Skip to content

Latest commit

 

History

History
2731 lines (2251 loc) · 83.2 KB

File metadata and controls

2731 lines (2251 loc) · 83.2 KB

Hexagon Feed System - Low-Level Design (LLD)

Table of Contents

  1. Database Schema Design
  2. API Contracts & Endpoints
  3. Class Design & Architecture
  4. Algorithms & Data Structures
  5. Component Interactions
  6. Sequence Diagrams
  7. Error Handling & Edge Cases

1. Database Schema Design

1.1 Post Database (NoSQL - Cassandra/DynamoDB)

Why NoSQL?

  • Need to partition data by HexID for horizontal scaling
  • High write throughput (1M users creating posts)
  • Query pattern: "Give me all posts in these 7 hexagons sorted by time"
  • No complex joins needed

Posts Table Structure:

Table: posts
─────────────────────────────────────────────────────────
Partition Key: hex_id (VARCHAR)
Sort Key: timestamp (BIGINT, descending) + post_id (UUID)
─────────────────────────────────────────────────────────
Columns:
  - hex_id          : VARCHAR(15)      // H3 hex identifier
  - timestamp       : BIGINT           // Unix ms (for sorting)
  - post_id         : UUID             // Unique identifier
  - user_id         : UUID             // Author
  - content         : TEXT             // Post content (max 10K)
  - media_urls      : LIST<TEXT>       // Optional attachments
  - metadata        : JSON/MAP         // hashtags, mentions, etc.
  - created_at      : TIMESTAMP
  - is_deleted      : BOOLEAN          // Soft delete flag

Key Design Decisions:

  1. Partition by hex_id: All posts in same hexagon stored together

    • Enables efficient query: "Get all posts from hex X"
    • Distributes load across multiple partitions
  2. Composite Sort Key (timestamp + post_id):

    • timestamp DESC: Recent posts first
    • post_id: Ensures uniqueness when timestamps collide
    • Allows efficient range queries
  3. Why this works for our use case:

    Query for 7 hexagons = 7 parallel partition reads
    Each partition already sorted by timestamp DESC
    Just merge results!
    

Secondary Indexes:

  • user_posts_index: Query by user_id + timestamp (for user profile page)
  • post_lookup_index: Direct lookup by post_id (for delete/update)

1.2 User Database (PostgreSQL - Relational)

Why SQL here?

  • User data has relationships (sessions, preferences)
  • Need ACID properties for user operations
  • Complex queries (user auth, profile management)

Users Table:

Table: users
─────────────────────────────────────────
Primary Key: user_id (UUID)
─────────────────────────────────────────
Columns:
  - user_id              : UUID PRIMARY KEY
  - username             : VARCHAR(50) UNIQUE NOT NULL
  - email                : VARCHAR(255) UNIQUE NOT NULL
  - password_hash        : VARCHAR(255) NOT NULL
  - current_hex_id       : VARCHAR(15)
  - last_location_update : TIMESTAMP
  - profile_data         : JSONB
  - created_at           : TIMESTAMP
  - updated_at           : TIMESTAMP

Indexes:
  - idx_users_hex_id ON (current_hex_id)
  - idx_users_email ON (email)

User Sessions Table:

Table: user_sessions
─────────────────────────────────────────
Primary Key: session_id (UUID)
Foreign Key: user_id -> users(user_id)
─────────────────────────────────────────
Columns:
  - session_id              : UUID PRIMARY KEY
  - user_id                 : UUID REFERENCES users
  - websocket_connection_id : VARCHAR(255)
  - subscribed_hex_ids      : TEXT[] (Array of 7 hex IDs)
  - connected_at            : TIMESTAMP
  - last_heartbeat          : TIMESTAMP
  - expires_at              : TIMESTAMP

Indexes:
  - idx_sessions_user ON (user_id)
  - idx_sessions_hex ON (subscribed_hex_ids) USING GIN

Why separate sessions table?

  • Multiple concurrent sessions per user (mobile + web)
  • Need to track WebSocket connections
  • Easy cleanup of expired sessions

1.3 Cache Schema (Redis)

Cache Strategy: Write-Through + TTL-based Invalidation

Key Patterns:

1. Feed Cache (Most Important):
   ──────────────────────────────────────
   Key:   feed:{hex_id}:{page_number}
   Value: JSON Array of Post objects
   TTL:   300 seconds (5 minutes)
   
   Example:
   Key:   feed:8c2a1072c23ffff:1
   Value: [
     {
       "post_id": "uuid-1",
       "content": "Hello world",
       "timestamp": 1727654400000,
       ...
     },
     ...
   ]
   
   Why?: Most users read page 1 repeatedly
         Cache hit saves 7 database queries!


2. User Session Cache:
   ──────────────────────────────────────
   Key:   session:{user_id}
   Value: {
     "hex_id": "8c2a1072c23ffff",
     "neighbor_hex_ids": ["...", "...", ...],
     "ws_connection_id": "conn-uuid"
   }
   TTL:   3600 seconds (1 hour)
   
   Why?: Avoid location service calls on every feed request


3. Hex Neighbor Cache:
   ──────────────────────────────────────
   Key:   hex_neighbors:{hex_id}
   Value: ["neighbor1", "neighbor2", ..., "neighbor6"]
   TTL:   86400 seconds (24 hours)
   
   Why?: Hex neighbors rarely change
         Can cache very long


4. Hot Post Cache:
   ──────────────────────────────────────
   Key:   post:{post_id}
   Value: Full post JSON
   TTL:   600 seconds (10 minutes)
   
   Why?: Recently created posts accessed frequently

Cache Invalidation Rules:

  • When new post created → Delete feed:{hex_id}:1 (first page only)
  • When post deleted → Delete feed:{hex_id}:* (all pages) + post:{post_id}
  • Let TTL handle everything else

2. API Contracts & Endpoints

2.1 REST APIs

GET /api/v1/feed

Purpose: Retrieve paginated feed for user's location

Request Format:

GET /api/v1/feed?latitude=37.7749&longitude=-122.4194&page=1&limit=20

Headers:
  Authorization: Bearer <jwt_token>
  Content-Type: application/json

Query Parameters:

  • latitude (required): User's latitude (-90 to 90)
  • longitude (required): User's longitude (-180 to 180)
  • page (optional, default=1): Page number
  • limit (optional, default=20, max=50): Posts per page

Success Response (200 OK):

{
  "success": true,
  "data": {
    "hex_id": "8c2a1072c23ffff",
    "neighbor_hex_ids": [
      "8c2a1072c27ffff",
      "8c2a1072c2fffff",
      "8c2a1072c37ffff",
      "8c2a1072c3fffff",
      "8c2a1072c47ffff",
      "8c2a1072c4fffff"
    ],
    "posts": [
      {
        "post_id": "123e4567-e89b-12d3-a456-426614174000",
        "user_id": "user-uuid-here",
        "username": "john_doe",
        "content": "Just posted from San Francisco!",
        "timestamp": 1727654400000,
        "created_at": "2025-09-30T10:30:00Z",
        "metadata": {
          "hashtags": ["#sanfrancisco", "#tech"],
          "mentions": ["@alice"]
        }
      }
      // ... more posts
    ],
    "pagination": {
      "current_page": 1,
      "total_pages": 50,
      "has_next": true,
      "has_previous": false,
      "next_cursor": "MTcyNzY1NDQwMDAwMHwxMjNlNDU2Nw=="
    }
  },
  "timestamp": 1727654500000
}

Error Responses:

// 400 Bad Request - Invalid location
{
  "success": false,
  "error": {
    "code": "INVALID_LOCATION",
    "message": "Latitude must be between -90 and 90"
  }
}

// 401 Unauthorized
{
  "success": false,
  "error": {
    "code": "UNAUTHORIZED",
    "message": "Invalid or expired token"
  }
}

// 429 Too Many Requests
{
  "success": false,
  "error": {
    "code": "RATE_LIMIT_EXCEEDED",
    "message": "Too many requests. Please try again in 60 seconds"
  }
}

POST /api/v1/posts

Purpose: Create a new post

Request Format:

POST /api/v1/posts

Headers:
  Authorization: Bearer <jwt_token>
  Content-Type: application/json

Body:
{
  "content": "This is my post content!",
  "latitude": 37.7749,
  "longitude": -122.4194,
  "metadata": {
    "hashtags": ["#test", "#feed"],
    "mentions": ["@user123"]
  }
}

Validation Rules:

  • content: Required, 1-10,000 characters
  • latitude: Required, -90 to 90
  • longitude: Required, -180 to 180
  • metadata: Optional, max 100 hashtags, max 50 mentions

Success Response (201 Created):

{
  "success": true,
  "data": {
    "post_id": "123e4567-e89b-12d3-a456-426614174000",
    "hex_id": "8c2a1072c23ffff",
    "content": "This is my post content!",
    "timestamp": 1727654400000,
    "created_at": "2025-09-30T10:30:00Z",
    "user_id": "user-uuid-here"
  }
}

Error Responses:

// 400 Bad Request - Content too long
{
  "success": false,
  "error": {
    "code": "CONTENT_TOO_LONG",
    "message": "Content exceeds maximum length of 10000 characters"
  }
}

// 429 Rate Limited
{
  "success": false,
  "error": {
    "code": "RATE_LIMIT_EXCEEDED",
    "message": "Maximum 10 posts per minute exceeded"
  }
}

DELETE /api/v1/posts/:postId

Purpose: Soft delete a post (user can only delete their own posts)

Request:

DELETE /api/v1/posts/123e4567-e89b-12d3-a456-426614174000

Headers:
  Authorization: Bearer <jwt_token>

Success Response (200 OK):

{
  "success": true,
  "message": "Post deleted successfully"
}

2.2 WebSocket Protocol

Connection URL:

ws://api.example.com/ws?token=<jwt_token>

Message Types (Client → Server):

1. Subscribe to Feed:

{
  "type": "subscribe",
  "payload": {
    "latitude": 37.7749,
    "longitude": -122.4194
  }
}

2. Heartbeat Pong:

{
  "type": "pong",
  "timestamp": 1727654400000
}

3. Unsubscribe:

{
  "type": "unsubscribe"
}

Message Types (Server → Client):

1. Connection Acknowledgment:

{
  "type": "connection_ack",
  "connection_id": "conn-uuid-abc123",
  "timestamp": 1727654400000
}

2. Subscription Acknowledgment:

{
  "type": "subscribe_ack",
  "payload": {
    "hex_id": "8c2a1072c23ffff",
    "subscribed_hex_ids": [
      "8c2a1072c23ffff",
      "8c2a1072c27ffff",
      "8c2a1072c2fffff",
      "8c2a1072c37ffff",
      "8c2a1072c3fffff",
      "8c2a1072c47ffff",
      "8c2a1072c4fffff"
    ]
  }
}

3. New Post Notification (Real-time):

{
  "type": "new_post",
  "payload": {
    "post_id": "new-post-uuid",
    "user_id": "author-uuid",
    "username": "jane_doe",
    "content": "Just posted this in real-time!",
    "timestamp": 1727654500000,
    "hex_id": "8c2a1072c23ffff",
    "metadata": {
      "hashtags": ["#realtime"]
    }
  }
}

4. Heartbeat Ping (every 30 seconds):

{
  "type": "ping",
  "timestamp": 1727654400000
}

5. Error Messages:

{
  "type": "error",
  "error": {
    "code": "INVALID_LOCATION",
    "message": "Invalid latitude or longitude provided"
  }
}

3. Class Design & Architecture

3.1 Overall Package Structure (Java)

com.hexfeed
├── controller         // REST & WebSocket endpoints
│   ├── FeedController
│   ├── PostController
│   └── WebSocketController
│
├── service           // Business logic
│   ├── LocationService
│   ├── FeedAggregationService
│   ├── PostIngestionService
│   ├── WebSocketManagerService
│   └── CacheInvalidationService
│
├── repository        // Data access layer
│   ├── PostRepository
│   ├── UserRepository
│   └── SessionRepository
│
├── model            // Domain entities
│   ├── entity
│   │   ├── Post
│   │   ├── User
│   │   └── UserSession
│   └── dto
│       ├── FeedRequest
│       ├── FeedResponse
│       ├── PostRequest
│       └── WebSocketMessage
│
├── config           // Configuration
│   ├── CassandraConfig
│   ├── RedisConfig
│   ├── KafkaConfig
│   └── WebSocketConfig
│
└── util             // Utilities
    ├── H3Util
    ├── PaginationUtil
    └── ValidationUtil

3.2 Core Classes - Detailed Design

LocationService Class

Responsibility: Convert lat/long to HexID and find neighbors

Class: LocationService
───────────────────────────────────────────────
Dependencies:
  - H3Core h3                  // H3 library
  - RedisTemplate cache
  - int resolution = 7         // H3 resolution level

Methods:
───────────────────────────────────────────────
+ HexLocationResult getHexIdsForLocation(double lat, double lon)
  Purpose: Get center hex + 6 neighbors
  Algorithm:
    1. Check cache: "hex_location:{lat}:{lon}"
    2. If miss:
       - Convert lat/lon to hexId using H3
       - Get neighbors using gridDisk(hexId, k=1)
       - Take first 6 neighbors
       - Cache result for 24 hours
    3. Return HexLocationResult
  
+ String getHexId(double lat, double lon)
  Purpose: Simple conversion to hexId
  
+ List<String> getNeighborHexIds(String hexId)
  Purpose: Get 6 immediate neighbors
  Algorithm: Use H3.gridDisk(hexId, 1) - exclude center

───────────────────────────────────────────────
Internal Helper:
- String buildCacheKey(double lat, double lon)
- boolean isValidCoordinate(double lat, double lon)

DTO: HexLocationResult

Class: HexLocationResult
───────────────────────────────────────────────
Fields:
  - String centerHexId
  - List<String> neighborHexIds (size = 6)
  - List<String> allHexIds (size = 7)

Methods:
  - Standard getters/setters
  - Constructor
  - toJson() / fromJson()

FeedAggregationService Class

Responsibility: Fetch and merge posts from 7 hexagons

Class: FeedAggregationService
───────────────────────────────────────────────
Constants:
  - int PAGE_SIZE = 20
  - int MAX_LIMIT = 50
  - int CACHE_TTL = 300 seconds

Dependencies:
  - PostRepository postRepository
  - RedisTemplate cache
  - LocationService locationService

Methods:
───────────────────────────────────────────────
+ FeedResponse getFeed(String userId, double lat, double lon, 
                       int page, int limit)
  Purpose: Main method to get user's feed
  Algorithm:
    1. Validate inputs
    2. Get hex IDs from LocationService
    3. Try cache for each hex: "feed:{hexId}:{page}"
    4. If cache miss:
       a. Query PostRepository for each hex (parallel)
       b. Merge results using K-way merge algorithm
       c. Update cache
    5. Build pagination info
    6. Return FeedResponse
  
- List<Post> fetchFromCache(List<String> hexIds, int page)
  Purpose: Try to get cached feed pages
  
- List<Post> fetchFromDatabase(List<String> hexIds, int page, int limit)
  Purpose: Query database when cache miss
  Algorithm:
    - Create 7 parallel CompletableFutures
    - Each queries one hexagon
    - Wait for all to complete
    - Merge using mergePosts()
  
- List<Post> mergePosts(List<List<Post>> hexFeeds, int limit)
  Purpose: K-way merge of sorted lists
  Algorithm: Min-heap based merge (explained in section 4)
  
- void updateCache(String hexId, int page, List<Post> posts)
  Purpose: Cache the merged feed

PostIngestionService Class

Responsibility: Handle new post creation

Class: PostIngestionService
───────────────────────────────────────────────
Constants:
  - int MAX_CONTENT_LENGTH = 10000
  - int RATE_LIMIT_PER_MINUTE = 10

Dependencies:
  - PostRepository postRepository
  - LocationService locationService
  - KafkaTemplate messageQueue
  - CacheInvalidationService cacheInvalidation
  - RateLimiter rateLimiter

Methods:
───────────────────────────────────────────────
+ Post createPost(String userId, PostRequest request)
  Purpose: Create and publish new post
  Steps:
    1. Validate content (length, profanity, etc.)
    2. Check rate limit for userId
    3. Get hexId from location
    4. Create Post entity with UUID
    5. Save to PostRepository (Cassandra)
    6. Publish event to Kafka topic "new-post-events"
    7. Async cache invalidation
    8. Return created Post
  
- void validatePost(PostRequest request)
  Purpose: Validation logic
  Checks:
    - Content not empty
    - Content <= 10000 chars
    - Valid coordinates
    - Valid metadata format
  
- void publishPostEvent(Post post)
  Purpose: Publish to Kafka
  Message Format:
    {
      "event_type": "post_created",
      "hex_id": "...",
      "post": {...},
      "timestamp": 1727654400000
    }

