-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathllmAgent.py
More file actions
68 lines (57 loc) · 2.5 KB
/
llmAgent.py
File metadata and controls
68 lines (57 loc) · 2.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from agents import Agent, set_tracing_disabled, handoff
from agents.extensions.models.litellm_model import LitellmModel
from openai import AsyncOpenAI
from agents.models.openai_chatcompletions import OpenAIChatCompletionsModel
import os
from prompt import career_assistant_prompt, extraction_agent_prompt
from tools import search_knowledge_base, search_indicators_by_report, search_by_victim, get_file_content, get_reportsID_by_technique, get_reports_by_reportID
from typing import List, Optional
from pydantic import BaseModel, Field
set_tracing_disabled(True)
# --- 1. Define the "Digital Form" the LLM must fill ---
class Indicator(BaseModel):
value: str = Field(..., description="The IoC value (e.g., 185.199.110.153)")
type: str = Field(..., description="Type of IoC: IP, Domain, Hash, URL")
class TTP(BaseModel):
technique_id: str = Field(..., description="MITRE ATT&CK ID (e.g., T1059)")
name: str = Field(..., description="Technique name (e.g., PowerShell)")
class ReportExtraction(BaseModel):
summary: str = Field(..., description="Brief summary of the incident")
severity: str = Field(..., description="High, Medium, or Low")
victim_sector: str = Field(..., description="e.g. Finance, Healthcare")
timeline_start: Optional[str] = Field(description="ISO timestamp of start")
timeline_end: Optional[str] = Field(description="ISO timestamp of end")
iocs: List[Indicator]
ttps: List[TTP]
def log_analyses_handoff(context):
yield "```Delegating Extraction to Extraction Agent```\n"
custom_client = AsyncOpenAI(
base_url=os.environ.get("LMAAS_URL"),
api_key=os.environ.get("LMAAS_KEY"),
)
custom_model = OpenAIChatCompletionsModel(
openai_client=custom_client,
model=os.environ.get("LMAAS_MODEL"),
)
custom_client_ollama = AsyncOpenAI(
base_url="http://localhost:11434/v1",
api_key="ollama", # required but unused
)
custom_model_ollama = OpenAIChatCompletionsModel(
openai_client=custom_client,
model="mistral-nemo",
)
extraction_assistant = Agent(
name= "Extraction Agent",
instructions= extraction_agent_prompt,
handoff_description="Handles extraction of the uploaded text files for threats and provides recommendations",
model = custom_model,
output_type=ReportExtraction,
)
career_assistant = Agent(
name= "Gaurav",
instructions= career_assistant_prompt,
handoffs=[],
model = custom_model,
tools = [search_indicators_by_report, search_by_victim, get_file_content, get_reportsID_by_technique, get_reports_by_reportID]
)