Skip to content

Commit 95179c7

Browse files
sharpninjaclaude
andcommitted
Port worker endpoints to CQRS command handlers
Phase D-1 step 2j. Every worker-facing minimal API endpoint now dispatches a McpServer.Cqrs command through IDispatcher; the endpoint layer is pure HTTP mapping with no business logic. New command handlers (all in Cqrs/Commands/): - RegisterWorkerCommand: accepts the authenticated client_id and the wire-format WorkerRegistrationRequest, upserts the worker row via SqliteWorkerRegistryStore, computes recommended task tokens via TaskSizingCalculator, and returns the full WorkerRegistrationResponse. - ClaimNextTaskCommand: atomically dequeues the next pending task from SqliteWorkQueueStore using a 2x task-duration lease and maps the domain WorkTaskRecord to the WorkTaskAssignment wire DTO. Returns Result.Success(null) on an empty queue. - SubmitHeartbeatCommand: validates the request, touches the worker's last-heartbeat, and returns a HeartbeatResponse. Uses Result.Failure("unregistered") as a sentinel so the endpoint layer can map it to 410 Gone. - SubmitGradientCommand: validates ownership (client_id vs submission.WorkerId), marks the task Done, logs the gradient metadata, returns a GradientAcceptance value object. Sentinel failure codes WorkerMismatchCode / TaskNotAssignedCode route to 403 / 409 respectively. Program.cs: - /register, /work, /heartbeat, /gradient endpoints all become thin async lambdas: read client_id claim, build command, dispatcher.SendAsync, map Result<T> to HTTP. - Total endpoint body shrinks from ~180 lines to ~85 lines and no longer directly references SqliteWorkerRegistryStore, SqliteWorkQueueStore, or TimeProvider — all DI injection moves into the handler constructors. Fast-lane regression 250/250 on net10 slice (5 WebApplicationFactory endpoint smoke tests still green because the CQRS refactor preserved the external HTTP shape byte-for-byte; handlers resolve cleanly via AddCqrsHandlers assembly scan). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 5fb6991 commit 95179c7

5 files changed

Lines changed: 393 additions & 121 deletions