WebSocketManagerService Class

Responsibility: Manage WebSocket connections and real-time updates

Class: WebSocketManagerService
───────────────────────────────────────────────
Data Structures:
  - ConcurrentHashMap<String, WebSocketConnection> connections
    Key: userId, Value: Connection object
    
  - ConcurrentHashMap<String, Set<String>> subscriptions
    Key: hexId, Value: Set of userIds subscribed to this hex

Dependencies:
  - KafkaConsumer messageQueueConsumer
  - ExecutorService executorService

Methods:
───────────────────────────────────────────────
+ void handleConnection(String userId, WebSocketSession session)
  Purpose: Register new WebSocket connection
  Steps:
    1. Create WebSocketConnection object
    2. Add to connections map
    3. Send "connection_ack" message
    4. Start heartbeat timer
  
+ void handleDisconnection(String userId)
  Purpose: Clean up when user disconnects
  Steps:
    1. Remove from connections map
    2. Remove from all hex subscriptions
    3. Update session in database
  
+ void handleSubscribe(String userId, double lat, double lon)
  Purpose: Subscribe user to hex feed
  Steps:
    1. Get 7 hex IDs from LocationService
    2. Unsubscribe from old hexes
    3. Subscribe to new hexes
       - For each hexId:
         subscriptions.get(hexId).add(userId)
    4. Send "subscribe_ack" message
  
+ void broadcastToHex(String hexId, WebSocketMessage message)
  Purpose: Push message to all users in a hex
  Algorithm:
    1. Get userIds from subscriptions.get(hexId)
    2. For each userId:
       - Get connection from connections map
       - Send message via WebSocket
       - Handle errors (connection closed, etc.)
  
@KafkaListener(topics = "new-post-events")
+ void consumePostEvents(PostEvent event)
  Purpose: Listen to Kafka and broadcast
  Steps:
    1. Receive event from Kafka
    2. Extract hexId and post data
    3. Build WebSocketMessage
    4. Call broadcastToHex(hexId, message)

Background Tasks:
───────────────────────────────────────────────
- void heartbeatChecker() (runs every 30 sec)
  Purpose: Send ping, detect dead connections
  
- void cleanupStaleConnections() (runs every 5 min)
  Purpose: Remove connections with no heartbeat

Inner Class: WebSocketConnection

Class: WebSocketConnection
───────────────────────────────────────────────
Fields:
  - String userId
  - String connectionId
  - WebSocketSession session
  - Set<String> subscribedHexIds
  - LocalDateTime lastHeartbeat
  - boolean isAlive

Methods:
  - void send(WebSocketMessage message)
  - void close()
  - void updateHeartbeat()
  - boolean isAlive()

3.3 Repository Layer

PostRepository Interface

Interface: PostRepository
───────────────────────────────────────────────
Methods:

+ Post save(Post post)
  Purpose: Insert new post into Cassandra
  CQL: INSERT INTO posts (hex_id, timestamp, post_id, ...) 
       VALUES (?, ?, ?, ...)

+ List<Post> findByHexId(String hexId, int page, int limit)
  Purpose: Get posts for a hexagon
  CQL: SELECT * FROM posts 
       WHERE hex_id = ? 
       ORDER BY timestamp DESC, post_id DESC 
       LIMIT ?
  
+ List<Post> findByHexIdWithCursor(String hexId, 
                                   PaginationCursor cursor, int limit)
  Purpose: Cursor-based pagination
  CQL: SELECT * FROM posts 
       WHERE hex_id = ? 
         AND (timestamp < ? OR 
              (timestamp = ? AND post_id < ?))
       ORDER BY timestamp DESC 
       LIMIT ?

+ Optional<Post> findByPostId(UUID postId)
  Purpose: Get single post
  Uses: Secondary index

+ void softDelete(UUID postId)
  Purpose: Mark post as deleted
  CQL: UPDATE posts SET is_deleted = true WHERE post_id = ?

+ List<Post> findByUserId(String userId, int page, int limit)
  Purpose: Get user's posts (for profile)
  Uses: Secondary index on user_id

4. Algorithms & Data Structures

4.1 K-Way Merge Algorithm for Feed Aggregation

Problem:

  • Have 7 sorted lists (one per hexagon)
  • Each list sorted by timestamp DESC
  • Need to merge into single sorted list efficiently

Solution: Min-Heap K-Way Merge

Algorithm: K-Way Merge
───────────────────────────────────────────────
Input: 
  - List<List<Post>> hexFeeds (7 lists, each sorted DESC)
  - int limit (number of posts to return)

Output:
  - List<Post> merged feed (sorted DESC)

Time Complexity: O(N log K)
  where N = limit, K = 7 (number of hexagons)

Space Complexity: O(K) = O(7) for heap

Steps:
───────────────────────────────────────────────
1. Create min-heap (PriorityQueue in Java)
   Comparator: Compare by timestamp DESC (use negative)

2. Initialize heap:
   For each hexFeed (index i):
     If feed is not empty:
       Add (timestamp, hexIndex, postIndex, post) to heap

3. Extract posts:
   result = []
   while heap is not empty AND result.size() < limit:
     (timestamp, hexIdx, postIdx, post) = heap.poll()
     result.add(post)
     
     // Add next post from same hex if available
     nextIdx = postIdx + 1
     if nextIdx < hexFeeds[hexIdx].size():
       nextPost = hexFeeds[hexIdx][nextIdx]
       heap.add((nextPost.timestamp, hexIdx, nextIdx, nextPost))

4. Return result

Example Walkthrough:

Hex 1: [Post(ts=1000), Post(ts=800), Post(ts=600)]
Hex 2: [Post(ts=950), Post(ts=750)]
Hex 3: [Post(ts=900), Post(ts=700)]

Step 1: Initialize heap
  Heap: [(1000, 0, 0), (950, 1, 0), (900, 2, 0)]

Step 2: Extract 1000 (from Hex 1)
  Add Post(ts=800) from Hex 1
  Heap: [(950, 1, 0), (900, 2, 0), (800, 0, 1)]
  Result: [1000]

Step 3: Extract 950 (from Hex 2)
  Add Post(ts=750) from Hex 2
  Heap: [(900, 2, 0), (800, 0, 1), (750, 1, 1)]
  Result: [1000, 950]

... continue until limit reached

Final Result: [1000, 950, 900, 800, 750, 700, 600]

Why not simple merge?

  • Simple merge: Fetch ALL posts from all hexes → Sort → Take top N
    • Problem: If 1000 posts/hex × 7 = 7000 posts loaded into memory!
  • K-way merge: Only keep K posts in memory at a time
    • Memory efficient
    • Stop early when limit reached

4.2 Cursor-Based Pagination

Problem with Offset-Based Pagination:

Page 1: SELECT * FROM posts WHERE hex_id = ? ORDER BY timestamp DESC LIMIT 20
Page 2: SELECT * FROM posts WHERE hex_id = ? ORDER BY timestamp DESC LIMIT 20 OFFSET 20

Issues:
1. If new post added between page 1 and page 2, user sees duplicate
2. Offset 1000 means database skips 1000 rows (slow for large offsets)
3. Inconsistent results

Cursor-Based Pagination Solution:

Cursor Format:
───────────────────────────────────────────────
base64(timestamp|post_id)

Example:
  Raw: "1727654400000|123e4567-e89b-12d3-a456-426614174000"
  Encoded: "MTcyNzY1NDQwMDAwMHwxMjNlNDU2Ny1lODliLTEyZDMtYTQ1Ni00MjY2MTQxNzQwMDA="

Algorithm:

Page 1 (No cursor):
  SELECT * FROM posts 
  WHERE hex_id = ?
  ORDER BY timestamp DESC, post_id DESC
  LIMIT 20
  
  Last post: timestamp=1727654380000, post_id=xyz
  Next cursor: base64("1727654380000|xyz")

Page 2 (With cursor "1727654380000|xyz"):
  Decode cursor → timestamp=1727654380000, post_id=xyz
  
  SELECT * FROM posts 
  WHERE hex_id = ?
    AND (timestamp < 1727654380000 
         OR (timestamp = 1727654380000 AND post_id < 'xyz'))
  ORDER BY timestamp DESC, post_id DESC
  LIMIT 20

Benefits:

  • ✅ Consistent results (no duplicates)
  • ✅ O(1) complexity (uses index directly)
  • ✅ Works even when data changes between pages

4.3 Rate Limiting - Token Bucket Algorithm

Requirement: Max 10 posts per minute per user

