From 5bb65642e3b5b88e57a62d2bc5512c62b7c8e4b3 Mon Sep 17 00:00:00 2001 From: Vicky Zhang Date: Thu, 21 May 2026 18:58:04 -0700 Subject: [PATCH 1/2] Handle nil keyring gracefully and fix gRPC v3 broker startup race Return a typed errKeychainUnavailable error when config.KeyRing is nil instead of panicking on a nil pointer dereference. Refactor the gRPC v3 plugin broker to start the CoreCLIHelper server with a short publish delay so non-Go plugins can register pending dials before the connection info arrives. Co-Authored-By: Claude Opus 4.6 (1M context) Committed-By-Agent: claude --- pkg/plugins/core_cli_helper.go | 40 +++- pkg/plugins/core_cli_helper_test.go | 26 +++ pkg/plugins/interface_grpc_3.go | 88 ++++++-- pkg/plugins/interface_grpc_3_test.go | 309 +++++++++++++++++++++++++++ 4 files changed, 442 insertions(+), 21 deletions(-) create mode 100644 pkg/plugins/interface_grpc_3_test.go diff --git a/pkg/plugins/core_cli_helper.go b/pkg/plugins/core_cli_helper.go index 82bf3bc5e..34d743133 100644 --- a/pkg/plugins/core_cli_helper.go +++ b/pkg/plugins/core_cli_helper.go @@ -2,6 +2,7 @@ package plugins import ( "context" + "errors" "fmt" "runtime" "sync" @@ -163,6 +164,7 @@ type pendingKeychainValue struct { } var ( + errKeychainUnavailable = errors.New("system keychain is unavailable") keychainVisibilityRetryTimeout = 1500 * time.Millisecond keychainVisibilityRetryEnabled = runtime.GOOS == "darwin" keychainVisibilityNow = time.Now @@ -170,8 +172,21 @@ var ( keychainVisibilityPendingValues = map[string]pendingKeychainValue{} ) +func keyRing() (keyring.Keyring, error) { + if config.KeyRing == nil { + return nil, errKeychainUnavailable + } + + return config.KeyRing, nil +} + func readKeychainPassword(key string) (string, bool, error) { - item, err := config.KeyRing.Get(key) + ring, err := keyRing() + if err != nil { + return "", false, err + } + + item, err := ring.Get(key) if err == nil { return string(item.Data), true, nil } @@ -267,7 +282,12 @@ func (h *coreCLIHelper) KeychainGetPassword(key string) (string, bool, error) { // KeychainSetPassword stores a password in the system keychain. func (h *coreCLIHelper) KeychainSetPassword(key string, value string) error { - if err := config.KeyRing.Set(keyring.Item{ + ring, err := keyRing() + if err != nil { + return err + } + + if err := ring.Set(keyring.Item{ Key: key, Data: []byte(value), Label: key, @@ -283,13 +303,18 @@ func (h *coreCLIHelper) KeychainSetPassword(key string, value string) error { func (h *coreCLIHelper) KeychainDeletePassword(key string) (bool, error) { clearPendingKeychainValue(key) - existingKeys, err := config.KeyRing.Keys() + ring, err := keyRing() + if err != nil { + return false, err + } + + existingKeys, err := ring.Keys() if err != nil { return false, err } for _, k := range existingKeys { if k == key { - if err := config.KeyRing.Remove(key); err != nil { + if err := ring.Remove(key); err != nil { return false, err } return true, nil @@ -300,7 +325,12 @@ func (h *coreCLIHelper) KeychainDeletePassword(key string) (bool, error) { // KeychainFindCredentials lists all keys stored in the keychain for this service. func (h *coreCLIHelper) KeychainFindCredentials() ([]string, error) { - return config.KeyRing.Keys() + ring, err := keyRing() + if err != nil { + return nil, err + } + + return ring.Keys() } // RunPeerPlugin looks up and runs the named plugin with the given arguments. diff --git a/pkg/plugins/core_cli_helper_test.go b/pkg/plugins/core_cli_helper_test.go index c6de63a14..e7772cf35 100644 --- a/pkg/plugins/core_cli_helper_test.go +++ b/pkg/plugins/core_cli_helper_test.go @@ -146,6 +146,20 @@ func TestKeychainGetPasswordReturnsNotFoundWithoutRetry(t *testing.T) { require.Equal(t, 1, ring.getCalls) } +func TestKeychainGetPasswordReturnsErrorWhenKeyringUnavailable(t *testing.T) { + originalKeyRing := config.KeyRing + config.KeyRing = nil + t.Cleanup(func() { + config.KeyRing = originalKeyRing + }) + + coreCLIHelper := NewCoreCLIHelper(context.Background(), nil, afero.NewMemMapFs()) + value, found, err := coreCLIHelper.KeychainGetPassword("missing.key") + require.ErrorIs(t, err, errKeychainUnavailable) + require.False(t, found) + require.Empty(t, value) +} + func TestKeychainGetPasswordReturnsUnexpectedError(t *testing.T) { originalKeyRing := config.KeyRing originalEnabled := keychainVisibilityRetryEnabled @@ -210,6 +224,18 @@ func TestKeychainSetPasswordMakesRecentWriteVisibleWhenKeychainHasNotCaughtUp(t require.Equal(t, 1, ring.getCalls) } +func TestKeychainSetPasswordReturnsErrorWhenKeyringUnavailable(t *testing.T) { + originalKeyRing := config.KeyRing + config.KeyRing = nil + t.Cleanup(func() { + config.KeyRing = originalKeyRing + }) + + coreCLIHelper := NewCoreCLIHelper(context.Background(), nil, afero.NewMemMapFs()) + err := coreCLIHelper.KeychainSetPassword("test.key", "sk_test_123") + require.ErrorIs(t, err, errKeychainUnavailable) +} + func TestKeychainGetPasswordPrefersRecentWriteOverStaleVisibleValue(t *testing.T) { originalKeyRing := config.KeyRing originalEnabled := keychainVisibilityRetryEnabled diff --git a/pkg/plugins/interface_grpc_3.go b/pkg/plugins/interface_grpc_3.go index 19ff98b17..9b791ef3e 100644 --- a/pkg/plugins/interface_grpc_3.go +++ b/pkg/plugins/interface_grpc_3.go @@ -2,6 +2,9 @@ package plugins import ( "context" + "errors" + "sync" + "time" hcplugin "github.com/hashicorp/go-plugin" "google.golang.org/grpc" @@ -40,33 +43,86 @@ func (p *CLIPluginV3) GRPCClient(ctx context.Context, broker *hcplugin.GRPCBroke // GRPCClientV3 is an implementation of the gRPC client that talks over gRPC. type GRPCClientV3 struct { client proto.MainClient - broker *hcplugin.GRPCBroker + broker grpcBrokerClient } -// RunCommand calls the RPC. -func (m *GRPCClientV3) RunCommand(additionalInfo *proto.AdditionalInfo, args []string, coreCLIHelper CoreCLIHelper) error { - coreCLIHelperServer := &CoreCLIHelperServer{Impl: coreCLIHelper} +type grpcBrokerServer interface { + AcceptAndServe(id uint32, newGRPCServer func([]grpc.ServerOption) *grpc.Server) +} + +type grpcBrokerClient interface { + grpcBrokerServer + NextId() uint32 +} + +var errCoreCLIHelperBrokerServerStart = errors.New("failed to start CoreCLIHelper broker server") +var coreCLIHelperBrokerPublishDelay = 25 * time.Millisecond +var coreCLIHelperBrokerServerStartTimeout = 5 * time.Second + +func startCoreCLIHelperBrokerServer(broker grpcBrokerServer, brokerID uint32, coreCLIHelper CoreCLIHelper) (func(), error) { + startedCh := make(chan struct{}) + doneCh := make(chan struct{}) + + var server *grpc.Server + go func() { + defer close(doneCh) + + broker.AcceptAndServe(brokerID, func(opts []grpc.ServerOption) *grpc.Server { + server = grpc.NewServer(opts...) + proto.RegisterCoreCLIHelperServer(server, &CoreCLIHelperServer{Impl: coreCLIHelper}) + close(startedCh) + return server + }) + }() + + select { + case <-startedCh: + case <-doneCh: + return nil, errCoreCLIHelperBrokerServerStart + case <-time.After(coreCLIHelperBrokerServerStartTimeout): + return nil, errCoreCLIHelperBrokerServerStart + } - var s *grpc.Server - serverFunc := func(opts []grpc.ServerOption) *grpc.Server { - s = grpc.NewServer(opts...) - proto.RegisterCoreCLIHelperServer(s, coreCLIHelperServer) - return s + var cleanupOnce sync.Once + cleanup := func() { + cleanupOnce.Do(func() { + if server != nil { + server.Stop() + } + }) } + return cleanup, nil +} + +// RunCommand calls the RPC. +func (m *GRPCClientV3) RunCommand(additionalInfo *proto.AdditionalInfo, args []string, coreCLIHelper CoreCLIHelper) error { brokerID := m.broker.NextId() - go m.broker.AcceptAndServe(brokerID, serverFunc) + errCh := make(chan error, 1) + go func() { + _, err := m.client.RunCommand(context.Background(), &proto.RunCommandRequest{ + AdditionalInfo: additionalInfo, + Args: args, + CoreCliHelperId: brokerID, + }) + errCh <- err + }() + + // Non-Go plugins can drop unsolicited broker connection info if it arrives + // before they register a pending dial for this service ID. + time.Sleep(coreCLIHelperBrokerPublishDelay) + + cleanup, err := startCoreCLIHelperBrokerServer(m.broker, brokerID, coreCLIHelper) + if err != nil { + return err + } + defer cleanup() - _, err := m.client.RunCommand(context.Background(), &proto.RunCommandRequest{ - AdditionalInfo: additionalInfo, - Args: args, - CoreCliHelperId: brokerID, - }) + err = <-errCh if err != nil { return err } - s.Stop() return nil } diff --git a/pkg/plugins/interface_grpc_3_test.go b/pkg/plugins/interface_grpc_3_test.go new file mode 100644 index 000000000..1c2c083b3 --- /dev/null +++ b/pkg/plugins/interface_grpc_3_test.go @@ -0,0 +1,309 @@ +package plugins + +import ( + "context" + "errors" + "net" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/bufconn" + + "github.com/stripe/stripe-cli/pkg/plugins/proto" +) + +type stubGRPCBrokerAccepter struct { + listener net.Listener + err error + acceptedIDs []uint32 +} + +func (s *stubGRPCBrokerAccepter) NextId() uint32 { + return uint32(len(s.acceptedIDs) + 1) +} + +func (s *stubGRPCBrokerAccepter) AcceptAndServe(id uint32, newGRPCServer func([]grpc.ServerOption) *grpc.Server) { + s.acceptedIDs = append(s.acceptedIDs, id) + if s.err != nil { + return + } + + server := newGRPCServer(nil) + _ = server.Serve(s.listener) +} + +type stubCoreCLIHelper struct{} + +func (s *stubCoreCLIHelper) Echo(input string) (string, error) { + return "echo:" + input, nil +} + +func (s *stubCoreCLIHelper) SendAnalytics(eventName string, eventValue string) error { + return nil +} + +func (s *stubCoreCLIHelper) KeychainGetPassword(key string) (string, bool, error) { + return "", false, nil +} + +func (s *stubCoreCLIHelper) KeychainSetPassword(key string, value string) error { + return nil +} + +func (s *stubCoreCLIHelper) KeychainDeletePassword(key string) (bool, error) { + return false, nil +} + +func (s *stubCoreCLIHelper) KeychainFindCredentials() ([]string, error) { + return nil, nil +} + +func (s *stubCoreCLIHelper) RunPeerPlugin(pluginName string, args []string, cwd string) error { + return nil +} + +func TestStartCoreCLIHelperBrokerServerServesRPCs(t *testing.T) { + listener := bufconn.Listen(1024 * 1024) + broker := &stubGRPCBrokerAccepter{listener: listener} + + cleanup, err := startCoreCLIHelperBrokerServer(broker, 42, &stubCoreCLIHelper{}) + require.NoError(t, err) + t.Cleanup(cleanup) + + require.Equal(t, []uint32{42}, broker.acceptedIDs) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + conn, err := grpc.DialContext( + ctx, + "bufnet", + grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return listener.Dial() + }), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + t.Cleanup(func() { + _ = conn.Close() + }) + + client := proto.NewCoreCLIHelperClient(conn) + resp, err := client.Echo(ctx, &proto.EchoRequest{Input: "hello"}) + require.NoError(t, err) + require.Equal(t, "echo:hello", resp.Output) +} + +func TestStartCoreCLIHelperBrokerServerReturnsAcceptError(t *testing.T) { + expectedErr := errors.New("boom") + broker := &stubGRPCBrokerAccepter{err: expectedErr} + + cleanup, err := startCoreCLIHelperBrokerServer(broker, 7, &stubCoreCLIHelper{}) + require.ErrorIs(t, err, errCoreCLIHelperBrokerServerStart) + require.Nil(t, cleanup) + require.Equal(t, []uint32{7}, broker.acceptedIDs) +} + +type sequencingGRPCBroker struct { + nextID uint32 + acceptedIDs []uint32 + published chan struct{} + publishHelper chan struct{} + returnServe chan struct{} +} + +func (s *sequencingGRPCBroker) NextId() uint32 { + return atomic.AddUint32(&s.nextID, 1) +} + +func (s *sequencingGRPCBroker) AcceptAndServe(id uint32, newGRPCServer func([]grpc.ServerOption) *grpc.Server) { + s.acceptedIDs = append(s.acceptedIDs, id) + <-s.publishHelper + close(s.published) + _ = newGRPCServer(nil) + <-s.returnServe +} + +type recordingMainClient struct { + called chan *proto.RunCommandRequest +} + +func (c *recordingMainClient) RunCommand(ctx context.Context, in *proto.RunCommandRequest, opts ...grpc.CallOption) (*proto.RunCommandResponse, error) { + c.called <- in + return &proto.RunCommandResponse{}, nil +} + +func TestGRPCClientV3RunCommandPublishesHelperAfterPluginRunCommandStarts(t *testing.T) { + originalDelay := coreCLIHelperBrokerPublishDelay + coreCLIHelperBrokerPublishDelay = 25 * time.Millisecond + t.Cleanup(func() { + coreCLIHelperBrokerPublishDelay = originalDelay + }) + + broker := &sequencingGRPCBroker{ + published: make(chan struct{}), + publishHelper: make(chan struct{}), + returnServe: make(chan struct{}), + } + mainClient := &recordingMainClient{ + called: make(chan *proto.RunCommandRequest, 1), + } + grpcClient := &GRPCClientV3{ + client: mainClient, + broker: broker, + } + + errCh := make(chan error, 1) + go func() { + errCh <- grpcClient.RunCommand(nil, []string{"search", "magazine"}, &stubCoreCLIHelper{}) + }() + + select { + case req := <-mainClient.called: + require.Equal(t, uint32(1), req.CoreCliHelperId) + require.Equal(t, []string{"search", "magazine"}, req.Args) + case <-time.After(time.Second): + t.Fatal("timed out waiting for plugin RunCommand before helper broker publication") + } + + select { + case <-broker.published: + t.Fatal("helper broker server published before plugin RunCommand started") + case <-time.After(10 * time.Millisecond): + } + + close(broker.publishHelper) + + select { + case <-broker.published: + case <-time.After(time.Second): + t.Fatal("timed out waiting for helper broker publication") + } + + require.Equal(t, []uint32{1}, broker.acceptedIDs) + + close(broker.returnServe) + require.NoError(t, <-errCh) +} + +type raceAwareGRPCBroker struct { + nextID uint32 + acceptedIDs []uint32 + pendingDialRegistered <-chan struct{} + publishedAfterPending chan struct{} + returnServe chan struct{} +} + +func (s *raceAwareGRPCBroker) NextId() uint32 { + return atomic.AddUint32(&s.nextID, 1) +} + +func (s *raceAwareGRPCBroker) AcceptAndServe(id uint32, newGRPCServer func([]grpc.ServerOption) *grpc.Server) { + s.acceptedIDs = append(s.acceptedIDs, id) + + select { + case <-s.pendingDialRegistered: + close(s.publishedAfterPending) + default: + } + + _ = newGRPCServer(nil) + <-s.returnServe +} + +type pendingDialMainClient struct { + pendingDialRegistered chan struct{} + publishedAfterPending <-chan struct{} +} + +func (c *pendingDialMainClient) RunCommand(ctx context.Context, in *proto.RunCommandRequest, opts ...grpc.CallOption) (*proto.RunCommandResponse, error) { + close(c.pendingDialRegistered) + + select { + case <-c.publishedAfterPending: + return &proto.RunCommandResponse{}, nil + case <-time.After(250 * time.Millisecond): + return nil, errors.New("broker publication dropped before pending dial was registered") + } +} + +func runCommandWithHelperPublishedFirst(client proto.MainClient, broker grpcBrokerClient, additionalInfo *proto.AdditionalInfo, args []string, coreCLIHelper CoreCLIHelper) error { + brokerID := broker.NextId() + + cleanup, err := startCoreCLIHelperBrokerServer(broker, brokerID, coreCLIHelper) + if err != nil { + return err + } + defer cleanup() + + _, err = client.RunCommand(context.Background(), &proto.RunCommandRequest{ + AdditionalInfo: additionalInfo, + Args: args, + CoreCliHelperId: brokerID, + }) + return err +} + +func TestLegacyHelperBrokerPublicationOrderCanDropConnectionInfo(t *testing.T) { + pendingDialRegistered := make(chan struct{}) + publishedAfterPending := make(chan struct{}) + broker := &raceAwareGRPCBroker{ + pendingDialRegistered: pendingDialRegistered, + publishedAfterPending: publishedAfterPending, + returnServe: make(chan struct{}), + } + mainClient := &pendingDialMainClient{ + pendingDialRegistered: pendingDialRegistered, + publishedAfterPending: publishedAfterPending, + } + + err := runCommandWithHelperPublishedFirst(mainClient, broker, nil, []string{"search", "myproject"}, &stubCoreCLIHelper{}) + close(broker.returnServe) + + require.EqualError(t, err, "broker publication dropped before pending dial was registered") + require.Equal(t, []uint32{1}, broker.acceptedIDs) +} + +func TestGRPCClientV3RunCommandAvoidsDroppedHelperBrokerPublicationRace(t *testing.T) { + originalDelay := coreCLIHelperBrokerPublishDelay + coreCLIHelperBrokerPublishDelay = 25 * time.Millisecond + t.Cleanup(func() { + coreCLIHelperBrokerPublishDelay = originalDelay + }) + + pendingDialRegistered := make(chan struct{}) + publishedAfterPending := make(chan struct{}) + broker := &raceAwareGRPCBroker{ + pendingDialRegistered: pendingDialRegistered, + publishedAfterPending: publishedAfterPending, + returnServe: make(chan struct{}), + } + mainClient := &pendingDialMainClient{ + pendingDialRegistered: pendingDialRegistered, + publishedAfterPending: publishedAfterPending, + } + grpcClient := &GRPCClientV3{ + client: mainClient, + broker: broker, + } + + errCh := make(chan error, 1) + go func() { + errCh <- grpcClient.RunCommand(nil, []string{"search", "myproject"}, &stubCoreCLIHelper{}) + }() + + select { + case <-publishedAfterPending: + case <-time.After(time.Second): + t.Fatal("timed out waiting for helper broker publication after pending dial registration") + } + + require.Equal(t, []uint32{1}, broker.acceptedIDs) + + close(broker.returnServe) + require.NoError(t, <-errCh) +} From d481ade8d9f2abc3eec56b2231d02c2673cba40c Mon Sep 17 00:00:00 2001 From: Vicky Zhang Date: Thu, 21 May 2026 19:03:26 -0700 Subject: [PATCH 2/2] lint --- pkg/plugins/interface_grpc_3.go | 18 +++++++++++++++--- pkg/plugins/interface_grpc_3_test.go | 21 ++++++++++----------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/pkg/plugins/interface_grpc_3.go b/pkg/plugins/interface_grpc_3.go index 9b791ef3e..cefd69cf5 100644 --- a/pkg/plugins/interface_grpc_3.go +++ b/pkg/plugins/interface_grpc_3.go @@ -36,7 +36,7 @@ func (p *CLIPluginV3) GRPCServer(broker *hcplugin.GRPCBroker, s *grpc.Server) er func (p *CLIPluginV3) GRPCClient(ctx context.Context, broker *hcplugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { return &GRPCClientV3{ client: proto.NewMainClient(c), - broker: broker, + broker: grpcBrokerAdapter{broker: broker}, }, nil } @@ -52,7 +52,19 @@ type grpcBrokerServer interface { type grpcBrokerClient interface { grpcBrokerServer - NextId() uint32 + nextID() uint32 +} + +type grpcBrokerAdapter struct { + broker *hcplugin.GRPCBroker +} + +func (b grpcBrokerAdapter) nextID() uint32 { + return b.broker.NextId() +} + +func (b grpcBrokerAdapter) AcceptAndServe(id uint32, newGRPCServer func([]grpc.ServerOption) *grpc.Server) { + b.broker.AcceptAndServe(id, newGRPCServer) } var errCoreCLIHelperBrokerServerStart = errors.New("failed to start CoreCLIHelper broker server") @@ -97,7 +109,7 @@ func startCoreCLIHelperBrokerServer(broker grpcBrokerServer, brokerID uint32, co // RunCommand calls the RPC. func (m *GRPCClientV3) RunCommand(additionalInfo *proto.AdditionalInfo, args []string, coreCLIHelper CoreCLIHelper) error { - brokerID := m.broker.NextId() + brokerID := m.broker.nextID() errCh := make(chan error, 1) go func() { _, err := m.client.RunCommand(context.Background(), &proto.RunCommandRequest{ diff --git a/pkg/plugins/interface_grpc_3_test.go b/pkg/plugins/interface_grpc_3_test.go index 1c2c083b3..5f907e1c2 100644 --- a/pkg/plugins/interface_grpc_3_test.go +++ b/pkg/plugins/interface_grpc_3_test.go @@ -22,7 +22,7 @@ type stubGRPCBrokerAccepter struct { acceptedIDs []uint32 } -func (s *stubGRPCBrokerAccepter) NextId() uint32 { +func (s *stubGRPCBrokerAccepter) nextID() uint32 { return uint32(len(s.acceptedIDs) + 1) } @@ -79,9 +79,8 @@ func TestStartCoreCLIHelperBrokerServerServesRPCs(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - conn, err := grpc.DialContext( - ctx, - "bufnet", + conn, err := grpc.NewClient( + "passthrough:///bufnet", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return listener.Dial() }), @@ -109,15 +108,15 @@ func TestStartCoreCLIHelperBrokerServerReturnsAcceptError(t *testing.T) { } type sequencingGRPCBroker struct { - nextID uint32 + nextIDValue uint32 acceptedIDs []uint32 published chan struct{} publishHelper chan struct{} returnServe chan struct{} } -func (s *sequencingGRPCBroker) NextId() uint32 { - return atomic.AddUint32(&s.nextID, 1) +func (s *sequencingGRPCBroker) nextID() uint32 { + return atomic.AddUint32(&s.nextIDValue, 1) } func (s *sequencingGRPCBroker) AcceptAndServe(id uint32, newGRPCServer func([]grpc.ServerOption) *grpc.Server) { @@ -191,15 +190,15 @@ func TestGRPCClientV3RunCommandPublishesHelperAfterPluginRunCommandStarts(t *tes } type raceAwareGRPCBroker struct { - nextID uint32 + nextIDValue uint32 acceptedIDs []uint32 pendingDialRegistered <-chan struct{} publishedAfterPending chan struct{} returnServe chan struct{} } -func (s *raceAwareGRPCBroker) NextId() uint32 { - return atomic.AddUint32(&s.nextID, 1) +func (s *raceAwareGRPCBroker) nextID() uint32 { + return atomic.AddUint32(&s.nextIDValue, 1) } func (s *raceAwareGRPCBroker) AcceptAndServe(id uint32, newGRPCServer func([]grpc.ServerOption) *grpc.Server) { @@ -232,7 +231,7 @@ func (c *pendingDialMainClient) RunCommand(ctx context.Context, in *proto.RunCom } func runCommandWithHelperPublishedFirst(client proto.MainClient, broker grpcBrokerClient, additionalInfo *proto.AdditionalInfo, args []string, coreCLIHelper CoreCLIHelper) error { - brokerID := broker.NextId() + brokerID := broker.nextID() cleanup, err := startCoreCLIHelperBrokerServer(broker, brokerID, coreCLIHelper) if err != nil {