diff --git a/docker-compose.api.yml b/docker-compose.api.yml index 010978c1..19383b8f 100644 --- a/docker-compose.api.yml +++ b/docker-compose.api.yml @@ -10,6 +10,12 @@ services: DOCKER_GIT_PROJECTS_ROOT_VOLUME: ${DOCKER_GIT_PROJECTS_ROOT_VOLUME:-docker-git-projects} DOCKER_GIT_FEDERATION_PUBLIC_ORIGIN: ${DOCKER_GIT_FEDERATION_PUBLIC_ORIGIN:-} DOCKER_GIT_FEDERATION_ACTOR: ${DOCKER_GIT_FEDERATION_ACTOR:-docker-git} + DOCKER_GIT_EXCHANGE_TARGETS: ${DOCKER_GIT_EXCHANGE_TARGETS:-} + DOCKER_GIT_EXCHANGE_PROJECT_REPO_URL: ${DOCKER_GIT_EXCHANGE_PROJECT_REPO_URL:-} + DOCKER_GIT_EXCHANGE_AGENT_PROVIDER: ${DOCKER_GIT_EXCHANGE_AGENT_PROVIDER:-codex} + DOCKER_GIT_EXCHANGE_AGENT_COMMAND: ${DOCKER_GIT_EXCHANGE_AGENT_COMMAND:-} + DOCKER_GIT_EXCHANGE_AGENT_TIMEOUT_MS: ${DOCKER_GIT_EXCHANGE_AGENT_TIMEOUT_MS:-3600000} + DOCKER_GIT_OUTBOX_POLLING_INTERVAL_MS: ${DOCKER_GIT_OUTBOX_POLLING_INTERVAL_MS:-5000} ports: - "${DOCKER_GIT_API_BIND_HOST:-127.0.0.1}:${DOCKER_GIT_API_PORT:-3334}:${DOCKER_GIT_API_PORT:-3334}" dns: diff --git a/docker-compose.yml b/docker-compose.yml index 5f4ed5db..49581a53 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,6 +13,12 @@ services: DOCKER_GIT_PROJECTS_ROOT_VOLUME: ${DOCKER_GIT_PROJECTS_ROOT_VOLUME:-docker-git-projects} DOCKER_GIT_FEDERATION_PUBLIC_ORIGIN: ${DOCKER_GIT_FEDERATION_PUBLIC_ORIGIN:-} DOCKER_GIT_FEDERATION_ACTOR: ${DOCKER_GIT_FEDERATION_ACTOR:-docker-git} + DOCKER_GIT_EXCHANGE_TARGETS: ${DOCKER_GIT_EXCHANGE_TARGETS:-} + DOCKER_GIT_EXCHANGE_PROJECT_REPO_URL: ${DOCKER_GIT_EXCHANGE_PROJECT_REPO_URL:-} + DOCKER_GIT_EXCHANGE_AGENT_PROVIDER: ${DOCKER_GIT_EXCHANGE_AGENT_PROVIDER:-codex} + DOCKER_GIT_EXCHANGE_AGENT_COMMAND: ${DOCKER_GIT_EXCHANGE_AGENT_COMMAND:-} + DOCKER_GIT_EXCHANGE_AGENT_TIMEOUT_MS: ${DOCKER_GIT_EXCHANGE_AGENT_TIMEOUT_MS:-3600000} + DOCKER_GIT_OUTBOX_POLLING_INTERVAL_MS: ${DOCKER_GIT_OUTBOX_POLLING_INTERVAL_MS:-5000} ports: - "${DOCKER_GIT_API_BIND_HOST:-127.0.0.1}:${DOCKER_GIT_API_PORT:-3334}:${DOCKER_GIT_API_PORT:-3334}" dns: diff --git a/packages/api/README.md b/packages/api/README.md index e1bcb2c5..c40f5092 100644 --- a/packages/api/README.md +++ b/packages/api/README.md @@ -45,6 +45,12 @@ Optional env: - `DOCKER_GIT_PROJECTS_ROOT_VOLUME` (Docker volume name for controller state, default: `docker-git-projects`) - `DOCKER_GIT_FEDERATION_PUBLIC_ORIGIN` (optional public ActivityPub origin) - `DOCKER_GIT_FEDERATION_ACTOR` (default: `docker-git`) +- `DOCKER_GIT_EXCHANGE_TARGETS` (optional comma-separated exchange targets, e.g. `https://exchange.lefine.pro` or `code@exchange.lefine.pro`) +- `DOCKER_GIT_EXCHANGE_PROJECT_REPO_URL` (fallback repo for exchange Tickets without a GitHub URL) +- `DOCKER_GIT_EXCHANGE_AGENT_PROVIDER` (default: `codex`; also supports `claude`, `opencode`, `custom`) +- `DOCKER_GIT_EXCHANGE_AGENT_COMMAND` (optional command template; `{{prompt}}` is replaced with the task prompt) +- `DOCKER_GIT_EXCHANGE_AGENT_TIMEOUT_MS` (default: `3600000`) +- `DOCKER_GIT_OUTBOX_POLLING_INTERVAL_MS` (default: `5000`) ## Endpoints @@ -56,6 +62,9 @@ Optional env: - `GET /federation/followers` - `GET /federation/following` - `GET /federation/liked` +- `POST /federation/exchange/subscriptions` (discover remote actor, persist metadata, send signed `Follow`) +- `GET /federation/exchange/subscriptions` +- `POST /federation/exchange/poll` (manual remote outbox poll) - `POST /federation/follows` (create ActivityPub `Follow` subscription) - `GET /federation/follows` - `GET /projects` @@ -77,6 +86,23 @@ Optional env: ## Subscription workflow (ActivityPub Follow + ForgeFed issues) +Exchange targets must be explicit. Use `https://exchange.lefine.pro`, an actor URL, or a handle like `code@exchange.lefine.pro`; the API resolves the code actor document, stores its `inbox/outbox/followers/publicKey`, sends `Follow`, and polls the stored `outbox`. + +```bash +./ctl request POST /federation/exchange/subscriptions '{ + "domain":"https://social.provercoder.ai", + "target":"https://exchange.lefine.pro", + "projectRepoUrl":"https://github.com/ProverCoderAI/docker-git", + "agentProvider":"codex" +}' + +./ctl request POST /federation/exchange/poll '{}' +./ctl request GET /federation/exchange/subscriptions +./ctl request GET /federation/issues +``` + +When a polled `Create(Ticket)` has no GitHub URL in the Ticket payload, `projectRepoUrl` or `DOCKER_GIT_EXCHANGE_PROJECT_REPO_URL` is required for the automatic docker-git project/agent run. + 1. Read actor profile (contains `inbox/outbox/followers/following/liked`): ```bash diff --git a/packages/api/src/api/contracts.ts b/packages/api/src/api/contracts.ts index fb18164d..f8338d57 100644 --- a/packages/api/src/api/contracts.ts +++ b/packages/api/src/api/contracts.ts @@ -399,21 +399,46 @@ export type ForgeFedTicket = { readonly summary: string readonly content: string readonly mediaType?: string | undefined - readonly source?: string | undefined + readonly source?: string | ForgeFedTicketSource | undefined readonly published?: string | undefined readonly updated?: string | undefined readonly url?: string | undefined + readonly context?: string | undefined + readonly workType?: string | undefined + readonly attachment?: ReadonlyArray | undefined + readonly raw?: unknown | undefined } -export type FederationIssueStatus = "offered" | "accepted" | "rejected" +export type ForgeFedTicketSource = { + readonly content?: string | undefined + readonly mediaType?: string | undefined +} + +export type FederationIssueStatus = + | "offered" + | "accepted" + | "rejected" + | "queued" + | "running" + | "completed" + | "failed" export type FederationIssueRecord = { readonly issueId: string readonly offerId?: string | undefined + readonly activityId?: string | undefined + readonly actor?: string | undefined readonly tracker?: string | undefined readonly status: FederationIssueStatus readonly receivedAt: string + readonly updatedAt?: string | undefined readonly ticket: ForgeFedTicket + readonly projectId?: string | undefined + readonly agentId?: string | undefined + readonly remoteInbox?: string | undefined + readonly remoteOutbox?: string | undefined + readonly result?: string | undefined + readonly error?: string | undefined } export type CreateFollowRequest = { @@ -437,6 +462,12 @@ export type ActivityPubFollowActivity = { readonly capability?: string | undefined } +export type ActivityPubPublicKey = { + readonly id: string + readonly owner: string + readonly publicKeyPem: string +} + export type ActivityPubPerson = { readonly "@context": "https://www.w3.org/ns/activitystreams" readonly type: "Person" @@ -449,10 +480,14 @@ export type ActivityPubPerson = { readonly followers: string readonly following: string readonly liked: string + readonly publicKey?: ActivityPubPublicKey | undefined + readonly endpoints?: { + readonly sharedInbox?: string | undefined + } | undefined } export type ActivityPubOrderedCollection = { - readonly "@context": "https://www.w3.org/ns/activitystreams" + readonly "@context": "https://www.w3.org/ns/activitystreams" | ReadonlyArray readonly type: "OrderedCollection" readonly id: string readonly totalItems: number @@ -465,6 +500,18 @@ export type FollowSubscription = { readonly actor: string readonly object: string readonly inbox?: string | undefined + readonly remoteActor?: string | undefined + readonly remoteInbox?: string | undefined + readonly remoteOutbox?: string | undefined + readonly remoteFollowers?: string | undefined + readonly remoteSharedInbox?: string | undefined + readonly remotePublicKeyId?: string | undefined + readonly remotePublicKeyPem?: string | undefined + readonly subscriptionName?: string | undefined + readonly queue?: string | undefined + readonly projectRepoUrl?: string | undefined + readonly agentProvider?: AgentProvider | undefined + readonly agentCommand?: string | undefined readonly to: ReadonlyArray readonly capability?: string | undefined status: FollowStatus @@ -487,6 +534,10 @@ export type FederationInboxResult = readonly kind: "issue.ticket" readonly issue: FederationIssueRecord } + | { + readonly kind: "issue.create" + readonly issue: FederationIssueRecord + } | { readonly kind: "follow.accept" readonly subscription: FollowSubscription @@ -496,6 +547,30 @@ export type FederationInboxResult = readonly subscription: FollowSubscription } +export type ExchangeSubscribeRequest = { + readonly target: string + readonly domain?: string | undefined + readonly actor?: string | undefined + readonly inbox?: string | undefined + readonly projectRepoUrl?: string | undefined + readonly agentProvider?: AgentProvider | undefined + readonly agentCommand?: string | undefined +} + +export type ExchangePollRequest = { + readonly target?: string | undefined + readonly runTasks?: boolean | undefined +} + +export type ExchangePollResult = { + readonly polledAt: string + readonly subscriptions: number + readonly totalItems: number + readonly newItems: number + readonly processedItems: number + readonly failedItems: number +} + export type ApiEventType = | "snapshot" | "project.created" diff --git a/packages/api/src/api/schema.ts b/packages/api/src/api/schema.ts index 8e23efb8..b5b823f8 100644 --- a/packages/api/src/api/schema.ts +++ b/packages/api/src/api/schema.ts @@ -222,6 +222,21 @@ export const CreateFollowRequestSchema = Schema.Struct({ capability: OptionalString }) +export const ExchangeSubscribeRequestSchema = Schema.Struct({ + target: Schema.String, + domain: OptionalString, + actor: OptionalString, + inbox: OptionalString, + projectRepoUrl: OptionalString, + agentProvider: Schema.optional(AgentProviderSchema), + agentCommand: OptionalString +}) + +export const ExchangePollRequestSchema = Schema.Struct({ + target: OptionalString, + runTasks: OptionalBoolean +}) + export const AgentSessionSchema = Schema.Struct({ id: Schema.String, projectId: Schema.String, diff --git a/packages/api/src/http.ts b/packages/api/src/http.ts index 0a051cf4..17ba9281 100644 --- a/packages/api/src/http.ts +++ b/packages/api/src/http.ts @@ -21,6 +21,8 @@ import { CreateAgentRequestSchema, CreateFollowRequestSchema, CreateProjectRequestSchema, + ExchangePollRequestSchema, + ExchangeSubscribeRequestSchema, GithubAuthLoginRequestSchema, GithubAuthLogoutRequestSchema, ProjectDatabaseProfileRequestSchema, @@ -52,7 +54,9 @@ import { readContainerTaskLogs, readContainerTaskSnapshot, stopContainerTask } f import { latestProjectCursor, listProjectEventsSince } from "./services/events.js" import { createFollowSubscription, + ensureExchangeSubscription, ingestFederationInbox, + listExchangeSubscriptions, listFederationIssues, listFollowSubscriptions, makeFederationActorDocument, @@ -60,7 +64,8 @@ import { makeFederationFollowersCollection, makeFederationFollowingCollection, makeFederationLikedCollection, - makeFederationOutboxCollection + makeFederationOutboxCollection, + pollExchangeOutboxes } from "./services/federation.js" import { applyAllProjects, @@ -201,6 +206,9 @@ const textResponse = (data: string, contentType: string, status = 200) => ) ) +const activityJsonResponse = (data: unknown, status: number) => + textResponse(JSON.stringify(data), "application/activity+json; charset=utf-8", status) + const parseQueryInt = (url: string, key: string, fallback: number): number => { const parsed = Number(new URL(url, "http://localhost").searchParams.get(key) ?? "") if (!Number.isFinite(parsed) || parsed <= 0) { @@ -326,6 +334,12 @@ const readStateInitRequest = () => HttpServerRequest.schemaBodyJson(StateInitReq const readStateCommitRequest = () => HttpServerRequest.schemaBodyJson(StateCommitRequestSchema) const readStateSyncRequest = () => HttpServerRequest.schemaBodyJson(StateSyncRequestSchema) const readApplyAllRequest = () => HttpServerRequest.schemaBodyJson(ApplyAllRequestSchema) +const readExchangeSubscribeRequest = () => HttpServerRequest.schemaBodyJson(ExchangeSubscribeRequestSchema) +const emptyExchangePollRequest = {} +const readExchangePollRequest = () => + HttpServerRequest.schemaBodyJson(ExchangePollRequestSchema).pipe( + Effect.catchAll(() => Effect.succeed(emptyExchangePollRequest)) + ) const emptyUpProjectRequest: UpProjectRequestInput = {} const readUpProjectRequest = () => HttpServerRequest.schemaBodyJson(UpProjectRequestSchema).pipe( @@ -577,7 +591,7 @@ export const makeRouter = () => { Effect.gen(function*(_) { const request = yield* _(HttpServerRequest.HttpServerRequest) const context = yield* _(resolveFederationContext(request)) - return yield* _(jsonResponse(makeFederationActorDocument(context), 200)) + return yield* _(activityJsonResponse(makeFederationActorDocument(context), 200)) }).pipe(Effect.catchAll(errorResponse)) ), HttpRouter.get( @@ -585,7 +599,7 @@ export const makeRouter = () => { Effect.gen(function*(_) { const request = yield* _(HttpServerRequest.HttpServerRequest) const context = yield* _(resolveFederationContext(request)) - return yield* _(jsonResponse(makeFederationOutboxCollection(context), 200)) + return yield* _(activityJsonResponse(makeFederationOutboxCollection(context), 200)) }).pipe(Effect.catchAll(errorResponse)) ), HttpRouter.get( @@ -593,7 +607,7 @@ export const makeRouter = () => { Effect.gen(function*(_) { const request = yield* _(HttpServerRequest.HttpServerRequest) const context = yield* _(resolveFederationContext(request)) - return yield* _(jsonResponse(makeFederationFollowersCollection(context), 200)) + return yield* _(activityJsonResponse(makeFederationFollowersCollection(context), 200)) }).pipe(Effect.catchAll(errorResponse)) ), HttpRouter.get( @@ -601,7 +615,7 @@ export const makeRouter = () => { Effect.gen(function*(_) { const request = yield* _(HttpServerRequest.HttpServerRequest) const context = yield* _(resolveFederationContext(request)) - return yield* _(jsonResponse(makeFederationFollowingCollection(context), 200)) + return yield* _(activityJsonResponse(makeFederationFollowingCollection(context), 200)) }).pipe(Effect.catchAll(errorResponse)) ), HttpRouter.get( @@ -609,7 +623,34 @@ export const makeRouter = () => { Effect.gen(function*(_) { const request = yield* _(HttpServerRequest.HttpServerRequest) const context = yield* _(resolveFederationContext(request)) - return yield* _(jsonResponse(makeFederationLikedCollection(context), 200)) + return yield* _(activityJsonResponse(makeFederationLikedCollection(context), 200)) + }).pipe(Effect.catchAll(errorResponse)) + ), + HttpRouter.post( + "/federation/exchange/subscriptions", + Effect.gen(function*(_) { + const requestBody = yield* _(readExchangeSubscribeRequest()) + const request = yield* _(HttpServerRequest.HttpServerRequest) + const context = yield* _(resolveFederationContext(request, requestBody.domain)) + const created = yield* _(ensureExchangeSubscription(requestBody, context)) + return yield* _(jsonResponse(created, 201)) + }).pipe(Effect.catchAll(errorResponse)) + ), + HttpRouter.get( + "/federation/exchange/subscriptions", + Effect.sync(() => ({ subscriptions: listExchangeSubscriptions() })).pipe( + Effect.flatMap((payload) => jsonResponse(payload, 200)), + Effect.catchAll(errorResponse) + ) + ), + HttpRouter.post( + "/federation/exchange/poll", + Effect.gen(function*(_) { + const requestBody = yield* _(readExchangePollRequest()) + const request = yield* _(HttpServerRequest.HttpServerRequest) + const context = yield* _(resolveFederationContext(request)) + const result = yield* _(pollExchangeOutboxes(requestBody, context)) + return yield* _(jsonResponse({ result }, 200)) }).pipe(Effect.catchAll(errorResponse)) ), HttpRouter.post( diff --git a/packages/api/src/program.ts b/packages/api/src/program.ts index 6104e990..6eeaf8d3 100644 --- a/packages/api/src/program.ts +++ b/packages/api/src/program.ts @@ -6,7 +6,7 @@ import { createServer } from "node:http" import { makeRouter } from "./http.js" import { initializeAgentState } from "./services/agents.js" import { attachAuthTerminalWebSocketServer } from "./services/auth-terminal-sessions.js" -import { startOutboxPolling } from "./services/federation.js" +import { initializeFederationState, startOutboxPolling } from "./services/federation.js" import { attachProjectBrowserWebSocketServer } from "./services/project-browser.js" import { attachProjectDatabaseWebSocketServer } from "./services/project-databases.js" import { attachTerminalWebSocketServer } from "./services/terminal-sessions.js" @@ -65,6 +65,7 @@ export const program = (() => { return Effect.scoped( Console.log(`docker-git api boot port=${port}`).pipe( Effect.zipRight(initializeAgentState()), + Effect.zipRight(initializeFederationState()), Effect.zipRight( Console.log(`docker-git outbox polling interval=${pollingInterval}ms`) ), diff --git a/packages/api/src/services/federation.ts b/packages/api/src/services/federation.ts index 8984cff3..202742ad 100644 --- a/packages/api/src/services/federation.ts +++ b/packages/api/src/services/federation.ts @@ -1,22 +1,78 @@ +import { defaultProjectsRoot } from "@effect-template/lib/usecases/path-helpers" +import { NodeContext } from "@effect/platform-node" import { Duration, Effect } from "effect" -import { randomUUID } from "node:crypto" +import { + createHash, + generateKeyPairSync, + randomUUID, + sign as signWithPrivateKey +} from "node:crypto" +import { promises as fs } from "node:fs" +import { dirname, join } from "node:path" import type { ActivityPubFollowActivity, ActivityPubOrderedCollection, ActivityPubPerson, + ActivityPubPublicKey, + AgentProvider, + AgentSession, CreateFollowRequest, + CreateProjectAccepted, + CreateProjectRequest, + ExchangePollRequest, + ExchangeSubscribeRequest, FederationInboxResult, FederationIssueRecord, FollowStatus, FollowSubscription, FollowSubscriptionCreated, - ForgeFedTicket + ForgeFedTicket, + ForgeFedTicketSource, + ProjectDetails } from "../api/contracts.js" import { ApiBadRequestError, ApiConflictError, ApiNotFoundError } from "../api/errors.js" +import { getAgent, readAgentLogs, startAgent } from "./agents.js" +import { createProjectFromRequest } from "./projects.js" type JsonRecord = { readonly [key: string]: unknown } +type LocalActorKeys = { + readonly publicKeyPem: string + readonly privateKeyPem: string +} + +type StoredFederationState = { + readonly version: 1 + readonly issues: ReadonlyArray + readonly follows: ReadonlyArray + readonly processedOutboxItems: ReadonlyArray + readonly localActorKeys?: LocalActorKeys | undefined +} + +type RemoteActorDocument = { + readonly id: string + readonly inbox?: string | undefined + readonly outbox: string + readonly followers?: string | undefined + readonly sharedInbox?: string | undefined + readonly publicKeyId?: string | undefined + readonly publicKeyPem?: string | undefined +} + +type ExchangeTarget = { + readonly name: string + readonly remoteActor: string + readonly candidateActors: ReadonlyArray + readonly queue: string +} + +type IngestOptions = { + readonly scheduleTask?: boolean | undefined + readonly context?: FederationContext | undefined + readonly subscription?: FollowSubscription | undefined +} + export type FederationContextInput = { readonly publicOrigin: string readonly actorUsername?: string | undefined @@ -32,15 +88,23 @@ export type FederationContext = { readonly following: string readonly liked: string readonly followsActivityPrefix: string + readonly exchangeActivityPrefix: string } const defaultActorUsername = "docker-git" +const activityJsonContentType = "application/activity+json" +const jsonLdContentType = "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"" +const activityAcceptHeader = `${jsonLdContentType}, ${activityJsonContentType}, application/json` +const defaultExchangeQueue = "code" +const stateVersion = 1 as const const issueStore: Map = new Map() const followStore: Map = new Map() const followByActivityId: Map = new Map() const followByActorObject: Map = new Map() const processedOutboxItems: Set = new Set() +let localActorKeys: LocalActorKeys | null = null +let stateLoaded = false const nowIso = (): string => new Date().toISOString() @@ -104,42 +168,8 @@ const readObjectRecord = ( ) } -const parseTicket = ( - payload: JsonRecord -): Effect.Effect => - Effect.gen(function*(_) { - if (!hasType(payload, "Ticket")) { - return yield* _( - Effect.fail( - new ApiBadRequestError({ - message: "ForgeFed ticket payload must include type=\"Ticket\"." - }) - ) - ) - } - - const attributedTo = yield* _(readRequiredString(payload, "attributedTo", "ForgeFed ticket")) - const summary = yield* _(readRequiredString(payload, "summary", "ForgeFed ticket")) - const content = yield* _(readRequiredString(payload, "content", "ForgeFed ticket")) - const id = readOptionalString(payload, "id") ?? `urn:docker-git:forgefed:ticket:${randomUUID()}` - - return { - id, - attributedTo, - summary, - content, - mediaType: readOptionalString(payload, "mediaType"), - source: readOptionalString(payload, "source"), - published: readOptionalString(payload, "published"), - updated: readOptionalString(payload, "updated"), - url: readOptionalString(payload, "url") - } - }) - -const upsertIssue = (issue: FederationIssueRecord): FederationIssueRecord => { - issueStore.set(issue.issueId, issue) - return issue -} +const stateFilePath = (): string => + join(defaultProjectsRoot(process.cwd()), ".orch", "state", "federation.json") const followKey = (actor: string, object: string): string => `${actor}\u0000${object}` @@ -153,6 +183,19 @@ const cleanToRecipients = ( const looksLikeAbsoluteUrl = (value: string): boolean => /^[a-zA-Z][a-zA-Z\d+.-]*:\/\//.test(value) +const uniqueStrings = (values: ReadonlyArray): ReadonlyArray => { + const seen: Set = new Set() + return values.filter((value) => { + if (seen.has(value)) { + return false + } + seen.add(value) + return true + }) +} + +const shellEscape = (value: string): string => `'${value.replaceAll("'", "'\\''")}'` + const normalizeOrigin = ( raw: string ): Effect.Effect => @@ -254,6 +297,126 @@ const normalizeHttpUrl = ( ) }) +const serializeState = (): StoredFederationState => ({ + version: stateVersion, + issues: [...issueStore.values()], + follows: [...followStore.values()], + processedOutboxItems: [...processedOutboxItems], + ...(localActorKeys === null ? {} : { localActorKeys }) +}) + +const persistFederationState = (): Effect.Effect => { + const filePath = stateFilePath() + const serialized = `${JSON.stringify(serializeState(), null, 2)}\n` + return Effect.tryPromise({ + try: () => fs.mkdir(dirname(filePath), { recursive: true }), + catch: (cause) => cause + }).pipe( + Effect.flatMap(() => + Effect.tryPromise({ + try: () => fs.writeFile(filePath, serialized, "utf8"), + catch: (cause) => cause + }) + ), + Effect.asVoid + ) +} + +const persistFederationStateBestEffort = (): void => { + Effect.runFork(persistFederationState().pipe(Effect.ignore)) +} + +const indexFollow = (subscription: FollowSubscription): void => { + followStore.set(subscription.id, subscription) + followByActivityId.set(subscription.activityId, subscription.id) + followByActorObject.set(followKey(subscription.actor, subscription.object), subscription.id) +} + +const upsertIssue = (issue: FederationIssueRecord): FederationIssueRecord => { + issueStore.set(issue.issueId, issue) + persistFederationStateBestEffort() + return issue +} + +const updateIssue = ( + issue: FederationIssueRecord, + patch: Partial +): FederationIssueRecord => + upsertIssue({ + ...issue, + ...patch, + updatedAt: nowIso() + }) + +const ensureLocalActorKeys = (): LocalActorKeys => { + if (localActorKeys !== null) { + return localActorKeys + } + const generated = generateKeyPairSync("rsa", { + modulusLength: 2048, + publicKeyEncoding: { + type: "pkcs1", + format: "pem" + }, + privateKeyEncoding: { + type: "pkcs1", + format: "pem" + } + }) + localActorKeys = { + publicKeyPem: generated.publicKey, + privateKeyPem: generated.privateKey + } + persistFederationStateBestEffort() + return localActorKeys +} + +const hydrateState = (state: StoredFederationState): void => { + issueStore.clear() + followStore.clear() + followByActivityId.clear() + followByActorObject.clear() + processedOutboxItems.clear() + + for (const issue of state.issues ?? []) { + issueStore.set(issue.issueId, issue) + } + for (const follow of state.follows ?? []) { + indexFollow(follow) + } + for (const item of state.processedOutboxItems ?? []) { + processedOutboxItems.add(item) + } + localActorKeys = state.localActorKeys ?? null +} + +export const initializeFederationState = () => + Effect.tryPromise({ + try: () => fs.readFile(stateFilePath(), "utf8"), + catch: () => new Error("Federation state not found or invalid.") + }).pipe( + Effect.flatMap((raw) => + Effect.try({ + try: () => { + const parsed = JSON.parse(raw) as StoredFederationState + hydrateState(parsed) + stateLoaded = true + }, + catch: () => new Error("Federation state not found or invalid.") + }) + ), + Effect.catchAll(() => + Effect.sync(() => { + stateLoaded = true + ensureLocalActorKeys() + }) + ), + Effect.asVoid + ) + +const ensureStateLoaded = () => + stateLoaded ? Effect.void : initializeFederationState() + export const makeFederationContext = ( input: FederationContextInput ): Effect.Effect => @@ -270,10 +433,26 @@ export const makeFederationContext = ( followers: `${publicOrigin}/federation/followers`, following: `${publicOrigin}/federation/following`, liked: `${publicOrigin}/federation/liked`, - followsActivityPrefix: `${publicOrigin}/federation/activities/follows` + followsActivityPrefix: `${publicOrigin}/federation/activities/follows`, + exchangeActivityPrefix: `${publicOrigin}/federation/activities/exchange` } }) +const defaultFederationContext = () => + makeFederationContext({ + publicOrigin: + process.env["DOCKER_GIT_FEDERATION_PUBLIC_ORIGIN"] ?? + process.env["DOCKER_GIT_API_PUBLIC_URL"] ?? + "http://localhost:3334", + actorUsername: process.env["DOCKER_GIT_FEDERATION_ACTOR"] ?? defaultActorUsername + }) + +const publicKeyForContext = (context: FederationContext): ActivityPubPublicKey => ({ + id: `${context.actorId}#main-key`, + owner: context.actorId, + publicKeyPem: ensureLocalActorKeys().publicKeyPem +}) + export const makeFederationActorDocument = ( context: FederationContext ): ActivityPubPerson => ({ @@ -287,7 +466,11 @@ export const makeFederationActorDocument = ( outbox: context.outbox, followers: context.followers, following: context.following, - liked: context.liked + liked: context.liked, + publicKey: publicKeyForContext(context), + endpoints: { + sharedInbox: context.inbox + } }) export const makeFederationOutboxCollection = ( @@ -339,6 +522,61 @@ export const makeFederationLikedCollection = ( orderedItems: [] }) +const readTicketSource = (payload: JsonRecord): string | ForgeFedTicketSource | undefined => { + const raw = payload["source"] + if (typeof raw === "string" && raw.trim().length > 0) { + return raw.trim() + } + const record = asRecord(raw) + if (record === null) { + return undefined + } + const content = readOptionalString(record, "content") + const mediaType = readOptionalString(record, "mediaType") + return content === undefined && mediaType === undefined ? undefined : { content, mediaType } +} + +const readTicketAttachment = (payload: JsonRecord): ReadonlyArray | undefined => { + const raw = payload["attachment"] + return Array.isArray(raw) ? raw : undefined +} + +const parseTicket = ( + payload: JsonRecord +): Effect.Effect => + Effect.gen(function*(_) { + if (!hasType(payload, "Ticket")) { + return yield* _( + Effect.fail( + new ApiBadRequestError({ + message: "ForgeFed ticket payload must include type=\"Ticket\"." + }) + ) + ) + } + + const attributedTo = yield* _(readRequiredString(payload, "attributedTo", "ForgeFed ticket")) + const summary = yield* _(readRequiredString(payload, "summary", "ForgeFed ticket")) + const content = yield* _(readRequiredString(payload, "content", "ForgeFed ticket")) + const id = readOptionalString(payload, "id") ?? `urn:docker-git:forgefed:ticket:${randomUUID()}` + + return { + id, + attributedTo, + summary, + content, + mediaType: readOptionalString(payload, "mediaType"), + source: readTicketSource(payload), + published: readOptionalString(payload, "published"), + updated: readOptionalString(payload, "updated"), + url: readOptionalString(payload, "url"), + context: readOptionalString(payload, "context"), + workType: readOptionalString(payload, "workType"), + attachment: readTicketAttachment(payload), + raw: payload + } + }) + const lookupFollowByReference = ( reference: string ): Effect.Effect => { @@ -371,9 +609,8 @@ const updateFollowStatus = ( status, updatedAt: nowIso() } - followStore.set(updated.id, updated) - followByActivityId.set(updated.activityId, updated.id) - followByActorObject.set(followKey(updated.actor, updated.object), updated.id) + indexFollow(updated) + persistFederationStateBestEffort() return updated } @@ -444,125 +681,17 @@ const ingestOfferTicket = ( } const ticket = yield* _(parseTicket(objectPayload)) - const issueId = ticket.id - const issue = upsertIssue({ - issueId, + return upsertIssue({ + issueId: ticket.id, offerId: readOptionalString(payload, "id"), + activityId: readOptionalString(payload, "id"), + actor: readOptionalString(payload, "actor"), tracker: readOptionalString(payload, "target"), status: "offered", receivedAt: nowIso(), + updatedAt: nowIso(), ticket }) - - // Пересылаем задачу в OrderQue через JSON-LD - const orderQueUrl = process.env["DOCKER_GIT_ORDERQUE_URL"] ?? "http://localhost:7277" - const apiPublicUrl = process.env["DOCKER_GIT_API_PUBLIC_URL"] ?? "http://localhost:3334" - - yield* _(Effect.tryPromise({ - try: async () => { - console.log("[ActivityPub] Пересылка Offer(Ticket) в OrderQue:", orderQueUrl) - - // JSON-LD формат для ActivityPub Offer с Ticket - const orderQuePayload = { - "@context": [ - "https://www.w3.org/ns/activitystreams", - "https://forgefed.org/ns" - ], - type: "Offer", - id: issueId, - actor: `${apiPublicUrl}/federation/actor`, - object: { - "@context": [ - "https://www.w3.org/ns/activitystreams", - "https://forgefed.org/ns" - ], - type: "Ticket", - id: issueId, - attributedTo: `${apiPublicUrl}/federation/actor`, - summary: ticket.summary, - content: ticket.content, - workType: "standard" - } - } - - console.log("[ActivityPub] JSON-LD:", JSON.stringify(orderQuePayload, null, 2)) - - const response = await fetch(`${orderQueUrl}/federation/inbox`, { - method: "POST", - headers: { - "Content-Type": "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"" - }, - body: JSON.stringify(orderQuePayload) - }) - - const text = await response.text() - let json: unknown - try { - json = text ? JSON.parse(text) : { raw: text } - } catch { - json = { raw: text } - } - - console.log("[ActivityPub] OrderQue ответ:", response.status, json) - - if (!response.ok) { - console.warn("[ActivityPub] OrderQue вернул ошибку:", response.status, json) - return - } - - const orderId = (json as Record)["id"] as number | undefined - if (orderId) { - console.log("[ActivityPub] Отправка результата для заказа #", orderId) - - // JSON-LD для результата - const resultPayload = { - "@context": [ - "https://www.w3.org/ns/activitystreams", - "https://forgefed.org/ns" - ], - type: "Update", - id: `${apiPublicUrl}/orders/${orderId}/result`, - actor: `${apiPublicUrl}/federation/actor`, - object: { - type: "Order", - id: `${orderQueUrl}/orders/${orderId}`, - status: "completed", - chatResponse: "complit task", - agentId: 1 - } - } - - console.log("[ActivityPub] Результат JSON-LD:", JSON.stringify(resultPayload, null, 2)) - - const resultResponse = await fetch(`${orderQueUrl}/orders/${orderId}`, { - method: "PUT", - headers: { - "Content-Type": "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"" - }, - body: JSON.stringify({ - status: "completed", - chatResponse: "complit task", - agentId: 1 - }) - }) - - const resultText = await resultResponse.text() - let resultJson: unknown - try { - resultJson = resultText ? JSON.parse(resultText) : { raw: resultText } - } catch { - resultJson = { raw: resultText } - } - - console.log("[ActivityPub] Результат отправлен:", resultResponse.status, resultJson) - } - }, - catch: (error) => { - console.warn("[ActivityPub] Ошибка пересылки в OrderQue:", error) - } - })).pipe(Effect.ignore) - - return issue }) const ingestDirectTicket = ( @@ -573,21 +702,57 @@ const ingestDirectTicket = ( issueId: ticket.id, status: "accepted", receivedAt: nowIso(), + updatedAt: nowIso(), ticket })) +const ingestCreateTicket = ( + payload: JsonRecord, + options: IngestOptions +): Effect.Effect => + Effect.gen(function*(_) { + const objectPayload = yield* _(readObjectRecord(payload, "object", "ActivityPub Create")) + if (!hasType(objectPayload, "Ticket")) { + return yield* _( + Effect.fail( + new ApiBadRequestError({ + message: "ActivityPub Create currently supports object.type=\"Ticket\" only." + }) + ) + ) + } + + const ticket = yield* _(parseTicket(objectPayload)) + const subscription = options.subscription + const issue = upsertIssue({ + issueId: ticket.id, + activityId: readOptionalString(payload, "id"), + actor: readOptionalString(payload, "actor"), + tracker: readOptionalString(objectPayload, "context"), + status: "accepted", + receivedAt: nowIso(), + updatedAt: nowIso(), + ticket, + remoteInbox: subscription?.remoteInbox ?? subscription?.remoteSharedInbox ?? subscription?.inbox, + remoteOutbox: subscription?.remoteOutbox + }) + + return issue + }) + // CHANGE: support ForgeFed issue inputs and ActivityPub inbox transitions in API mode. -// WHY: Konrad requested ForgeFed Issue intake and Follow workflow support in PR discussion. -// QUOTE(ТЗ): "А сможешь на вход поддержать ... #issues" + "добавить поддержку follow" -// REF: pr-88-konrad-request -// SOURCE: n/a -// FORMAT THEOREM: ∀m: validInbox(m) → handled(m) ∈ {issue.offer, issue.ticket, follow.accept, follow.reject} +// WHY: issue #233 requires ForgeFed/ActivityPub subscription and task intake. +// QUOTE(ТЗ): "Осталось forgefed допподержать" + "Законнектишь к exchange" +// REF: issue-233 +// SOURCE: https://github.com/ProverCoderAI/docker-git/issues/233 +// FORMAT THEOREM: ∀m: validInbox(m) → handled(m) ∈ {issue.offer, issue.ticket, issue.create, follow.accept, follow.reject} // PURITY: SHELL // EFFECT: Effect // INVARIANT: state transitions are deterministic for identical references // COMPLEXITY: O(1) export const ingestFederationInbox = ( - payload: unknown + payload: unknown, + options: IngestOptions = {} ): Effect.Effect => Effect.gen(function*(_) { const record = asRecord(payload) @@ -606,6 +771,11 @@ export const ingestFederationInbox = ( return { kind: "issue.offer", issue } } + if (hasType(record, "Create")) { + const issue = yield* _(ingestCreateTicket(record, options)) + return { kind: "issue.create", issue } + } + if (hasType(record, "Ticket")) { const issue = yield* _(ingestDirectTicket(record)) return { kind: "issue.ticket", issue } @@ -623,17 +793,79 @@ export const ingestFederationInbox = ( return yield* _( Effect.fail( new ApiBadRequestError({ - message: "Unsupported inbox payload type. Expected Offer(Ticket), Ticket, Accept, or Reject." + message: "Unsupported inbox payload type. Expected Offer(Ticket), Create(Ticket), Ticket, Accept, or Reject." }) ) ) }) +const signRequestHeaders = ( + context: FederationContext, + method: string, + endpoint: string, + body: string +): Record => { + const parsed = new URL(endpoint) + const date = new Date().toUTCString() + const digest = `SHA-256=${createHash("sha256").update(body).digest("base64")}` + const target = `${parsed.pathname}${parsed.search}` + const signingString = [ + `(request-target): ${method.toLowerCase()} ${target.length === 0 ? "/" : target}`, + `host: ${parsed.host}`, + `date: ${date}`, + `digest: ${digest}` + ].join("\n") + const signature = signWithPrivateKey( + "RSA-SHA256", + Buffer.from(signingString), + ensureLocalActorKeys().privateKeyPem + ).toString("base64") + + return { + accept: activityAcceptHeader, + "content-type": activityJsonContentType, + date, + digest, + signature: [ + `keyId="${context.actorId}#main-key"`, + `algorithm="rsa-sha256"`, + `headers="(request-target) host date digest"`, + `signature="${signature}"` + ].join(",") + } +} + +const sendJsonLd = ( + context: FederationContext, + endpoint: string, + payload: unknown +): Effect.Effect => { + const body = JSON.stringify(payload) + return Effect.tryPromise({ + try: () => + fetch(endpoint, { + method: "POST", + headers: signRequestHeaders(context, "POST", endpoint, body), + body + }), + catch: (cause) => + new ApiBadRequestError({ + message: cause instanceof Error ? cause.message : String(cause) + }) + }).pipe( + Effect.flatMap((response) => + response.ok + ? Effect.void + : Effect.fail(new ApiBadRequestError({ message: `HTTP ${response.status} POST ${endpoint}` })) + ) + ) +} + // CHANGE: build outgoing ActivityPub Follow subscriptions for task feeds. -// WHY: requested to subscribe to issue/task distribution via ActivityPub Follow. -// QUOTE(ТЗ): "добавить поддержку follow, чтобы можно было подписатся на отдачу задач" -// REF: pr-88-konrad-request -// SOURCE: n/a +// WHY: issue #233 requires subscribing to exchange queues before waiting for tasks. +// QUOTE(ТЗ): "для этого просто надо на него подписатся" +// REF: issue-233 +// SOURCE: https://github.com/ProverCoderAI/docker-git/issues/233 // FORMAT THEOREM: ∀r: valid(r) → ∃s: s.status = pending ∧ s.object = r.object // PURITY: SHELL // EFFECT: Effect @@ -644,6 +876,7 @@ export const createFollowSubscription = ( context: FederationContext ): Effect.Effect => Effect.gen(function*(_) { + yield* _(ensureStateLoaded()) const actor = request.actor?.trim() ? yield* _(normalizeHttpUrl(request.actor, context, "Follow actor")) : context.actorId @@ -676,7 +909,6 @@ export const createFollowSubscription = ( const activityId = `${context.followsActivityPrefix}/${id}` const createdAt = nowIso() - // JSON-LD формат для ActivityPub Follow const activity: ActivityPubFollowActivity = { "@context": [ "https://www.w3.org/ns/activitystreams", @@ -704,47 +936,311 @@ export const createFollowSubscription = ( activity } - followStore.set(id, subscription) - followByActivityId.set(activityId, id) - followByActorObject.set(key, id) + indexFollow(subscription) + persistFederationStateBestEffort() - // Отправляем Follow активность в target inbox (OrderQue) if (normalizedInbox) { - yield* _(Effect.tryPromise({ - try: async () => { - console.log("[ActivityPub] Отправка Follow в inbox:", normalizedInbox) - console.log("[ActivityPub] JSON-LD:", JSON.stringify(activity, null, 2)) - - const response = await fetch(normalizedInbox, { - method: "POST", - headers: { - "Content-Type": "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"" - }, - body: JSON.stringify(activity) - }) - - const text = await response.text() - let json: unknown - try { - json = text ? JSON.parse(text) : { raw: text } - } catch { - json = { raw: text } - } - - console.log("[ActivityPub] Ответ:", response.status, json) - - // Если получили Accept автоматически - обновляем статус - if (response.ok) { - subscription.status = "accepted" - subscription.updatedAt = nowIso() - followStore.set(id, subscription) - console.log("[ActivityPub] Подписка подтверждена (status=accepted)") - } - }, - catch: (error) => { - console.warn("[ActivityPub] Ошибка отправки Follow:", error) + yield* _(sendJsonLd(context, normalizedInbox, activity).pipe(Effect.ignore)) + } + + return { subscription, activity } + }) + +const parseExchangeTarget = ( + raw: string +): Effect.Effect => + Effect.gen(function*(_) { + const normalized = raw.trim() + if (normalized.length === 0) { + return yield* _(Effect.fail(new ApiBadRequestError({ message: "Exchange target is required." }))) + } + + if (looksLikeAbsoluteUrl(normalized)) { + const parsed = yield* _( + Effect.try({ + try: () => new URL(normalized), + catch: (cause) => + new ApiBadRequestError({ + message: cause instanceof Error ? cause.message : String(cause) + }) + }) + ) + if (parsed.protocol !== "http:" && parsed.protocol !== "https:") { + return yield* _(Effect.fail(new ApiBadRequestError({ message: "Exchange target URL must use http:// or https://." }))) + } + const pathParts = parsed.pathname.split("/").filter((part) => part.length > 0) + const isRootActorCollection = pathParts.length === 1 && (pathParts[0] === "actor" || pathParts[0] === "actors") + const queue = pathParts.length === 0 || isRootActorCollection + ? defaultExchangeQueue + : inferQueueFromActor(normalized) + const origin = `${parsed.protocol}//${parsed.host}` + const candidateActors = pathParts.length === 0 || isRootActorCollection + ? [`${origin}/actor/${queue}`, `${origin}/actors/${queue}`, `${origin}/actor`] + : [ + normalized, + `${origin}/actor/${queue}`, + `${origin}/actors/${queue}`, + `${origin}/actor` + ] + return { + name: normalized, + remoteActor: candidateActors[0] ?? normalized, + candidateActors: uniqueStrings(candidateActors), + queue + } + } + + const value = normalized.startsWith("@") ? normalized.slice(1) : normalized + const separator = value.indexOf("@") + const actor = separator > 0 ? value.slice(0, separator).trim() : defaultExchangeQueue + const domain = separator > 0 ? value.slice(separator + 1).trim() : value.trim() + if ( + actor.length === 0 || + domain.length === 0 || + domain.includes("@") || + domain.includes("/") || + domain.startsWith(".") || + domain.endsWith(".") + ) { + return yield* _(Effect.fail(new ApiBadRequestError({ message: `Invalid exchange target: ${raw}` }))) + } + + const candidateActors = [ + `https://${domain}/actor/${actor}`, + `https://${domain}/actors/${actor}`, + `https://${domain}/actor` + ] + return { + name: normalized, + remoteActor: candidateActors[0] ?? `https://${domain}/actor/${actor}`, + candidateActors: uniqueStrings(candidateActors), + queue: actor + } + }) + +const inferQueueFromActor = (remoteActor: string): string => { + try { + const parsed = new URL(remoteActor) + const parts = parsed.pathname.split("/").filter((part) => part.length > 0) + return parts.at(-1) ?? "exchange" + } catch { + return "exchange" + } +} + +const fetchJson = ( + url: string, + label: string +): Effect.Effect => + Effect.tryPromise({ + try: () => + fetch(url, { + method: "GET", + headers: { + accept: activityAcceptHeader } - })).pipe(Effect.ignore) + }), + catch: (cause) => + new ApiBadRequestError({ + message: cause instanceof Error ? cause.message : String(cause) + }) + }).pipe( + Effect.flatMap((response) => + response.ok + ? Effect.succeed(response) + : Effect.fail(new ApiBadRequestError({ message: `HTTP ${response.status} GET ${url}` })) + ), + Effect.flatMap((response) => + Effect.tryPromise({ + try: (): Promise => response.json(), + catch: (cause) => + new ApiBadRequestError({ + message: cause instanceof Error ? cause.message : String(cause) + }) + }) + ), + Effect.flatMap((json) => { + const record = asRecord(json) + return record === null + ? Effect.fail(new ApiBadRequestError({ message: `${label} returned a non-object JSON payload.` })) + : Effect.succeed(record) + }) + ) + +const parseRemoteActorDocument = ( + payload: JsonRecord, + fallbackActor: string +): Effect.Effect => + Effect.gen(function*(_) { + const id = readOptionalString(payload, "id") ?? fallbackActor + const outbox = readOptionalString(payload, "outbox") + if (outbox === undefined) { + return yield* _( + Effect.fail( + new ApiBadRequestError({ + message: `Remote actor missing outbox: ${id}` + }) + ) + ) + } + + const publicKey = asRecord(payload["publicKey"]) + const endpoints = asRecord(payload["endpoints"]) + + return { + id, + inbox: readOptionalString(payload, "inbox"), + outbox, + followers: readOptionalString(payload, "followers"), + sharedInbox: endpoints === null ? undefined : readOptionalString(endpoints, "sharedInbox"), + publicKeyId: publicKey === null ? undefined : readOptionalString(publicKey, "id"), + publicKeyPem: publicKey === null ? undefined : readOptionalString(publicKey, "publicKeyPem") + } + }) + +const collectionActorItems = (payload: JsonRecord): ReadonlyArray => { + const rawItems = Array.isArray(payload["items"]) + ? payload["items"] + : Array.isArray(payload["orderedItems"]) + ? payload["orderedItems"] + : [] + return rawItems + .map((item) => asRecord(item)) + .filter((item): item is JsonRecord => item !== null) +} + +const actorItemMatchesQueue = (item: JsonRecord, queue: string): boolean => { + const preferredUsername = readOptionalString(item, "preferredUsername") + const category = readOptionalString(item, "category") + const id = readOptionalString(item, "id") + return preferredUsername === queue || category === queue || (id !== undefined && inferQueueFromActor(id) === queue) +} + +const parseExchangeActorPayload = ( + target: ExchangeTarget, + candidateActor: string, + payload: JsonRecord +): Effect.Effect => { + const collectionItems = collectionActorItems(payload) + const outbox = readOptionalString(payload, "outbox") + if (outbox === undefined && collectionItems.length > 0) { + const selected = collectionItems.find((item) => actorItemMatchesQueue(item, target.queue)) + return selected === undefined + ? Effect.fail( + new ApiBadRequestError({ + message: `Exchange actor collection did not include queue "${target.queue}".` + }) + ) + : parseRemoteActorDocument(selected, readOptionalString(selected, "id") ?? target.remoteActor) + } + return parseRemoteActorDocument(payload, candidateActor) +} + +const fetchExchangeActorDocument = ( + target: ExchangeTarget +): Effect.Effect => + Effect.gen(function*(_) { + let lastError: ApiBadRequestError | undefined + for (const candidateActor of target.candidateActors) { + const result = yield* _( + fetchJson(candidateActor, "Exchange actor").pipe( + Effect.flatMap((payload) => parseExchangeActorPayload(target, candidateActor, payload)), + Effect.either + ) + ) + if (result._tag === "Right") { + return result.right + } + lastError = result.left + } + return yield* _( + Effect.fail( + lastError ?? + new ApiBadRequestError({ + message: `No exchange actor candidates were available for target "${target.name}".` + }) + ) + ) + }) + +const buildFollowActivity = ( + context: FederationContext, + actor: string, + object: string, + to: ReadonlyArray, + capability: string | undefined +): ActivityPubFollowActivity => ({ + "@context": [ + "https://www.w3.org/ns/activitystreams", + "https://forgefed.org/ns" + ], + id: `${context.followsActivityPrefix}/${randomUUID()}`, + type: "Follow", + actor, + object, + ...(to.length === 0 ? {} : { to }), + ...(capability === undefined ? {} : { capability }) +}) + +export const ensureExchangeSubscription = ( + request: ExchangeSubscribeRequest, + context: FederationContext +): Effect.Effect => + Effect.gen(function*(_) { + yield* _(ensureStateLoaded()) + const target = yield* _(parseExchangeTarget(request.target)) + const document = yield* _(fetchExchangeActorDocument(target)) + + const actor = request.actor?.trim() + ? yield* _(normalizeHttpUrl(request.actor, context, "Follow actor")) + : context.actorId + const object = document.id + const key = followKey(actor, object) + const existingId = followByActorObject.get(key) + if (existingId) { + const existing = followStore.get(existingId) + if (existing && existing.status !== "rejected") { + return { subscription: existing, activity: existing.activity } + } + } + + const inbox = request.inbox?.trim() + ? yield* _(normalizeHttpUrl(request.inbox, context, "Follow inbox")) + : document.inbox + const to = document.followers === undefined ? [] : [document.followers] + const activity = buildFollowActivity(context, actor, object, to, "ticket") + const createdAt = nowIso() + const subscription: FollowSubscription = { + id: randomUUID(), + activityId: activity.id, + actor, + object, + inbox, + remoteActor: document.id, + remoteInbox: document.inbox, + remoteOutbox: document.outbox, + remoteFollowers: document.followers, + remoteSharedInbox: document.sharedInbox, + remotePublicKeyId: document.publicKeyId, + remotePublicKeyPem: document.publicKeyPem, + subscriptionName: target.name, + queue: target.queue, + projectRepoUrl: request.projectRepoUrl?.trim() || undefined, + agentProvider: request.agentProvider, + agentCommand: request.agentCommand?.trim() || undefined, + to, + capability: "ticket", + status: "pending", + createdAt, + updatedAt: createdAt, + activity + } + + indexFollow(subscription) + persistFederationStateBestEffort() + + if (inbox !== undefined) { + yield* _(sendJsonLd(context, inbox, activity).pipe(Effect.ignore)) } return { subscription, activity } @@ -756,103 +1252,455 @@ export const listFederationIssues = (): ReadonlyArray => export const listFollowSubscriptions = (): ReadonlyArray => [...followStore.values()].sort((left, right) => right.createdAt.localeCompare(left.createdAt)) +export const listExchangeSubscriptions = (): ReadonlyArray => + listFollowSubscriptions().filter((subscription) => subscription.remoteOutbox !== undefined) + export const clearFederationState = (): void => { issueStore.clear() followStore.clear() followByActivityId.clear() followByActorObject.clear() processedOutboxItems.clear() + localActorKeys = null + stateLoaded = true } -/** - * Polling outbox из OrderQue для получения новых задач - * Запускается как фоновый процесс - * URL определяется из активных Follow подписок - */ -export const startOutboxPolling = ( - intervalMs: number = 5000 -): Effect.Effect => +const configuredExchangeTargets = (): ReadonlyArray => + (process.env["DOCKER_GIT_EXCHANGE_TARGETS"] ?? "") + .split(",") + .map((entry) => entry.trim()) + .filter((entry) => entry.length > 0) + +const ensureConfiguredExchangeSubscriptions = ( + context: FederationContext +): Effect.Effect => + Effect.forEach( + configuredExchangeTargets(), + (target) => + ensureExchangeSubscription({ target }, context).pipe( + Effect.tapError((error) => + Effect.sync(() => { + console.warn("[ActivityPub] Failed to subscribe to exchange target:", target, error) + }) + ), + Effect.ignore + ), + { discard: true } + ) + +const outboxItemId = (item: unknown, subscription: FollowSubscription): string => { + const record = asRecord(item) + const id = record === null ? undefined : readOptionalString(record, "id") + if (id !== undefined) { + return id + } + return `${subscription.id}:${createHash("sha256").update(JSON.stringify(item)).digest("hex")}` +} + +const fetchOutbox = ( + url: string +): Effect.Effect => + fetchJson(url, "Exchange outbox").pipe( + Effect.map((record) => ({ + "@context": Array.isArray(record["@context"]) + ? record["@context"].filter((item): item is string => typeof item === "string") + : "https://www.w3.org/ns/activitystreams", + type: "OrderedCollection" as const, + id: readOptionalString(record, "id") ?? url, + totalItems: typeof record["totalItems"] === "number" ? record["totalItems"] : 0, + orderedItems: Array.isArray(record["orderedItems"]) ? record["orderedItems"] : [] + })) + ) + +const matchesPollRequest = (subscription: FollowSubscription, request: ExchangePollRequest): boolean => { + const target = request.target?.trim() + return !target || + subscription.subscriptionName === target || + subscription.remoteActor === target || + subscription.object === target || + subscription.queue === target +} + +export const pollExchangeOutboxes = ( + request: ExchangePollRequest = {}, + contextInput?: FederationContext | undefined +) => Effect.gen(function*(_) { - console.log("[ActivityPub Polling] Запуск polling outbox") - console.log("[ActivityPub Polling] Интервал:", intervalMs, "мс") - - const poll = Effect.gen(function*(_) { - // Получаем активные подписки - const subscriptions = listFollowSubscriptions().filter(s => s.status === "accepted") - - if (subscriptions.length === 0) { - console.log("[ActivityPub Polling] Нет активных подписок, ждем...") - return 0 + yield* _(ensureStateLoaded()) + const context = contextInput ?? (yield* _(defaultFederationContext())) + const subscriptions = listExchangeSubscriptions() + .filter((subscription) => subscription.status !== "rejected") + .filter((subscription) => matchesPollRequest(subscription, request)) + + let totalItems = 0 + let newItems = 0 + let processedItems = 0 + let failedItems = 0 + + for (const subscription of subscriptions) { + const remoteOutbox = subscription.remoteOutbox + if (remoteOutbox === undefined) { + continue } - - let totalItems = 0 - - for (const sub of subscriptions) { - // Извлекаем OrderQue URL из object (followers URL) - const orderQueUrl = sub.object.replace("/federation/followers", "") - const outboxUrl = `${orderQueUrl}/federation/outbox` - - console.log("[ActivityPub Polling] Запрос outbox:", outboxUrl, "подписка:", sub.id) - - const result = yield* _( - Effect.tryPromise({ - try: async () => { - const response = await fetch(outboxUrl, { - method: "GET", - headers: { - "Accept": "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"" - } - }) - - if (!response.ok) { - throw new Error(`OrderQue вернул ${response.status}`) - } - - const json = await response.json() - return json as ActivityPubOrderedCollection - }, - catch: (error) => - new ApiBadRequestError({ - message: `Ошибка polling outbox: ${error instanceof Error ? error.message : String(error)}` - }) - }) + const collection = yield* _(fetchOutbox(remoteOutbox)) + totalItems += collection.orderedItems.length + + for (const item of collection.orderedItems) { + const itemId = outboxItemId(item, subscription) + if (processedOutboxItems.has(itemId)) { + continue + } + newItems += 1 + const handled = yield* _( + ingestFederationInbox(item, { + scheduleTask: true, + context, + subscription + }).pipe(Effect.either) ) - - const orderedItems = Array.isArray(result.orderedItems) ? result.orderedItems : [] - console.log("[ActivityPub Polling] Получено задач из", outboxUrl, ":", orderedItems.length) - totalItems += orderedItems.length - - for (const item of orderedItems) { - const itemId = (item as Record)["id"] as string | undefined - - if (itemId && !processedOutboxItems.has(itemId)) { - console.log("[ActivityPub Polling] Обработка задачи:", itemId) - console.log("[ActivityPub Polling] JSON-LD:", JSON.stringify(item, null, 2)) - - // Отправляем задачу в свой inbox для обработки + processedOutboxItems.add(itemId) + persistFederationStateBestEffort() + + if (handled._tag === "Left") { + failedItems += 1 + } else { + if (handled.right.kind === "issue.create" && request.runTasks !== false) { yield* _( - ingestFederationInbox(item).pipe( - Effect.tapBoth({ - onSuccess: (result) => - Effect.sync(() => { - console.log("[ActivityPub Polling] Задача обработана:", result.kind) - processedOutboxItems.add(itemId) - }), - onFailure: (error) => - Effect.sync(() => { - console.warn("[ActivityPub Polling] Ошибка обработки задачи:", error) - }) - }) - ) + scheduleExchangeTask(handled.right.issue, { + context, + subscription + }) ) } + processedItems += 1 } } - - return totalItems - }).pipe(Effect.ignore) - - // Бесконечный цикл с интервалом + } + + return { + polledAt: nowIso(), + subscriptions: subscriptions.length, + totalItems, + newItems, + processedItems, + failedItems + } + }) + +const sourceContent = (source: ForgeFedTicket["source"]): string | undefined => + typeof source === "string" ? source : source?.content + +const firstGithubUrl = (text: string | undefined): string | undefined => { + if (text === undefined) { + return undefined + } + const match = text.match(/https:\/\/github\.com\/[^\s"'<>]+/u) + return match?.[0] +} + +const resolveTaskRepoUrl = ( + issue: FederationIssueRecord, + subscription: FollowSubscription | undefined +): string | undefined => + firstGithubUrl(issue.ticket.url) ?? + firstGithubUrl(sourceContent(issue.ticket.source)) ?? + firstGithubUrl(issue.ticket.content) ?? + firstGithubUrl(issue.ticket.summary) ?? + subscription?.projectRepoUrl ?? + process.env["DOCKER_GIT_EXCHANGE_PROJECT_REPO_URL"]?.trim() + +const slugify = (value: string): string => { + const normalized = value + .trim() + .toLowerCase() + .replaceAll(/[^a-z0-9_-]+/g, "-") + .replaceAll(/-+/g, "-") + .replace(/^-+|-+$/g, "") + return normalized.length > 0 ? normalized.slice(0, 48) : randomUUID() +} + +const issueSlug = (issue: FederationIssueRecord): string => + slugify(issue.issueId) + +const isCreateProjectAccepted = ( + value: ProjectDetails | CreateProjectAccepted +): value is CreateProjectAccepted => + "accepted" in value + +const resolveAgentProvider = ( + subscription: FollowSubscription | undefined +): AgentProvider => { + const raw = subscription?.agentProvider ?? process.env["DOCKER_GIT_EXCHANGE_AGENT_PROVIDER"] + return raw === "claude" || raw === "opencode" || raw === "custom" ? raw : "codex" +} + +const buildTaskPrompt = (issue: FederationIssueRecord): string => { + const source = sourceContent(issue.ticket.source) + const parts = [ + `ForgeFed task: ${issue.ticket.summary}`, + "", + issue.ticket.content, + source === undefined || source === issue.ticket.content ? "" : `Source:\n${source}`, + "", + `Ticket: ${issue.ticket.id}`, + issue.activityId === undefined ? "" : `Activity: ${issue.activityId}`, + "Implement the requested work in this repository. Commit the changes and provide a concise final summary." + ].filter((part) => part.trim().length > 0) + return parts.join("\n") +} + +const buildAgentCommand = ( + provider: AgentProvider, + prompt: string, + subscription: FollowSubscription | undefined +): string => { + const override = subscription?.agentCommand?.trim() ?? process.env["DOCKER_GIT_EXCHANGE_AGENT_COMMAND"]?.trim() + if (override && override.length > 0) { + return override.includes("{{prompt}}") + ? override.replaceAll("{{prompt}}", shellEscape(prompt)) + : `${override} ${shellEscape(prompt)}` + } + if (provider === "claude") { + return `MCP_PLAYWRIGHT_ISOLATED=1 claude --dangerously-skip-permissions -p ${shellEscape(prompt)}` + } + if (provider === "opencode") { + return `opencode run ${shellEscape(prompt)}` + } + if (provider === "custom") { + return `sh -lc ${shellEscape(`printf '%s\n' ${shellEscape(prompt)}`)}` + } + return `MCP_PLAYWRIGHT_ISOLATED=1 codex exec ${shellEscape(prompt)}` +} + +const buildCreateProjectRequest = ( + issue: FederationIssueRecord, + repoUrl: string +): CreateProjectRequest => { + const slug = issueSlug(issue) + return { + repoUrl, + outDir: `.docker-git/exchange/${slug}`, + containerName: `dg-ex-${slug}`, + serviceName: `dg-ex-${slug}`, + volumeName: `dg-ex-${slug}-home`, + up: true, + waitForClone: true, + openSsh: false + } +} + +const buildIssueUpdateActivity = ( + context: FederationContext, + issue: FederationIssueRecord, + status: FederationIssueRecord["status"], + message: string +) => ({ + "@context": [ + "https://www.w3.org/ns/activitystreams", + "https://forgefed.org/ns" + ], + id: `${context.exchangeActivityPrefix}/${issueSlug(issue)}/${status}/${randomUUID()}`, + type: "Update", + actor: context.actorId, + object: { + type: "Order", + id: issue.issueId, + status, + chatResponse: message, + projectId: issue.projectId, + agentId: issue.agentId + } +}) + +const deliverIssueUpdate = ( + context: FederationContext, + issue: FederationIssueRecord, + subscription: FollowSubscription | undefined, + status: FederationIssueRecord["status"], + message: string +) => { + const endpoint = issue.remoteInbox ?? subscription?.remoteInbox ?? subscription?.remoteSharedInbox ?? subscription?.inbox + if (endpoint === undefined) { + return Effect.void + } + return sendJsonLd(context, endpoint, buildIssueUpdateActivity(context, issue, status, message)).pipe(Effect.ignore) +} + +const waitForAgentCompletion = ( + session: AgentSession, + timeoutMs: number +): Effect.Effect => { + const startedAt = Date.now() + const loop = (): Effect.Effect => + Effect.gen(function*(_) { + const current = yield* _(getAgent(session.projectId, session.id)) + if (current.status === "exited" || current.status === "failed" || current.status === "stopped") { + return current + } + if (Date.now() - startedAt > timeoutMs) { + return yield* _(Effect.fail(new ApiConflictError({ message: `Exchange agent timed out after ${timeoutMs}ms.` }))) + } + yield* _(Effect.sleep(Duration.millis(2_000))) + return yield* _(loop()) + }) + return loop() +} + +const runExchangeTask = ( + issue: FederationIssueRecord, + subscription: FollowSubscription | undefined, + context: FederationContext +) => + Effect.gen(function*(_) { + const repoUrl = resolveTaskRepoUrl(issue, subscription) + if (repoUrl === undefined || repoUrl.length === 0) { + const failed = updateIssue(issue, { + status: "failed", + error: "Exchange task has no GitHub URL and DOCKER_GIT_EXCHANGE_PROJECT_REPO_URL is not configured." + }) + yield* _(deliverIssueUpdate(context, failed, subscription, "failed", failed.error ?? "Missing repository.")) + return + } + + const queued = updateIssue(issue, { status: "queued" }) + yield* _(deliverIssueUpdate(context, queued, subscription, "queued", "Task accepted by docker-git.")) + + const result = yield* _(createProjectFromRequest(buildCreateProjectRequest(issue, repoUrl)).pipe(Effect.either)) + if (result._tag === "Left") { + const failed = updateIssue(queued, { + status: "failed", + error: String(result.left) + }) + yield* _(deliverIssueUpdate(context, failed, subscription, "failed", failed.error ?? "Project creation failed.")) + return + } + if (isCreateProjectAccepted(result.right)) { + const failed = updateIssue(queued, { + status: "failed", + error: "Exchange project creation unexpectedly returned an async job." + }) + yield* _(deliverIssueUpdate(context, failed, subscription, "failed", failed.error ?? "Project creation failed.")) + return + } + + const project = result.right + const provider = resolveAgentProvider(subscription) + const prompt = buildTaskPrompt(issue) + const session = yield* _( + startAgent(project, { + provider, + command: buildAgentCommand(provider, prompt, subscription), + label: `exchange:${issueSlug(issue)}` + }).pipe(Effect.either) + ) + if (session._tag === "Left") { + const failed = updateIssue(queued, { + status: "failed", + projectId: project.id, + error: String(session.left) + }) + yield* _(deliverIssueUpdate(context, failed, subscription, "failed", failed.error ?? "Agent start failed.")) + return + } + + const running = updateIssue(queued, { + status: "running", + projectId: project.id, + agentId: session.right.id + }) + yield* _(deliverIssueUpdate(context, running, subscription, "running", "Agent started.")) + + const timeoutMs = Number(process.env["DOCKER_GIT_EXCHANGE_AGENT_TIMEOUT_MS"] ?? "3600000") + const finalSession = yield* _( + waitForAgentCompletion(session.right, Number.isFinite(timeoutMs) && timeoutMs > 0 ? timeoutMs : 3_600_000) + .pipe(Effect.either) + ) + if (finalSession._tag === "Left") { + const failed = updateIssue(running, { + status: "failed", + error: finalSession.left.message + }) + yield* _(deliverIssueUpdate(context, failed, subscription, "failed", failed.error ?? "Agent failed.")) + return + } + + const logs = yield* _(readAgentLogs(project.id, session.right.id, 80).pipe(Effect.orElseSucceed(() => []))) + const resultText = logs.map((line) => line.line).join("\n").trim() + const status = finalSession.right.status === "exited" && finalSession.right.exitCode === 0 ? "completed" : "failed" + const finished = updateIssue(running, { + status, + result: resultText.length > 0 ? resultText : `Agent finished with status ${finalSession.right.status}.`, + error: status === "failed" ? `Agent finished with status ${finalSession.right.status}.` : undefined + }) + yield* _( + deliverIssueUpdate( + context, + finished, + subscription, + status, + finished.result ?? finished.error ?? `Agent finished with status ${finalSession.right.status}.` + ) + ) + }).pipe( + Effect.catchAll((error) => + Effect.sync(() => { + const failed = updateIssue(issue, { + status: "failed", + error: String(error) + }) + void failed + }) + ) + ) + +const scheduleExchangeTask = ( + issue: FederationIssueRecord, + options: IngestOptions +) => + Effect.gen(function*(_) { + const context = options.context ?? (yield* _(defaultFederationContext())) + const scheduled = issue.status === "running" || issue.status === "queued" || issue.status === "completed" + ? issue + : updateIssue(issue, { status: "accepted" }) + yield* _( + Effect.sync(() => { + Effect.runFork(runExchangeTask(scheduled, options.subscription, context).pipe(Effect.provide(NodeContext.layer))) + }) + ) + return scheduled + }) + +/** + * Polls configured exchange outboxes for ForgeFed tasks. + */ +export const startOutboxPolling = ( + intervalMs: number = 5000 +) => + Effect.gen(function*(_) { + yield* _(initializeFederationState()) + const context = yield* _(defaultFederationContext().pipe(Effect.orElseSucceed(() => ({ + publicOrigin: "http://localhost:3334", + actorUsername: defaultActorUsername, + actorId: "http://localhost:3334/federation/actor", + inbox: "http://localhost:3334/federation/inbox", + outbox: "http://localhost:3334/federation/outbox", + followers: "http://localhost:3334/federation/followers", + following: "http://localhost:3334/federation/following", + liked: "http://localhost:3334/federation/liked", + followsActivityPrefix: "http://localhost:3334/federation/activities/follows", + exchangeActivityPrefix: "http://localhost:3334/federation/activities/exchange" + } satisfies FederationContext)))) + yield* _(ensureConfiguredExchangeSubscriptions(context)) + + const poll = pollExchangeOutboxes({}, context).pipe( + Effect.tapError((error) => + Effect.sync(() => { + console.warn("[ActivityPub Polling] poll failed:", error) + }) + ), + Effect.ignore + ) + while (true) { yield* _(poll) yield* _(Effect.sleep(Duration.millis(intervalMs))) diff --git a/packages/api/tests/federation.test.ts b/packages/api/tests/federation.test.ts index cc6c2d92..c525785d 100644 --- a/packages/api/tests/federation.test.ts +++ b/packages/api/tests/federation.test.ts @@ -1,15 +1,19 @@ import { describe, expect, it } from "@effect/vitest" import { Effect } from "effect" +import { vi } from "vitest" import { clearFederationState, createFollowSubscription, + ensureExchangeSubscription, ingestFederationInbox, listFederationIssues, + listExchangeSubscriptions, listFollowSubscriptions, makeFederationActorDocument, makeFederationContext, - makeFederationFollowingCollection + makeFederationFollowingCollection, + pollExchangeOutboxes } from "../src/services/federation.js" describe("federation service", () => { @@ -179,4 +183,139 @@ describe("federation service", () => { expect(duplicateError._tag).toBe("ApiConflictError") })) + + it.effect("ingests ActivityPub Create with ForgeFed Ticket payload", () => + Effect.gen(function*(_) { + clearFederationState() + + const result = yield* _( + ingestFederationInbox({ + "@context": [ + "https://www.w3.org/ns/activitystreams", + "https://forgefed.org/ns" + ], + id: "https://exchange.lefine.pro/outbox/code/111", + type: "Create", + actor: "https://exchange.lefine.pro/actor/code", + object: { + "@context": [ + "https://www.w3.org/ns/activitystreams", + "https://forgefed.org/ns" + ], + type: "Ticket", + id: "https://exchange.lefine.pro/orders/111", + attributedTo: "https://exchange.lefine.pro/actor/code", + summary: "Calculate 2+2 via remote cogni", + content: "