Token Bucket Algorithm:

Concept:
───────────────────────────────────────────────
- Bucket holds "tokens"
- Max capacity: 10 tokens
- Refill rate: 10 tokens per minute
- Action costs 1 token
- If no tokens available → reject

Implementation (Redis):
───────────────────────────────────────────────
Key: rate_limit:{user_id}
Value: {
  "tokens": 8,
  "last_refill": 1727654400000
}
TTL: 60 seconds

Algorithm:

Function: checkRateLimit(userId)
───────────────────────────────────────────────
1. Get current state from Redis
   key = "rate_limit:" + userId
   
2. If key doesn't exist:
   - Initialize with 10 tokens
   - Set TTL 60 seconds
   - Allow request
   
3. If key exists:
   a. Calculate tokens to add:
      elapsed = now - last_refill (in seconds)
      tokensToAdd = (elapsed / 60) * 10  // 10 per minute
      newTokens = min(current_tokens + tokensToAdd, 10)
   
   b. If newTokens >= 1:
      - Consume 1 token
      - Update Redis: tokens = newTokens - 1
      - Update last_refill = now
      - Allow request
   
   c. If newTokens < 1:
      - Reject request (rate limit exceeded)
      - Return retry_after = 60 - elapsed seconds

4. Set TTL 60 seconds on every update

Java Pseudocode:

public boolean checkRateLimit(String userId) {
    String key = "rate_limit:" + userId;
    RateLimitState state = redis.get(key);
    
    long now = System.currentTimeMillis();
    
    if (state == null) {
        state = new RateLimitState(10, now);
        redis.setex(key, 60, state);
        return true;
    }
    
    long elapsed = (now - state.lastRefill) / 1000; // seconds
    double tokensToAdd = (elapsed / 60.0) * 10;
    double newTokens = Math.min(state.tokens + tokensToAdd, 10);
    
    if (newTokens >= 1) {
        state.tokens = newTokens - 1;
        state.lastRefill = now;
        redis.setex(key, 60, state);
        return true;
    }
    
    return false; // Rate limit exceeded
}

4.4 Cache Invalidation Strategy

Challenge: Keep cache consistent when posts are created/deleted

Strategies:

1. Write-Through on Create:

When new post is created:
───────────────────────────────────────────────
Problem: First page cache (feed:{hex_id}:1) is now stale

Solution A - Simple Invalidation:
  - Delete cache key: feed:{hex_id}:1
  - Next request will rebuild from DB
  
Solution B - Optimized Update (Prepend):
  - Get cached page 1
  - Prepend new post to array
  - Keep only top 20 posts
  - Update cache
  - Benefit: No DB query needed
  
We use Solution A for simplicity (cache rebuild is fast with indexes)

2. Invalidate on Delete:

When post is deleted:
───────────────────────────────────────────────
Problem: Post might be cached in multiple pages

Solution:
  - Delete all pages for that hex: feed:{hex_id}:*
  - Delete post cache: post:{post_id}
  - Let TTL handle cleanup

3. TTL-Based Expiry:

All caches have TTL:
───────────────────────────────────────────────
- Feed pages: 300 seconds (5 minutes)
- User sessions: 3600 seconds (1 hour)
- Hex neighbors: 86400 seconds (24 hours)

Why TTL?
- Automatic cleanup of stale data
- Handles edge cases (DB update failed, etc.)
- Reduces complexity

5. Component Interactions

5.1 Data Flow Diagrams

Flow 1: User Requests Feed

┌─────────┐
│ Client  │
└────┬────┘
     │ 1. GET /api/v1/feed?lat=37.7749&lon=-122.4194
     ▼
┌─────────────────┐
│ FeedController  │
└────┬────────────┘
     │ 2. getFeed(userId, lat, lon, page)
     ▼
┌──────────────────────┐
│ FeedAggregation      │
│ Service              │
└────┬─────────────────┘
     │ 3. getHexIdsForLocation()
     ▼
┌──────────────────┐
│ LocationService  │
└────┬─────────────┘
     │ 4. Return [hex1, hex2, ..., hex7]
     │
     ▼
┌──────────────────────┐
│ FeedAggregation      │ ──5. Check Cache──▶ ┌─────────┐
│ Service              │ ◀──Cache Hit/Miss─── │ Redis   │
└────┬─────────────────┘                      └─────────┘
     │
     │ 6. If Cache Miss
     ▼
┌──────────────────┐    7. Query 7 hexagons   ┌───────────┐
│ PostRepository   │ ◀─────(Parallel)────────▶ │ Cassandra │
└────┬─────────────┘                           └───────────┘
     │ 8. Return List<List<Post>>
     ▼
┌──────────────────────┐
│ FeedAggregation      │
│ Service              │
│ - mergePosts()       │ ← K-way merge algorithm
│ - updateCache()      │
└────┬─────────────────┘
     │ 9. Return FeedResponse
     ▼
┌─────────────────┐
│ FeedController  │
└────┬────────────┘
     │ 10. JSON Response
     ▼
┌─────────┐
│ Client  │
└─────────┘

Flow 2: User Creates Post (Real-time Distribution)

┌─────────┐
│ Client  │
└────┬────┘
     │ 1. POST /api/v1/posts
     ▼
┌─────────────────┐
│ PostController  │
└────┬────────────┘
     │ 2. createPost(userId, request)
     ▼
┌──────────────────────┐
│ PostIngestion        │
│ Service              │
└────┬─────────────────┘
     │ 3. Check rate limit
     ▼
┌──────────────────┐  ┌─────────┐
│ RateLimiter      │──│ Redis   │
└────┬─────────────┘  └─────────┘
     │ 4. Allowed
     ▼
┌──────────────────────┐
│ LocationService      │
└────┬─────────────────┘
     │ 5. getHexId(lat, lon)
     ▼
┌──────────────────────┐
│ PostIngestion        │
│ Service              │
│ - Create Post entity │
└────┬─────────────────┘
     │
     ├─6. Save Post────────────▶ ┌───────────┐
     │                           │ Cassandra │
     │                           └───────────┘
     │
     ├─7. Publish Event────────▶ ┌───────────┐
     │  topic: "new-post-events" │   Kafka   │
     │  partition_key: hex_id    └─────┬─────┘
     │                                  │
     │                                  │ 8. Consume Event
     │                                  ▼
     │                           ┌──────────────────┐
     │                           │ WebSocket        │
     │                           │ Manager          │
     │                           └────┬─────────────┘
     │                                │ 9. broadcastToHex()
     │                                │
     │                                ▼
     │                           ┌─────────────────────┐
     │                           │ All subscribed      │
     │                           │ clients in hex      │
     │                           │ receive real-time   │
     │                           │ notification        │
     │                           └─────────────────────┘
     │
     └─10. Async: Invalidate Cache─▶ ┌─────────┐
          DELETE feed:{hex_id}:1      │ Redis   │
                                      └─────────┘

Flow 3: WebSocket Real-time Updates

┌─────────┐
│ Client  │ ──1. ws://api/ws?token=jwt──▶ ┌──────────────────┐
└────┬────┘                                │ WebSocket        │
     │                                     │ Manager          │
     │ ◀─2. connection_ack──────────────── └────┬─────────────┘
     │                                          │
     │ ──3. subscribe(lat, lon)──────────────▶ │
     │                                          │ 4. getHexIds()
     │                                          ▼
     │                                     ┌──────────────────┐
     │                                     │ LocationService  │
     │                                     └────┬─────────────┘
     │                                          │ 5. Return 7 hexIds
     │                                          ▼
     │                                     ┌──────────────────┐
     │                                     │ WebSocket        │
     │                                     │ Manager          │
     │                                     │ - Add to         │
     │ ◀─6. subscribe_ack─────────────────│   subscriptions  │
     │                                     │   map            │
     │                                     └────┬─────────────┘
     │                                          │
     │                                          │ 7. Listen to Kafka
     │                                          ▼
     │                                     ┌──────────────────┐
     │                                     │   Kafka          │
     │                                     │   Consumer       │
     │                                     └────┬─────────────┘
     │                                          │ 8. New post event
     │                                          ▼
     │                                     ┌──────────────────┐
     │                                     │ WebSocket        │
     │                                     │ Manager          │
     │ ◀─9. new_post notification──────── │ broadcastToHex() │
     │                                     └──────────────────┘
     │
     │ ──10. ping (every 30s)───────────▶
     │ ◀─11. pong─────────────────────────
     │
└─────────┘

6. Sequence Diagrams

