-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagent.py
More file actions
73 lines (56 loc) · 2.04 KB
/
agent.py
File metadata and controls
73 lines (56 loc) · 2.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import slack
import os
from pathlib import Path
from dotenv import load_dotenv
from flask import Flask
from slackeventsapi import SlackEventAdapter
from collections import deque
from config.graph import init_graph
from config.llm import init_llm
env_path = Path(".") / ".env"
load_dotenv(dotenv_path=env_path)
app = Flask(__name__)
init_llm()
graph = init_graph()
slack_events_adapter = SlackEventAdapter(
os.environ["SLACK_SIGNING_SECRET"], "/slack/events", app
)
client = slack.WebClient(token=os.environ["SLACK_BOT_TOKEN"])
BOT_ID = client.api_call("auth.test")["user_id"]
# Event deduplication: store processed event IDs (automatically removes oldest when full)
processed_events = deque(maxlen=100)
@slack_events_adapter.on("app_mention")
def handle_message(payload):
# Extract event details
event = payload.get("event", {})
event_id = payload.get("event_id")
# Check for duplicate events
if event_id in processed_events:
print(f"Duplicate event {event_id} ignored")
return
# Mark event as processed (deque automatically removes oldest when maxlen is reached)
processed_events.append(event_id)
channel_id = event.get("channel")
user_id = event.get("user")
text = event.get("text")
message_ts = event.get("ts")
thread_ts = event.get("thread_ts", message_ts) # Use existing thread or start new one
if user_id == BOT_ID:
return
username = None
# Get user information from Slack API
try:
user_info = client.users_info(user=user_id)
username = user_info['user'].get('name')
except slack.errors.SlackApiError as e:
print(f"Error fetching user info: {e}")
username = "Unknown User"
response = graph.invoke({"messages": [f"You are a helpful assistant. The user with username {username} said: {text}"]})
# Reply in thread
client.chat_postMessage(
channel=channel_id,
text=response["messages"][-1].content,
thread_ts=thread_ts
)
if __name__ == "__main__":
app.run(debug=True)