feat: transaction grpc server#531
Conversation
485fb12 to
d04c67e
Compare
There was a problem hiding this comment.
Pull request overview
Adds a standalone gRPC server and supporting domain/service layers to stream settled ledger transactions from Blink Core, with cursor-based replay and MongoDB change-stream live delivery. This fits alongside existing Core API services by exposing a new stream-oriented contract (transactions.proto) and wiring it into local dev + Bats e2e.
Changes:
- Introduces a transactions stream domain + service, including mapping from ledger transactions to
TransactionEventand a gRPC server implementation with backpressure/cancellation handling. - Adds a ledger-side settled transaction streamer that replays after
after_transaction_idand then tails MongoDB change streams with overlap de-dupe. - Wires the new service into build/dev/CI/e2e: Buck/Tilt targets, Bats helpers/tests, and local Mongo replica-set support for change streams.
Reviewed changes
Copilot reviewed 31 out of 36 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| pnpm-lock.yaml | Updates protobufjs override/resolution in the lockfile. |
| package.json | Updates root resolutions for protobufjs. |
| flake.nix | Adds Darwin-friendly buf/grpcurl selection, PATH shellHook additions, and a new build derivation. |
| dev/Tiltfile | Adds a Tilt local_resource for api-transactions-grpc-stream with readiness probe + Mongo dependency. |
| dev/docker-compose.deps.yml | Runs MongoDB as a replica set and adds a healthcheck to support change streams. |
| core/api/test/unit/services/transactions-stream/index.spec.ts | Unit tests for TransactionsStreamService subscription behavior and errors. |
| core/api/test/unit/services/transactions-stream/helpers.spec.ts | Unit tests for account/preimage resolvers and event mapper helpers. |
| core/api/test/unit/services/transactions-stream/grpc-server.spec.ts | Unit tests for gRPC server mapping, invalid cursors, cancellation, and backpressure. |
| core/api/test/unit/services/ledger/stream-settled-transactions.spec.ts | Unit tests for replay + change-stream delivery and error surfacing. |
| core/api/src/services/transactions-stream/proto/transactions.proto | Defines the public gRPC contract (SubscribeTransactions / TransactionEvent). |
| core/api/src/services/transactions-stream/proto/transactions_pb.js | Generated protobuf JS bindings for the new contract. |
| core/api/src/services/transactions-stream/proto/transactions_pb.d.ts | Generated protobuf TS typings for the new contract. |
| core/api/src/services/transactions-stream/proto/transactions_grpc_pb.js | Generated gRPC service definitions (JS). |
| core/api/src/services/transactions-stream/proto/transactions_grpc_pb.d.ts | Generated gRPC service definitions (TS). |
| core/api/src/services/transactions-stream/proto/buf.gen.yaml | Buf generation config for JS/gRPC/TS outputs. |
| core/api/src/services/transactions-stream/index.ts | Implements TransactionsStreamService subscribe logic with cursor parsing and abort cleanup. |
| core/api/src/services/transactions-stream/helpers.ts | Adds mapping helpers + account/preimage loading and caching. |
| core/api/src/services/transactions-stream/grpc-server.ts | Implements server-streaming gRPC handler and backpressure handling. |
| core/api/src/services/transactions-stream/convert.ts | Converts domain TransactionStreamEvent into protobuf TransactionEvent. |
| core/api/src/services/ledger/stream-settled-transactions.ts | Streams settled transactions via replay query + Mongo change stream with dedupe. |
| core/api/src/services/ledger/index.ts | Exposes streamSettledTransactions on LedgerService. |
| core/api/src/servers/transactions-grpc-stream-server.ts | New standalone server process that starts gRPC + health endpoints. |
| core/api/src/domain/transactions-stream/index.types.d.ts | Declares domain types for transaction stream events and enums. |
| core/api/src/domain/transactions-stream/index.ts | Implements transaction stream enums and ledger-type mapping helpers. |
| core/api/src/domain/ledger/index.types.d.ts | Adds streamSettledTransactions to the ledger service interface. |
| core/api/src/config/index.ts | Exposes new env-configured ports for the stream server + health endpoint. |
| core/api/src/config/env.ts | Adds env vars and defaults for TRANSACTIONS_GRPC_STREAM_PORT and health port. |
| core/api/package.json | Ensures proto artifacts are copied on build; adds dev + codegen scripts. |
| core/api/BUCK | Adds a Buck dev task binary for the new stream server. |
| core/api-transactions-grpc-stream/Dockerfile | Adds container build for the standalone stream server artifact. |
| core/api-transactions-grpc-stream/BUCK | Adds Buck targets/aliases for prod/dev builds of the new service. |
| bats/helpers/transactions-grpc-stream.bash | Adds grpcurl wrapper helpers for the new stream endpoint. |
| bats/helpers/_common.bash | Extends grpcurl_request to forward extra grpcurl args. |
| bats/core/api/transactions-grpc-stream.bats | Adds e2e tests for invalid cursors and replay behavior. |
| bats/ci_setup_suite.bash | Waits for the new stream service health endpoint in CI setup. |
| bats/ci_run.sh | Adds the new service to CI Buck build set. |
Files not reviewed (1)
- pnpm-lock.yaml: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
31caf68 to
bd165b1
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 36 out of 43 changed files in this pull request and generated 5 comments.
Comments suppressed due to low confidence (1)
core/api/src/servers/transactions-grpc-stream/grpc-server.ts:92
- There is a race condition during stream setup.
call.on("cancelled", cleanup)andcall.on("error", cleanup)are only registered afterawait transactionsStream.subscribeToTransactions(...)resolves. If the client cancels (or the call errors) while that await is in flight, the cancelled/error event will fire before the listeners are attached, and the resultingresult.close()will never be called, leaking the underlying ledger stream and Mongo change stream. Consider attaching thecancelled/errorlisteners synchronously (before the await) and have them also set a flag that preventssubscriptionRef.currentfrom being used post-cancellation, or close the subscription immediately ifisClosedis already true when the subscribe call returns.
const result = await transactionsStream.subscribeToTransactions({
afterTransactionId: requestAfterTransactionId(call.request),
onTransaction: async (event) => {
if (isClosed) return
const canContinue = call.write(
transactionStreamEventToGrpcTransactionEvent(event),
)
if (!canContinue && !isClosed) await waitForDrainOrTerminalEvent()
},
onError: (err) => {
serviceLogger.error({ err }, "Failed to stream transactions")
cleanup()
call.destroy(err)
},
})
if (result instanceof Error) {
call.destroy(
toServiceError({
code: status.INVALID_ARGUMENT,
message: "Invalid after_transaction_id",
details: "after_transaction_id must be a valid Mongo ObjectId",
}),
)
return
}
subscriptionRef.current = result
call.on("cancelled", cleanup)
call.on("error", cleanup)
| healthcheck: | ||
| test: | ||
| [ | ||
| "CMD-SHELL", | ||
| "mongosh --quiet --eval 'try { rs.status().ok } catch (e) { rs.initiate({ _id: \"rs0\", members: [{ _id: 0, host: \"localhost:27017\" }] }).ok }' | grep 1", | ||
| ] | ||
| interval: 5s | ||
| timeout: 30s | ||
| retries: 30 |
There was a problem hiding this comment.
I checked this against the current dev/Tilt setup.
For dev/docker-compose.deps.yml, MongoDB is started by compose, but the core services are run from the host through the dev bundle. The dev env files currently use MONGODB_CON=mongodb://localhost:27017/galoy, so advertising the replica set member as localhost:27017 matches the existing host-side connection pattern.
Changing the replica set member to mongodb:27017 would require a broader dev topology change, because host-side clients would no longer be able to follow the advertised replica set address without also changing the connection strings or adding directConnection=true where appropriate.
So I’m leaving this as-is for this PR. If we want to support both host-side and compose-network Mongo clients against the same dev replica set, we should handle that separately as a dev infra cleanup.
@dolcalmi please also look at it
c4d5438 to
bd165b1
Compare
Summary
after_transaction_id, then continues with Mongo change-stream live delivery.
TransactionEventfields including direction, wallet/accountIDs, payment hash, preimage, amounts, currency, settlement type, and timestamp.
cancellation, and service-side stream errors.
local Mongo replica-set support for change streams.
Story Coverage
E4-S1 Blink Core items addressed:
transactions-grpc-stream-serverprocess.transactions.protoas the Blink Core transaction stream contract.after_transaction_id.SENTandRECEIVEDtransaction events from Blink Core.Testing
Passed:
nix develop -c buck2 test //core/api:check-lint //core/api:check-type //core/api:check- yaml //core/api:check-circular-dependencies //core/api:unit-testsnix develop -c buck2 build //core/api:prod_build //core/api-transactions-grpc-stream:api- transactions-grpc-streamgit diff --checkNote: full
//core/api:testincludesaudit, which required the update ofprotobufjsadvisory.