6.1 Feed Request Sequence (Detailed)

Client          Controller      FeedService     LocationService     Cache       PostRepo      Cassandra
  │                 │                │                 │              │             │              │
  │──GET /feed─────▶│                │                 │              │             │              │
  │                 │                │                 │              │             │              │
  │                 │──getFeed()────▶│                 │              │             │              │
  │                 │                │                 │              │             │              │
  │                 │                │──getHexIds()───▶│              │             │              │
  │                 │                │                 │              │             │              │
  │                 │                │                 │──get()──────▶│             │              │
  │                 │                │                 │◀─cache hit───│             │              │
  │                 │                │◀──7 hex IDs─────│              │             │              │
  │                 │                │                 │              │             │              │
  │                 │                │──Check cache────────────────▶  │             │              │
  │                 │                │   for each hex                 │             │              │
  │                 │                │◀──Cache MISS──────────────────│             │              │
  │                 │                │                                │             │              │
  │                 │                │──findByHexId(hex1)────────────────────────▶│              │
  │                 │                │──findByHexId(hex2)────────────────────────▶│              │
  │                 │                │──findByHexId(hex3)────────────────────────▶│              │
  │                 │                │     ... (parallel)                          │              │
  │                 │                │                                             │──Query──────▶│
  │                 │                │                                             │◀─Results─────│
  │                 │                │◀──7 Lists of Posts─────────────────────────│              │
  │                 │                │                                │             │              │
  │                 │                │  [K-way merge                 │             │              │
  │                 │                │   algorithm here]              │             │              │
  │                 │                │                                │             │              │
  │                 │                │──Update cache──────────────────▶│             │              │
  │                 │                │                                │             │              │
  │                 │◀──FeedResponse─│                                │             │              │
  │◀──200 OK────────│                │                                │             │              │
  │   JSON          │                │                                │             │              │

6.2 Post Creation with Real-time Distribution

Client       Controller   PostService   RateLimiter   LocationSvc   PostRepo   Kafka   WSManager   Redis
  │              │             │              │            │           │         │        │          │
  │──POST────────▶│             │              │            │           │         │        │          │
  │              │             │              │            │           │         │        │          │
  │              │──create()──▶│              │            │           │         │        │          │
  │              │             │              │            │           │         │        │          │
  │              │             │──check()────▶│            │           │         │        │          │
  │              │             │              │──Redis────────────────────────────────────▶│          │
  │              │             │              │◀─allowed──────────────────────────────────│          │
  │              │             │◀─OK──────────│            │           │         │        │          │
  │              │             │                           │           │         │        │          │
  │              │             │──getHexId()───────────────▶│           │         │        │          │
  │              │             │◀─hex_id────────────────────│           │         │        │          │
  │              │             │                           │           │         │        │          │
  │              │             │──save()────────────────────────────────▶│         │        │          │
  │              │             │                                        │──Write──▶        │          │
  │              │             │◀─Post created──────────────────────────│         │        │          │
  │              │             │                                        │         │        │          │
  │              │             │──publish()─────────────────────────────────────▶│        │          │
  │              │             │   {hex_id, post_data}                          │        │          │
  │              │             │                                                 │        │          │
  │              │             │──invalidate()───────────────────────────────────────────────────────▶│
  │              │             │   DELETE feed:{hex_id}:1                                 │          │
  │              │             │                                                 │        │          │
  │              │◀─201────────│                                                 │        │          │
  │◀─Created─────│             │                                                 │        │          │
  │              │             │                                                 │        │          │
  │              │             │                                        [Async]  │        │          │
  │              │             │                                                 │        │          │
  │              │             │                                         consume()        │          │
  │              │             │                                                 │───────▶│          │
  │              │             │                                                 │        │          │
  │              │             │                                                 │   broadcast()     │
  │              │             │                                                 │   to all users    │
  │              │             │                                                 │   in hex          │
All subscribed  │             │                                                 │        │          │
users in hex    │             │                                                 │        │          │
◀───new_post notification─────────────────────────────────────────────────────────────────│          │

7. Error Handling & Edge Cases

7.1 Error Categories and Handling

Validation Errors (400 Bad Request)

Error Cases:
───────────────────────────────────────────────
1. Invalid Location:
   - Latitude not in [-90, 90]
   - Longitude not in [-180, 180]
   Response: {
     "code": "INVALID_LOCATION",
     "message": "Latitude must be between -90 and 90"
   }

2. Content Validation:
   - Empty content
   - Content > 10,000 characters
   - Invalid characters (SQL injection attempts)
   Response: {
     "code": "INVALID_CONTENT",
     "message": "Content exceeds maximum length of 10000 characters"
   }

3. Pagination Errors:
   - Invalid page number (< 1)
   - Invalid limit (< 1 or > 50)
   Response: {
     "code": "INVALID_PAGINATION",
     "message": "Page must be greater than 0"
   }

Handling Strategy:
  - Validate at controller layer
  - Use @Valid annotations in Spring
  - Return 400 with clear error message

Authentication/Authorization Errors (401/403)

Error Cases:
───────────────────────────────────────────────
1. Missing Token:
   Response: 401 Unauthorized
   {
     "code": "MISSING_TOKEN",
     "message": "Authorization token required"
   }

2. Invalid/Expired Token:
   Response: 401 Unauthorized
   {
     "code": "INVALID_TOKEN",
     "message": "Token is invalid or expired"
   }

3. Delete Other User's Post:
   Response: 403 Forbidden
   {
     "code": "FORBIDDEN",
     "message": "You can only delete your own posts"
   }

Handling Strategy:
  - JWT validation at API Gateway/Controller
  - Check post ownership before delete
  - Use Spring Security

Rate Limiting Errors (429 Too Many Requests)

Error Cases:
───────────────────────────────────────────────
1. Post Rate Limit Exceeded:
   Response: 429
   Headers: Retry-After: 45
   {
     "code": "RATE_LIMIT_EXCEEDED",
     "message": "Maximum 10 posts per minute exceeded",
     "retry_after": 45
   }

2. API Request Rate Limit:
   Response: 429
   {
     "code": "TOO_MANY_REQUESTS",
     "message": "API rate limit exceeded. Try again later"
   }

Handling Strategy:
  - Check rate limit before processing
  - Return time until next allowed request
  - Log excessive rate limit violations (potential abuse)

Database Errors (500/503)

Error Cases:
───────────────────────────────────────────────
1. Cassandra Write Timeout:
   Scenario: High load, write timeout
   Response: 503 Service Unavailable
   {
     "code": "SERVICE_UNAVAILABLE",
     "message": "Unable to process request. Please retry"
   }
   
   Handling:
     - Retry with exponential backoff (3 attempts)
     - If all fail, return 503
     - Alert monitoring system

2. Database Connection Lost:
   Scenario: Network partition
   Response: 500 Internal Server Error
   {
     "code": "INTERNAL_ERROR",
     "message": "An internal error occurred"
   }
   
   Handling:
     - Connection pool auto-retry
     - Circuit breaker pattern
     - Failover to replica

3. Query Timeout:
   Scenario: Large dataset scan
   Response: 504 Gateway Timeout
   
   Handling:
     - Set read timeout (5 seconds)
     - Return partial results if possible
     - Suggest pagination

Cache Errors (Degraded Mode)

Error Cases:
───────────────────────────────────────────────
1. Redis Connection Lost:
   Handling:
     - Log error
     - Continue without cache (direct DB query)
     - Alert monitoring
     - System still functional, just slower

2. Cache Deserialization Error:
   Handling:
     - Delete corrupted cache entry
     - Fetch from database
     - Log error for investigation

Strategy: Cache failures should NOT break the system
  - Always have fallback to database
  - Graceful degradation

WebSocket Errors

Error Cases:
───────────────────────────────────────────────
1. Connection Dropped:
   Client Side:
     - Detect via missed pings
     - Auto-reconnect with exponential backoff
     - Resubscribe to hex feed

   Server Side:
     - Heartbeat checker removes dead connections
     - Clean up subscription maps

2. Invalid Subscribe Message:
   Response to client:
   {
     "type": "error",
     "error": {
       "code": "INVALID_LOCATION",
       "message": "Invalid coordinates provided"
     }
   }

3. Message Send Failure:
   Handling:
     - Try-catch around websocket.send()
     - If fails, mark connection as dead
     - Remove from active connections

4. Kafka Consumer Lag:
   Scenario: Kafka behind, delayed real-time updates
   Handling:
     - Monitor consumer lag
     - Scale consumers if lag > threshold
     - Set max.poll.records appropriately

