Skip to content

Feature/recruiter ai service#3

Open
sai-sushma-maddali wants to merge 4 commits into
mainfrom
feature/recruiter-ai-service
Open

Feature/recruiter ai service#3
sai-sushma-maddali wants to merge 4 commits into
mainfrom
feature/recruiter-ai-service

Conversation

@sai-sushma-maddali
Copy link
Copy Markdown
Collaborator

@sai-sushma-maddali sai-sushma-maddali commented Apr 27, 2026

Enhanced recruiter AI service to support batch data processing and modular skill-based services.

Key Changes

  • Introduced batch processing capability in AI service

  • Added new agent modules for:

    • Interview question generation
    • Ranking explanation
    • Implemented multiple skill services:
      * Resume parsing
      * Candidate matching
      * Outreach drafting
      * Interview questions
      * Ranking explanation
      * Added metrics tracking for evaluation
      * Included end-to-end smoke testing tools
      Notes

This improves scalability and modularity of the recruiter AI pipeline.

Summary by Sourcery

Introduce an AI recruiter microservice that orchestrates multi-step hiring workflows over Kafka with batch candidate support, persistence, and real-time updates.

New Features:

  • Add FastAPI-based AI agent service with REST and WebSocket endpoints for submitting hiring workflows, tracking status, and streaming results.
  • Support batch candidate processing for a job, including per-candidate approvals and aggregate trace status.
  • Introduce modular skill microservices for resume parsing, job-candidate matching, outreach drafting, ranking explanation, and interview question generation.
  • Add Kafka producer/consumer integration using a standardized event envelope and idempotency handling for AI requests and approval events.
  • Persist traces, steps, events, approvals, and metrics in MongoDB with query helpers and index management.
  • Add Redis-backed status caching and idempotency keys for consumer-side deduplication.
  • Expose metrics APIs for approval rates and match quality summaries to support evaluation and reporting.

Enhancements:

  • Implement a hiring workflow supervisor that orchestrates skill calls per candidate, ranks candidates, and surfaces summary statistics and human-review state.
  • Provide ranking result shaping and slimming utilities to return UI-friendly views while keeping full history in persistence.
  • Add LLM-based utilities for grounded resume parsing, outreach message drafting, interview question generation, and ranking explanations with provider-agnostic configuration.

Build:

  • Add Dockerfile for the AI service and docker-compose stack wiring Kafka, MongoDB, Redis, the main service, and all skill microservices.

Documentation:

  • Add README documenting the AI Agent Service architecture, tech stack, workflow, endpoints, Kafka topics, environment configuration, and local run instructions.

Tests:

  • Add in-container end-to-end smoke test tool covering happy path, idempotent approvals, low-match behavior, metrics endpoints, and REST validations.
  • Add host-level extended e2e tool to exercise failure handling when a skill service (matcher) is unavailable.

Chores:

  • Add project configuration and tooling files including requirements.txt, Ruff configuration, and initial model/schema placeholders for Kafka events and tasks.

Copilot AI review requested due to automatic review settings April 27, 2026 23:56
@sourcery-ai
Copy link
Copy Markdown

sourcery-ai Bot commented Apr 27, 2026

Reviewer's Guide

Adds a new standalone AI recruiter microservice that orchestrates a Kafka-driven, skill-based hiring workflow with batch candidate processing, trace persistence, metrics, and e2e tooling.

Sequence diagram for batch hiring workflow orchestration

