-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAdditional Utility Files.txt
More file actions
230 lines (196 loc) · 8.42 KB
/
Additional Utility Files.txt
File metadata and controls
230 lines (196 loc) · 8.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# utils/conversation_manager.py
"""
Conversation Manager for maintaining context across translation sessions
"""
import json
import time
from typing import Dict, List, Optional
import structlog
# Catch specific exceptions instead of a broad 'except' block.
try:
# Attempt to import redis for type hinting, though the class handles a None client
import redis
except ImportError:
redis = None
logger = structlog.get_logger()
class ConversationManager:
"""
Manages conversation history, using Redis for persistence with an in-memory fallback.
"""
def __init__(self, redis_client=None, max_history=10):
self.redis_client = redis_client
self.max_history = max_history
self.memory_store = {} # Fallback when Redis unavailable
def add_exchange(self, session_id: str, user_text: str, translation: str):
"""Add a new exchange to conversation history"""
exchange = {
'user_text': user_text,
'translation': translation,
'timestamp': time.time()
}
if self.redis_client:
try:
key = f"conversation:{session_id}"
history = self._get_history(session_id)
history.append(exchange)
# Keep only recent exchanges
if len(history) > self.max_history:
history = history[-self.max_history:]
# Use setex to set a key with an expiration time
self.redis_client.setex(key, 3600, json.dumps(history))
except (redis.exceptions.ConnectionError, json.JSONDecodeError) as e:
# Log specific Redis or JSON errors for better debugging
logger.warning(f"Redis operation failed, falling back to memory store: {e}")
self._add_to_memory_store(session_id, exchange)
except Exception as e:
# Catch any other unexpected errors
logger.error(f"Unexpected error in Redis conversation storage: {e}")
self._add_to_memory_store(session_id, exchange)
else:
# Fallback to memory
self._add_to_memory_store(session_id, exchange)
def _add_to_memory_store(self, session_id, exchange):
"""Helper to manage the in-memory store"""
if session_id not in self.memory_store:
self.memory_store[session_id] = []
self.memory_store[session_id].append(exchange)
if len(self.memory_store[session_id]) > self.max_history:
self.memory_store[session_id] = self.memory_store[session_id][-self.max_history:]
def get_context(self, session_id: str) -> str:
"""Get conversation context for translation"""
try:
history = self._get_history(session_id)
if not history:
return ""
# Build context from recent exchanges (last 3 is a good number)
context_parts = []
for exchange in history[-3:]:
context_parts.append(f"Previous: {exchange['user_text']} → {exchange['translation']}")
return " | ".join(context_parts)
except Exception as e:
# Keep a generic catch-all for `get_context` to be safe, but more specific logs are better in `_get_history`
logger.warning(f"Failed to get context: {e}")
return ""
def _get_history(self, session_id: str) -> List[Dict]:
"""Get conversation history for session"""
try:
if self.redis_client:
key = f"conversation:{session_id}"
data = self.redis_client.get(key)
# Decode JSON only if data exists to avoid errors on empty keys
return json.loads(data) if data else []
else:
return self.memory_store.get(session_id, [])
except (redis.exceptions.ConnectionError, json.JSONDecodeError):
# Return an empty list on specific errors to handle gracefully
return []
except Exception as e:
# Catch other unexpected errors
logger.error(f"Unexpected error retrieving history from Redis: {e}")
return []
# utils/rate_limiter.py
"""
Rate limiter for API protection
"""
import time
from typing import Dict, List
import structlog
# Catch specific exceptions instead of a broad 'except' block.
try:
import redis
except ImportError:
redis = None
logger = structlog.get_logger()
class RateLimiter:
"""
Implements a sliding window rate limiter using Redis sorted sets.
"""
def __init__(self, redis_client=None, limit=100, window=60):
self.redis_client = redis_client
self.limit = limit
self.window = window
self.memory_store = {} # Fallback
def is_allowed(self, client_ip: str) -> bool:
"""Check if request is within rate limit"""
try:
current_time = int(time.time())
if self.redis_client:
key = f"rate_limit:{client_ip}"
# Use a pipeline for atomicity and efficiency
pipe = self.redis_client.pipeline()
# Remove timestamps older than the window
pipe.zremrangebyscore(key, 0, current_time - self.window)
# Get the current count of requests
pipe.zcard(key)
# Add the new request timestamp
# Note: ZADD takes a dict of {member: score}
pipe.zadd(key, {str(current_time): current_time})
# Set or refresh the key's expiration time
pipe.expire(key, self.window)
results = pipe.execute()
request_count = results[1]
return request_count < self.limit
else:
# Memory fallback
if client_ip not in self.memory_store:
self.memory_store[client_ip] = []
# Clean old requests
cutoff = current_time - self.window
self.memory_store[client_ip] = [
req_time for req_time in self.memory_store[client_ip]
if req_time > cutoff
]
# Check limit
if len(self.memory_store[client_ip]) >= self.limit:
return False
self.memory_store[client_ip].append(current_time)
return True
except (redis.exceptions.ConnectionError, redis.exceptions.RedisError) as e:
# Catch specific Redis errors
logger.warning(f"Redis rate limiter error, allowing request: {e}")
return True # Fail-open policy on error
except Exception as e:
# Catch any other unexpected errors
logger.error(f"Unexpected error in rate limiter: {e}")
return True # Fail-open policy
# config/settings.py
"""
Configuration management
"""
import os
from typing import Dict
import structlog
# Ensure that the logger is configured
structlog.configure(
processors=[
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
]
)
logger = structlog.get_logger()
class Config:
"""
Configuration class to load settings from environment variables with defaults.
"""
def __init__(self):
# Use a more robust check for boolean values from environment variables
self.debug = os.getenv('FLASK_DEBUG', 'false').lower() in ('true', '1', 't')
self.secret_key = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production')
self.redis_host = os.getenv('REDIS_HOST', 'localhost')
# Use a try-except block to handle potential conversion errors
try:
self.redis_port = int(os.getenv('REDIS_PORT', 6379))
except ValueError:
logger.error("Invalid value for REDIS_PORT. Defaulting to 6379.")
self.redis_port = 6379
self.model_cache_dir = os.getenv('MODEL_CACHE_DIR', './models')
self.log_level = os.getenv('LOG_LEVEL', 'INFO')
def get_flask_config(self) -> Dict:
"""Returns a dictionary of settings suitable for Flask configuration"""
return {
'DEBUG': self.debug,
'SECRET_KEY': self.secret_key,
'JSON_SORT_KEYS': False
}