7.2 Edge Cases

Edge Case 1: User on Hexagon Boundary

Problem:
───────────────────────────────────────────────
User is standing exactly on the edge between two hexagons.
Small GPS drift might cause hex_id to flip back and forth.

Example:
  Request 1: lat=37.7749001 → hex_id = A
  Request 2: lat=37.7749002 → hex_id = B (neighbor of A)

Impact:
  - User sees different feeds on each request
  - Real-time updates might miss posts

Solution:
───────────────────────────────────────────────
1. Location Smoothing (Client Side):
   - Don't update location for every GPS change
   - Only update if moved > 50 meters
   - Prevents flickering between hexagons

2. Always Include Neighbors:
   - Even if on boundary, we fetch 7 hexagons
   - Coverage area overlaps ensure no missed posts

3. Session-based Hex Locking:
   - Cache user's hex_id for 5 minutes
   - Only update if significantly moved

Edge Case 2: Very Active Hexagon

Problem:
───────────────────────────────────────────────
One hexagon has 10,000 posts (e.g., Times Square, NYC)
Other hexagons have 100 posts each

Impact on Feed:
  - Feed dominated by posts from one hexagon
  - Posts from smaller hexagons rarely visible

Solution Options:
───────────────────────────────────────────────
Option A: Fair Distribution (Recommended)
  - Take top 3 posts from each hexagon
  - Then fill remaining with most recent across all
  - Ensures visibility for all hexagons

Option B: Weighted by Distance
  - Prioritize center hexagon
  - Lower weight for neighbor hexagons

Option C: Accept as-is
  - If active area, it should dominate feed
  - User likely interested in nearby activity

For MVP: Use Option C (simplest)
For v2: Implement Option A (better UX)

Edge Case 3: New User in Empty Hexagon

Problem:
───────────────────────────────────────────────
User in remote area where no posts exist yet

Query Result:
  - All 7 hexagons return empty arrays
  - Feed shows "No posts available"

Solutions:
───────────────────────────────────────────────
1. Empty State UI:
   Response: {
     "posts": [],
     "message": "Be the first to post in your area!"
   }

2. Expand Search Radius:
   - If no posts in 7 hexagons
   - Query k=2 ring (19 hexagons)
   - Show "Posts from nearby areas"

3. Show Popular Global Posts:
   - Fallback to trending posts
   - "While you wait, check these out"

Edge Case 4: Clock Skew / Timestamp Issues

Problem:
───────────────────────────────────────────────
Different servers have slightly different clocks
Post created with timestamp in the "future"

Example:
  Server A clock: 10:00:00
  Server B clock: 10:00:05 (5 sec ahead)
  Post created on B: timestamp = 10:00:05
  User on server A queries at 10:00:02
  Post appears to be from the future!

Impact:
  - K-way merge might place "future" posts incorrectly
  - Cache invalidation timing issues

Solution:
───────────────────────────────────────────────
1. Use Logical Clocks:
   - Generate timestamp on a single source (e.g., database server)
   - Don't use application server time

2. NTP Synchronization:
   - All servers synchronized to NTP
   - Acceptable skew: < 100ms

3. Timestamp Validation:
   - Reject posts with future timestamps
   - If timestamp > now + 5 seconds → Error

4. Database-Generated Timestamps:
   - Let Cassandra generate timestamp
   - WRITETIME() function ensures consistency

Edge Case 5: Deleted Posts in Cache

Problem:
───────────────────────────────────────────────
1. User requests feed → Page cached
2. Post is deleted
3. Cache not invalidated properly
4. User sees deleted post

Solution:
───────────────────────────────────────────────
1. Soft Delete Flag:
   - Don't physically delete from DB
   - Set is_deleted = true
   - Filter deleted posts at query time

2. Cache Invalidation on Delete:
   - Delete all cache pages for that hex
   - feed:{hex_id}:*

3. TTL Fallback:
   - Cache expires in 5 minutes anyway
   - Deleted post disappears after TTL

4. Client-side Filtering:
   - Real-time delete notification via WebSocket
   - Client removes post from local state

Edge Case 6: Race Condition in WebSocket Subscribe

Problem:
───────────────────────────────────────────────
Timeline:
  T1: User subscribes to hex A (at location X)
  T2: New post created in hex A
  T3: Kafka event published
  T4: User moves to location Y (hex B)
  T5: User unsubscribes from hex A, subscribes to hex B
  T6: WebSocket manager processes T3 event
      → Tries to send to hex A subscribers
      → User already unsubscribed!

Result: User misses the post

Solution:
───────────────────────────────────────────────
1. Idempotent Subscribe:
   - Subscribe operation includes timestamp
   - Ignore events older than subscribe time

2. Event Buffering:
   - Buffer last N events per hex (e.g., 100)
   - On subscribe, replay recent events

3. Pull on Subscribe:
   - When user subscribes to new hex
   - Immediately fetch latest posts (HTTP)
   - Then listen for real-time updates

4. Accept Minor Data Loss:
   - User can refresh feed
   - TTL-based cache ensures consistency eventually

Edge Case 7: Pagination Consistency with Real-time Updates

Problem:
───────────────────────────────────────────────
User on page 1, sees 20 posts
While viewing, 5 new posts arrive via WebSocket
User clicks "Next Page"
What should page 2 show?

Options:
───────────────────────────────────────────────
Option A: Include New Posts
  - Page 2 = posts 21-40 from ORIGINAL query
  - Problem: User might see posts 16-20 again (duplicates)

Option B: Exclude New Posts
  - Use cursor-based pagination
  - Cursor = timestamp of last post on page 1
  - Page 2 = posts BEFORE cursor
  - New posts stay on page 1

Recommended: Option B (Cursor-based)
  - Consistent pagination
  - New posts stay at top
  - User can refresh to see them

8. Additional Design Considerations

8.1 Monitoring & Observability

Key Metrics to Track:
───────────────────────────────────────────────
1. API Metrics:
   - Request rate (per endpoint)
   - Response time (p50, p95, p99)
   - Error rate (per error code)
   - Rate limit hits

2. Database Metrics:
   - Query latency (per table)
   - Read/Write throughput
   - Connection pool usage
   - Partition hot spots

3. Cache Metrics:
   - Hit rate (should be > 80%)
   - Miss rate
   - Eviction rate
   - Memory usage

4. WebSocket Metrics:
   - Active connections
   - Messages sent/received per second
   - Connection duration
   - Reconnection rate

5. Kafka Metrics:
   - Producer throughput
   - Consumer lag
   - Message processing time
   - Failed messages

6. Business Metrics:
   - Posts created per minute
   - Active users per hexagon
   - Feed request distribution by location

8.2 Security Considerations

Security Measures:
───────────────────────────────────────────────
1. Input Validation:
   - Sanitize all user inputs
   - Prevent SQL injection (use prepared statements)
   - XSS protection (escape HTML in content)
   - Max content length enforcement

2. Authentication:
   - JWT tokens with expiration
   - Refresh token mechanism
   - Token blacklist for logout

3. Authorization:
   - Users can only delete their own posts
   - Admin roles for moderation
   - Rate limiting per user

4. Data Privacy:
   - Location data encrypted in transit
   - Don't expose exact coordinates in responses
   - GDPR compliance (user data deletion)

5. DDoS Protection:
   - API Gateway rate limiting
   - CloudFlare/WAF in front
   - Captcha for suspicious activity

6. Content Moderation:
   - Profanity filter
   - Spam detection
   - Report/block functionality
   - ML-based content classification (future)

8.3 Scalability Optimizations

Future Optimizations:
───────────────────────────────────────────────
1. Read Replicas:
   - Cassandra replicas for read scaling
   - Read from nearest datacenter

2. CDN for Static Content:
   - Cache user profiles, avatars
   - Reduce load on application servers

3. Database Partitioning Strategy:
   - Archive old posts (> 6 months) to cold storage
   - Reduce active dataset size
   - Faster queries on recent data

4. Smart Caching:
   - Predictive cache warming
   - Cache most active hexagons preemptively
   - Use ML to predict hot hexagons

5. Connection Pooling:
   - Optimize connection pool sizes
   - Per-hexagon connection pools
   - Reduce connection overhead

6. Batch Operations:
   - Batch read multiple hexagons in single query
   - Batch write for analytics
   - Reduce network round trips

7. Edge Computing:
   - Deploy Feed Service to edge locations
   - Reduce latency for global users
   - Regional data centers

8.4 Failure Scenarios & Recovery

