diff --git a/client/client.go b/client/client.go index afc08315..78f4189b 100644 --- a/client/client.go +++ b/client/client.go @@ -134,4 +134,14 @@ type OpAMPClient interface { // If no error is returned, the channel returned will be closed after the specified // message is sent. SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) + + // SetAvailableComponents modifies the set of components that are available for configuration + // on the agent. + // May be called any time after Start(), including from the OnMessage handler. + // The new components will be sent with the next message to the server. + // If components is nil, errReportsAvailableComponentsNotSet will be returned. + // If components.Hash is nil or an empty []byte, errNoAvailableComponentHash will be returned. + // This method is subject to agent status compression - if components is not + // different from the cached agent state, this method is a no-op. + SetAvailableComponents(components *protobufs.AvailableComponents) error } diff --git a/client/clientimpl_test.go b/client/clientimpl_test.go index dea6bd6c..2bab9b5b 100644 --- a/client/clientimpl_test.go +++ b/client/clientimpl_test.go @@ -2305,3 +2305,23 @@ func TestSetFlagsBeforeStart(t *testing.T) { assert.NoError(t, err) }) } + +func generateTestAvailableComponents() *protobufs.AvailableComponents { + return &protobufs.AvailableComponents{ + Hash: []byte("fake-hash"), + Components: map[string]*protobufs.ComponentDetails{ + "receivers": { + Metadata: []*protobufs.KeyValue{ + { + Key: "component", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{ + StringValue: "filereceiver", + }, + }, + }, + }, + }, + }, + } +} diff --git a/client/httpclient.go b/client/httpclient.go index 92259d59..8d32797d 100644 --- a/client/httpclient.go +++ b/client/httpclient.go @@ -120,6 +120,11 @@ func (c *httpClient) SendCustomMessage(message *protobufs.CustomMessage) (messag return c.common.SendCustomMessage(message) } +// SetAvailableComponents implements OpAMPClient.SetAvailableComponents +func (c *httpClient) SetAvailableComponents(components *protobufs.AvailableComponents) error { + return c.common.SetAvailableComponents(components) +} + func (c *httpClient) runUntilStopped(ctx context.Context) { // Start the HTTP sender. This will make request/responses with retries for // failures and will wait with configured polling interval if there is nothing diff --git a/client/httpclient_test.go b/client/httpclient_test.go index fc670411..9c16ac30 100644 --- a/client/httpclient_test.go +++ b/client/httpclient_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" "github.com/open-telemetry/opamp-go/client/internal" @@ -311,3 +312,112 @@ func TestRedirectHTTP(t *testing.T) { }) } } + +func TestHTTPReportsAvailableComponents(t *testing.T) { + testCases := []struct { + desc string + availableComponents *protobufs.AvailableComponents + }{ + { + desc: "Does not report AvailableComponents", + availableComponents: nil, + }, + { + desc: "Reports AvailableComponents", + availableComponents: generateTestAvailableComponents(), + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + // Start a Server. + srv := internal.StartMockServer(t) + var rcvCounter atomic.Uint64 + srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + assert.EqualValues(t, rcvCounter.Load(), msg.SequenceNum) + rcvCounter.Add(1) + time.Sleep(50 * time.Millisecond) + if rcvCounter.Load() == 1 { + resp := &protobufs.ServerToAgent{ + InstanceUid: msg.InstanceUid, + } + + if tc.availableComponents != nil { + // the first message received should contain just the available component hash + availableComponents := msg.GetAvailableComponents() + require.NotNil(t, availableComponents) + require.Nil(t, availableComponents.GetComponents()) + require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash()) + + // add the flag asking for the full available component state to the response + resp.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents) + } else { + require.Nil(t, msg.GetAvailableComponents()) + } + + return resp + } + + if rcvCounter.Load() == 2 { + if tc.availableComponents != nil { + // the second message received should contain the full component state + availableComponents := msg.GetAvailableComponents() + require.NotNil(t, availableComponents) + require.Equal(t, tc.availableComponents.GetComponents(), availableComponents.GetComponents()) + require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash()) + } else { + require.Nil(t, msg.GetAvailableComponents()) + } + + return nil + } + + // all subsequent messages should not have any available components + require.Nil(t, msg.GetAvailableComponents()) + return nil + } + + // Start a client. + settings := types.StartSettings{} + settings.OpAMPServerURL = "http://" + srv.Endpoint + if tc.availableComponents != nil { + settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents + settings.AvailableComponents = tc.availableComponents + } + + client := NewHTTP(nil) + prepareClient(t, &settings, client) + + assert.NoError(t, client.Start(context.Background(), settings)) + + // Verify that status report is delivered. + eventually(t, func() bool { + return rcvCounter.Load() == 1 + }) + + if tc.availableComponents != nil { + // Verify that status report is delivered again. Polling should ensure this. + eventually(t, func() bool { + return rcvCounter.Load() == 2 + }) + } else { + // Verify that no second status report is delivered (polling is too infrequent for this to happen in 3 seconds) + require.Never(t, func() bool { + return rcvCounter.Load() == 2 + }, 3*time.Second, 10*time.Millisecond) + } + + // Verify that no third status report is delivered (polling is too infrequent for this to happen in 3 seconds) + require.Never(t, func() bool { + return rcvCounter.Load() == 3 + }, 3*time.Second, 10*time.Millisecond) + + // Shutdown the Server. + srv.Close() + + // Shutdown the client. + err := client.Stop(context.Background()) + assert.NoError(t, err) + }) + } +} diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index 283e3b01..5a7ea4cd 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -20,10 +20,12 @@ var ( ErrReportsRemoteConfigNotSet = errors.New("ReportsRemoteConfig capability is not set") ErrPackagesStateProviderNotSet = errors.New("PackagesStateProvider must be set") ErrAcceptsPackagesNotSet = errors.New("AcceptsPackages and ReportsPackageStatuses must be set") + ErrAvailableComponentsMissing = errors.New("AvailableComponents is nil") - errAlreadyStarted = errors.New("already started") - errCannotStopNotStarted = errors.New("cannot stop because not started") - errReportsPackageStatusesNotSet = errors.New("ReportsPackageStatuses capability is not set") + errAlreadyStarted = errors.New("already started") + errCannotStopNotStarted = errors.New("cannot stop because not started") + errReportsPackageStatusesNotSet = errors.New("ReportsPackageStatuses capability is not set") + errReportsAvailableComponentsNotSet = errors.New("ReportsAvailableComponents capability is not set") ) // ClientCommon contains the OpAMP logic that is common between WebSocket and @@ -88,6 +90,16 @@ func (c *ClientCommon) PrepareStart( return ErrHealthMissing } + if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 { + if settings.AvailableComponents == nil { + return ErrAvailableComponentsMissing + } + + if err := c.ClientSyncedState.SetAvailableComponents(settings.AvailableComponents); err != nil { + return err + } + } + // Prepare remote config status. if settings.RemoteConfigStatus == nil { // RemoteConfigStatus is not provided. Start with empty. @@ -212,6 +224,15 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error { return err } + // initially, do not send the full component state - just send the hash. + // full state is available on request from the server using the corresponding ServerToAgent flag + var availableComponents *protobufs.AvailableComponents + if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 { + availableComponents = &protobufs.AvailableComponents{ + Hash: c.ClientSyncedState.AvailableComponents().GetHash(), + } + } + c.sender.NextMessage().Update( func(msg *protobufs.AgentToServer) { msg.AgentDescription = c.ClientSyncedState.AgentDescription() @@ -221,6 +242,7 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error { msg.Capabilities = uint64(c.Capabilities) msg.CustomCapabilities = c.ClientSyncedState.CustomCapabilities() msg.Flags = c.ClientSyncedState.Flags() + msg.AvailableComponents = availableComponents }, ) return nil @@ -433,3 +455,33 @@ func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (mess return sendingChan, nil } + +// SetAvailableComponents sends a message to the server with the available components for the agent +func (c *ClientCommon) SetAvailableComponents(components *protobufs.AvailableComponents) error { + if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents == 0 { + return errReportsAvailableComponentsNotSet + } + + if len(components.Hash) == 0 { + return errNoAvailableComponentHash + } + + // implement agent status compression, don't send the message if it hasn't changed from the previous message + availableComponentsChanged := !proto.Equal(c.ClientSyncedState.AvailableComponents(), components) + + if availableComponentsChanged { + if err := c.ClientSyncedState.SetAvailableComponents(components); err != nil { + return err + } + + c.sender.NextMessage().Update( + func(msg *protobufs.AgentToServer) { + msg.AvailableComponents = c.ClientSyncedState.AvailableComponents() + }, + ) + + c.sender.ScheduleSend() + } + + return nil +} diff --git a/client/internal/clientstate.go b/client/internal/clientstate.go index 93250c9f..fc25866b 100644 --- a/client/internal/clientstate.go +++ b/client/internal/clientstate.go @@ -14,11 +14,13 @@ var ( errPackageStatusesMissing = errors.New("PackageStatuses is not set") errServerProvidedAllPackagesHashNil = errors.New("ServerProvidedAllPackagesHash is nil") errCustomCapabilitiesMissing = errors.New("CustomCapabilities is not set") + errAvailableComponentsMissing = errors.New("AvailableComponents is not set") + errNoAvailableComponentHash = errors.New("AvailableComponents.Hash is empty") ) // ClientSyncedState stores the state of the Agent messages that the OpAMP Client needs to -// have access to synchronize to the Server. Six messages can be stored in this store: -// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses, CustomCapabilities and Flags. +// have access to synchronize to the Server. Seven messages can be stored in this store: +// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses, CustomCapabilities, AvailableComponents and Flags. // // See OpAMP spec for more details on how status reporting works: // https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#status-reporting @@ -34,12 +36,13 @@ var ( type ClientSyncedState struct { mutex sync.Mutex - agentDescription *protobufs.AgentDescription - health *protobufs.ComponentHealth - remoteConfigStatus *protobufs.RemoteConfigStatus - packageStatuses *protobufs.PackageStatuses - customCapabilities *protobufs.CustomCapabilities - flags protobufs.AgentToServerFlags + agentDescription *protobufs.AgentDescription + health *protobufs.ComponentHealth + remoteConfigStatus *protobufs.RemoteConfigStatus + packageStatuses *protobufs.PackageStatuses + customCapabilities *protobufs.CustomCapabilities + availableComponents *protobufs.AvailableComponents + flags protobufs.AgentToServerFlags } func (s *ClientSyncedState) AgentDescription() *protobufs.AgentDescription { @@ -72,6 +75,12 @@ func (s *ClientSyncedState) CustomCapabilities() *protobufs.CustomCapabilities { return s.customCapabilities } +func (s *ClientSyncedState) AvailableComponents() *protobufs.AvailableComponents { + defer s.mutex.Unlock() + s.mutex.Lock() + return s.availableComponents +} + func (s *ClientSyncedState) Flags() uint64 { defer s.mutex.Unlock() s.mutex.Lock() @@ -176,6 +185,20 @@ func (s *ClientSyncedState) HasCustomCapability(capability string) bool { return false } +func (s *ClientSyncedState) SetAvailableComponents(components *protobufs.AvailableComponents) error { + if components == nil { + return errAvailableComponentsMissing + } + + clone := proto.Clone(components).(*protobufs.AvailableComponents) + + defer s.mutex.Unlock() + s.mutex.Lock() + s.availableComponents = clone + + return nil +} + // SetFlags sets the flags in the state. func (s *ClientSyncedState) SetFlags(flags protobufs.AgentToServerFlags) { defer s.mutex.Unlock() diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index aee05a81..55f2ee55 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -197,6 +197,7 @@ func (r *receivedProcessor) rcvFlags( msg.PackageStatuses = r.clientSyncedState.PackageStatuses() msg.CustomCapabilities = r.clientSyncedState.CustomCapabilities() msg.Flags = r.clientSyncedState.Flags() + msg.AvailableComponents = r.clientSyncedState.AvailableComponents() // The logic for EffectiveConfig is similar to the previous 6 sub-messages however // the EffectiveConfig is fetched using GetEffectiveConfig instead of @@ -207,6 +208,15 @@ func (r *receivedProcessor) rcvFlags( scheduleSend = true } + if flags&protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents != 0 { + r.sender.NextMessage().Update( + func(msg *protobufs.AgentToServer) { + msg.AvailableComponents = r.clientSyncedState.AvailableComponents() + }, + ) + scheduleSend = true + } + return scheduleSend, nil } diff --git a/client/types/callbacks.go b/client/types/callbacks.go index 48d5f832..99f81ee4 100644 --- a/client/types/callbacks.go +++ b/client/types/callbacks.go @@ -45,6 +45,9 @@ type MessageData struct { // CustomMessage contains a custom message sent by the server. CustomMessage *protobufs.CustomMessage + + // Flags contains any flags sent by the server. + Flags protobufs.AgentToServerFlags } // Callbacks contains functions that are executed when the client encounters diff --git a/client/types/startsettings.go b/client/types/startsettings.go index 6184d575..4463e5bb 100644 --- a/client/types/startsettings.go +++ b/client/types/startsettings.go @@ -64,4 +64,9 @@ type StartSettings struct { // // If the ReportsHeartbeat capability is disabled, this option has no effect. HeartbeatInterval *time.Duration + + // Defines the available components of the Agent. + // Required if the ReportsAvailableComponents capability is set. + // If the ReportsAvailableComponents capability is not set, this option has no effect. + AvailableComponents *protobufs.AvailableComponents } diff --git a/client/wsclient.go b/client/wsclient.go index f19d8ab4..fdacdd4b 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -157,6 +157,11 @@ func (c *wsClient) SendCustomMessage(message *protobufs.CustomMessage) (messageS return c.common.SendCustomMessage(message) } +// SetAvailableComponents implements OpAMPClient.SetAvailableComponents +func (c *wsClient) SetAvailableComponents(components *protobufs.AvailableComponents) error { + return c.common.SetAvailableComponents(components) +} + func viaReq(resps []*http.Response) []*http.Request { reqs := make([]*http.Request, 0, len(resps)) for _, resp := range resps { diff --git a/client/wsclient_test.go b/client/wsclient_test.go index 436ceb55..8abf7d04 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -257,7 +257,7 @@ func TestVerifyWSCompress(t *testing.T) { remoteCfg := &protobufs.AgentRemoteConfig{ Config: &protobufs.AgentConfigMap{ ConfigMap: map[string]*protobufs.AgentConfigFile{ - "": &protobufs.AgentConfigFile{ + "": { Body: uncompressedCfg, }, }, @@ -727,3 +727,96 @@ func TestHandlesConnectionError(t *testing.T) { err = client.Stop(context.Background()) require.NoError(t, err) } + +func TestWSSenderReportsAvailableComponents(t *testing.T) { + testCases := []struct { + desc string + availableComponents *protobufs.AvailableComponents + }{ + { + desc: "Does not report AvailableComponents", + availableComponents: nil, + }, + { + desc: "Reports AvailableComponents", + availableComponents: generateTestAvailableComponents(), + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + srv := internal.StartMockServer(t) + + var firstMsg atomic.Bool + var conn atomic.Value + srv.OnWSConnect = func(c *websocket.Conn) { + conn.Store(c) + firstMsg.Store(true) + } + var msgCount atomic.Int64 + srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + if firstMsg.Load() { + msgCount.Add(1) + firstMsg.Store(false) + resp := &protobufs.ServerToAgent{ + InstanceUid: msg.InstanceUid, + } + + if tc.availableComponents != nil { + availableComponents := msg.GetAvailableComponents() + require.NotNil(t, availableComponents) + require.Nil(t, availableComponents.GetComponents()) + require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash()) + + resp.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents) + } else { + require.Nil(t, msg.GetAvailableComponents()) + } + + return resp + } + msgCount.Add(1) + if tc.availableComponents != nil { + availableComponents := msg.GetAvailableComponents() + require.NotNil(t, availableComponents) + require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash()) + require.Equal(t, tc.availableComponents.GetComponents(), availableComponents.GetComponents()) + } else { + require.Error(t, errors.New("should not receive a second message when ReportsAvailableComponents is disabled")) + } + + return nil + } + + // Start an OpAMP/WebSocket client. + settings := types.StartSettings{ + OpAMPServerURL: "ws://" + srv.Endpoint, + } + client := NewWebSocket(nil) + + if tc.availableComponents != nil { + settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents + settings.AvailableComponents = tc.availableComponents + } + + startClient(t, settings, client) + + // Wait for connection to be established. + eventually(t, func() bool { return conn.Load() != nil }) + + if tc.availableComponents != nil { + assert.Eventually(t, func() bool { + return msgCount.Load() >= 2 + }, 5*time.Second, 10*time.Millisecond) + } else { + assert.Never(t, func() bool { + return msgCount.Load() >= 2 + }, 3*time.Second, 10*time.Millisecond) + } + + // Stop the client. + err := client.Stop(context.Background()) + assert.NoError(t, err) + }) + } +}