Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/ruby.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,13 @@ jobs:
ruby-version: .ruby-version
bundler-cache: true
- run: bundle exec yard --fail-on-warning

steep:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- uses: ruby/setup-ruby@v1
with:
ruby-version: .ruby-version
bundler-cache: true
- run: bundle exec steep check
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ Naming/MethodParameterName:
- h
- id
- kw
- s
- t

# Integration specs are organized by protocol scenario first, not by a single
Expand Down Expand Up @@ -114,6 +115,8 @@ RSpec/DescribeClass:
Exclude:
- 'spec/integration/**/*'
- 'spec/e2e/**/*'
- 'spec/unit/coverage_spec.rb'
- 'spec/unit/coverage_extras_spec.rb'

# Specs are grouped by test layer (`unit`, `integration`, `e2e`) instead of
# mirroring the source tree exactly.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<p align="center">
<a href="https://rubygems.org/gems/arcp"><img alt="gem" src="https://img.shields.io/gem/v/arcp.svg"></a>
<a href="https://github.com/nficano/arpc/actions/workflows/test.yml"><img alt="CI" src="https://github.com/nficano/arpc/actions/workflows/test.yml/badge.svg"></a>
<a href="https://github.com/agentruntimecontrolprotocol/ruby-sdk/actions/workflows/test.yml"><img alt="CI" src="https://github.com/agentruntimecontrolprotocol/ruby-sdk/actions/workflows/test.yml/badge.svg"></a>
<a href="https://github.com/agentruntimecontrolprotocol/spec/blob/main/docs/draft-arcp-1.1.md"><img alt="ARCP" src="https://img.shields.io/badge/ARCP-v1.1%20draft-blue"></a>
<a href="LICENSE"><img alt="License" src="https://img.shields.io/badge/license-Apache--2.0-lightgrey"></a>
<a href="https://coderabbit.ai"><img alt="CodeRabbit" src="https://img.shields.io/coderabbit/prs/github/agentruntimecontrolprotocol/ruby-sdk?utm_source=oss&utm_medium=github&utm_campaign=agentruntimecontrolprotocol/ruby-sdk&labelColor=171717&color=FF570A&label=CodeRabbit+Reviews"></a>
Expand Down
15 changes: 7 additions & 8 deletions Steepfile
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
# frozen_string_literal: true

# Steep targets the smallest reliable surface today: the version file
# plus the transport base contract. Bringing the rest of the implementation
# back under Steep is tracked as ongoing work — adding files here once
# their sigs are accurate keeps Steep useful instead of drowning in
# pre-existing drift from the Ruby 3.4 `Data.define` rewrite. The runtime
# sigs in `sig/arcp/runtime.rbs` are kept current so downstream consumers
# (and future Steep coverage) can rely on them.
target :lib do
signature 'sig'

check 'lib/arcp/envelope.rb'
check 'lib/arcp/serializer.rb'
check 'lib/arcp/version.rb'
check 'lib/arcp/errors.rb'
check 'lib/arcp/client.rb'
check 'lib/arcp/runtime/runtime.rb'
check 'lib/arcp/session.rb'
check 'lib/arcp/job.rb'
check 'lib/arcp/lease.rb'
check 'lib/arcp/transport/base.rb'

library 'time'
Expand Down
57 changes: 42 additions & 15 deletions lib/arcp/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def initialize(transport:, clock: Arcp::SystemClock.new)
@job_streams = {}
@job_results = {}
@result_waiters = {}
@submitted_jobs = {}
@reader_task = nil
@heartbeat_task = nil
@next_outbound_seq = 0
Expand Down Expand Up @@ -153,6 +154,7 @@ def submit_job(agent:, input: nil, lease_request: nil, lease_constraints: nil,
payload: submit.to_h
)
accepted = Arcp::Job::Accepted.from_h(accepted_env.payload)
@mutex.synchronize { @submitted_jobs[accepted.job_id] = true }
Arcp::Job::Handle.new(
job_id: accepted.job_id, agent: accepted.agent,
submitted_at: accepted.accepted_at,
Expand All @@ -161,11 +163,17 @@ def submit_job(agent:, input: nil, lease_request: nil, lease_constraints: nil,
)
end

# Subscribes to a job's event stream.
# Subscribes to a job's event stream. Sends `job.subscribe` for any job
# this client did not submit (so observer sessions attach to the runtime
# fanout); submitter sessions reuse the stream the runtime opened for
# them at submit time. The `subscribe` feature is required for explicit
# subscriptions regardless of whether `from_event_seq` is supplied.
def subscribe_job(job_id:, from_event_seq: nil, history: false)
already_owned = @mutex.synchronize { @submitted_jobs[job_id] }
queue = @mutex.synchronize { @job_streams[job_id] ||= Async::Queue.new }

if @session.supports?(Arcp::Session::Feature::SUBSCRIBE) && from_event_seq
unless already_owned
require_feature!(Arcp::Session::Feature::SUBSCRIBE)
send_envelope(type: Arcp::MessageTypes::JOB_SUBSCRIBE,
job_id: job_id,
payload: Arcp::Job::Subscribe.new(job_id: job_id, from_event_seq: from_event_seq,
Expand Down Expand Up @@ -197,6 +205,8 @@ def get_result(job_id:)
@mutex.synchronize { @result_waiters[job_id] = queue }
env = queue.dequeue
end
raise Arcp::Errors::ProtocolViolation, 'transport closed before job result' if env.nil?

case env.type
when Arcp::MessageTypes::JOB_RESULT
Arcp::Job::Result.from_h(env.payload)
Expand All @@ -214,16 +224,32 @@ def ack(seq)
payload: Arcp::Session::Ack.new(last_processed_seq: seq).to_h)
end

# Sends an envelope on the current session.
def send_envelope(type:, payload:, job_id: nil)
# Builds an envelope for the current session without sending it.
# Lets callers register pending waiters keyed on the envelope id
# before the peer can reply.
def build_envelope(type:, payload:, job_id: nil)
raise Arcp::Errors::Internal, 'session not open' unless @session
raise IOError, 'client closed' if @closed

env = Arcp::Envelope.build(
Arcp::Envelope.build(
type: type, session_id: @session.id,
trace_id: Arcp::Trace.current.trace_id,
job_id: job_id, payload: payload
)
end

# Sends an envelope on the current session.
def send_envelope(type:, payload:, job_id: nil)
raise IOError, 'client closed' if @closed

env = build_envelope(type: type, payload: payload, job_id: job_id)
@transport.send(env)
env
end

# Sends a pre-built envelope, e.g. after registering a pending waiter.
def send_built_envelope(env)
raise IOError, 'client closed' if @closed

@transport.send(env)
env
end
Expand All @@ -232,13 +258,13 @@ def send_envelope(type:, payload:, job_id: nil)
def close(reason: nil)
return if @closed

@closed = true
begin
send_envelope(type: Arcp::MessageTypes::SESSION_BYE,
payload: Arcp::Session::Bye.new(reason: reason).to_h)
rescue StandardError
nil
end
@closed = true
@heartbeat_task&.stop
@reader_task&.stop
@transport.close(reason: reason)
Expand All @@ -255,9 +281,15 @@ def require_feature!(feature)
end

def request(type:, expect:, payload:)
env = send_envelope(type: type, payload: payload)
env = build_envelope(type: type, payload: payload)
queue = Async::Queue.new
@mutex.synchronize { @pending[env.id] = [expect, queue] }
begin
send_built_envelope(env)
rescue StandardError
@mutex.synchronize { @pending.delete(env.id) }
raise
end
response = queue.dequeue
raise Arcp::Errors::ProtocolViolation, 'transport closed' if response.nil?

Expand Down Expand Up @@ -348,14 +380,9 @@ def feed_result(env)

def feed_pending(env)
reply_to = env.payload.is_a?(Hash) ? env.payload['reply_to'] : nil
key = reply_to || @mutex.synchronize do
@pending.keys.find do |k|
@pending[k].is_a?(Array) && @pending[k][0] == env.type
end
end
return unless key
return unless reply_to

pair = @mutex.synchronize { @pending.delete(key) }
pair = @mutex.synchronize { @pending.delete(reply_to) }
pair&.last&.enqueue(env)
end

Expand Down
61 changes: 57 additions & 4 deletions lib/arcp/lease.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,79 @@
module Arcp
module Lease
# Immutable lease bounds attached to a job request or granted lease.
# `max_budget` is a {CostBudget} expressing the maximum per-currency
# amount that a requested lease budget may declare for this job; it
# accepts the same shape as `cost.budget` (a list of `"CCY:amount"`
# entries) or a pre-parsed {CostBudget}.
LeaseConstraints = Data.define(:expires_at, :max_budget) do
def initialize(expires_at: nil, max_budget: nil)
super(expires_at: expires_at, max_budget: self.class.parse_max_budget(max_budget))
end

def self.from_h(h)
return nil if h.nil?

h = h.transform_keys(&:to_s)
new(expires_at: h['expires_at'], max_budget: h['max_budget'])
end

def self.parse_max_budget(value)
case value
when nil then nil
when CostBudget then value
when Array then CostBudget.parse(value)
when Hash
h = value.transform_keys(&:to_s)
CostBudget.parse(h['cost.budget'] || h.values_at(*h.keys).flatten)
else
raise Arcp::Errors::InvalidRequest,
"max_budget must be a list of 'CCY:amount' entries or a CostBudget"
end
end

def to_h
out = {}
out['expires_at'] = expires_at if expires_at
out['max_budget'] = max_budget if max_budget
out['max_budget'] = max_budget.to_a if max_budget
out
end

def validate!
return if expires_at.nil?
unless expires_at.nil?
t = Time.iso8601(expires_at)
raise Arcp::Errors::InvalidRequest, "expires_at must be UTC (use 'Z'): #{expires_at}" unless t.utc?
end

validate_max_budget!
end

# Raises {Arcp::Errors::LeaseSubsetViolation} if a requested lease
# budget exceeds the per-currency caps declared in `max_budget`.
# A request that omits a currency declared in `max_budget` is allowed.
def enforce_max_budget!(requested_budget)
return if max_budget.nil?
return if requested_budget.nil?

offending = requested_budget.per_currency.filter_map do |ccy, amt|
cap = max_budget.per_currency[ccy]
ccy if cap.nil? || amt > cap
end
return if offending.empty?

raise Arcp::Errors::LeaseSubsetViolation.new(
"lease budget exceeds lease_constraints max_budget for: #{offending.inspect}",
details: { 'currencies' => offending }
)
end

private

def validate_max_budget!
return if max_budget.nil?
return if max_budget.is_a?(CostBudget)

t = Time.iso8601(expires_at)
raise Arcp::Errors::InvalidRequest, "expires_at must be UTC (use 'Z'): #{expires_at}" unless t.utc?
raise Arcp::Errors::InvalidRequest,
'max_budget must be a CostBudget after parsing'
end
end

Expand Down
43 changes: 36 additions & 7 deletions lib/arcp/runtime/event_log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,28 @@

module Arcp
module Runtime
# In-memory ring of buffered events keyed by session_id. The runtime
# uses this for the replay window and `session.ack`-driven early
# eviction. The shipped implementation is in-memory; persistence can
# be layered on later without changing the public API.
# In-memory ring of buffered events keyed by session_id, with a
# secondary index by job_id so that `job.subscribe` history replays
# can resolve from the originating job's stream regardless of which
# session emitted the envelopes. The runtime uses this for the
# replay window and `session.ack`-driven early eviction. The shipped
# implementation is in-memory; persistence can be layered on later
# without changing the public API.
class EventLog
def initialize(window_sec: 300, clock: Arcp::SystemClock.new)
@window_sec = window_sec
@clock = clock
@sessions = Hash.new { |h, k| h[k] = [] }
@jobs = Hash.new { |h, k| h[k] = [] }
@floor = Hash.new(0)
@mutex = Mutex.new
end

def append(session_id, envelope)
entry = [envelope, @clock.monotonic]
@mutex.synchronize do
@sessions[session_id] << [envelope, @clock.monotonic]
@sessions[session_id] << entry
@jobs[envelope.job_id] << entry if envelope.job_id
end
envelope
end
Expand All @@ -33,11 +39,29 @@ def evict_up_to(session_id, seq)
end
end

# Replays buffered envelopes for a session in arrival order. Used
# for resume token replay where the session id frames the cursor.
# Terminal envelopes (`job.result`, `job.error`) carry no
# `event_seq` and are always included so a resuming client can
# observe the final job state alongside any missed events.
def replay(session_id, from_event_seq: nil)
@mutex.synchronize do
@sessions[session_id].each_with_object([]) do |(env, _t), out|
next if env.event_seq.nil?
next if from_event_seq && env.event_seq < from_event_seq
next if env.event_seq && from_event_seq && env.event_seq < from_event_seq

out << env
end
end
end

# Replays buffered envelopes for a job's stream regardless of which
# session originally produced them. Used by `job.subscribe`
# history replay so observers see the full job timeline, including
# the terminal `job.result` / `job.error` envelope.
def replay_job(job_id, from_event_seq: nil)
@mutex.synchronize do
@jobs[job_id].each_with_object([]) do |(env, _t), out|
next if env.event_seq && from_event_seq && env.event_seq < from_event_seq

out << env
end
Expand All @@ -51,11 +75,16 @@ def expire!
@sessions.each_value do |buf|
buf.reject! { |(_e, t)| (now - t) > @window_sec }
end
@jobs.each_value do |buf|
buf.reject! { |(_e, t)| (now - t) > @window_sec }
end
end
end

# @api private
def buffer_size(session_id) = @sessions[session_id].size
# @api private
def job_buffer_size(job_id) = @jobs[job_id].size
end
end
end
Loading