Failure Scenario 1: Cassandra Node Failure
───────────────────────────────────────────────
Impact:
  - Some partitions unavailable
  - Read/write failures for specific hexagons

Recovery:
  1. Cassandra replication (RF=3)
  2. Automatic failover to replica
  3. Read from remaining nodes
  4. Write to available replicas
  5. Hinted handoff when node recovers

User Impact: None (transparent failover)

---

Failure Scenario 2: Redis Cache Failure
───────────────────────────────────────────────
Impact:
  - All cache misses
  - Increased load on Cassandra
  - Slower response times

Recovery:
  1. Detect Redis failure (health check)
  2. Bypass cache, query DB directly
  3. Alert operations team
  4. Redis cluster failover to standby

User Impact: Slower feed loads (500ms → 2s)
System Status: Degraded but functional

---

Failure Scenario 3: Kafka Broker Failure
───────────────────────────────────────────────
Impact:
  - Real-time updates delayed
  - Post creation still works (async)
  - WebSocket notifications delayed

Recovery:
  1. Kafka replication (min.insync.replicas=2)
  2. Producer retries to other brokers
  3. Consumer continues from last offset
  4. No data loss due to replication

User Impact: Real-time updates delayed 5-10 seconds
Feed still works, just not instant updates

---

Failure Scenario 4: WebSocket Manager Crash
───────────────────────────────────────────────
Impact:
  - All WebSocket connections dropped
  - Real-time updates stopped

Recovery:
  1. Load balancer detects failure
  2. Routes new connections to healthy instances
  3. Clients auto-reconnect (exponential backoff)
  4. Resubscribe to hexagons

User Impact: Brief disconnection (5-10 seconds)
Auto-recovery, minimal user action needed

---

Failure Scenario 5: Location Service Failure
───────────────────────────────────────────────
Impact:
  - Cannot convert lat/lon to hex_id
  - Feed requests fail

Recovery:
  1. Use cached hex_id from user session
  2. If no cache, use default hex resolution
  3. Fallback to nearby hexagon
  4. Auto-restart service

User Impact: May see slightly different feed
Critical: Need high availability for this service

---

Failure Scenario 6: Database Partition
───────────────────────────────────────────────
Problem: Network split between datacenters

Impact:
  - Write conflicts (CAP theorem)
  - Stale reads possible

Cassandra Strategy (AP system):
  - Availability over consistency
  - Accept writes to both partitions
  - Resolve conflicts using last-write-wins
  - Eventual consistency when partition heals

Recovery:
  1. Cassandra repairs after partition heals
  2. Conflict resolution (timestamp-based)
  3. May have temporary inconsistencies

User Impact: Might see different feed versions
Post count might be slightly off temporarily
Resolves automatically within minutes

9. Java Implementation Guidelines

9.1 Project Structure (Spring Boot)

hexfeed-backend/
├── src/main/java/com/hexfeed/
│   ├── HexFeedApplication.java
│   │
│   ├── controller/
│   │   ├── FeedController.java
│   │   ├── PostController.java
│   │   └── WebSocketController.java
│   │
│   ├── service/
│   │   ├── LocationService.java
│   │   ├── FeedAggregationService.java
│   │   ├── PostIngestionService.java
│   │   ├── WebSocketManagerService.java
│   │   ├── CacheInvalidationService.java
│   │   └── RateLimiterService.java
│   │
│   ├── repository/
│   │   ├── PostRepository.java (Cassandra)
│   │   ├── UserRepository.java (PostgreSQL)
│   │   └── SessionRepository.java (PostgreSQL)
│   │
│   ├── model/
│   │   ├── entity/
│   │   │   ├── Post.java
│   │   │   ├── User.java
│   │   │   └── UserSession.java
│   │   │
│   │   └── dto/
│   │       ├── request/
│   │       │   ├── FeedRequest.java
│   │       │   ├── PostRequest.java
│   │       │   └── SubscribeRequest.java
│   │       │
│   │       └── response/
│   │           ├── FeedResponse.java
│   │           ├── PostResponse.java
│   │           └── ApiResponse.java
│   │
│   ├── config/
│   │   ├── CassandraConfig.java
│   │   ├── RedisConfig.java
│   │   ├── KafkaConfig.java
│   │   ├── WebSocketConfig.java
│   │   └── SecurityConfig.java
│   │
│   ├── util/
│   │   ├── H3Util.java
│   │   ├── PaginationUtil.java
│   │   ├── ValidationUtil.java
│   │   └── JsonUtil.java
│   │
│   ├── exception/
│   │   ├── GlobalExceptionHandler.java
│   │   ├── ValidationException.java
│   │   ├── RateLimitException.java
│   │   └── ResourceNotFoundException.java
│   │
│   ├── messaging/
│   │   ├── PostEventProducer.java
│   │   └── PostEventConsumer.java
│   │
│   └── websocket/
│       ├── WebSocketHandler.java
│       ├── WebSocketConnection.java
│       └── MessageBroker.java
│
├── src/main/resources/
│   ├── application.yml
│   ├── application-dev.yml
│   ├── application-prod.yml
│   └── schema.cql (Cassandra schema)
│
└── pom.xml

9.2 Key Dependencies (Maven)

<dependencies>
    <!-- Spring Boot -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    
    <!-- Cassandra -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-cassandra</artifactId>
    </dependency>
    
    <!-- Redis -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    
    <!-- Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    
    <!-- H3 (Uber's Hexagonal Spatial Index) -->
    <dependency>
        <groupId>com.uber</groupId>
        <artifactId>h3</artifactId>
        <version>4.1.1</version>
    </dependency>
    
    <!-- JWT -->
    <dependency>
        <groupId>io.jsonwebtoken</groupId>
        <artifactId>jjwt</artifactId>
        <version>0.9.1</version>
    </dependency>
    
    <!-- Validation -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-validation</artifactId>
    </dependency>
    
    <!-- Lombok (reduces boilerplate) -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    
    <!-- Jackson for JSON -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    
    <!-- PostgreSQL Driver -->
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>postgresql</artifactId>
    </dependency>
    
    <!-- Monitoring -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    
    <!-- Metrics -->
    <dependency>
        <groupId>io.micrometer</groupId>
        <artifactId>micrometer-registry-prometheus</artifactId>
    </dependency>
</dependencies>

9.3 Implementation Tips for Java

Tip 1: Use CompletableFuture for Parallel Queries

// Fetching from 7 hexagons in parallel
List<CompletableFuture<List<Post>>> futures = hexIds.stream()
    .map(hexId -> CompletableFuture.supplyAsync(
        () -> postRepository.findByHexId(hexId, page, limit),
        executor
    ))
    .collect(Collectors.toList());

// Wait for all to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

// Extract results
List<List<Post>> allPosts = futures.stream()
    .map(CompletableFuture::join)
    .collect(Collectors.toList());

Tip 2: Use @Cacheable for Spring Cache Abstraction

@Service
public class FeedAggregationService {
    
    @Cacheable(
        value = "feed",
        key = "#hexId + ':' + #page",
        unless = "#result.isEmpty()"
    )
    public List<Post> getFeedForHex(String hexId, int page) {
        return postRepository.findByHexId(hexId, page, 20);
    }
    
    @CacheEvict(value = "feed", key = "#hexId + ':1'")
    public void invalidateFirstPage(String hexId) {
        // Method body can be empty
        // Annotation handles cache eviction
    }
}

Tip 3: Use PriorityQueue for K-Way Merge

public class FeedMerger {
    
    public List<Post> merge(List<List<Post>> hexFeeds, int limit) {
        PriorityQueue<PostIterator> heap = new PriorityQueue<>(
            (a, b) -> Long.compare(b.current().getTimestamp(), 
                                   a.current().getTimestamp()) // DESC
        );
        
        // Initialize heap
        for (int i = 0; i < hexFeeds.size(); i++) {
            if (!hexFeeds.get(i).isEmpty()) {
                heap.offer(new PostIterator(hexFeeds.get(i), i));
            }
        }
        
        List<Post> result = new ArrayList<>();
        
        while (!heap.isEmpty() && result.size() < limit) {
            PostIterator iterator = heap.poll();
            result.add(iterator.current());
            
            if (iterator.hasNext()) {
                iterator.next();
                heap.offer(iterator);
            }
        }
        
        return result;
    }
    
    private static class PostIterator {
        private final List<Post> posts;
        private int index;
        
        PostIterator(List<Post> posts, int hexIndex) {
            this.posts = posts;
            this.index = 0;
        }
        
        Post current() {
            return posts.get(index);
        }
        
        boolean hasNext() {
            return index + 1 < posts.size();
        }
        
        void next() {
            index++;
        }
    }
}

Tip 4: WebSocket with STOMP Protocol

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic", "/queue");
        config.setApplicationDestinationPrefixes("/app");
    }
    
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")
                .setAllowedOrigins("*")
                .withSockJS();
    }
}