Calculate 2+2

", + mediaType: "text/html", + source: { + content: "Calculate 2+2", + mediaType: "text/plain" + }, + workType: "standard" + } + }) + ) + + expect(result.kind).toBe("issue.create") + if (result.kind === "issue.create") { + expect(result.issue.issueId).toBe("https://exchange.lefine.pro/orders/111") + expect(result.issue.status).toBe("accepted") + expect(result.issue.ticket.source).toEqual({ + content: "Calculate 2+2", + mediaType: "text/plain" + }) + } + })) + + it.effect("discovers exchange root target and deduplicates polled Create tasks", () => + Effect.gen(function*(_) { + clearFederationState() + + const previousFetch = globalThis.fetch + const fetchMock = vi.fn((input: Parameters[0], init?: Parameters[1]) => { + const url = typeof input === "string" + ? input + : input instanceof URL + ? input.toString() + : input.url + const method = init?.method ?? "GET" + + if (method === "GET" && url === "https://exchange.lefine.pro/actor/code") { + return Promise.resolve(new Response(JSON.stringify({ + "@context": ["https://www.w3.org/ns/activitystreams", "https://forgefed.org/ns"], + id: "https://exchange.lefine.pro/actor/code", + type: "Service", + inbox: "https://exchange.lefine.pro/inbox/code", + outbox: "https://exchange.lefine.pro/outbox/code", + followers: "https://exchange.lefine.pro/actors/code/followers", + preferredUsername: "code", + publicKey: { + id: "https://exchange.lefine.pro/actor/code#main-key", + owner: "https://exchange.lefine.pro/actor/code", + publicKeyPem: "pem" + } + }), { status: 200 })) + } + + if (method === "GET" && url === "https://exchange.lefine.pro/outbox/code") { + return Promise.resolve(new Response(JSON.stringify({ + "@context": ["https://www.w3.org/ns/activitystreams", "https://forgefed.org/ns"], + id: "https://exchange.lefine.pro/outbox/code", + type: "OrderedCollection", + totalItems: 1, + orderedItems: [ + { + "@context": ["https://www.w3.org/ns/activitystreams", "https://forgefed.org/ns"], + id: "https://exchange.lefine.pro/outbox/code/111", + type: "Create", + actor: "https://exchange.lefine.pro/actor/code", + object: { + type: "Ticket", + id: "https://exchange.lefine.pro/orders/111", + attributedTo: "https://exchange.lefine.pro/actor/code", + summary: "Calculate 2+2", + content: "

