-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathmain_async.py
More file actions
197 lines (164 loc) · 5.54 KB
/
main_async.py
File metadata and controls
197 lines (164 loc) · 5.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, HttpUrl
import subprocess
import tempfile
import json
import time
import redis
import threading
from typing import Any
from pathlib import Path
ANALYSIS_RESULT_TTL = 3600
ANALYSIS_INPROGRESS_TTL = 600
app = FastAPI()
app.mount("/assets", StaticFiles(directory="static"), name="assets")
# Redis 연결
redis_client = redis.Redis(
host="localhost",
port=6379,
db=0,
decode_responses=True,
)
@app.get("/")
def read_index():
return FileResponse("templates/main_async.html")
class AnalysisRequest(BaseModel):
github_url: HttpUrl
def run_command(cmd: list[str], cwd: str | None = None) -> subprocess.CompletedProcess:
start_time = time.perf_counter()
print(str(cmd))
try:
result = subprocess.run(
cmd,
cwd=cwd,
capture_output=True,
text=True,
check=True,
)
duration = (time.perf_counter() - start_time)
print(f"Duration: %.1f s" % duration)
return result
except subprocess.CalledProcessError as e:
raise HTTPException(
status_code=500,
detail={
"message": "명령 실행 중 오류가 발생했습니다.",
"command": cmd,
"stdout": e.stdout,
"stderr": e.stderr,
},
) from e
except FileNotFoundError as e:
raise HTTPException(
status_code=500,
detail=f"필수 실행 파일을 찾을 수 없습니다: {cmd[0]}",
) from e
def extract_top_languages(cloc_json: dict[str, Any], top_n: int = 5) -> list[dict[str, Any]]:
"""
cloc 결과에서 언어별 code 라인 수 기준 상위 N개 추출
"""
ignored_keys = {"header", "SUM"}
languages = []
for key, value in cloc_json.items():
if key in ignored_keys:
continue
if not isinstance(value, dict):
continue
code_count = value.get("code", 0)
file_count = value.get("nFiles", 0)
languages.append({
"language": key,
"lines": code_count,
"files": file_count,
})
languages.sort(key=lambda x: x["lines"], reverse=True)
return languages[:top_n]
def run_analysis_job(owner: str, repo_name: str, github_url: str, redis_key: str, progress_key: str):
try:
with tempfile.TemporaryDirectory() as tmp_dir:
repo_dir = Path(tmp_dir) / "repo"
# git clone
run_command([
"git",
"clone",
"--depth",
"1",
github_url,
str(repo_dir),
])
if not repo_dir.exists():
raise RuntimeError("저장소 클론에 실패했습니다.")
# cloc 실행
cloc_result = run_command([
"cloc",
"--json",
str(repo_dir),
])
try:
cloc_json = json.loads(cloc_result.stdout)
except json.JSONDecodeError as e:
raise RuntimeError("cloc 결과 파싱 실패") from e
top_languages = extract_top_languages(cloc_json, top_n=5)
result_data = {
"status": "completed",
"github_url": github_url,
"top_languages": top_languages,
}
# 결과 저장
redis_client.setex(
redis_key,
ANALYSIS_RESULT_TTL,
json.dumps(result_data, ensure_ascii=False),
)
except Exception as e:
# 실패 상태도 잠깐 저장해두면 프론트에서 알 수 있음
error_data = {
"status": "failed",
"github_url": github_url,
"detail": str(e),
}
redis_client.setex(
redis_key,
300, # 실패 결과는 짧게 캐시
json.dumps(error_data, ensure_ascii=False),
)
finally:
# 진행중 키 제거
redis_client.delete(progress_key)
@app.get("/analysis/{owner}/{repo_name:path}")
def analyze_repository_get(owner: str, repo_name: str):
if not owner.strip() or not repo_name.strip():
raise HTTPException(status_code=400, detail="owner 또는 repo_name 이 비어 있습니다.")
repo_name = repo_name.removesuffix(".git")
github_url = f"https://github.com/{owner}/{repo_name}.git"
redis_key = f"analysis:{owner}/{repo_name}"
progress_key = f"analysis:progress:{owner}/{repo_name}"
# 1) 결과 캐시 확인
cached_data = redis_client.get(redis_key)
if cached_data:
return json.loads(cached_data)
# 2) 진행중 여부 확인
if redis_client.exists(progress_key):
return {
"status": "processing",
"github_url": github_url,
"detail": "분석이 진행중입니다.",
}
# 3) 진행중 락 생성 (중복 thread 방지)
# setnx: 키가 없을 때만 생성
is_new_job = redis_client.set(progress_key, "1", ex=ANALYSIS_INPROGRESS_TTL, nx=True)
if is_new_job:
thread = threading.Thread(
target=run_analysis_job,
args=(owner, repo_name, github_url, redis_key, progress_key),
daemon=True,
)
thread.start()
# 4) 즉시 진행중 응답
return {
"status": "processing",
"github_url": github_url,
"detail": "분석이 시작되었습니다.",
}