sequenceDiagram
    actor Recruiter
    participant UI
    participant AgentAPI as AgentService_API
    participant Kafka as Kafka_AI_Topics
    participant Consumer as Kafka_Consumer
    participant Supervisor as HiringSupervisor
    participant ResumeSvc as ResumeParser_Service
    participant MatcherSvc as Matcher_Service
    participant OutreachSvc as OutreachDrafter_Service
    participant Mongo as MongoDB
    participant Redis as Redis
    participant WS as WebSocketHub

    Recruiter->>UI: Configure job + candidates
    UI->>AgentAPI: POST /agent/request (AgentRequest)
    AgentAPI->>Mongo: upsert_trace(status=queued)
    AgentAPI->>Redis: set_status(queued)
    AgentAPI->>Kafka: publish_event(topic=ai.requests, event_type=ai.requested)
    AgentAPI-->>UI: {trace_id, status=queued}

    loop consume ai.requested
        Consumer->>Kafka: poll ai.requests
        Kafka-->>Consumer: event ai.requested
        Consumer->>Redis: claim_idempotency
        alt first_delivery
            Consumer->>Mongo: upsert_trace(status=queued)
            Consumer->>Redis: set_status(queued)
            Consumer->>Supervisor: run_hiring_workflow(task, trace_id)

            loop for_each_candidate
                Supervisor->>Redis: set_status(in_progress, current_step=resume_parsed)
                Supervisor->>ResumeSvc: POST /run (resume_text)
                ResumeSvc-->>Supervisor: parsed_resume
                Supervisor->>Mongo: add_step(step=resume_parsed)
                Supervisor->>WS: push_update(trace_id, progress)

                Supervisor->>Redis: set_status(in_progress, current_step=match_scored)
                Supervisor->>MatcherSvc: POST /run (job, parsed_resume)
                MatcherSvc-->>Supervisor: match
                Supervisor->>Mongo: add_step(step=match_scored)
                Supervisor->>WS: push_update(trace_id, progress)

                Supervisor->>Redis: set_status(in_progress, current_step=outreach_drafted)
                Supervisor->>OutreachSvc: POST /run (job, parsed_resume, match)
                OutreachSvc-->>Supervisor: outreach
                Supervisor->>Mongo: add_step(step=outreach_drafted)
                Supervisor->>WS: push_update(trace_id, progress)
            end

            Supervisor->>Mongo: add_step(step=candidates_ranked, data=stats+ranked)
            Supervisor->>Redis: set_status(awaiting_approval, requires_human_review=true)
            Supervisor-->>Consumer: results (ranked_candidates, stats)

            Consumer->>Kafka: publish_event(topic=ai.results, event_type=ai.completed)
            Consumer->>WS: push_update(trace_id, results)
        else duplicate_delivery
            Consumer-->>Consumer: skip (idempotency)
        end
    end
Loading

Sequence diagram for approval handling and batch status resolution

sequenceDiagram
    actor Recruiter
    participant UI
    participant AgentAPI as AgentService_API
    participant Kafka as Kafka_AI_Topics
    participant Consumer as Kafka_Consumer
    participant Mongo as MongoDB
    participant Redis as Redis
    participant WS as WebSocketHub

    Recruiter->>UI: Review ranked candidates
    UI->>AgentAPI: POST /agent/approve/{trace_id} (ApprovalRequest)
    AgentAPI->>Kafka: publish_event(topic=ai.results, event_type=ai.approval.recorded)
    AgentAPI-->>UI: {status=recorded}

    loop consume ai.approval.recorded
        Consumer->>Kafka: poll ai.results
        Kafka-->>Consumer: event ai.approval.recorded
        Consumer->>Redis: claim_idempotency(idempotency_key)
        alt first_delivery
            Consumer->>Mongo: add_approval(trace_id, action, candidate_id)
            Consumer->>Mongo: get_ranked_candidate_count(trace_id)
            Consumer->>Mongo: count_distinct_candidate_approvals(trace_id)

            alt batch_incomplete
                Consumer->>Mongo: upsert_trace(status=awaiting_approval)
                Consumer->>Redis: set_status(awaiting_approval, progress)
                Consumer->>Mongo: add_step(step=candidate_approval, status=recorded)
            else batch_complete
                Consumer->>Mongo: _latest_approval_actions_by_candidate
                Consumer-->>Consumer: derive_final_status(approved|edited|rejected)
                Consumer->>Mongo: upsert_trace(status=final_status)
                Consumer->>Redis: set_status(final_status, requires_human_review=false)
                Consumer->>Mongo: add_step(step=batch_approval_completed, status=final_status)
            end

            Consumer->>Mongo: get_latest_result(trace_id)
            Consumer->>Redis: get_status(trace_id)
            Consumer->>WS: push_update(trace_id, latest_trace+steps+status)
        else duplicate_delivery
            Consumer-->>Consumer: skip (no-op, idempotent)
        end
    end
Loading

Class diagram for core configuration, API models, and Kafka envelope