Calculate 2+2

", + source: { + content: "Calculate 2+2", + mediaType: "text/plain" + } + } + } + ] + }), { status: 200 })) + } + + return Promise.resolve(new Response("{}", { status: 202 })) + }) + + globalThis.fetch = fetchMock as typeof fetch + + try { + const context = yield* _( + makeFederationContext({ + publicOrigin: "https://docker-git.example", + actorUsername: "docker-git" + }) + ) + + const created = yield* _(ensureExchangeSubscription({ target: "https://exchange.lefine.pro" }, context)) + expect(created.subscription.remoteOutbox).toBe("https://exchange.lefine.pro/outbox/code") + expect(created.subscription.queue).toBe("code") + expect(listExchangeSubscriptions()).toHaveLength(1) + + const firstPoll = yield* _(pollExchangeOutboxes({ runTasks: false }, context)) + expect(firstPoll.newItems).toBe(1) + expect(firstPoll.processedItems).toBe(1) + + const issues = listFederationIssues() + expect(issues).toHaveLength(1) + expect(issues[0]?.issueId).toBe("https://exchange.lefine.pro/orders/111") + + const secondPoll = yield* _(pollExchangeOutboxes({ runTasks: false }, context)) + expect(secondPoll.newItems).toBe(0) + } finally { + globalThis.fetch = previousFetch + } + })) }) diff --git a/packages/api/tests/schema.test.ts b/packages/api/tests/schema.test.ts index 4185082f..627bd111 100644 --- a/packages/api/tests/schema.test.ts +++ b/packages/api/tests/schema.test.ts @@ -7,6 +7,8 @@ import { CodexAuthLoginRequestSchema, CodexAuthLogoutRequestSchema, CreateAgentRequestSchema, + ExchangePollRequestSchema, + ExchangeSubscribeRequestSchema, CreateFollowRequestSchema, CreateProjectRequestSchema, GithubAuthLoginRequestSchema, @@ -88,6 +90,45 @@ describe("api schemas", () => { }) })) + it.effect("decodes exchange subscribe payload", () => + Effect.sync(() => { + const result = Schema.decodeUnknownEither(ExchangeSubscribeRequestSchema)({ + target: "code@exchange.lefine.pro", + domain: "https://docker-git.example", + projectRepoUrl: "https://github.com/ProverCoderAI/docker-git", + agentProvider: "codex" + }) + + Either.match(result, { + onLeft: (error) => { + throw new Error(ParseResult.TreeFormatter.formatIssueSync(error.issue)) + }, + onRight: (value) => { + expect(value.target).toBe("code@exchange.lefine.pro") + expect(value.agentProvider).toBe("codex") + expect(value.projectRepoUrl).toBe("https://github.com/ProverCoderAI/docker-git") + } + }) + })) + + it.effect("decodes exchange poll payload", () => + Effect.sync(() => { + const result = Schema.decodeUnknownEither(ExchangePollRequestSchema)({ + target: "code", + runTasks: false + }) + + Either.match(result, { + onLeft: (error) => { + throw new Error(ParseResult.TreeFormatter.formatIssueSync(error.issue)) + }, + onRight: (value) => { + expect(value.target).toBe("code") + expect(value.runTasks).toBe(false) + } + }) + })) + it.effect("decodes auth login payload", () => Effect.sync(() => { const result = Schema.decodeUnknownEither(GithubAuthLoginRequestSchema)({