File tree

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using BitNetSharp.Distributed.Contracts;
4+
using BitNetSharp.Distributed.Coordinator.Configuration;
5+
using BitNetSharp.Distributed.Coordinator.Persistence;
6+
using McpServer.Cqrs;
7+
using Microsoft.Extensions.Options;
8+
9+
namespace BitNetSharp.Distributed.Coordinator.Cqrs.Commands;
10+
11+
/// <summary>
12+
/// State-changing command dispatched by <c>GET /work</c>. Atomically
13+
/// claims the oldest pending task for the authenticated worker and
14+
/// returns the matching wire-format assignment, or a null payload
15+
/// wrapped in <see cref="Result{T}.Success"/> when the queue is
16+
/// empty. Treated as a command rather than a query because the
17+
/// dequeue transitions task state from Pending → Assigned.
18+
/// </summary>
19+
public sealed record ClaimNextTaskCommand(string ClientId) : ICommand<WorkTaskAssignment?>;
20+
21+
/// <summary>
22+
/// Handler that owns the claim transaction and the DTO mapping from
23+
/// <see cref="WorkTaskRecord"/> to
24+
/// <see cref="WorkTaskAssignment"/>.
25+
/// </summary>
26+
public sealed class ClaimNextTaskCommandHandler : ICommandHandler<ClaimNextTaskCommand, WorkTaskAssignment?>
27+
{
28+
private readonly SqliteWorkQueueStore _workQueue;
29+
private readonly IOptionsMonitor<CoordinatorOptions> _options;
30+
private readonly TimeProvider _time;
31+
32+
public ClaimNextTaskCommandHandler(
33+
SqliteWorkQueueStore workQueue,
34+
IOptionsMonitor<CoordinatorOptions> options,
35+
TimeProvider time)
36+
{
37+
_workQueue = workQueue;
38+
_options = options;
39+
_time = time;
40+
}
41+
42+
public Task<Result<WorkTaskAssignment?>> HandleAsync(
43+
ClaimNextTaskCommand command,
44+
CallContext context)
45+
{
46+
ArgumentNullException.ThrowIfNull(command);
47+
if (string.IsNullOrWhiteSpace(command.ClientId))
48+
{
49+
return Task.FromResult(Result<WorkTaskAssignment?>.Failure("Authenticated client_id is missing."));
50+
}
51+
52+
var opts = _options.CurrentValue;
53+
var leaseDuration = TimeSpan.FromSeconds(opts.TargetTaskDurationSeconds * 2);
54+
var claimed = _workQueue.TryClaimNextPending(command.ClientId, leaseDuration);
55+
if (claimed is null)
56+
{
57+
return Task.FromResult(Result<WorkTaskAssignment?>.Success(null));
58+
}
59+
60+
var baseUrl = opts.BaseUrl.TrimEnd('/');
61+
WorkTaskAssignment? assignment = new WorkTaskAssignment(
62+
TaskId: claimed.TaskId,
63+
WeightVersion: claimed.WeightVersion,
64+
WeightUrl: $"{baseUrl}/weights/{claimed.WeightVersion}",
65+
ShardId: claimed.ShardId,
66+
ShardOffset: claimed.ShardOffset,
67+
ShardLength: claimed.ShardLength,
68+
TokensPerTask: claimed.TokensPerTask,
69+
KLocalSteps: claimed.KLocalSteps,
70+
HyperparametersJson: claimed.HyperparametersJson,
71+
DeadlineUtc: claimed.DeadlineUtc ?? _time.GetUtcNow().Add(leaseDuration));
72+
73+
return Task.FromResult(Result<WorkTaskAssignment?>.Success(assignment));
74+
}
75+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using BitNetSharp.Distributed.Contracts;
4+
using BitNetSharp.Distributed.Coordinator.Configuration;
5+
using BitNetSharp.Distributed.Coordinator.Persistence;
6+
using McpServer.Cqrs;
7+
using Microsoft.Extensions.Logging;
8+
using Microsoft.Extensions.Options;
9+
10+
namespace BitNetSharp.Distributed.Coordinator.Cqrs.Commands;
11+
12+
/// <summary>
13+
/// Command dispatched by the <c>/register</c> endpoint when an
14+
/// authenticated worker sends its initial
15+
/// <see cref="WorkerRegistrationRequest"/>. Carries the bits the
16+
/// handler needs (already-authenticated client id, request payload)
17+
/// and returns the fields the worker expects in response.
18+
/// </summary>
19+
public sealed record RegisterWorkerCommand(
20+
string ClientId,
21+
WorkerRegistrationRequest Request) : ICommand<WorkerRegistrationResponse>;
22+
23+
/// <summary>
24+
/// Handler for <see cref="RegisterWorkerCommand"/>. Upserts the
25+
/// worker row, computes the recommended task size from the
26+
/// capability report, and composes the <see cref="WorkerRegistrationResponse"/>
27+
/// using values from <see cref="CoordinatorOptions"/>.
28+
/// </summary>
29+
public sealed class RegisterWorkerCommandHandler : ICommandHandler<RegisterWorkerCommand, WorkerRegistrationResponse>
30+
{
31+
private readonly SqliteWorkerRegistryStore _workerStore;
32+
private readonly IOptionsMonitor<CoordinatorOptions> _options;
33+
private readonly TimeProvider _time;
34+
private readonly ILogger<RegisterWorkerCommandHandler> _logger;
35+
36+
public RegisterWorkerCommandHandler(
37+
SqliteWorkerRegistryStore workerStore,
38+
IOptionsMonitor<CoordinatorOptions> options,
39+
TimeProvider time,
40+
ILogger<RegisterWorkerCommandHandler> logger)
41+
{
42+
_workerStore = workerStore;
43+
_options = options;
44+
_time = time;
45+
_logger = logger;
46+
}
47+
48+
public Task<Result<WorkerRegistrationResponse>> HandleAsync(
49+
RegisterWorkerCommand command,
50+
CallContext context)
51+
{
52+
ArgumentNullException.ThrowIfNull(command);
53+
if (command.Request is null)
54+
{
55+
return Task.FromResult(Result<WorkerRegistrationResponse>.Failure("Request body is missing."));
56+
}
57+
58+
if (string.IsNullOrWhiteSpace(command.ClientId))
59+
{
60+
return Task.FromResult(Result<WorkerRegistrationResponse>.Failure("Authenticated client_id is missing."));
61+
}
62+
63+
var opts = _options.CurrentValue;
64+
var recommendedTokens = TaskSizingCalculator.RecommendedTokensPerTask(
65+
command.Request.Capability.TokensPerSecond,
66+
TimeSpan.FromSeconds(opts.TargetTaskDurationSeconds),
67+
opts.FullStepEfficiency);
68+
69+
var now = _time.GetUtcNow();
70+
_workerStore.Upsert(new WorkerRecord(
71+
WorkerId: command.ClientId,
72+
Name: string.IsNullOrWhiteSpace(command.Request.WorkerName) ? command.ClientId : command.Request.WorkerName,
73+
CpuThreads: command.Request.Capability.CpuThreads,
74+
TokensPerSecond: command.Request.Capability.TokensPerSecond,
75+
RecommendedTokensPerTask: recommendedTokens,
76+
ProcessArchitecture: command.Request.ProcessArchitecture,
77+
OsDescription: command.Request.OsDescription,
78+
RegisteredAtUtc: now,
79+
LastHeartbeatUtc: now,
80+
State: WorkerState.Active));
81+
82+
_logger.LogInformation(
83+
"Registered worker {ClientId} ({Name}) at {RegisteredAt} with recommended task size {Tokens}.",
84+
command.ClientId,
85+
command.Request.WorkerName,
86+
now,
87+
recommendedTokens);
88+
89+
return Task.FromResult(Result<WorkerRegistrationResponse>.Success(
90+
new WorkerRegistrationResponse(
91+
WorkerId: command.ClientId,
92+
BearerToken: string.Empty,
93+
InitialWeightVersion: opts.InitialWeightVersion,
94+
RecommendedTokensPerTask: recommendedTokens,
95+
HeartbeatIntervalSeconds: opts.HeartbeatIntervalSeconds,
96+
ServerTime: now)));
97+
}
98+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using BitNetSharp.Distributed.Contracts;
4+
using BitNetSharp.Distributed.Coordinator.Persistence;
5+
using McpServer.Cqrs;
6+
using Microsoft.Extensions.Logging;
7+
8+
namespace BitNetSharp.Distributed.Coordinator.Cqrs.Commands;
9+
10+
/// <summary>
11+
/// Command dispatched by the <c>/gradient</c> endpoint when a
12+
/// worker reports a completed task. Validates worker ownership and
13+
/// transitions the task state from <c>Assigned</c> to <c>Done</c>.
14+
/// Gradient decoding + global weight apply are deferred to the
15+
/// Phase D-4 commit.
16+
/// </summary>
17+
public sealed record SubmitGradientCommand(
18+
string ClientId,
19+
GradientSubmission Submission) : ICommand<GradientAcceptance>;
20+
21+
/// <summary>
22+
/// Lightweight value object returned by the gradient handler on
23+
/// the happy path. The endpoint layer serializes this straight to
24+
/// JSON as the HTTP response body.
25+
/// </summary>
26+
public sealed record GradientAcceptance(
27+
string TaskId,
28+
string WorkerId,
29+
long TokensSeen,
30+
bool Accepted);
31+
32+
/// <summary>
33+
/// Handler for <see cref="SubmitGradientCommand"/>. Uses
34+
/// <c>Result.Failure</c> with sentinel codes so the endpoint layer
35+
/// can branch to the right HTTP status without re-validating.
36+
/// </summary>
37+
public sealed class SubmitGradientCommandHandler : ICommandHandler<SubmitGradientCommand, GradientAcceptance>
38+
{
39+
private readonly SqliteWorkQueueStore _workQueue;
40+
private readonly ILogger<SubmitGradientCommandHandler> _logger;
41+
42+
/// <summary>Returned when the submission's worker_id does not match the JWT.</summary>
43+
public const string WorkerMismatchCode = "worker_mismatch";
44+
45+
/// <summary>Returned when the task is not currently assigned to this worker.</summary>
46+
public const string TaskNotAssignedCode = "task_not_assigned";
47+
48+
public SubmitGradientCommandHandler(
49+
SqliteWorkQueueStore workQueue,
50+
ILogger<SubmitGradientCommandHandler> logger)
51+
{
52+
_workQueue = workQueue;
53+
_logger = logger;
54+
}
55+
56+
public Task<Result<GradientAcceptance>> HandleAsync(
57+
SubmitGradientCommand command,
58+
CallContext context)
59+
{
60+
ArgumentNullException.ThrowIfNull(command);
61+
if (command.Submission is null)
62+
{
63+
return Task.FromResult(Result<GradientAcceptance>.Failure("Gradient body is missing."));
64+
}
65+
66+
if (string.IsNullOrWhiteSpace(command.ClientId)
67+
|| command.ClientId != command.Submission.WorkerId)
68+
{
69+
return Task.FromResult(Result<GradientAcceptance>.Failure(WorkerMismatchCode));
70+
}
71+
72+
var completed = _workQueue.MarkCompleted(command.Submission.TaskId, command.ClientId);
73+
if (!completed)
74+
{
75+
return Task.FromResult(Result<GradientAcceptance>.Failure(TaskNotAssignedCode));
76+
}
77+
78+
_logger.LogInformation(
79+
"Accepted gradient for task {TaskId} from worker {ClientId}: format={Format}, bytes={Size}, tokens={Tokens}, loss={Loss}",
80+
command.Submission.TaskId,
81+
command.ClientId,
82+
command.Submission.GradientFormat,
83+
command.Submission.GradientPayload?.Length ?? 0,
84+
command.Submission.TokensSeen,
85+
command.Submission.LossAfter);
86+
87+
return Task.FromResult(Result<GradientAcceptance>.Success(
88+
new GradientAcceptance(
89+
TaskId: command.Submission.TaskId,
90+
WorkerId: command.ClientId,
91+
TokensSeen: command.Submission.TokensSeen,
92+
Accepted: true)));
93+
}
94+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using BitNetSharp.Distributed.Contracts;
4+
using BitNetSharp.Distributed.Coordinator.Persistence;
5+
using McpServer.Cqrs;
6+
7+
namespace BitNetSharp.Distributed.Coordinator.Cqrs.Commands;
8+
9+
/// <summary>
10+
/// Command dispatched by the <c>/heartbeat</c> endpoint. Touches
11+
/// the worker's last-heartbeat timestamp and returns the
12+
/// <see cref="HeartbeatResponse"/> the worker echoes back.
13+
/// </summary>
14+
public sealed record SubmitHeartbeatCommand(
15+
string ClientId,
16+
HeartbeatRequest Request) : ICommand<HeartbeatResponse>;
17+
18+
/// <summary>
19+
/// Outcome indicator used by the endpoint layer to decide whether
20+
/// the caller should be told to re-register (410 Gone) instead of
21+
/// receiving the normal 200 OK response.
22+
/// </summary>
23+
public sealed class SubmitHeartbeatCommandHandler : ICommandHandler<SubmitHeartbeatCommand, HeartbeatResponse>
24+
{
25+
private readonly SqliteWorkerRegistryStore _workerStore;
26+
private readonly TimeProvider _time;
27+
28+
/// <summary>
29+
/// Machine-readable failure code the endpoint layer looks for so
30+
/// it can respond with HTTP 410 Gone instead of 500. See
31+
/// <see cref="ICommandHandler{TCommand,TResult}"/> contract —
32+
/// the handler uses <c>Result.Failure(code)</c> as a lightweight
33+
/// channel for the "worker must re-register" outcome.
34+
/// </summary>
35+
public const string UnregisteredFailureCode = "unregistered";
36+
37+
public SubmitHeartbeatCommandHandler(
38+
SqliteWorkerRegistryStore workerStore,
39+
TimeProvider time)
40+
{
41+
_workerStore = workerStore;
42+
_time = time;
43+
}
44+
45+
public Task<Result<HeartbeatResponse>> HandleAsync(
46+
SubmitHeartbeatCommand command,
47+
CallContext context)
48+
{
49+
ArgumentNullException.ThrowIfNull(command);
50+
if (command.Request is null)
51+
{
52+
return Task.FromResult(Result<HeartbeatResponse>.Failure("Heartbeat body is missing."));
53+
}
54+
55+
if (string.IsNullOrWhiteSpace(command.ClientId))
56+
{
57+
return Task.FromResult(Result<HeartbeatResponse>.Failure("Authenticated client_id is missing."));
58+
}
59+
60+
var touched = _workerStore.TouchHeartbeat(command.ClientId);
61+
if (!touched)
62+
{
63+
return Task.FromResult(Result<HeartbeatResponse>.Failure(UnregisteredFailureCode));
64+
}
65+
66+
return Task.FromResult(Result<HeartbeatResponse>.Success(
67+
new HeartbeatResponse(
68+
ShouldDrain: false,
69+
RecommendedTokensPerTaskOverride: null,
70+
ServerTime: _time.GetUtcNow())));
71+
}
72+
}

0 commit comments

Comments
 (0)