classDiagram
    class Settings {
        +str kafka_bootstrap
        +str mongo_uri
        +str redis_url
        +str groq_api_key
        +str groq_base_url
        +str groq_model
        +str openrouter_api_key
        +str openrouter_base_url
        +str openrouter_model
        +str resume_parser_url
        +str matcher_url
        +str ranking_explainer_url
        +str interview_questions_url
        +str outreach_drafter_url
    }

    class AgentRequest {
        +str actor_id
        +dict job
        +str resume_text
        +list~dict~ candidates
    }

    class ApprovalRequest {
        +str action
        +str edited_draft
        +str candidate_id
    }

    class KafkaEnvelope {
        +str event_type
        +str trace_id
        +str timestamp
        +str actor_id
        +dict entity
        +dict payload
        +str idempotency_key
        +model_post_init(_)
    }

    class ResumeParseRequest {
        +str resume_text
    }

    class MatcherRequest {
        +dict job
        +dict candidate
    }

    class OutreachRequest {
        +dict job
        +dict candidate
        +dict match
    }

    class ExplainRequest {
        +dict job
        +dict candidate
        +dict match
    }

    class InterviewQuestionsRequest {
        +dict job
        +dict candidate
        +dict match
    }

    Settings <.. AgentRequest : uses_for_urls
    AgentRequest --> KafkaEnvelope : wrapped_into_event
    ApprovalRequest --> KafkaEnvelope : wrapped_into_event

    ResumeParseRequest --> ResumeParser_Service : request_body
    MatcherRequest --> Matcher_Service : request_body
    OutreachRequest --> Outreach_Drafter_Service : request_body
    ExplainRequest --> Ranking_Explainer_Service : request_body
    InterviewQuestionsRequest --> Interview_Questions_Service : request_body
Loading

File-Level Changes

Change Details Files
Introduce FastAPI-based AI agent service with Kafka integration and WebSocket updates for orchestrating hiring workflows.
  • Create FastAPI app entrypoint wiring REST, WebSocket, and Kafka consumer lifecycle management.
  • Add WebSocket router to stream JSON updates per trace_id and manage in-memory connection registry.
  • Define API routes for submitting AI requests, recording approvals, fetching status/results, and exposing metrics summaries.
  • Implement Kafka producer helper that enriches events, ensures idempotency/timestamps, and persists envelopes to Mongo before publishing.
ai-service/app/main.py
ai-service/app/api/routes.py
ai-service/app/api/websocket.py
ai-service/app/kafka/consumer.py
ai-service/app/kafka/producer.py
ai-service/app/models/events.py
ai-service/app/kafka/schemas.py
Persist AI workflow traces, steps, events, approvals, and metrics in MongoDB with helper query utilities.
  • Add Mongo connection helper, index initialization, and CRUD helpers for traces, steps, approvals, and raw events.
  • Provide utility queries to derive ranked candidate counts, distinct approvals, slimmed step/result views, and latest workflow snapshot.
  • Implement metrics recording for match quality and approval actions plus aggregate summary queries for reporting APIs.
ai-service/app/db/mongo.py
ai-service/app/metrics.py
Implement Redis-backed task status cache and consumer idempotency guard.
  • Create Redis client wrapper with helpers to set/get per-trace status payloads.
  • Add idempotency key tracking with TTL to safely skip duplicate Kafka messages in the consumer.
ai-service/app/db/redis_client.py
Add modular skill services (resume parsing, matching, outreach drafting, ranking explanation, interview questions) driven by LLMs and embeddings.
  • Implement LLM-based resume parser with robust JSON extraction and repair to return structured candidate data.
  • Implement sentence-transformer-based job matcher that combines semantic similarity and skill overlap into a composite score.
  • Implement LLM-based outreach drafter that generates grounded, non-hallucinatory recruiter messages conditioned on match strength.
  • Implement LLM-based ranking explainer and interview question generator that reason only over provided job/candidate/match fields.
  • Expose each skill as its own FastAPI microservice with simple /health and /run endpoints.
ai-service/app/agents/resume_parser.py
ai-service/app/agents/job_matcher.py
ai-service/app/agents/outreach_drafter.py
ai-service/app/agents/ranking_explainer.py
ai-service/app/agents/interview_question_generator.py
ai-service/app/skill_services/resume_parser_service.py
ai-service/app/skill_services/matcher_service.py
ai-service/app/skill_services/outreach_drafter_service.py
ai-service/app/skill_services/ranking_explainer_service.py
ai-service/app/skill_services/interview_questions_service.py
Add supervisor orchestration layer to support batch candidate workflows and human-in-the-loop approvals.
  • Implement run_hiring_workflow to iterate over candidates, call skill services with retry/backoff, collect steps, and compute ranking stats.
  • Track per-step success/failure in Mongo and Redis while emitting WebSocket progress updates during processing.
  • Mark traces as awaiting_approval with summary stats and integrate metrics recording for match quality.
  • Extend Kafka consumer to handle ai.requested and ai.approval.recorded events, including batch approval aggregation and final status computation.
ai-service/app/agents/supervisor.py
ai-service/app/kafka/consumer.py
Provide end-to-end and failure-injection smoke tests plus Dockerized local stack for the AI service.
  • Add container-internal e2e smoke script that exercises request/approval flow, idempotent approvals, low-match handling, and metrics endpoints.
  • Add host-side e2e script that runs container smoke tests and injects matcher failures to verify graceful degradation.
  • Introduce Dockerfile, docker-compose stack (Kafka, Mongo, Redis, AI service, and all skill services), and service configuration via env-based settings.
  • Document service behavior, architecture, endpoints, and setup instructions in README, and configure Ruff plus requirements for reproducible env.
ai-service/app/tools/e2e_smoke.py
ai-service/tools/e2e_extended_host.py
ai-service/docker-compose.yml
ai-service/Dockerfile
ai-service/requirements.txt
ai-service/app/config.py
ai-service/README.md
ai-service/ruff.toml
ai-service/.gitignore

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 security issue, 1 other issue, and left some high level feedback:

Security issues:

  • Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)

General comments:

  • The LLM client selection logic (_llm_client) is duplicated across multiple agent modules (resume parser, outreach drafter, interview question generator, ranking explainer); consider centralizing this into a shared helper to avoid drift and make provider/config changes easier.
  • The score-to-tier mapping (_tier_from_score) is implemented both in the supervisor and again inside mongo.get_latest_result; extracting this into a single shared utility would reduce the risk of inconsistent thresholds over time.
  • In metrics.match_quality_summary, the match_filter variable is constructed but never used, while match_filter_or is; either remove match_filter or refactor to use one consistent filter to keep the aggregation logic clearer.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The LLM client selection logic (`_llm_client`) is duplicated across multiple agent modules (resume parser, outreach drafter, interview question generator, ranking explainer); consider centralizing this into a shared helper to avoid drift and make provider/config changes easier.
- The score-to-tier mapping (`_tier_from_score`) is implemented both in the supervisor and again inside `mongo.get_latest_result`; extracting this into a single shared utility would reduce the risk of inconsistent thresholds over time.
- In `metrics.match_quality_summary`, the `match_filter` variable is constructed but never used, while `match_filter_or` is; either remove `match_filter` or refactor to use one consistent filter to keep the aggregation logic clearer.

## Individual Comments

### Comment 1
<location path="ai-service/app/agents/job_matcher.py" line_range="7" />
<code_context>
+# 40% exact skills overlap — giving both meaning-aware matching and keyword precision.
+from sentence_transformers import SentenceTransformer, util
+
+_model = SentenceTransformer("all-MiniLM-L6-v2")
+
+
</code_context>
<issue_to_address>
**suggestion (performance):** Consider lazy-loading the `SentenceTransformer` model to reduce import-time latency and startup failures.

Initializing `SentenceTransformer("all-MiniLM-L6-v2")` at import time can noticeably slow startup and will fail if the model/cache isn’t available (e.g., fresh containers). Instead, consider a cached lazy getter, e.g.:

```python
_model: SentenceTransformer | None = None

def get_model() -> SentenceTransformer:
    global _model
    if _model is None:
        _model = SentenceTransformer("all-MiniLM-L6-v2")
    return _model
```

and call `get_model()` from `compute_match_score` to keep cold starts and failures more predictable in ephemeral/serverless environments.

Suggested implementation:

```python
# Skill: embedding-based match score

# The final score is a weighted blend — 60% semantic similarity from embeddings,
# 40% exact skills overlap — giving both meaning-aware matching and keyword precision.
from sentence_transformers import SentenceTransformer, util


# Lazy-loaded SentenceTransformer model to avoid import-time latency and failures.
_model: SentenceTransformer | None = None


def get_model() -> SentenceTransformer:
    """
    Lazily initialize and cache the SentenceTransformer model.

    This avoids importing/loading the model at module import time, which can slow
    startup or fail in environments where the model cache isn't yet populated.
    """
    global _model
    if _model is None:
        _model = SentenceTransformer("all-MiniLM-L6-v2")
    return _model


def compute_match_score(job: dict, candidate: dict) -> dict:
    job_skills = job.get("skills_required", [])
    cand_skills = candidate.get("skills", [])

    # Ensure the embedding model is loaded lazily when needed.
    model = get_model()

```

Wherever `_model` was previously used in this file (e.g., to encode skills or compute similarities), replace direct references like `_model.encode(...)` with `get_model().encode(...)` or reuse the `model` local variable inside `compute_match_score` and any other functions that depend on the SentenceTransformer instance.
</issue_to_address>

### Comment 2
<location path="ai-service/tools/e2e_extended_host.py" line_range="16" />
<code_context>
    subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

# 40% exact skills overlap — giving both meaning-aware matching and keyword precision.
from sentence_transformers import SentenceTransformer, util

_model = SentenceTransformer("all-MiniLM-L6-v2")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): Consider lazy-loading the SentenceTransformer model to reduce import-time latency and startup failures.

Initializing SentenceTransformer("all-MiniLM-L6-v2") at import time can noticeably slow startup and will fail if the model/cache isn’t available (e.g., fresh containers). Instead, consider a cached lazy getter, e.g.:

_model: SentenceTransformer | None = None

def get_model() -> SentenceTransformer:
    global _model
    if _model is None:
        _model = SentenceTransformer("all-MiniLM-L6-v2")
    return _model

and call get_model() from compute_match_score to keep cold starts and failures more predictable in ephemeral/serverless environments.

Suggested implementation:

# Skill: embedding-based match score

# The final score is a weighted blend — 60% semantic similarity from embeddings,
# 40% exact skills overlap — giving both meaning-aware matching and keyword precision.
from sentence_transformers import SentenceTransformer, util


# Lazy-loaded SentenceTransformer model to avoid import-time latency and failures.
_model: SentenceTransformer | None = None


def get_model() -> SentenceTransformer:
    """
    Lazily initialize and cache the SentenceTransformer model.

    This avoids importing/loading the model at module import time, which can slow
    startup or fail in environments where the model cache isn't yet populated.
    """
    global _model
    if _model is None:
        _model = SentenceTransformer("all-MiniLM-L6-v2")
    return _model


def compute_match_score(job: dict, candidate: dict) -> dict:
    job_skills = job.get("skills_required", [])
    cand_skills = candidate.get("skills", [])

    # Ensure the embedding model is loaded lazily when needed.
    model = get_model()

Wherever _model was previously used in this file (e.g., to encode skills or compute similarities), replace direct references like _model.encode(...) with get_model().encode(...) or reuse the model local variable inside compute_match_score and any other functions that depend on the SentenceTransformer instance.



def sh(cmd: list[str]) -> None:
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Enhances the recruiter AI service by moving to an event-driven, batch-capable workflow orchestrator (Kafka + FastAPI) and splitting core capabilities into separate “skill” microservices, with added metrics and E2E tooling.

Changes:

  • Added batch candidate processing and expanded approval handling + metrics endpoints.
  • Introduced Kafka producer/consumer, Mongo/Redis persistence, and WebSocket streaming for real-time updates.
  • Added skill microservices (resume parsing, matching, outreach drafting, interview questions, ranking explanation) plus E2E smoke tools and Docker compose wiring.

Reviewed changes

Copilot reviewed 31 out of 34 changed files in this pull request and generated 14 comments.

