From 960211a04e9191f9b27981d467059b6ef589f265 Mon Sep 17 00:00:00 2001 From: Gauri Kalra Date: Wed, 27 May 2026 05:11:02 +0000 Subject: [PATCH] fix(storage): Add telemetry tracing support for async stream Close() --- .../async/writer_connection_tracing.cc | 17 +++++++ .../async/writer_connection_tracing_test.cc | 46 +++++++++++++++++++ 2 files changed, 63 insertions(+) diff --git a/google/cloud/storage/internal/async/writer_connection_tracing.cc b/google/cloud/storage/internal/async/writer_connection_tracing.cc index a2cd9f514f1dd..96ad896a0369f 100644 --- a/google/cloud/storage/internal/async/writer_connection_tracing.cc +++ b/google/cloud/storage/internal/async/writer_connection_tracing.cc @@ -116,6 +116,23 @@ class AsyncWriterConnectionTracing : public storage::AsyncWriterConnection { }); } + future Close(storage::WritePayload p) override { + internal::OTelScope scope(span_); + auto size = static_cast(p.size()); + return impl_->Close(std::move(p)) + .then([count = ++sent_count_, span = span_, size](auto f) { + span->AddEvent( + "gl-cpp.close", + { + {/*sc::kRpcMessageType=*/"rpc.message.type", "SENT"}, + {/*sc::kRpcMessageId=*/"rpc.message.id", count}, + {sc::thread::kThreadId, internal::CurrentThreadId()}, + {"gl-cpp.size", size}, + }); + return internal::EndSpan(*span, f.get()); + }); + } + future> Query() override { internal::OTelScope scope(span_); return impl_->Query().then([count = ++recv_count_, span = span_](auto f) { diff --git a/google/cloud/storage/internal/async/writer_connection_tracing_test.cc b/google/cloud/storage/internal/async/writer_connection_tracing_test.cc index 46c5dd1caf736..b1fbd2bd45f9f 100644 --- a/google/cloud/storage/internal/async/writer_connection_tracing_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_tracing_test.cc @@ -68,6 +68,10 @@ auto ExpectFlush(std::int64_t id, std::uint64_t size) { return AllOf(EventNamed("gl-cpp.flush"), ExpectSent(id, size)); } +auto ExpectClose(std::int64_t id, std::uint64_t size) { + return AllOf(EventNamed("gl-cpp.close"), ExpectSent(id, size)); +} + auto ExpectQuery(std::int64_t id) { namespace sc = ::opentelemetry::semconv; return AllOf(EventNamed("gl-cpp.query"), @@ -248,6 +252,48 @@ TEST(WriterConnectionTracing, Cancel) { sc::thread::kThreadId, _))))))); } +TEST(WriterConnectionTracing, Close) { + auto span_catcher = InstallSpanCatcher(); + + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Close).WillOnce([] { + return make_ready_future(Status{}); + }); + auto actual = MakeTracingWriterConnection( + internal::MakeSpan("test-span-name"), std::move(mock)); + auto status = actual->Close(WritePayload{std::string(1024, 'A')}).get(); + EXPECT_STATUS_OK(status); + + auto spans = span_catcher->GetSpans(); + EXPECT_THAT(spans, ElementsAre(AllOf( + SpanNamed("test-span-name"), + SpanWithStatus(opentelemetry::trace::StatusCode::kOk), + SpanHasInstrumentationScope(), SpanKindIsClient(), + SpanHasEvents(ExpectClose(1, 1024))))); +} + +TEST(WriterConnectionTracing, CloseError) { + auto span_catcher = InstallSpanCatcher(); + + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Close).WillOnce([] { + return make_ready_future(PermanentError()); + }); + auto actual = MakeTracingWriterConnection( + internal::MakeSpan("test-span-name"), std::move(mock)); + auto status = actual->Close(WritePayload{std::string(1024, 'A')}).get(); + EXPECT_THAT(status, StatusIs(PermanentError().code())); + + auto spans = span_catcher->GetSpans(); + EXPECT_THAT( + spans, + ElementsAre(AllOf(SpanNamed("test-span-name"), + SpanWithStatus(opentelemetry::trace::StatusCode::kError, + PermanentError().message()), + SpanHasInstrumentationScope(), SpanKindIsClient(), + SpanHasEvents(ExpectClose(1, 1024))))); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal