Skip to content

Implement Watchlist Poller Deduplication (Remove the Existing TODO) #51

@Vedant1703

Description

@Vedant1703

📋 Description

There is an explicit // TODO in the production codebase at backend/core_service/internal/watchlist/poller.go, line 55:

// TODO: Fetch distinct repos to avoid duplicate checks if multiple users watch the same repo
// For MVP, we iterate all entries. Optimization: implement "ListAllUniqueRepos" in repository.

The problem in plain terms: If 50 users all watch facebook/react, the poller currently fires 50 separate GitHub API calls to check for new issues in that repo — every poll cycle. This scales as O(total_watchlist_entries) rather than O(unique_repos). At GitHub's unauthenticated rate limit of 60 req/hour and authenticated of 5,000/hour, this can easily exhaust the quota for large deployments.

The fix is a two-phase poll cycle:

  1. Phase 1 (deduplicated): Fetch only the distinct set of watched repos. Call GitHub once per unique (owner, repo) pair. Store the results in a local map.
  2. Phase 2 (fan-out): Iterate all watchlist entries. For each entry, look up the pre-fetched issue number from the map (no API call). If a new issue was found, notify that user.

This requires changes at the database layer (a new query), the repository layer (a new method), and the poller logic (rewriting poll()).


📍 Files to Change

  • backend/core_service/internal/watchlist/repository.go — add ListUniqueRepos() method
  • backend/core_service/internal/watchlist/poller.go — rewrite poll() with two-phase logic
  • backend/core_service/db/migrations/ — possibly, if you want a DB-level DISTINCT ON query

🔍 Current Broken Logic

// poller.go — O(N_entries) GitHub API calls
func (p *Poller) poll(ctx context.Context) {
    entries, _ := p.repo.ListAll(ctx)  // e.g. 500 entries

    for _, entry := range entries {
        // ❌ BUG: calls GitHub for EVERY entry, even if same repo
        latestNum, title, issueURL, err := p.githubClient.GetLatestIssue(entry.RepoOwner, entry.RepoName)
        // ...
    }
}

✅ What To Build

Step 1 — Add ListUniqueRepos() to repository.go:

This returns only the distinct (owner, name) pairs, with no user context:

type UniqueRepo struct {
    Owner string
    Name  string
}

func (r *Repository) ListUniqueRepos(ctx context.Context) ([]UniqueRepo, error) {
    query := `
        SELECT DISTINCT repo_owner, repo_name
        FROM watchlist
    `
    rows, err := r.db.Query(ctx, query)
    if err != nil {
        return nil, fmt.Errorf("listing unique repos: %w", err)
    }
    defer rows.Close()

    var repos []UniqueRepo
    for rows.Next() {
        var u UniqueRepo
        if err := rows.Scan(&u.Owner, &u.Name); err != nil {
            return nil, err
        }
        repos = append(repos, u)
    }
    return repos, nil
}

Step 2 — Rewrite poll() in poller.go with two phases:

func (p *Poller) poll(ctx context.Context) {
    log.Println("Poller: Starting poll cycle")

    // ─── PHASE 1: One API call per unique repo ───────────────────────
    uniqueRepos, err := p.repo.ListUniqueRepos(ctx)
    if err != nil {
        log.Printf("Poller: Error listing unique repos: %v", err)
        return
    }
    log.Printf("Poller: Checking %d unique repos (deduplicated)", len(uniqueRepos))

    // Map: "owner/name" -> latest issue data
    type issueResult struct {
        number int
        title  string
        url    string
        err    error
    }
    results := make(map[string]issueResult, len(uniqueRepos))

    for _, repo := range uniqueRepos {
        key := repo.Owner + "/" + repo.Name
        num, title, url, err := p.githubClient.GetLatestIssue(repo.Owner, repo.Name)
        results[key] = issueResult{number: num, title: title, url: url, err: err}
    }

    // ─── PHASE 2: Fan out notifications to all watchers ──────────────
    entries, err := p.repo.ListAll(ctx)
    if err != nil {
        log.Printf("Poller: Error listing watchlist entries: %v", err)
        return
    }

    for _, entry := range entries {
        key := entry.RepoOwner + "/" + entry.RepoName
        result, ok := results[key]
        if !ok || result.err != nil {
            continue
        }

        if result.number > entry.LatestIssueNumber {
            log.Printf("Poller: New issue #%d in %s — notifying user %s", result.number, key, entry.UserID)

            if err := p.repo.UpdateLastChecked(ctx, entry.ID, result.number); err != nil {
                log.Printf("Poller: DB update failed for entry %s: %v", entry.ID, err)
            }

            payload := map[string]interface{}{
                "type":         "new_issue",
                "repo":         key,
                "issue_number": result.number,
                "issue_title":  result.title,
                "issue_url":    result.url,
                "message":      "New issue detected!",
            }
            for _, notifier := range p.notifiers {
                if err := notifier.NotifyUser(entry.UserID, payload); err != nil {
                    log.Printf("Poller: Notification failed for user %s: %v", entry.UserID, err)
                }
            }
        }
    }
    log.Println("Poller: cycle finished")
}

Step 3 (Bonus) — Make Phase 1 concurrent:

For very large deployments with hundreds of unique repos, Phase 1 could itself be slow if done sequentially. You can use a sync.WaitGroup + a sync.Mutex-protected map to fetch all unique repos in parallel, respecting a concurrency limit with a semaphore channel:

sem := make(chan struct{}, 10)  // at most 10 concurrent GitHub calls
var wg sync.WaitGroup
var mu sync.Mutex

for _, repo := range uniqueRepos {
    wg.Add(1)
    sem <- struct{}{}
    go func(r UniqueRepo) {
        defer wg.Done()
        defer func() { <-sem }()
        // ... do API call, write to results map under mu
    }(repo)
}
wg.Wait()

🏁 Acceptance Criteria

  • ListUniqueRepos() is added to watchlist/repository.go and uses SELECT DISTINCT
  • poll() in poller.go calls GitHub once per unique repo, not once per watchlist entry
  • Notifications are still correctly delivered per user (multiple watchers of the same repo each get notified)
  • The // TODO comment is removed from poller.go
  • If a repo returns an error from GitHub API, the error does not prevent other repos from being checked
  • All core_service Go code compiles: cd backend/core_service && go build ./...
  • (Bonus) Phase 1 fetches repos concurrently with a bounded concurrency limit

💡 Technical Hints

  • The watchlist schema has repo_owner and repo_name as separate columns — your DISTINCT query must use both
  • After Phase 1, if results[key].err != nil, skip that repo silently in Phase 2 — don't crash the whole cycle
  • The ListAll() method already exists in repository.go — don't rewrite it, just add ListUniqueRepos() alongside it
  • To verify correctness manually: add two different users to the same repo in the watchlist, start the poller with a short interval, and observe logs showing 1 API call instead of 2

🚀 Getting Started

  1. Fork the repository
  2. Create a branch: git checkout -b fix/issue-30-poller-deduplication
  3. Add ListUniqueRepos to backend/core_service/internal/watchlist/repository.go
  4. Rewrite poll() in backend/core_service/internal/watchlist/poller.go
  5. Run: cd backend/core_service && go build ./...
  6. Open a PR — include a log snippet showing Checking N unique repos where N < total watchlist entries

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions