You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
"""RAT (Retrieval Augmented Thinking) package.This package provides tools for enhanced AI responses through structured reasoning and research."""from .research.mainimportmain__all__= ['main']
__version__="0.1.0"
rat/research/init.py
"""RAT Research package initialization.This module provides research capabilities for the RAT system."""from .perplexity_clientimportPerplexityClientfrom .firecrawl_clientimportFirecrawlClientfrom .orchestratorimportResearchOrchestratorfrom .output_managerimportOutputManager__all__= [
'PerplexityClient',
'FirecrawlClient',
'ResearchOrchestrator',
'OutputManager'
]
rat/research/main.py
"""Main entry point for executing the research package as a module."""from .mainimportmainif__name__=='__main__':
main()
rat/research/agents/init.py
"""Multi-agent system for research orchestration.Provides specialized agents for search, exploration, and reasoning."""from .baseimportBaseAgent, ResearchDecision, DecisionTypefrom .contextimportResearchContext, ContextBranch, ContentTypefrom .searchimportSearchAgentfrom .exploreimportExploreAgentfrom .reasonimportReasoningAgent__all__= [
'BaseAgent',
'ResearchContext',
'ContextBranch',
'ResearchDecision',
'DecisionType',
'ContentType',
'SearchAgent',
'ExploreAgent',
'ReasoningAgent'
]
rat/research/agents/base.py
"""Base agent interface and core decision-making structures.Defines the contract that all specialized research agents must implement.This async version uses asyncio locks and awaits where appropriate."""fromabcimportABC, abstractmethodfromdataclassesimportdataclassfromtypingimportList, Dict, Any, Optional, Set, TYPE_CHECKINGifTYPE_CHECKING:
from .contextimportResearchContextfromenumimportEnumfromrichimportprintasrprintimportasyncioimporttimeimportlogginglogger=logging.getLogger(__name__)
classTokenBucket:
""" Implements the token bucket algorithm for rate limiting. Provides a more flexible and efficient way to control API request rates. """def__init__(self, rate_limit: float, burst_limit: Optional[float] =None):
""" Initialize the token bucket. Args: rate_limit: Number of tokens per minute burst_limit: Maximum number of tokens that can be accumulated (defaults to rate_limit) """self.rate_limit=float(rate_limit)
self.burst_limit=float(burst_limitifburst_limitisnotNoneelserate_limit)
self.tokens=self.burst_limitself.last_update=asyncio.get_event_loop().time()
self._lock=asyncio.Lock()
asyncdefacquire(self, tokens: float=1.0) ->float:
""" Acquire tokens from the bucket. Returns the wait time if tokens aren't available. Args: tokens: Number of tokens to acquire (default: 1.0) Returns: Float: Time to wait in seconds (0 if tokens are available immediately) """asyncwithself._lock:
now=asyncio.get_event_loop().time()
time_passed=now-self.last_updateself.tokens=min(
self.burst_limit,
self.tokens+time_passed* (self.rate_limit/60.0)
)
self.last_update=nowifself.tokens>=tokens:
self.tokens-=tokensreturn0.0else:
wait_time= (tokens-self.tokens) *60.0/self.rate_limitself.tokens=0returnwait_timeasyncdeftry_acquire(self, tokens: float=1.0) ->bool:
""" Try to acquire tokens without waiting. Returns: bool: True if tokens were acquired, False otherwise """wait_time=awaitself.acquire(tokens)
returnwait_time==0.0classDecisionType(Enum):
"""Types of decisions an agent can make during research."""SEARCH="search"# New search query neededEXPLORE="explore"# URL exploration neededREASON="reason"# Deep analysis needed (using Gemini now)EXECUTE="execute"# Execution of a decisionTERMINATE="terminate"# Research complete or no further steps@dataclassclassResearchDecision:
""" Represents a decision made by an agent during the research process. Attributes: decision_type: The type of action recommended priority: Priority level (0-1) for this decision context: Additional context or parameters for the decision rationale: Explanation for why this decision was made """decision_type: DecisionTypepriority: floatcontext: Dict[str, Any]
rationale: strdef__post_init__(self):
ifnot0<=self.priority<=1:
raiseValueError("Priority must be between 0 and 1")
classBaseAgent(ABC):
""" Base class for all research agents. All methods are now asynchronous with improved rate limiting. """def__init__(self, name: str, config: Optional[Dict[str, Any]] =None):
self.name=nameself.config=configor {}
self.decisions_made= []
self.metrics= {
"decisions_made": 0,
"successful_executions": 0,
"failed_executions": 0,
"total_execution_time": 0.0,
"parallel_executions": 0,
"max_concurrent_tasks": 0,
"rate_limit_delays": 0,
"retry_attempts": 0,
"tokens_consumed": 0.0
}
self.max_workers=self.config.get("max_workers", 5)
self.rate_limit=self.config.get("rate_limit", 100)
self.burst_limit=self.config.get("burst_limit", self.rate_limit*1.5)
self._active_tasks: Set[str] =set()
self._tasks_lock=asyncio.Lock()
self._token_bucket=TokenBucket(self.rate_limit, self.burst_limit)
self.logger=logging.getLogger(f"{__name__}.{name}")
asyncdef_enforce_rate_limit(self, tokens: float=1.0) ->None:
""" Enforce rate limiting using the token bucket algorithm. More sophisticated than the previous implementation, allowing for bursts while still maintaining average rate limits. Args: tokens: Number of tokens to consume (default: 1.0) """ifself.rate_limit<=0:
returnwait_time=awaitself._token_bucket.acquire(tokens)
ifwait_time>0:
self.metrics["rate_limit_delays"] +=1self.logger.debug(f"Rate limit enforced, waiting {wait_time:.2f}s")
awaitasyncio.sleep(wait_time)
self.metrics["tokens_consumed"] +=tokens@abstractmethodasyncdefanalyze(self, context: 'ResearchContext') ->List[ResearchDecision]:
""" Analyze the current research context and make decisions. """passdeflog_decision(self, decision: ResearchDecision, success: bool=True, execution_time: float=0.0):
""" Log a decision made by this agent and update metrics. """self.decisions_made.append(decision)
self.metrics["decisions_made"] +=1ifsuccess:
self.metrics["successful_executions"] +=1else:
self.metrics["failed_executions"] +=1self.metrics["total_execution_time"] +=execution_time@abstractmethodasyncdefexecute_decision(self, decision: ResearchDecision) ->Dict[str, Any]:
""" Execute a decision made by this or another agent. """passdefget_decision_history(self) ->List[ResearchDecision]:
""" Get the history of decisions made by this agent. """returnself.decisions_made.copy()
defget_metrics(self) ->Dict[str, Any]:
""" Get the current metrics for this agent. """returnself.metrics.copy()
def__repr__(self) ->str:
returnf"{self.__class__.__name__}(name='{self.name}')"
rat/research/agents/context.py
"""Research context management system.Handles token counting, content branching, and state management for the research process."""fromdataclassesimportdataclass, fieldfromtypingimportList, Dict, Any, Optional, SetfromenumimportEnumimportjsonimporttimefromuuidimportuuid4classContentType(Enum):
"""Types of content that can be stored in the research context."""QUERY="query"SEARCH_RESULT="search_result"URL_CONTENT="url_content"ANALYSIS="analysis"EXPLORED_CONTENT="explored_content"OTHER="other"@dataclassclassContentItem:
""" A piece of content in the research context. Attributes: content_type: Type of this content content: The actual content data metadata: Additional information about this content token_count: Number of tokens in this content priority: Priority of this content (0-1) timestamp: When this content was added id: Unique identifier for this content item """content_type: ContentTypecontent: Anymetadata: Dict[str, Any]
token_count: intpriority: float=0.5timestamp: float=field(default_factory=time.time)
id: str=field(default_factory=lambda: str(uuid4()))
@dataclassclassContextBranch:
""" A branch of the research context for parallel processing. Attributes: branch_id: Unique identifier for this branch parent_id: ID of the parent branch (if any) content_items: Content items in this branch token_count: Total tokens in this branch created_at: When this branch was created metadata: Additional branch-specific metadata """branch_id: strparent_id: Optional[str]
content_items: List[ContentItem] =field(default_factory=list)
token_count: int=0created_at: float=field(default_factory=time.time)
metadata: Dict[str, Any] =field(default_factory=dict)
classResearchContext:
""" Manages the state and evolution of a research session. Handles token counting, content organization, and parallel processing through branching and merging operations. """# Increased limit to match large Gemini input allowance.# We'll chunk if needed, up to 1,048,576 tokens totalMAX_TOKENS_PER_BRANCH=1048576def__init__(self, initial_question: str):
""" Initialize a new research context. Args: initial_question: The research question to investigate """self.initial_question=initial_questionself.main_branch=ContextBranch(
branch_id="main",
parent_id=None
)
self.branches: Dict[str, ContextBranch] = {"main": self.main_branch}
self.merged_branches: Set[str] =set()
self.version=0defcreate_branch(self, parent_branch_id: str="main") ->ContextBranch:
""" Create a new branch from an existing one. Args: parent_branch_id: ID of the branch to fork from Returns: The newly created branch """ifparent_branch_idnotinself.branches:
raiseValueError(f"Parent branch {parent_branch_id} not found")
new_branch_id=str(uuid4())
parent=self.branches[parent_branch_id]
# Create new branch with copied contentnew_branch=ContextBranch(
branch_id=new_branch_id,
parent_id=parent_branch_id,
content_items=parent.content_items.copy(),
token_count=parent.token_count
)
self.branches[new_branch_id] =new_branchreturnnew_branchdefadd_content(self,
branch_id: str,
content_item: Optional[ContentItem] =None,
content_type: Optional[ContentType] =None,
content: Optional[Any] =None,
metadata: Optional[Dict[str, Any]] =None,
token_count: Optional[int] =None,
priority: float=0.5) ->ContentItem:
""" Add new content to a specific branch. Can be called with either: 1. branch_id and content_item 2. branch_id and individual parameters (content_type, content, metadata, etc.) Raises ValueError if adding content would exceed the per-branch token limit. """ifbranch_idnotinself.branches:
raiseValueError(f"Branch {branch_id} not found")
branch=self.branches[branch_id]
ifcontent_item:
item=content_itemtoken_usage=item.token_countelse:
ifcontent_typeisNoneorcontentisNoneormetadataisNone:
raiseValueError("Must provide either content_item or all of: content_type, content, metadata")
# Estimate tokens if not providediftoken_countisNone:
token_usage=self._estimate_tokens(str(content))
else:
token_usage=token_count# Create new content itemitem=ContentItem(
content_type=content_type,
content=content,
metadata=metadata,
token_count=token_usage,
priority=priority
)
# Check token limitifbranch.token_count+token_usage>self.MAX_TOKENS_PER_BRANCH:
raiseValueError(
f"Adding this content would exceed the token limit "f"({self.MAX_TOKENS_PER_BRANCH}) for branch {branch_id}"
)
branch.content_items.append(item)
branch.token_count+=token_usagereturnitemdefmerge_branches(self, branch_ids: List[str], target_branch_id: str="main"):
""" Merge multiple branches into a target branch. Args: branch_ids: List of branch IDs to merge target_branch_id: Branch to merge into """iftarget_branch_idnotinself.branches:
raiseValueError(f"Target branch {target_branch_id} not found")
target=self.branches[target_branch_id]
merged_content: Dict[str, ContentItem] = {
item.id: itemforitemintarget.content_items
}
# Merge each branchforbranch_idinbranch_ids:
ifbranch_idnotinself.branches:
raiseValueError(f"Branch {branch_id} not found")
branch=self.branches[branch_id]
# Add unique content itemsforiteminbranch.content_items:
ifitem.idnotinmerged_content:
merged_content[item.id] =itemself.merged_branches.add(branch_id)
# Update target branchtarget.content_items=list(merged_content.values())
target.token_count=sum(item.token_countforitemintarget.content_items)
# Increment version after successful mergeself.version+=1defget_content(self,
branch_id: str,
content_type: Optional[ContentType] =None) ->List[ContentItem]:
""" Get content items from a specific branch, optionally filtered by ContentType. """ifbranch_idnotinself.branches:
raiseValueError(f"Branch {branch_id} not found")
items=self.branches[branch_id].content_itemsifcontent_type:
items= [itemforiteminitemsifitem.content_type==content_type]
returnitemsdef_estimate_tokens(self, content: str) ->int:
""" Estimate the number of tokens in a piece of content. Simple approximation: ~4 characters per token. """returnlen(content) //4defto_dict(self) ->Dict[str, Any]:
""" Convert the context to a dictionary for serialization. """return {
"initial_question": self.initial_question,
"version": self.version,
"branches": {
bid: {
"branch_id": b.branch_id,
"parent_id": b.parent_id,
"token_count": b.token_count,
"created_at": b.created_at,
"metadata": b.metadata,
"content_items": [
{
"id": item.id,
"content_type": item.content_type.value,
"content": item.content,
"metadata": item.metadata,
"token_count": item.token_count,
"priority": item.priority,
"timestamp": item.timestamp
}
foriteminb.content_items
]
}
forbid, binself.branches.items()
},
"merged_branches": list(self.merged_branches)
}
@classmethoddeffrom_dict(cls, data: Dict[str, Any]) ->'ResearchContext':
""" Create a context instance from a dictionary. """context=cls(data["initial_question"])
context.version=data["version"]
context.merged_branches=set(data["merged_branches"])
# Reconstruct branchescontext.branches= {}
forbid, bdataindata["branches"].items():
branch=ContextBranch(
branch_id=bdata["branch_id"],
parent_id=bdata["parent_id"],
token_count=bdata["token_count"],
created_at=bdata["created_at"],
metadata=bdata["metadata"]
)
# Reconstruct content itemsforidatainbdata["content_items"]:
item=ContentItem(
content_type=ContentType(idata["content_type"]),
content=idata["content"],
metadata=idata["metadata"],
token_count=idata["token_count"],
priority=idata.get("priority", 0.5),
timestamp=idata["timestamp"],
id=idata["id"]
)
branch.content_items.append(item)
context.branches[bid] =branchreturncontextdefsave_to_file(self, file_path: str):
""" Persist the research context to a JSON file. Args: file_path: Path where the context JSON should be saved. """withopen(file_path, 'w', encoding='utf-8') asf:
json.dump(self.to_dict(), f, indent=2)
@classmethoddefload_from_file(cls, file_path: str) ->'ResearchContext':
""" Load a research context from a JSON file. Args: file_path: Path to the JSON file. Returns: An instance of ResearchContext. """withopen(file_path, 'r', encoding='utf-8') asf:
data=json.load(f)
returncls.from_dict(data)
rat/research/agents/explore.py
"""Explore agent for extracting content from URLs.Now acts as a simple executor that processes EXPLORE decisions."""fromtypingimportList, Dict, Any, Optionalimportloggingfromrichimportprintasrprintimportasynciofrom .baseimportBaseAgent, ResearchDecision, DecisionTypefrom .contextimportResearchContext, ContentTypefrom ..firecrawl_clientimportFirecrawlClientlogger=logging.getLogger(__name__)
classExploreAgent(BaseAgent):
""" Agent responsible for extracting content from URLs. """def__init__(self, firecrawl_client: FirecrawlClient, config=None):
super().__init__("explore", config)
self.firecrawl=firecrawl_clientself.logger=logging.getLogger(__name__)
asyncdefanalyze(self, context: ResearchContext) ->List[ResearchDecision]:
# URL selection is handled by ReasoningAgent.return []
defcan_handle(self, decision: ResearchDecision) ->bool:
returndecision.decision_type==DecisionType.EXPLOREasyncdefexecute_decision(self, decision: ResearchDecision) ->Dict[str, Any]:
""" Execute an EXPLORE decision by scraping the URL. """url=decision.context["url"]
self.logger.info(f"Exploring URL: {url}")
try:
scrape_result=awaitself.firecrawl.extract_content(url)
ifnotscrape_result:
self.logger.warning(f"No content extracted from URL: {url}")
return {}
return {
"url": url,
"title": scrape_result.get("title", ""),
"text": scrape_result.get("text", ""),
"metadata": {
**scrape_result.get("metadata", {}),
"relevance": decision.context.get("relevance", 0.0),
"rationale": decision.context.get("rationale", "")
}
}
exceptExceptionase:
self.logger.error(f"Error exploring URL {url}: {str(e)}")
return {"url": url, "error": str(e)}
rat/research/agents/reason.py
"""Reasoning agent for analyzing research content using the o3-mini model with high reasoning effort.Now acts as the "lead agent" that decides next steps.All methods are now asynchronous."""importasynciofromtypingimportList, Dict, Any, Optional, Setfromdataclassesimportdataclassimporttimefromrichimportprintasrprintimportloggingimportjsonimportrefromurllib.parseimporturlparseimportopenaifromopenaiimportOpenAIfrom .baseimportBaseAgent, ResearchDecision, DecisionTypefrom .contextimportResearchContext, ContentType, ContentItemlogger=logging.getLogger(__name__)
api_logger=logging.getLogger('api.o3mini')
@dataclassclassAnalysisTask:
content: strpriority: floatrationale: strchunk_index: Optional[int] =Nonetimestamp: float=time.time()
classReasoningAgent(BaseAgent):
""" Reasoning agent for analyzing research content. """def__init__(self, config: Optional[Dict[str, Any]] =None):
super().__init__("reason", config)
self.model_name="o3-mini"self.max_output_tokens=self.config.get("max_output_tokens", 50000)
self.request_timeout=self.config.get("o3_mini_timeout", 180)
self.reasoning_effort="high"self.chunk_margin=5000self.max_parallel_tasks=self.config.get("max_parallel_tasks", 3)
self.min_priority=self.config.get("min_priority", 0.3)
self.min_url_relevance=self.config.get("min_url_relevance", 0.6)
self.explored_urls: Set[str] =set()
self.flash_fix_rate_limit=self.config.get("flash_fix_rate_limit", 10)
self._flash_fix_last_time=0.0self._flash_fix_lock=asyncio.Lock()
logger.info("ReasoningAgent initialized to use o3-mini model: %s", self.model_name)
self.analysis_tasks: Dict[str, AnalysisTask] = {}
asyncdef_enforce_flash_fix_limit(self):
ifself.flash_fix_rate_limit<=0:
returnasyncwithself._flash_fix_lock:
current_time=asyncio.get_event_loop().time()
elapsed=current_time-self._flash_fix_last_timemin_interval=60.0/self.flash_fix_rate_limitifelapsed<min_interval:
awaitasyncio.sleep(min_interval-elapsed)
self.metrics["rate_limit_delays"] +=1self._flash_fix_last_time=asyncio.get_event_loop().time()
asyncdef_call_o3_mini(self, prompt: str, context: str="") ->str:
messages= []
ifcontext:
messages.append({"role": "assistant", "content": context})
messages.append({"role": "user", "content": prompt})
api_logger.info(f"o3-mini API Request - Prompt length: {len(prompt)}")
try:
# Enforce rate limit before making the API callawaitself._enforce_rate_limit()
client=OpenAI()
response=awaitasyncio.to_thread(
client.chat.completions.create,
model=self.model_name,
messages=messages,
reasoning_effort=self.reasoning_effort,
max_completion_tokens=self.max_output_tokens,
**({"response_format": {"type": "json_object"}} if"json"inprompt.lower() else {})
)
returnresponse.choices[0].message.contentexceptExceptionase:
api_logger.error(f"o3-mini API error: {str(e)}")
raiseasyncdefanalyze(self, context: ResearchContext) ->List[ResearchDecision]:
decisions= []
search_results=context.get_content("main", ContentType.SEARCH_RESULT)
ifnotsearch_results:
decisions.append(
ResearchDecision(
decision_type=DecisionType.SEARCH,
priority=1.0,
context={
"query": context.initial_question,
"rationale": "Initial broad search for the research question"
},
rationale="Starting research with a broad search query"
)
)
returndecisionsexplored_content=context.get_content("main", ContentType.EXPLORED_CONTENT)
explored_urls= {
item.content.get("url", "") foriteminexplored_contentifisinstance(item.content, dict)
}
self.explored_urls.update(explored_urls)
unvisited_urls=set()
forresultinsearch_results:
ifisinstance(result.content, dict):
urls=result.content.get("urls", [])
unvisited_urls.update(urlforurlinurlsifurlnotinself.explored_urls)
relevant_urls=awaitself._filter_relevant_urls(list(unvisited_urls), context)
forurl, relevanceinrelevant_urls:
ifrelevance>=self.min_url_relevance:
decisions.append(
ResearchDecision(
decision_type=DecisionType.EXPLORE,
priority=relevance,
context={
"url": url,
"relevance": relevance,
"rationale": "URL deemed relevant to research question"
},
rationale=f"URL relevance score: {relevance:.2f}"
)
)
unprocessed_search= [itemforiteminsearch_resultsifnotitem.metadata.get("analyzed_by_reasoner")]
unprocessed_explored= [itemforiteminexplored_contentifnotitem.metadata.get("analyzed_by_reasoner")]
foriteminunprocessed_search+unprocessed_explored:
ifitem.priority<self.min_priority:
continuedecisions.append(
ResearchDecision(
decision_type=DecisionType.REASON,
priority=0.9,
context={
"content": item.content,
"content_type": item.content_type.value,
"item_id": item.id
},
rationale=f"Analyze new {item.content_type.value} content"
)
)
search_text="\n".join(str(item.content) foriteminsearch_resultsifisinstance(item.content, str))
explored_text="\n".join(str(item.content) foriteminexplored_contentifisinstance(item.content, str))
analysis_items=context.get_content("main", ContentType.ANALYSIS)
analysis_text="\n".join(
(item.content.get("analysis", "") ifisinstance(item.content, dict) elsestr(item.content))
foriteminanalysis_items
)
combined_analysis=f"{search_text}\n\n{explored_text}\n\n{analysis_text}".strip()
ifcombined_analysis:
gaps=awaitself._identify_knowledge_gaps(context.initial_question, combined_analysis)
filtered_gaps= []
forgapingaps:
query_str=gap.get("query", "")
url_str=gap.get("url", "")
ifany(xinquery_strorxinurl_strforxin ("[", "]")):
self.logger.warning(f"Skipping gap with placeholders: {gap}")
continuefiltered_gaps.append(gap)
forgapinfiltered_gaps:
ifgap["type"] =="search":
decisions.append(
ResearchDecision(
decision_type=DecisionType.SEARCH,
priority=0.8,
context={
"query": gap["query"],
"rationale": gap["rationale"]
},
rationale=f"Fill knowledge gap: {gap['rationale']}"
)
)
elifgap["type"] =="explore":
ifgap["url"] notinself.explored_urls:
decisions.append(
ResearchDecision(
decision_type=DecisionType.EXPLORE,
priority=0.75,
context={
"url": gap["url"],
"rationale": gap["rationale"]
},
rationale=f"Explore URL for more details: {gap['rationale']}"
)
)
else:
self.logger.info("Skipping knowledge gap analysis due to lack of context.")
ifawaitself._should_terminate(context):
decisions.append(
ResearchDecision(
decision_type=DecisionType.TERMINATE,
priority=1.0,
context={},
rationale="Research question appears to be sufficiently answered"
)
)
returndecisionsdefcan_handle(self, decision: ResearchDecision) ->bool:
returndecision.decision_type==DecisionType.REASONasyncdefexecute_decision(self, decision: ResearchDecision) ->Dict[str, Any]:
start_time=asyncio.get_event_loop().time()
success=Falseresults= {}
try:
content=decision.context["content"]
content_type=decision.context["content_type"]
item_id=decision.context["item_id"]
content_str=str(content)
tokens_estimated=len(content_str) //4iftokens_estimated>self.max_output_tokens:
chunk_results=awaitself._parallel_analyze_content(content_str, content_type)
results=self._combine_chunk_results(chunk_results)
else:
results=awaitself._analyze_content_chunk(content_str, content_type)
success=bool(results.get("analysis", "").strip())
decision.context["analyzed_by_reasoner"] =Truefinal_results= {
"analysis": results.get("analysis", ""),
"insights": results.get("insights", []),
"analyzed_item_id": item_id
}
ifsuccess:
rprint(f"[green]ReasoningAgent: Analysis completed for content type '{content_type}'[/green]")
else:
rprint(f"[yellow]ReasoningAgent: No analysis produced for '{content_type}'[/yellow]")
returnfinal_resultsexceptExceptionase:
rprint(f"[red]ReasoningAgent error: {str(e)}[/red]")
return {"error": str(e), "analysis": "", "insights": []}
finally:
execution_time=asyncio.get_event_loop().time() -start_timeself.log_decision(decision, success, execution_time)
asyncdef_parallel_analyze_content(self, content: str, content_type: str) ->List[Dict[str, Any]]:
words=content.split()
chunk_size_words=self.max_output_tokens*4chunks= []
idx=0whileidx<len(words):
chunk=words[idx: idx+chunk_size_words]
chunks.append(" ".join(chunk))
idx+=chunk_size_wordstasks= []
fori, chunkinenumerate(chunks):
tasks.append(asyncio.create_task(self._analyze_content_chunk(chunk, f"{content_type}_chunk_{i}")))
chunk_results=awaitasyncio.gather(*tasks, return_exceptions=False)
fori, resinenumerate(chunk_results):
res["chunk_index"] =ireturnchunk_resultsasyncdef_analyze_content_chunk(self, content: str, content_type: str) ->Dict[str, Any]:
awaitself._enforce_flash_fix_limit()
prompt= (
"You are an advanced reasoning model (o3-mini) with high reasoning effort. ""Analyze the following text for key insights, patterns, or relevant facts. ""Provide ONLY factual analysis and insights without placeholders or next-step suggestions.\n\n"f"CONTENT:\n{content}\n\n""Please provide your analysis below (plain text only):"
)
response_text=awaitself._call_o3_mini(prompt)
analysis_text=response_text.strip()
insights=self._extract_insights(analysis_text)
return {"analysis": analysis_text, "insights": insights}
def_extract_insights(self, analysis_text: str) ->List[str]:
lines=analysis_text.split("\n")
insights= []
forlineinlines:
line=line.strip()
if (line.startswith("-") orline.startswith("*") orline.startswith("•") or
(len(line) >2andline[:2].isdigit())):
insights.append(line.lstrip("-*•").strip())
returninsightsdef_combine_chunk_results(self, chunk_results: List[Dict[str, Any]]) ->Dict[str, Any]:
sorted_chunks=sorted(chunk_results, key=lambdax: x.get("chunk_index", 0))
combined_analysis="\n\n".join(res["analysis"] forresinsorted_chunksifres.get("analysis", "").strip())
combined_insights= []
forresinsorted_chunks:
combined_insights.extend(insightforinsightinres.get("insights", []) ifinsight.strip())
unique_insights=list(dict.fromkeys(combined_insights))
return {"analysis": combined_analysis, "insights": unique_insights, "chunk_count": len(chunk_results)}
asyncdef_identify_knowledge_gaps(self, question: str, current_analysis: str) ->List[Dict[str, Any]]:
prompt= (
"You are an advanced research assistant. Given a research question and current analysis, ""identify specific missing information and suggest concrete next steps.\n\n""IMPORTANT RULES:\n""1. DO NOT use placeholders like [company name] or [person].\n""2. Base suggestions solely on the provided content.\n""3. If no specific gaps can be identified, return an empty array.\n""4. Each suggestion must be actionable and clearly linked to the research question.\n\n"f"RESEARCH QUESTION: {question}\n\n"f"CURRENT ANALYSIS:\n{current_analysis}\n\n""Return a JSON object with a 'gaps' array in this format:\n""{\"gaps\": [{\"type\": \"search\"|\"explore\", \"query\"|\"url\": \"specific text\", \"rationale\": \"why needed\"}]}"
)
try:
response=awaitself._call_o3_mini(prompt)
content_str=response.strip()
ifnotcontent_str:
return []
try:
result=json.loads(content_str)
gaps=result.get("gaps", [])
filtered_gaps= []
forgapingaps:
ifnotisinstance(gap, dict):
continueif"type"notingaporgap["type"] notin ["search", "explore"]:
continuecontent_field="query"ifgap["type"] =="search"else"url"ifcontent_fieldnotingapor"rationale"notingap:
continueifany(xingap[content_field] orxingap["rationale"] forxin ("[", "]")):
self.logger.warning(f"Skipping gap with placeholders: {gap}")
continuefiltered_gaps.append(gap)
returnfiltered_gapsexceptjson.JSONDecodeError:
self.logger.error("Failed to parse knowledge gaps JSON response")
return []
exceptExceptionase:
self.logger.error(f"Error identifying knowledge gaps: {str(e)}")
return []
def_clean_json_response(self, content: str) ->str:
content=content.strip()
ifcontent.startswith("```"):
start_idx=content.find("\n") +1end_idx=content.rfind("```")
ifend_idx>start_idx:
content=content[start_idx:end_idx].strip()
else:
content=content.replace("```", "").strip()
content=content.replace("json", "").strip()
returncontentasyncdef_should_terminate(self, context: ResearchContext) ->bool:
analysis_items=context.get_content("main", ContentType.ANALYSIS)
iflen(analysis_items) <3:
returnFalsecombined_analysis="\n".join(
str(item.content.get("analysis", "")) foriteminanalysis_itemsifisinstance(item.content, dict)
)
prompt= (
"You are an advanced research assistant. Given a research question and current analysis, ""determine if the question has been sufficiently answered.\n\n"f"QUESTION: {context.initial_question}\n\n"f"CURRENT ANALYSIS:\n{combined_analysis}\n\n""Respond with a single word: YES if answered, NO if not."
)
try:
answer=awaitself._call_o3_mini(prompt, combined_analysis)
returnanswer.strip().upper() =="YES"exceptException:
returnFalseasyncdef_filter_relevant_urls(self, urls: List[str], context: ResearchContext) ->List[tuple]:
ifnoturls:
return []
batch_size=5url_batches= [urls[i:i+batch_size] foriinrange(0, len(urls), batch_size)]
relevant_urls= []
forbatchinurl_batches:
prompt= (
"You are an expert at determining URL relevance for research questions.\n""For each URL, analyze its potential relevance to the research question ""and provide a relevance score between 0.0 and 1.0.\n\n"f"RESEARCH QUESTION: {context.initial_question}\n\n""URLs to evaluate:\n"
)
forurlinbatch:
domain=urlparse(url).netlocpath=urlparse(url).pathprompt+=f"- {domain}{path}\n"prompt+= (
"\nRespond with a JSON array of scores in this format:\n""[{\"url\": \"...\", \"score\": 0.X, \"reason\": \"...\"}]\n""ONLY return the JSON array, no other text."
)
try:
content=awaitself._call_o3_mini(prompt)
ifcontent.startswith("```"):
content=content[content.find("\n")+1:content.rfind("```")].strip()
content=content.replace("json", "").strip()
scores=json.loads(content)
forscore_objinscores:
url=score_obj["url"]
score=float(score_obj["score"])
relevant_urls.append((url, score))
exceptExceptionase:
logger.error(f"Error scoring URLs: {str(e)}")
forurlinbatch:
relevance=self._basic_url_relevance(url, context.initial_question)
relevant_urls.append((url, relevance))
returnsorted(relevant_urls, key=lambdax: x[1], reverse=True)
def_basic_url_relevance(self, url: str, question: str) ->float:
importrefromurllib.parseimporturlparsekeywords=set(re.findall(r'\w+', question.lower()))
parsed=urlparse(url)
domain_parts=parsed.netloc.lower().split('.')
path_parts=parsed.path.lower().split('/')
domain_matches=sum(1forpartindomain_partsifpartinkeywords)
path_matches=sum(1forpartinpath_partsifpartinkeywords)
score= (domain_matches*0.6+path_matches*0.4) /max(len(keywords), 1)
returnmin(max(score, 0.0), 1.0)
"""Firecrawl client for web scraping functionality.Now uses asyncio.to_thread to wrap blocking calls."""importosfromtypingimportDict, Anyfromdotenvimportload_dotenvfromrichimportprintasrprintfromfirecrawlimportFirecrawlAppimportloggingimportasyncioload_dotenv()
api_logger=logging.getLogger('api.firecrawl')
classFirecrawlClient:
def__init__(self):
self.api_key=os.getenv("FIRECRAWL_API_KEY")
ifnotself.api_key:
raiseValueError("FIRECRAWL_API_KEY not found in environment variables")
self.app=FirecrawlApp(api_key=self.api_key)
self.logger=logging.getLogger(__name__)
asyncdefextract_content(self, url: str) ->Dict[str, Any]:
""" Asynchronously extract content from a webpage. """api_logger.info(f"Firecrawl API Request - URL: {url}")
try:
ifnoturl.startswith(('http://', 'https://')):
url='https://'+urlapi_logger.debug(f"Added https:// protocol to URL: {url}")
result=awaitasyncio.to_thread(self.app.scrape_url, url, params={'formats': ['markdown']})
processed_result=self._process_extracted_content(result.get('data', {}), url)
api_logger.debug(f"Processed content from {url}: {len(processed_result.get('text', ''))} chars")
returnprocessed_resultexceptExceptionase:
api_logger.error(f"Firecrawl API request failed for {url}: {str(e)}")
return {
"title": "",
"text": "",
"metadata": {
"url": url,
"error": str(e)
}
}
def_process_extracted_content(self, data: Dict[str, Any], original_url: str) ->Dict[str, Any]:
metadata=data.get("metadata", {})
markdown_content=data.get("markdown", "")
processed= {
"title": metadata.get("title", metadata.get("ogTitle", "")),
"text": markdown_content,
"metadata": {
"url": metadata.get("sourceURL", original_url),
"author": metadata.get("author", ""),
"published_date": "",
"domain": metadata.get("ogSiteName", ""),
"word_count": len(markdown_content.split()) ifmarkdown_contentelse0,
"language": metadata.get("language", ""),
"status_code": metadata.get("statusCode", 200)
}
}
ifprocessed["text"]:
processed["text"] =self._clean_text(processed["text"])
api_logger.debug(f"Cleaned text for {original_url}: {len(processed['text'])} chars")
else:
api_logger.warning(f"No text content extracted from {original_url}")
returnprocesseddef_clean_text(self, text: str) ->str:
ifnottext:
return""lines=text.split("\n")
cleaned_lines= []
forlineinlines:
ifline.strip().startswith(("#", "-", "*", "1.", ">")):
cleaned_lines.append(line)
else:
cleaned=" ".join(line.split())
ifcleaned:
cleaned_lines.append(cleaned)
return"\n\n".join(cleaned_lines)
rat/research/main.py
"""Main entry point for the multi-agent research system.Now uses an async main loop."""importosimportsysimportjsonimportloggingfrompathlibimportPathfromtypingimportDict, Any, Optionalfromdotenvimportload_dotenvfromrich.consoleimportConsolefromrich.panelimportPanelfromrich.markdownimportMarkdownfromprompt_toolkitimportpromptfromprompt_toolkit.completionimportWordCompleterimportasynciofrom .orchestratorimportResearchOrchestratorfrom .output_managerimportOutputManager# Logging configurationroot_logger=logging.getLogger()
forhandlerinroot_logger.handlers[:]:
root_logger.removeHandler(handler)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('rat.log', mode='w'),
logging.StreamHandler(sys.stdout)
]
)
api_logger=logging.getLogger('api')
api_logger.setLevel(logging.DEBUG)
api_handler=logging.FileHandler('rat_api.log', mode='w')
api_handler.setFormatter(
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
)
api_logger.addHandler(api_handler)
firecrawl_logger=logging.getLogger('api.firecrawl')
firecrawl_logger.setLevel(logging.DEBUG)
firecrawl_logger.addHandler(api_handler)
firecrawl_logger.propagate=Falseapi_logger.propagate=Falselogger=logging.getLogger(__name__)
console=Console()
defload_config() ->Dict[str, Any]:
load_dotenv()
config= {
'max_iterations': int(os.getenv('MAX_ITERATIONS', '5')),
'min_new_content': int(os.getenv('MIN_NEW_CONTENT', '3')),
'min_confidence': float(os.getenv('MIN_CONFIDENCE', '0.7')),
'search_config': {
'max_results': int(os.getenv('MAX_SEARCH_RESULTS', '10')),
'min_relevance': float(os.getenv('MIN_SEARCH_RELEVANCE', '0.6')),
'api_key': os.getenv('PERPLEXITY_API_KEY'),
'max_workers': int(os.getenv('MAX_PARALLEL_SEARCHES', '10')),
'rate_limit': int(os.getenv('SEARCH_RATE_LIMIT', '100'))
},
'explore_config': {
'max_urls': int(os.getenv('MAX_URLS', '20')),
'min_priority': float(os.getenv('MIN_URL_PRIORITY', '0.5')),
'allowed_domains': json.loads(os.getenv('ALLOWED_DOMAINS', '[]')),
'api_key': os.getenv('FIRECRAWL_API_KEY'),
'max_workers': int(os.getenv('MAX_PARALLEL_EXPLORES', '10')),
'rate_limit': int(os.getenv('EXPLORE_RATE_LIMIT', '50'))
},
'reason_config': {
'max_chunk_size': int(os.getenv('MAX_CHUNK_SIZE', '4000')),
'min_confidence': float(os.getenv('MIN_ANALYSIS_CONFIDENCE', '0.7')),
'max_workers': int(os.getenv('MAX_PARALLEL_REASON', '5')),
'rate_limit': int(os.getenv('REASON_RATE_LIMIT', '10')),
'flash_fix_rate_limit': int(os.getenv('FLASH_FIX_RATE_LIMIT', '10')),
'api_key': os.getenv('GEMINI_API_KEY'),
'gemini_timeout': int(os.getenv('GEMINI_TIMEOUT', '180'))
}
}
returnconfigdefdisplay_welcome():
welcome_text="""# RAT - Retrieval Augmented ThinkingWelcome to the multi-agent research system! This tool helps you conduct comprehensive research using:1. Search Agent (Perplexity) - Intelligent web searching2. Explore Agent (Firecrawl) - URL content extraction3. Reasoning Agent (Gemini) - Content analysis using Gemini 2.0 Flash ThinkingEnter your research question below, or type 'help' for more information."""console.print(Panel(Markdown(welcome_text), title="Welcome", border_style="blue"))
defdisplay_help():
help_text="""## Available Commands- `research <question>` - Start a new research session- `config` - View current configuration- `metrics` - View research metrics- `help` - Display this help message- `exit` - Exit the system## Tips- Be specific in your research questions- Use quotes for exact phrases- Type 'exit' to quit at any time"""console.print(Panel(Markdown(help_text), title="Help", border_style="green"))
asyncdefrun_research(question: str, config: Dict[str, Any]) ->None:
orchestrator=ResearchOrchestrator(config)
results=awaitorchestrator.start_research(question)
if"error"inresults:
console.print(f"[red]Research error: {results['error']}[/red]")
else:
console.print(Panel(Markdown(results["paper"]), title="Research Results", border_style="green"))
asyncdefmain_async():
importargparseparser=argparse.ArgumentParser(description="RAT - Retrieval Augmented Thinking")
parser.add_argument("--interactive", "-i", action="store_true",
help="Start in interactive mode")
parser.add_argument("question", nargs="?",
help="Research question (if not using interactive mode)")
args=parser.parse_args()
config=load_config()
ifargs.interactive:
display_welcome()
commands=WordCompleter(['research', 'config', 'metrics', 'help', 'exit'])
orchestrator: Optional[ResearchOrchestrator] =NonewhileTrue:
try:
user_input=prompt('RAT> ', completer=commands).strip()
ifnotuser_input:
continueifuser_input.lower() =='exit':
console.print("[yellow]Exiting research system...[/yellow]")
breakifuser_input.lower() =='help':
display_help()
continueifuser_input.lower() =='config':
console.print(Panel(json.dumps(config, indent=2), title="Configuration", border_style="cyan"))
continueifuser_input.lower() =='metrics'andorchestrator:
metrics=orchestrator._calculate_metrics(time.time())
console.print(Panel(json.dumps(metrics, indent=2), title="Research Metrics", border_style="magenta"))
continueifuser_input.lower().startswith('research '):
question=user_input[9:].strip()
ifnotquestion:
console.print("[red]Please provide a research question.[/red]")
continueawaitrun_research(question, config)
continueconsole.print("[red]Unknown command. Type 'help' for available commands.[/red]")
exceptKeyboardInterrupt:
console.print("\n[yellow]Operation cancelled. Type 'exit' to quit.[/yellow]")
continueexceptExceptionase:
console.print(f"[red]Error: {str(e)}[/red]")
continueelse:
ifnotargs.question:
parser.error("Research question is required when not in interactive mode")
awaitrun_research(args.question, config)
defmain():
asyncio.run(main_async())
if__name__=='__main__':
main()
rat/research/manager.py
"""Manager for coordinating the multi-agent research workflow.This module implements a central ResearchManager that: - Initializes a ResearchContext and persists it to a JSON file. - Dispatches agent decisions as concurrent tasks (using AgentTask wrappers). - Updates the context as agent tasks complete. - Generates a comprehensive research paper upon termination."""importjsonimporttimeimportloggingimportasynciofrompathlibimportPathfromtypingimportDict, Any, List, OptionalfromopenaiimportOpenAIfromrichimportprintasrprintfromrat.research.agents.searchimportSearchAgentfromrat.research.agents.exploreimportExploreAgentfromrat.research.agents.reasonimportReasoningAgentfromrat.research.agents.baseimportResearchDecision, DecisionTypefromrat.research.agents.contextimportResearchContext, ContentType, ContentItemfromrat.research.perplexity_clientimportPerplexityClientfromrat.research.firecrawl_clientimportFirecrawlClientfromrat.research.output_managerimportOutputManagerlogger=logging.getLogger(__name__)
classAgentTask:
""" Wrapper for an agent decision execution. Executes the decision via the appropriate agent and calls a callback to update the research context. """def__init__(self, decision: ResearchDecision, agent, callback):
self.decision=decisionself.agent=agentself.callback=callbackasyncdefrun(self):
"""Execute the decision asynchronously."""try:
result=awaitself.agent.execute_decision(self.decision)
self.callback(self.decision, result)
returnresultexceptExceptionase:
logger.error(f"Error executing decision: {str(e)}")
return {"error": str(e)}
classResearchManager:
""" Central manager for the multi-agent research process. """def__init__(self, config: Optional[Dict[str, Any]] =None):
self.config=configor {}
self.perplexity=PerplexityClient()
self.firecrawl=FirecrawlClient()
self.search_agent=SearchAgent(self.perplexity, self.config.get("search_config", {}))
self.explore_agent=ExploreAgent(self.firecrawl, self.config.get("explore_config", {}))
self.reason_agent=ReasoningAgent(self.config.get("reason_config", {}))
self.output_manager=OutputManager()
self.max_iterations=self.config.get("max_iterations", 5)
self.current_context: Optional[ResearchContext] =Noneself.research_dir: Optional[Path] =Noneself.previous_searches=set()
asyncdefstart_research(self, question: str) ->Dict[str, Any]:
start_time=time.time()
self.research_dir=self.output_manager.create_research_dir(question)
self.current_context=ResearchContext(initial_question=question)
self.persist_context()
iteration=0whileiteration<self.max_iterations:
iteration+=1logger.info(f"Starting iteration {iteration}")
decisions=awaitself.collect_decisions()
ifnotdecisions:
logger.info("No new decisions, terminating research")
breakifany(d.decision_type==DecisionType.TERMINATEfordindecisions):
logger.info("Terminate decision received")
breakawaitself.dispatch_decisions(decisions, iteration)
self.persist_context()
ifawaitself._should_terminate():
logger.info("Termination condition met based on context")
breakfinal_output=awaitself._generate_final_output()
total_time=time.time() -start_timefinal_output["metrics"] = {
"total_time": total_time,
"iterations": iteration
}
ifself.research_dir:
self.output_manager.save_research_paper(self.research_dir, final_output)
returnfinal_outputasyncdefcollect_decisions(self) ->List[ResearchDecision]:
decisions= []
try:
reason_decisions=awaitself.reason_agent.analyze(self.current_context)
exceptExceptionase:
logger.error(f"Error in reason agent analysis: {e}")
reason_decisions= []
try:
search_decisions=awaitself.search_agent.analyze(self.current_context)
exceptExceptionase:
logger.error(f"Error in search agent analysis: {e}")
search_decisions= []
try:
explore_decisions=awaitself.explore_agent.analyze(self.current_context)
exceptExceptionase:
logger.error(f"Error in explore agent analysis: {e}")
explore_decisions= []
decisions.extend(reason_decisions)
decisions.extend(search_decisions)
decisions.extend(explore_decisions)
filtered= []
fordindecisions:
ifd.decision_type==DecisionType.SEARCH:
query=d.context.get("query", "").strip()
ifqueryinself.previous_searches:
logger.info(f"Skipping duplicate search: {query}")
continueelse:
self.previous_searches.add(query)
filtered.append(d)
filtered.sort(key=lambdad: d.priority, reverse=True)
returnfilteredasyncdefdispatch_decisions(self, decisions: List[ResearchDecision], iteration: int):
tasks= []
fordecisionindecisions:
ifdecision.decision_type==DecisionType.TERMINATE:
continueagent=self._select_agent(decision)
ifnotagent:
continuetask=AgentTask(decision, agent, self.update_context_with_result)
tasks.append(asyncio.create_task(task.run()))
iftasks:
awaitasyncio.gather(*tasks)
defupdate_context_with_result(self, decision: ResearchDecision, result: Dict[str, Any]):
content_item=self._create_content_item(decision, result)
ifcontent_itemandself.current_context:
try:
self.current_context.add_content("main", content_item=content_item)
exceptExceptionase:
logger.error(f"Error updating context: {e}")
def_create_content_item(self, decision: ResearchDecision, result: Dict[str, Any]) ->ContentItem:
ifdecision.decision_type==DecisionType.SEARCH:
content_str=result.get("content", "")
urls=result.get("urls", [])
token_count=self.current_context._estimate_tokens(content_str)
returnContentItem(
content_type=ContentType.SEARCH_RESULT,
content=content_str,
metadata={"decision_type": decision.decision_type.value, "urls": urls},
token_count=token_count,
priority=decision.priority
)
else:
ifisinstance(result, dict):
content_str=result.get("analysis", json.dumps(result))
else:
content_str=str(result)
token_count=self.current_context._estimate_tokens(content_str)
content_type= (ContentType.EXPLORED_CONTENTifdecision.decision_type==DecisionType.EXPLOREelseContentType.ANALYSIS)
returnContentItem(
content_type=content_type,
content=result,
metadata={"decision_type": decision.decision_type.value},
token_count=token_count,
priority=decision.priority
)
def_select_agent(self, decision: ResearchDecision):
ifdecision.decision_type==DecisionType.SEARCH:
returnself.search_agentelifdecision.decision_type==DecisionType.EXPLORE:
returnself.explore_agentelifdecision.decision_type==DecisionType.REASON:
returnself.reason_agentelse:
returnNoneasyncdef_should_terminate(self) ->bool:
ifself.current_context:
contents=self.current_context.get_content("main")
iflen(contents) >=self.config.get("min_new_content", 3):
returnTruetry:
reason_decisions=awaitself.reason_agent.analyze(self.current_context)
search_decisions=awaitself.search_agent.analyze(self.current_context)
explore_decisions=awaitself.explore_agent.analyze(self.current_context)
valid_decisions= [
dfordin (reason_decisions+search_decisions+explore_decisions)
if (d.decision_type!=DecisionType.SEARCHord.context.get("query", "").strip() notinself.previous_searches)
]
ifnotvalid_decisions:
rprint("[yellow]Terminating: No further valid decisions from any agent.[/yellow]")
returnTrueexceptExceptionase:
rprint(f"[red]Error checking for new decisions: {str(e)}[/red]")
returnFalsereturnFalsedefpersist_context(self):
ifself.current_contextandself.research_dir:
context_file=self.research_dir/"research_context.json"self.current_context.save_to_file(str(context_file))
logger.info(f"Context persisted to {context_file}")
asyncdef_generate_comprehensive_markdown(self) ->str:
search_items=self.current_context.get_content("main", ContentType.SEARCH_RESULT)
explored_items=self.current_context.get_content("main", ContentType.EXPLORED_CONTENT)
analysis_items=self.current_context.get_content("main", ContentType.ANALYSIS)
corpus= []
corpus.append("### Consolidated Research\n")
corpus.append("#### Search Results\n")
foriteminsearch_items:
corpus.append(str(item.content))
corpus.append("\n#### Explored Content\n")
foriteminexplored_items:
corpus.append(str(item.content))
corpus.append("\n#### Analysis\n")
foriteminanalysis_items:
ifisinstance(item.content, dict):
corpus.append(item.content.get("analysis", ""))
else:
corpus.append(str(item.content))
big_text="\n\n".join(corpus)
prompt= (
"You are an advanced AI tasked with generating a comprehensive research paper in Markdown. ""Using the following research corpus, produce a detailed, well-structured paper with headings, subheadings, bullet points, and tables if necessary.\n\n""RESEARCH CORPUS:\n"f"{big_text}\n\n""Please produce the final research paper in Markdown:"
)
try:
# Use the new OpenAI API formatclient=OpenAI()
response=awaitasyncio.to_thread(
client.chat.completions.create,
model="o3-mini",
messages=[{"role": "user", "content": prompt}],
reasoning_effort="high",
max_completion_tokens=50000
)
final_markdown=response.choices[0].message.content.strip()
returnfinal_markdownexceptExceptionase:
logger.error(f"Error generating comprehensive markdown: {str(e)}")
return"Error generating research paper. Please try again."asyncdef_generate_final_output(self) ->Dict[str, Any]:
comprehensive_md=awaitself._generate_comprehensive_markdown()
ifself.research_dir:
pdf_path=self.research_dir/"research_paper.pdf"awaitself._convert_markdown_to_pdf(comprehensive_md, pdf_path)
return {
"paper": comprehensive_md,
"title": self.current_context.initial_question,
"sources": []
}
asyncdef_convert_markdown_to_pdf(self, markdown_text: str, out_path: Path):
importmarkdownfromweasyprintimportHTMLhtml_content=f""" <html> <head> <style> body {{ font-family: Arial, sans-serif; line-height: 1.6; max-width: 800px; margin: 0 auto; padding: 2em; }} h1 {{ color: #2c3e50; border-bottom: 2px solid #eee; }} h2 {{ color: #34495e; margin-top: 2em; }} h3 {{ color: #7f8c8d; }} code {{ background: #f8f9fa; padding: 0.2em 0.4em; border-radius: 3px; }} pre {{ background: #f8f9fa; padding: 1em; border-radius: 5px; overflow-x: auto; }} blockquote {{ border-left: 4px solid #ddd; margin: 0; padding-left: 1em; color: #666; }} table {{ border-collapse: collapse; width: 100%; margin: 1em 0; }} th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }} th {{ background-color: #f8f9fa; }} </style> </head> <body>{markdown.markdown(markdown_text, extensions=['fenced_code', 'tables'])} </body> </html> """HTML(string=html_content).write_pdf(str(out_path))
def_calculate_metrics(self, total_time: float) ->Dict[str, Any]:
metrics= {
"total_time": total_time,
"iterations": len(self.iterations),
"total_decisions": sum(len(it.decisions_made) foritinself.iterations),
"total_content": sum(len(it.content_added) foritinself.iterations),
"agent_metrics": self._get_agent_metrics()
}
metrics["iterations_data"] = [
{
"number": it.iteration_number,
"time": it.metrics["iteration_time"],
"decisions": len(it.decisions_made),
"content": len(it.content_added)
}
foritinself.iterations
]
returnmetricsdef_get_agent_metrics(self) ->Dict[str, Any]:
return {
"search": self.search_agent.get_metrics(),
"explore": self.explore_agent.get_metrics(),
"reason": self.reason_agent.get_metrics()
}
rat/research/orchestrator.py
"""Orchestrator for coordinating the multi-agent research workflow.Now fully asynchronous with parallel execution support."""importjsonimporttimeimportloggingfromtypingimportList, Dict, Any, Optional, Set, TuplefromdataclassesimportdataclassimportasynciofromrichimportprintasrprintfrompathlibimportPathimportmarkdownfromweasyprintimportHTMLimportopenaifromrat.research.agents.searchimportSearchAgentfromrat.research.agents.exploreimportExploreAgentfromrat.research.agents.reasonimportReasoningAgentfromrat.research.perplexity_clientimportPerplexityClientfromrat.research.firecrawl_clientimportFirecrawlClientfromrat.research.output_managerimportOutputManagerfromrat.research.agents.baseimportResearchDecision, DecisionTypefromrat.research.agents.contextimportResearchContext, ContentType, ContentItemlogger=logging.getLogger(__name__)
@dataclassclassResearchIteration:
iteration_number: intdecisions_made: List[ResearchDecision]
content_added: List[ContentItem]
metrics: Dict[str, Any]
timestamp: float=time.time()
@dataclassclassAgentTask:
""" Represents a single task to be executed by an agent. Includes the decision, agent reference, and any metadata needed for execution. """decision: ResearchDecisionagent: Anypriority: floatmetadata: Dict[str, Any]
asyncdefexecute(self) ->Tuple[ResearchDecision, Optional[Any]]:
"""Execute the task and return the result along with the original decision."""try:
result=awaitself.agent.execute_decision(self.decision)
returnself.decision, resultexceptExceptionase:
logger.error(f"Error executing task: {str(e)}")
returnself.decision, NoneclassAgentTaskManager:
""" Manages the parallel execution of agent tasks with rate limiting and error handling. """def__init__(self, max_concurrent_tasks: int=10):
self.max_concurrent_tasks=max_concurrent_tasksself.semaphore=asyncio.Semaphore(max_concurrent_tasks)
self.tasks: List[AgentTask] = []
defadd_task(self, task: AgentTask):
"""Add a task to the execution queue."""self.tasks.append(task)
asyncdef_execute_task_with_semaphore(self, task: AgentTask) ->Tuple[ResearchDecision, Optional[Any]]:
"""Execute a single task with semaphore-based concurrency control."""asyncwithself.semaphore:
returnawaittask.execute()
asyncdefexecute_all(self) ->List[Tuple[ResearchDecision, Optional[Any]]]:
"""Execute all tasks in parallel with controlled concurrency."""ifnotself.tasks:
return []
# Sort tasks by priorityself.tasks.sort(key=lambdax: x.priority, reverse=True)
# Create task coroutinescoroutines= [self._execute_task_with_semaphore(task) fortaskinself.tasks]
# Execute all tasks and gather resultsresults=awaitasyncio.gather(*coroutines, return_exceptions=True)
# Filter out exceptions and failed tasksvalid_results= []
forresultinresults:
ifisinstance(result, Exception):
logger.error(f"Task execution failed: {str(result)}")
elifresult[1] isnotNone: # Only include tasks with valid resultsvalid_results.append(result)
returnvalid_resultsclassResearchOrchestrator:
""" Coordinates the multi-agent research workflow. """def__init__(self, config: Optional[Dict[str, Any]] =None):
self.config=configor {}
self.perplexity=PerplexityClient()
self.firecrawl=FirecrawlClient()
self.search_agent=SearchAgent(
self.perplexity,
{
**(self.config.get("search_config") or {}),
"max_workers": self.config.get("max_parallel_searches", 10),
"rate_limit": self.config.get("search_rate_limit", 100)
}
)
self.explore_agent=ExploreAgent(
self.firecrawl,
{
**(self.config.get("explore_config") or {}),
"max_workers": self.config.get("max_parallel_explores", 10),
"rate_limit": self.config.get("explore_rate_limit", 50)
}
)
self.reason_agent=ReasoningAgent(
{
**(self.config.get("reason_config") or {}),
"max_workers": self.config.get("max_parallel_reason", 5),
"rate_limit": self.config.get("reason_rate_limit", 10),
"flash_fix_rate_limit": self.config.get("flash_fix_rate_limit", 10)
}
)
self.output_manager=OutputManager()
self.max_iterations=self.config.get("max_iterations", 5)
self.min_new_content=self.config.get("min_new_content", 1)
self.min_confidence=self.config.get("min_confidence", 0.7)
self.current_context: Optional[ResearchContext] =Noneself.iterations: List[ResearchIteration] = []
self.research_dir: Optional[Path] =Noneself.previous_searches=set()
asyncdefstart_research(self, question: str) ->Dict[str, Any]:
start_time=time.time()
self.research_dir=self.output_manager.create_research_dir(question)
self.current_context=ResearchContext(initial_question=question)
iteration=0whileiteration<self.max_iterations:
iteration+=1rprint(f"\n[bold cyan]Starting iteration {iteration}[/bold cyan]")
iteration_result=awaitself._run_iteration(iteration)
self.iterations.append(iteration_result)
ifself._should_terminate(iteration_result):
breakresults=awaitself._generate_final_output()
total_time=time.time() -start_timeresults["metrics"] =self._calculate_metrics(total_time)
ifself.research_dir:
self.output_manager.save_research_paper(self.research_dir, results)
returnresultsasyncdef_run_iteration(self, iteration_number: int) ->ResearchIteration:
""" Run a single research iteration with parallel execution of decisions. """iteration_start=time.time()
all_decisions= []
content_added= []
try:
# Get decisions from all agentsreason_decisions=awaitself.reason_agent.analyze(self.current_context)
# Check for termination decision firstifany(d.decision_type==DecisionType.TERMINATEfordinreason_decisions):
all_decisions.extend(reason_decisions)
returnself._create_iteration_result(
iteration_number,
all_decisions,
content_added,
time.time() -iteration_start
)
# Gather decisions from other agentssearch_decisions=awaitself.search_agent.analyze(self.current_context)
explore_decisions=awaitself.explore_agent.analyze(self.current_context)
all_decisions=reason_decisions+search_decisions+explore_decisions# Create task manager and add taskstask_manager=AgentTaskManager(
max_concurrent_tasks=self.config.get("max_concurrent_tasks", 10)
)
# Filter and prepare tasksfordecisioninsorted(all_decisions, key=lambdad: d.priority, reverse=True):
ifdecision.decision_type==DecisionType.TERMINATE:
continueagent=self._get_agent_for_decision(decision)
ifnotagent:
continue# Skip duplicate searchesifdecision.decision_type==DecisionType.SEARCH:
query_str=decision.context.get("query", "").strip()
ifnotquery_strorquery_strinself.previous_searches:
rprint(f"[yellow]Skipping duplicate search: '{query_str}'[/yellow]")
continueself.previous_searches.add(query_str)
# Create and add tasktask=AgentTask(
decision=decision,
agent=agent,
priority=decision.priority,
metadata={"iteration": iteration_number}
)
task_manager.add_task(task)
# Execute all tasks in parallelresults=awaittask_manager.execute_all()
# Process results and update contextfordecision, resultinresults:
ifresult:
content_item=self._create_content_item(decision, result, iteration_number)
self.current_context.add_content("main", content_item=content_item)
content_added.append(content_item)
exceptExceptionase:
rprint(f"[red]Iteration error: {str(e)}[/red]")
logger.error(f"Iteration error: {str(e)}", exc_info=True)
returnself._create_iteration_result(
iteration_number,
all_decisions,
content_added,
time.time() -iteration_start
)
def_create_iteration_result(
self,
iteration_number: int,
decisions: List[ResearchDecision],
content: List[ContentItem],
iteration_time: float
) ->ResearchIteration:
"""Helper method to create a ResearchIteration result."""metrics= {
"iteration_time": iteration_time,
"decisions_made": len(decisions),
"content_added": len(content),
"agent_metrics": self._get_agent_metrics()
}
returnResearchIteration(
iteration_number=iteration_number,
decisions_made=decisions,
content_added=content,
metrics=metrics
)
def_create_content_item(self, decision: ResearchDecision, result: Dict[str, Any], iteration_number: int) ->ContentItem:
ifdecision.decision_type==DecisionType.SEARCH:
content_str=result.get('content', '')
urls=result.get('urls', [])
token_count=self.current_context._estimate_tokens(content_str)
returnContentItem(
content_type=self._get_content_type(decision),
content=content_str,
metadata={"decision_type": decision.decision_type.value, "iteration": iteration_number, "urls": urls},
token_count=token_count,
priority=decision.priority
)
else:
content_str=resultifisinstance(result, str) elsejson.dumps(result)
token_count=self.current_context._estimate_tokens(content_str)
returnContentItem(
content_type=self._get_content_type(decision),
content=result,
metadata={"decision_type": decision.decision_type.value, "iteration": iteration_number},
token_count=token_count,
priority=decision.priority
)
def_should_terminate(self, iteration: ResearchIteration) ->bool:
terminate_decision=any(d.decision_type==DecisionType.TERMINATEfordiniteration.decisions_made)
ifterminate_decision:
rprint("[green]Terminating: ReasoningAgent indicated completion.[/green]")
returnTruetry:
reason_decisions=asyncio.run(self.reason_agent.analyze(self.current_context))
search_decisions=asyncio.run(self.search_agent.analyze(self.current_context))
explore_decisions=asyncio.run(self.explore_agent.analyze(self.current_context))
valid_decisions= [
dfordin (reason_decisions+search_decisions+explore_decisions)
if (d.decision_type!=DecisionType.SEARCHord.context.get("query", "").strip() notinself.previous_searches)
]
ifnotvalid_decisions:
rprint("[yellow]Terminating: No further valid decisions from any agent.[/yellow]")
returnTrueexceptExceptionase:
rprint(f"[red]Error checking for new decisions: {str(e)}[/red]")
returnFalsereturnFalsedef_get_agent_for_decision(self, decision: ResearchDecision) ->Optional[Any]:
agent_map= {
DecisionType.SEARCH: self.search_agent,
DecisionType.EXPLORE: self.explore_agent,
DecisionType.REASON: self.reason_agent
}
returnagent_map.get(decision.decision_type)
def_get_content_type(self, decision: ResearchDecision) ->ContentType:
type_map= {
DecisionType.SEARCH: ContentType.SEARCH_RESULT,
DecisionType.EXPLORE: ContentType.EXPLORED_CONTENT,
DecisionType.REASON: ContentType.ANALYSIS,
DecisionType.TERMINATE: ContentType.OTHER
}
returntype_map.get(decision.decision_type, ContentType.OTHER)
asyncdef_call_o3_mini_for_report(self, prompt: str) ->str:
try:
response=awaitopenai.ChatCompletion.acreate(
model="o3-mini",
messages=[{"role": "user", "content": prompt}],
reasoning_effort="high",
max_completion_tokens=self.reason_agent.max_output_tokens
)
text=response.choices[0].message.content.strip()
iftext.startswith("```"):
start_idx=text.find("\n") +1end_idx=text.rfind("```")
ifend_idx>start_idx:
text=text[start_idx:end_idx].strip()
else:
text=text.replace("```", "").strip()
returntextexceptExceptionase:
logger.error(f"Error in final paper LLM call: {e}")
return"## Error generating comprehensive paper"asyncdef_generate_comprehensive_paper_markdown(self, context: ResearchContext) ->str:
search_items=context.get_content("main", ContentType.SEARCH_RESULT)
explored_items=context.get_content("main", ContentType.EXPLORED_CONTENT)
analysis_items=context.get_content("main", ContentType.ANALYSIS)
combined_corpus= []
combined_corpus.append("### Final Consolidated Research\n")
combined_corpus.append("[SEARCH RESULTS]\n")
forsinsearch_items:
combined_corpus.append(str(s.content))
combined_corpus.append("\n[EXPLORED CONTENT]\n")
foreinexplored_items:
combined_corpus.append(str(e.content))
combined_corpus.append("\n[ANALYSIS TEXT]\n")
forainanalysis_items:
ifisinstance(a.content, dict):
combined_corpus.append(a.content.get("analysis", ""))
else:
combined_corpus.append(str(a.content))
big_text="\n\n".join(combined_corpus)
prompt= (
"You are an advanced AI that just completed a comprehensive multi-step research.\n""Now produce a SINGLE, richly detailed research paper in valid Markdown.\n""Incorporate all relevant facts, context, analysis, and insights from the text below.\n\n""Provide a thorough, well-structured breakdown:\n""- Large headings\n""- Subheadings\n""- Bullet points\n""- Tables if relevant\n""- Detailed comparisons and references\n\n""Return ONLY Markdown. RULE: ensure that all tables are valid Markdown tables. No extra JSON or placeholders.\n\n""RESEARCH CORPUS:\n"f"{big_text}\n\n""Please produce the final research paper in Markdown now:"
).strip()
final_markdown=awaitself._call_o3_mini_for_report(prompt)
returnfinal_markdownasyncdef_convert_markdown_to_pdf(self, markdown_text: str, out_path: Path):
importmarkdownhtml_content=f""" <html> <head> <style> body {{ font-family: Arial, sans-serif; line-height: 1.6; max-width: 800px; margin: 0 auto; padding: 2em; }} h1 {{ color: #2c3e50; border-bottom: 2px solid #eee; }} h2 {{ color: #34495e; margin-top: 2em; }} h3 {{ color: #7f8c8d; }} code {{ background: #f8f9fa; padding: 0.2em 0.4em; border-radius: 3px; }} pre {{ background: #f8f9fa; padding: 1em; border-radius: 5px; overflow-x: auto; }} blockquote {{ border-left: 4px solid #ddd; margin: 0; padding-left: 1em; color: #666; }} table {{ border-collapse: collapse; width: 100%; margin: 1em 0; }} th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }} th {{ background-color: #f8f9fa; }} </style> </head> <body>{markdown.markdown(markdown_text, extensions=['fenced_code', 'tables'])} </body> </html> """HTML(string=html_content).write_pdf(str(out_path))
asyncdef_generate_final_output(self) ->Dict[str, Any]:
comprehensive_md=awaitself._generate_comprehensive_paper_markdown(self.current_context)
ifself.research_dir:
pdf_path=self.research_dir/"research_paper.pdf"awaitself._convert_markdown_to_pdf(comprehensive_md, pdf_path)
return {
"paper": comprehensive_md,
"title": self.current_context.initial_question,
"sources": []
}
def_calculate_metrics(self, total_time: float) ->Dict[str, Any]:
metrics= {
"total_time": total_time,
"iterations": len(self.iterations),
"total_decisions": sum(len(it.decisions_made) foritinself.iterations),
"total_content": sum(len(it.content_added) foritinself.iterations),
"agent_metrics": self._get_agent_metrics()
}
metrics["iterations_data"] = [
{
"number": it.iteration_number,
"time": it.metrics["iteration_time"],
"decisions": len(it.decisions_made),
"content": len(it.content_added)
}
foritinself.iterations
]
returnmetricsdef_get_agent_metrics(self) ->Dict[str, Any]:
return {
"search": self.search_agent.get_metrics(),
"explore": self.explore_agent.get_metrics(),
"reason": self.reason_agent.get_metrics()
}
rat/research/output_manager.py
"""Output manager for research results.Handles saving research outputs, intermediate results, and performance metrics."""importosimportjsonfromdatetimeimportdatetimefrompathlibimportPathfromtypingimportDict, Any, ListimportshutilclassOutputManager:
""" Manages research outputs and metrics. Handles: - Creating research directories - Saving research papers - Tracking intermediate results - Recording performance metrics """def__init__(self):
"""Initialize the output manager."""self.base_dir=Path("research_outputs")
defcreate_research_dir(self, question: str) ->Path:
""" Create a directory for research outputs. Args: question: Research question Returns: Path to created directory """# Create timestamped directory nametimestamp=datetime.now().strftime("%Y%m%d_%H%M%S")
dir_name=f"{timestamp}_{self._sanitize_filename(question[:50])}"# Create directoryresearch_dir=self.base_dir/dir_nameresearch_dir.mkdir(parents=True, exist_ok=True)
# Save initial metadataself.save_metadata(research_dir, {
"question": question,
"started_at": timestamp,
"status": "in_progress"
})
returnresearch_dirdefsave_research_paper(self, research_dir: Path, paper: Dict[str, Any]):
""" Save the research paper and update metadata. Args: research_dir: Research output directory paper: Paper content and metadata """# Save paper contentpaper_path=research_dir/"research_paper.md"paper_path.write_text(paper["paper"])
# Save paper infoinfo_path=research_dir/"research_info.json"info= {
"title": paper["title"],
"sources": paper["sources"],
"metrics": paper.get("metrics", {})
}
info_path.write_text(json.dumps(info, indent=2))
# Update metadataself.save_metadata(research_dir, {
"status": "completed",
"completed_at": datetime.now().strftime("%Y%m%d_%H%M%S"),
"metrics": paper.get("metrics", {})
})
defsave_context_state(self, research_dir: Path, context_data: Dict[str, Any]):
""" Save intermediate context state. Args: research_dir: Research output directory context_data: Context state to save """# Create states directorystates_dir=research_dir/"states"states_dir.mkdir(exist_ok=True)
# Save state with timestamptimestamp=datetime.now().strftime("%Y%m%d_%H%M%S")
state_path=states_dir/f"context_state_{timestamp}.json"state_path.write_text(json.dumps(context_data, indent=2))
# Keep only last 5 states to save spaceself._cleanup_old_states(states_dir)
defsave_iteration_metrics(
self,
research_dir: Path,
iterations: List[Dict[str, Any]]
):
""" Save iteration performance metrics. Args: research_dir: Research output directory iterations: List of iteration metrics """metrics_path=research_dir/"iteration_metrics.json"metrics_path.write_text(json.dumps({
"iterations": iterations,
"summary": self._calculate_metrics_summary(iterations)
}, indent=2))
defsave_metadata(self, research_dir: Path, updates: Dict[str, Any]):
""" Update research session metadata. Args: research_dir: Research output directory updates: Metadata updates """metadata_path=research_dir/"metadata.json"# Load existing metadataifmetadata_path.exists():
current_metadata=json.loads(metadata_path.read_text())
else:
current_metadata= {}
# Update metadatacurrent_metadata.update(updates)
# Save updated metadatametadata_path.write_text(json.dumps(current_metadata, indent=2))
def_sanitize_filename(self, name: str) ->str:
""" Create a safe filename from text. Args: name: Text to convert to filename Returns: Safe filename """# Replace unsafe characterssafe_chars="-_"filename="".join(
cifc.isalnum() orcinsafe_charselse"_"forcinname
)
returnfilename.strip("_")
def_cleanup_old_states(self, states_dir: Path):
""" Keep only the most recent state files. Args: states_dir: Directory containing state files """state_files=sorted(
states_dir.glob("context_state_*.json"),
key=lambdap: p.stat().st_mtime,
reverse=True
)
# Remove old filesforfileinstate_files[5:]: # Keep 5 most recentfile.unlink()
def_calculate_metrics_summary(
self,
iterations: List[Dict[str, Any]]
) ->Dict[str, Any]:
""" Calculate summary metrics across iterations. Args: iterations: List of iteration metrics Returns: Summary metrics """ifnotiterations:
return {}
return {
"total_iterations": len(iterations),
"total_decisions": sum(it["decisions"] foritiniterations),
"total_new_content": sum(it["new_content"] foritiniterations),
"total_time": sum(it["time"] foritiniterations),
"avg_decisions_per_iteration": (
sum(it["decisions"] foritiniterations) /len(iterations)
),
"avg_new_content_per_iteration": (
sum(it["new_content"] foritiniterations) /len(iterations)
),
"avg_time_per_iteration": (
sum(it["time"] foritiniterations) /len(iterations)
)
}
rat/research/perplexity_client.py
"""Perplexity API client for web search functionality.Now uses async OpenAI API calls."""importosimportreimportjsonimportloggingimportopenaifromtypingimportList, Dict, Anyfromrichimportprintasrprintfromdotenvimportload_dotenvimportasynciofromopenaiimportOpenAIload_dotenv()
api_logger=logging.getLogger('api.perplexity')
classPerplexityClient:
def__init__(self):
self.api_key=os.getenv("PERPLEXITY_API_KEY")
self.client=openaiself.client.api_key=self.api_keyself.client.api_base="https://api.perplexity.ai"self.model="sonar-pro"self.system_message= (
"You are a research assistant helping to find accurate and up-to-date information. ""When providing information, always cite your sources in the format [Source: URL]. ""Focus on finding specific, factual information and avoid speculation."
)
asyncdefsearch(self, query: str) ->Dict[str, Any]:
""" Perform an asynchronous web search using the Perplexity API via requests. """api_logger.info(f"Perplexity API Request - Query: {query}")
payload= {
"model": "sonar-reasoning",
"messages": [
{"role": "system", "content": self.system_message},
{"role": "user", "content": query}
],
"temperature": 0.2,
"top_p": 0.9,
"search_domain_filter": ["perplexity.ai"],
"return_images": False,
"return_related_questions": False,
"search_recency_filter": "month",
"top_k": 0,
"stream": False,
"presence_penalty": 0,
"frequency_penalty": 1,
"response_format": None
}
headers= {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
url="https://api.perplexity.ai/chat/completions"try:
response=awaitasyncio.to_thread(
lambda: __import__('requests').post(url, json=payload, headers=headers)
)
ifresponse.status_code!=200:
api_logger.error(f"Perplexity API error: Error code: {response.status_code} - {response.text}")
return {
"content": "",
"urls": [],
"query": query,
"metadata": {}
}
data=response.json()
content=data["choices"][0]["message"]["content"]
urls=self._extract_urls(content)
api_logger.debug(f"Response data: {json.dumps({'content': content, 'urls': urls}, indent=2)}")
return {
"content": content,
"urls": urls,
"query": query,
"metadata": {
"model": "sonar",
"usage": data.get("usage", {})
}
}
exceptExceptionase:
api_logger.error(f"Perplexity API error: {str(e)}")
return {
"content": "",
"urls": [],
"query": query,
"metadata": {}
}
def_extract_urls(self, text: str) ->List[str]:
citation_pattern=r'$begin:math:display$Source: (https?://[^$end:math:display$]+)\]'citation_urls=re.findall(citation_pattern, text)
url_pattern=r'https?://\S+'raw_urls=re.findall(url_pattern, text)
all_urls=list(set(citation_urls+raw_urls))
returnall_urlsasyncdefvalidate_url(self, url: str) ->bool:
importrequestsfromurllib.parseimporturlparsetry:
parsed=urlparse(url)
ifnotall([parsed.scheme, parsed.netloc]):
returnFalsereturnawaitasyncio.to_thread(lambda: requests.head(url, timeout=5).status_code==200)
exceptException:
returnFalse
rat_agentic.py
"""Entry point for the multi-agent research system.Provides a command-line interface for conducting research using the agent-based approach."""importsysimportjsonfromrichimportprintasrprintfromrich.panelimportPanelfromrich.markdownimportMarkdownfromprompt_toolkitimportPromptSessionfromprompt_toolkit.stylesimportStylefromprompt_toolkit.completionimportWordCompleterfromtypingimportDict, Any, Optionalimportasyncioimportbuiltinsfromrat.research.managerimportResearchManager# Ensure that dynamic execution contexts have access to asyncio and rprintbuiltins.asyncio=asynciobuiltins.rprint=rprintdefcreate_default_config() ->Dict[str, Any]:
""" Create default configuration for the research system. Returns: Default configuration dictionary """return {
"max_iterations": 5,
"min_new_content": 2,
"min_confidence": 0.7,
"search_config": {
"max_queries": 5,
"min_priority": 0.3,
"refinement_threshold": 0.7,
"rate_limit": 20# Updated: Perplexity rate limit set to 20 requests per minute
},
"explore_config": {
"max_urls": 10,
"min_priority": 0.3,
"allowed_domains": []
},
"reason_config": {
"max_parallel_tasks": 3,
"chunk_size": 30000,
"min_priority": 0.3,
"rate_limit": 200, # Updated: O3 mini rate limit set to 200 requests per minute"flash_fix_rate_limit": 10
},
"execute_config": {
"model": "claude-3-5-sonnet-20241022",
"min_priority": 0.3,
"max_retries": 2
},
"max_workers": 20
}
defdisplay_results(results: Dict[str, Any]):
""" Display research results in a formatted way. Args: results: Research results to display """if"error"inresults:
rprint(f"\n[red]Error during research: {results['error']}[/red]")
returnrprint(Panel(
Markdown(results["paper"]),
title="[bold green]Research Paper[/bold green]",
border_style="green"
))
metrics=results.get("metrics", {})
rprint("\n[bold cyan]Research Metrics:[/bold cyan]")
rprint(f"Total time: {metrics.get('total_time', 0):.2f} seconds")
rprint(f"Iterations: {metrics.get('iterations', 0)}")
rprint(f"Total decisions: {metrics.get('total_decisions', 0)}")
rprint(f"Total content items: {metrics.get('total_content', 0)}")
agent_metrics=metrics.get("agent_metrics", {})
foragent_name, agent_datainagent_metrics.items():
rprint(f"\n[bold]{agent_name.title()} Agent:[/bold]")
rprint(f"Decisions made: {agent_data.get('decisions_made', 0)}")
rprint(f"Successful executions: {agent_data.get('successful_executions', 0)}")
rprint(f"Failed executions: {agent_data.get('failed_executions', 0)}")
rprint(
f"Average execution time: "f"{agent_data.get('total_execution_time', 0) /max(agent_data.get('decisions_made', 1), 1):.2f}s"
)
asyncdefmain_async():
"""Main entry point for the research system."""style=Style.from_dict({
'prompt': 'orange bold',
})
session=PromptSession(style=style)
manager=ResearchManager(create_default_config())
rprint(Panel.fit(
"[bold cyan]RAT Multi-Agent Research System[/bold cyan]\n""Conduct research using a coordinated team of specialized AI agents",
title="[bold cyan]🧠 RAT Research[/bold cyan]",
border_style="cyan"
))
rprint("[yellow]Commands:[/yellow]")
rprint(" • Type [bold red]'quit'[/bold red] to exit")
rprint(" • Type [bold magenta]'config'[/bold magenta] to view current configuration")
rprint(" • Type [bold magenta]'metrics'[/bold magenta] to view latest metrics")
rprint(" • Enter your research question to begin\n")
latest_results: Optional[Dict[str, Any]] =Nonecommands=WordCompleter(['quit', 'config', 'metrics'])
whileTrue:
try:
user_input=awaitsession.prompt_async("\nResearch Question: ", completer=commands)
user_input=user_input.strip()
ifuser_input.lower() =='quit':
rprint("\nGoodbye! 👋")
breakelifuser_input.lower() =='config':
rprint(Panel(
Markdown(f"```json\n{json.dumps(manager.config, indent=2)}\n```"),
title="[bold cyan]Current Configuration[/bold cyan]",
border_style="cyan"
))
continueelifuser_input.lower() =='metrics':
iflatest_results:
display_results(latest_results)
else:
rprint("[yellow]No research has been conducted yet[/yellow]")
continuerprint(f"\n[bold cyan]Starting research on:[/bold cyan] {user_input}")
latest_results=awaitmanager.start_research(user_input)
display_results(latest_results)
exceptKeyboardInterrupt:
continueexceptEOFError:
breakexceptExceptionase:
rprint(f"[red]Error: {str(e)}[/red]")
defmain():
"""Run the async main function."""asyncio.run(main_async())
if__name__=="__main__":
main()
setup.py
fromsetuptoolsimportsetup, find_packagessetup(
name="rat",
version="0.1.0",
packages=find_packages(),
install_requires=[
"openai",
"python-dotenv",
"rich",
"prompt_toolkit",
"requests",
],
entry_points={
'console_scripts': [
'rat-research=rat.rat_research:main',
],
},
author="Skirano",
description="Retrieval Augmented Thinking - Enhanced AI responses through structured reasoning",
long_description=open("README.md").read(),
long_description_content_type="text/markdown",
url="https://github.com/Doriandarko/RAT-retrieval-augmented-thinking",
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
python_requires=">=3.7",
)
test_agent.py
"""Test script to run the research agent with a sample question.Now asynchronous."""importasynciofromrat.research.orchestratorimportResearchOrchestratorfromrichimportprintasrprintasyncdefmain():
orchestrator=ResearchOrchestrator()
question="What are the main features and pricing of Billit's accounting software, and how does it compare to competitors in Belgium?"rprint(f"[bold cyan]Starting research on: {question}[/bold cyan]")
results=awaitorchestrator.start_research(question)
if"error"inresults:
rprint(f"[red]Error: {results['error']}[/red]")
else:
rprint("\n[bold green]Research completed![/bold green]")
rprint("\n[bold]Results:[/bold]")
print(results["paper"])
rprint("\n[bold]Sources:[/bold]")
forsourceinresults.get("sources", []):
print(f"- {source}")
rprint("\n[bold]Metrics:[/bold]")
metrics=results.get("metrics", {})
print(f"Total time: {metrics.get('total_time', 0):.2f} seconds")
print(f"Iterations: {metrics.get('iterations', 0)}")
print(f"Total decisions: {metrics.get('total_decisions', 0)}")
print(f"Total content items: {metrics.get('total_content', 0)}")
if__name__=="__main__":
asyncio.run(main())