@Controller
public class WebSocketController {
    
    @MessageMapping("/subscribe")
    @SendToUser("/queue/feed")
    public SubscribeResponse subscribe(SubscribeRequest request, 
                                       Principal principal) {
        String userId = principal.getName();
        HexLocationResult hexes = locationService.getHexIds(
            request.getLatitude(), 
            request.getLongitude()
        );
        
        // Subscribe user to hex topics
        webSocketManager.subscribe(userId, hexes);
        
        return new SubscribeResponse(hexes);
    }
}

Tip 5: Kafka Producer with Spring

@Service
public class PostEventProducer {
    
    @Autowired
    private KafkaTemplate<String, PostEvent> kafkaTemplate;
    
    private static final String TOPIC = "new-post-events";
    
    public void publishPostCreated(Post post) {
        PostEvent event = PostEvent.builder()
            .eventType("post_created")
            .hexId(post.getHexId())
            .post(post)
            .timestamp(System.currentTimeMillis())
            .build();
        
        // Partition by hex_id for ordering
        kafkaTemplate.send(TOPIC, post.getHexId(), event)
            .addCallback(
                success -> log.info("Published event: {}", event),
                failure -> log.error("Failed to publish: {}", failure)
            );
    }
}

@Service
public class PostEventConsumer {
    
    @Autowired
    private WebSocketManagerService webSocketManager;
    
    @KafkaListener(
        topics = "new-post-events",
        groupId = "websocket-group",
        concurrency = "3"
    )
    public void consume(PostEvent event) {
        WebSocketMessage message = WebSocketMessage.builder()
            .type("new_post")
            .payload(event.getPost())
            .build();
        
        webSocketManager.broadcastToHex(event.getHexId(), message);
    }
}

Tip 6: Custom Exception Handling

@RestControllerAdvice
public class GlobalExceptionHandler {
    
    @ExceptionHandler(ValidationException.class)
    public ResponseEntity<ApiResponse> handleValidation(
        ValidationException ex
    ) {
        return ResponseEntity
            .badRequest()
            .body(ApiResponse.error("VALIDATION_ERROR", ex.getMessage()));
    }
    
    @ExceptionHandler(RateLimitException.class)
    public ResponseEntity<ApiResponse> handleRateLimit(
        RateLimitException ex
    ) {
        return ResponseEntity
            .status(HttpStatus.TOO_MANY_REQUESTS)
            .header("Retry-After", String.valueOf(ex.getRetryAfter()))
            .body(ApiResponse.error("RATE_LIMIT_EXCEEDED", ex.getMessage()));
    }
    
    @ExceptionHandler(Exception.class)
    public ResponseEntity<ApiResponse> handleGeneral(Exception ex) {
        log.error("Unexpected error", ex);
        return ResponseEntity
            .status(HttpStatus.INTERNAL_SERVER_ERROR)
            .body(ApiResponse.error("INTERNAL_ERROR", 
                                   "An unexpected error occurred"));
    }
}

Tip 7: Configuration Management

# application.yml
spring:
  application:
    name: hexfeed-service
    
  # Cassandra
  data:
    cassandra:
      keyspace-name: hexfeed
      contact-points: localhost
      port: 9042
      local-datacenter: datacenter1
      
  # Redis
  redis:
    host: localhost
    port: 6379
    timeout: 2000ms
    jedis:
      pool:
        max-active: 100
        max-idle: 50
        
  # Kafka
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      group-id: hexfeed-consumers
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      
# Application-specific
hexfeed:
  h3:
    resolution: 7
  feed:
    page-size: 20
    max-limit: 50
    cache-ttl: 300
  rate-limit:
    posts-per-minute: 10
  websocket:
    heartbeat-interval: 30000

9.4 Testing Strategy

Unit Tests:
───────────────────────────────────────────────
1. Service Layer:
   - Test LocationService hex conversion
   - Test FeedMerger algorithm
   - Test RateLimiter logic
   - Mock repositories

2. Repository Layer:
   - Test Cassandra queries
   - Test pagination logic
   - Use embedded Cassandra for tests

3. Utility Classes:
   - Test H3Util methods
   - Test PaginationUtil cursor encoding/decoding
   - Test ValidationUtil

Integration Tests:
───────────────────────────────────────────────
1. API Tests:
   - Test full feed request flow
   - Test post creation flow
   - Use TestContainers for Cassandra/Redis/Kafka

2. WebSocket Tests:
   - Test connection/subscription
   - Test message broadcasting
   - Use WebSocket test client

3. Cache Tests:
   - Test cache hit/miss scenarios
   - Test invalidation logic

Performance Tests:
───────────────────────────────────────────────
1. Load Testing:
   - Use JMeter or Gatling
   - Simulate 1M concurrent users
   - Test feed response times under load

2. Stress Testing:
   - Test rate limiting
   - Test database connection pool limits
   - Test WebSocket connection limits

10. Summary & Next Steps

Key Design Decisions Summary

✓ Hexagonal Partitioning
  - Efficient spatial queries
  - Natural data distribution
  - Parallel query execution

✓ NoSQL for Posts (Cassandra)
  - Horizontal scaling
  - Time-series optimized
  - High write throughput

✓ SQL for Users (PostgreSQL)
  - ACID guarantees
  - Complex relationships
  - Strong consistency

✓ Multi-Layer Caching (Redis)
  - 80%+ hit rate target
  - Reduced DB load
  - Fast response times

✓ WebSocket for Real-time
  - Persistent connections
  - Low latency updates
  - Pub/Sub per hexagon

✓ Kafka for Event Streaming
  - Decoupled architecture
  - Guaranteed delivery
  - Horizontal scaling

✓ K-Way Merge Algorithm
  - Memory efficient
  - O(N log K) complexity
  - Early termination

✓ Cursor-Based Pagination
  - Consistent results
  - No duplicates
  - O(1) complexity

Implementation Phases

Phase 1: Core Infrastructure (Week 1-2)
───────────────────────────────────────────────
□ Setup Spring Boot project
□ Configure Cassandra, PostgreSQL, Redis
□ Implement LocationService with H3
□ Design and create database schemas
□ Setup basic REST API structure

Phase 2: Feed System (Week 3-4)
───────────────────────────────────────────────
□ Implement PostRepository
□ Implement FeedAggregationService
□ Implement K-way merge algorithm
□ Implement caching layer
□ Implement pagination (cursor-based)
□ Write unit tests

Phase 3: Post Creation (Week 5)
───────────────────────────────────────────────
□ Implement PostIngestionService
□ Implement rate limiting
□ Implement validation
□ Setup Kafka producer
□ Implement cache invalidation

Phase 4: Real-time Updates (Week 6-7)
───────────────────────────────────────────────
□ Setup WebSocket infrastructure
□ Implement WebSocketManagerService
□ Implement Kafka consumer
□ Implement heartbeat mechanism
□ Test real-time distribution

Phase 5: Testing & Optimization (Week 8)
───────────────────────────────────────────────
□ Integration tests
□ Load testing (simulate 1M users)
□ Performance optimization
□ Cache tuning
□ Query optimization

Phase 6: Production Ready (Week 9-10)
───────────────────────────────────────────────
□ Security hardening
□ Monitoring setup (Prometheus, Grafana)
□ Logging (ELK stack)
□ API documentation (Swagger)
□ Deployment automation
□ Disaster recovery plan

Key Files to Start With

1. Post.java (Entity)
   - Define Post model with all fields
   - Cassandra annotations

2. PostRepository.java
   - CQL queries for CRUD operations
   - Pagination support

3. LocationService.java
   - H3 integration
   - Hex ID conversion logic

4. FeedAggregationService.java
   - Core feed logic
   - K-way merge implementation

5. PostIngestionService.java
   - Post creation flow
   - Kafka integration

6. WebSocketConfig.java
   - WebSocket setup
   - STOMP configuration

7. application.yml
   - All configuration centralized

This LLD document provides a comprehensive blueprint for implementing the Hexagon Feed System in Java. Start with Phase 1 and build incrementally, testing each component thoroughly before moving to the next phase.

Good luck with your implementation! 🚀