Show a summary per file
File Description
ai-service/tools/e2e_extended_host.py Host-side extended E2E runner with failure injection (matcher stop/start).
ai-service/tests/test_supervisor.py Placeholder test file (currently empty).
ai-service/tests/test_resume_parser.py Placeholder test file (currently empty).
ai-service/tests/test_job_matcher.py Placeholder test file (currently empty).
ai-service/ruff.toml Adds Ruff lint/format configuration.
ai-service/requirements.txt Defines Python dependency set for the service stack.
ai-service/docker-compose.yml Brings up Kafka/ZK, Mongo, Redis, main service, and skill services.
ai-service/app/tools/e2e_smoke.py In-container E2E smoke script (REST + Kafka idempotency + metrics).
ai-service/app/skill_services/resume_parser_service.py FastAPI wrapper around resume parsing skill.
ai-service/app/skill_services/ranking_explainer_service.py FastAPI wrapper around ranking explanation skill.
ai-service/app/skill_services/outreach_drafter_service.py FastAPI wrapper around outreach drafting skill.
ai-service/app/skill_services/matcher_service.py FastAPI wrapper around job-candidate matcher skill.
ai-service/app/skill_services/interview_questions_service.py FastAPI wrapper around interview question generation skill.
ai-service/app/models/task.py Placeholder for task models (currently only a comment).
ai-service/app/models/events.py Adds a Kafka event envelope Pydantic model.
ai-service/app/metrics.py Persists and aggregates match-quality + approval-action metrics in Mongo.
ai-service/app/main.py FastAPI app entrypoint; runs Kafka consumer with retry/backoff.
ai-service/app/kafka/schemas.py Placeholder for shared schema (currently only a comment).
ai-service/app/kafka/producer.py Kafka producer singleton + Mongo event persistence.
ai-service/app/kafka/consumer.py Kafka consumer orchestration for requests and approvals.
ai-service/app/db/redis_client.py Redis status cache + idempotency key claims.
ai-service/app/db/mongo.py Mongo persistence for traces/steps/approvals/events + “latest result” view.
ai-service/app/config.py Settings for Kafka/Mongo/Redis + LLM providers + skill service URLs.
ai-service/app/api/websocket.py WebSocket endpoint and push-update helper for UI streaming.
ai-service/app/api/routes.py REST endpoints for request/status/result/approval + metrics.
ai-service/app/agents/supervisor.py Batch workflow orchestrator calling skill services with retries.
ai-service/app/agents/resume_parser.py LLM-based resume parsing + JSON repair/extraction.
ai-service/app/agents/ranking_explainer.py LLM-based ranking explanation grounded by computed sets.
ai-service/app/agents/outreach_drafter.py LLM-based outreach drafting with low-match guardrails.
ai-service/app/agents/job_matcher.py Embedding + overlap match scoring (Sentence Transformers).
ai-service/app/agents/interview_question_generator.py LLM-based interview question generator with gap grounding.
ai-service/README.md Documents architecture, endpoints, topics, and local setup.
ai-service/Dockerfile Container build for the main FastAPI service and dependencies.
ai-service/.gitignore Ignores env/venv/pycache/pytest caches.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +42 to +50
payload: dict = {
"actor_id": body.actor_id,
"job": body.job,
}
if body.candidates is not None:
payload["candidates"] = body.candidates
else:
payload["resume_text"] = body.resume_text or ""

Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request_agent will accept requests with neither candidates nor resume_text and silently enqueue an empty resume. This can trigger unnecessary LLM calls and confusing results; return 400 when candidates is empty/None and resume_text is missing/blank.

Copilot uses AI. Check for mistakes.
Comment thread ai-service/app/metrics.py
Comment on lines +102 to +103
match_filter: dict[str, Any] = {"metric_type": "match_quality", "created_at": {"$gte": since.isoformat()}}

Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

match_filter is assigned but never used, which will fail ruff (F841). Remove it or use it consistently (e.g., as the base filter for both aggregation + sampling).

Copilot uses AI. Check for mistakes.
from app.db.mongo import add_step, upsert_trace
from app.db.redis_client import set_status
from app.config import settings
from app.kafka.producer import publish_event
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import publish_event will fail ruff/pyflakes (F401). Remove it or use it (e.g., to emit progress/completion events) but avoid leaving dead imports.

Suggested change
from app.kafka.producer import publish_event

Copilot uses AI. Check for mistakes.
Comment on lines +5 to +12
import sys
import time
from typing import Any

import httpx
import asyncio
import datetime
import uuid
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused imports (sys, uuid) will fail ruff/pyflakes (F401). Remove them to keep the e2e tool runnable under the repo’s lint settings.

Suggested change
import sys
import time
from typing import Any
import httpx
import asyncio
import datetime
import uuid
import time
from typing import Any
import httpx
import asyncio
import datetime

Copilot uses AI. Check for mistakes.
Comment on lines +131 to +135
_poll_status(client, trace_good, {"approved", "edited", "rejected"}, timeout_s=60)
res2 = client.get(f"{BASE_URL}/agent/result/{trace_good}").json()
approval_steps = [s for s in res2.get("steps", []) if s.get("step") == "approval"]
_assert(len(approval_steps) >= 1, "approval step missing")
print("[e2e] approval transition ok")
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion looks for a step named approval, but the consumer currently writes candidate_approval/batch_approval_completed when get_ranked_candidate_count(trace_id) > 0 (which includes single-candidate traces). Either adjust the consumer to always emit an approval step for backwards compatibility, or update this e2e check to match the new step naming.

Copilot uses AI. Check for mistakes.
Comment on lines +6 to +13
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel

from app.db.mongo import get_latest_result, get_ranked_candidate_count, upsert_trace
from app.db.redis_client import get_status as get_cached_status, set_status
from app.kafka.producer import publish_event
from fastapi import Query

Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import order will fail ruff/isort: Query should be imported alongside the other fastapi imports (and third-party imports should be grouped before local app.* imports).

Suggested change
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from app.db.mongo import get_latest_result, get_ranked_candidate_count, upsert_trace
from app.db.redis_client import get_status as get_cached_status, set_status
from app.kafka.producer import publish_event
from fastapi import Query
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from app.db.mongo import get_latest_result, get_ranked_candidate_count, upsert_trace
from app.db.redis_client import get_status as get_cached_status, set_status
from app.kafka.producer import publish_event

Copilot uses AI. Check for mistakes.
Comment on lines +85 to +91
expected = await get_ranked_candidate_count(trace_id)
if expected > 0:
if not (body.candidate_id or "").strip():
raise HTTPException(
status_code=400,
detail="candidate_id is required for batch traces (multi-candidate workflows)",
)
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approve_outreach requires candidate_id whenever expected > 0, which includes single-candidate traces (expected==1). This breaks backward compatibility for existing clients that don’t send candidate_id; consider only requiring it for true batch traces (e.g., expected > 1), or persist a is_batch/applied_count flag on the trace and validate against that instead.

Copilot uses AI. Check for mistakes.
Comment on lines +220 to +242
results["ranked_candidates"] = ranked
results["ranked_count"] = len(ranked)
results["stats"] = stats
await add_step(
trace_id=trace_id,
step="candidates_ranked",
status="completed",
data={"count": len(ranked), "stats": stats, "ranked_candidates": ranked},
)
results["steps"].append(
{"step": "candidates_ranked", "status": "completed", "data": {"count": len(ranked)}}
)

await _set_status(
"awaiting_approval",
extra={
"requires_human_review": True,
"ranked_count": len(ranked),
"applied_count": applied_count,
"good_count": stats["good_count"],
"irrelevant_count": stats["irrelevant_count"],
},
)
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workflow always sets the trace to awaiting_approval even when every candidate failed and ranked is empty. This conflicts with the failure-injection e2e flow and will leave users stuck “awaiting approval” with nothing to approve; if success_count == 0 (or len(ranked)==0), set the trace to failed and record a terminal step/error.

Copilot uses AI. Check for mistakes.
"timestamp": datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat(),
"actor_id": "recruiter",
"entity": {"entity_type": "ai_task", "entity_id": trace_idem},
"payload": {"action": "approve", "edited_draft": None},
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The injected ai.approval.recorded Kafka event omits candidate_id. With the current consumer logic, approvals without candidate_id don’t count toward count_distinct_candidate_approvals, so the trace will remain awaiting_approval and this test will time out. Include candidate_id in the event payload (or update the consumer to handle single-candidate approvals without it).

Suggested change
"payload": {"action": "approve", "edited_draft": None},
"payload": {"action": "approve", "candidate_id": "good_1", "edited_draft": None},

Copilot uses AI. Check for mistakes.
Comment on lines +57 to +64
# With matcher down, we expect failure at match_scored
_ = client.get(f"{BASE_URL}/agent/result/{trace_id}").json()
steps = [(x.get("step"), x.get("status")) for x in _.get("steps", [])]
print("[host-e2e] status", status, "steps", steps)
if status != "failed":
raise RuntimeError(f"expected failed with matcher down; got status={status}")
if not any(step == "match_scored" and st == "failed" for step, st in steps):
raise RuntimeError(f"expected match_scored failed; got steps={steps}")
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This host e2e expects overall status failed when the matcher is down, but the supervisor currently always sets awaiting_approval even if no candidates succeeded. Either update the supervisor to mark the trace failed when ranked is empty (preferred), or relax this assertion to accept awaiting_approval and assert on the failed step only.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants