Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add implementation of new AvailableComponents message #340

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,14 @@ type OpAMPClient interface {
// If no error is returned, the channel returned will be closed after the specified
// message is sent.
SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error)

// SetAvailableComponents modifies the set of components that are available for configuration
// on the agent.
// May be called any time after Start(), including from the OnMessage handler.
// The new components will be sent with the next message to the server.
// If components is nil, errReportsAvailableComponentsNotSet will be returned.
// If components.Hash is nil or an empty []byte, errNoAvailableComponentHash will be returned.
// This method is subject to agent status compression - if components is not
// different from the cached agent state, this method is a no-op.
SetAvailableComponents(components *protobufs.AvailableComponents) error
}
20 changes: 20 additions & 0 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2305,3 +2305,23 @@ func TestSetFlagsBeforeStart(t *testing.T) {
assert.NoError(t, err)
})
}

func generateTestAvailableComponents() *protobufs.AvailableComponents {
return &protobufs.AvailableComponents{
Hash: []byte("fake-hash"),
Components: map[string]*protobufs.ComponentDetails{
"receivers": {
Metadata: []*protobufs.KeyValue{
{
Key: "component",
Value: &protobufs.AnyValue{
Value: &protobufs.AnyValue_StringValue{
StringValue: "filereceiver",
},
},
},
},
},
},
}
}
5 changes: 5 additions & 0 deletions client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@
return c.common.SendCustomMessage(message)
}

// SetAvailableComponents implements OpAMPClient.SetAvailableComponents
func (c *httpClient) SetAvailableComponents(components *protobufs.AvailableComponents) error {
return c.common.SetAvailableComponents(components)

Check warning on line 125 in client/httpclient.go

View check run for this annotation

Codecov / codecov/patch

client/httpclient.go#L124-L125

Added lines #L124 - L125 were not covered by tests
}

func (c *httpClient) runUntilStopped(ctx context.Context) {
// Start the HTTP sender. This will make request/responses with retries for
// failures and will wait with configured polling interval if there is nothing
Expand Down
110 changes: 110 additions & 0 deletions client/httpclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opamp-go/client/internal"
Expand Down Expand Up @@ -311,3 +312,112 @@ func TestRedirectHTTP(t *testing.T) {
})
}
}

func TestHTTPReportsAvailableComponents(t *testing.T) {
testCases := []struct {
desc string
availableComponents *protobufs.AvailableComponents
}{
{
desc: "Does not report AvailableComponents",
availableComponents: nil,
},
{
desc: "Reports AvailableComponents",
availableComponents: generateTestAvailableComponents(),
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
// Start a Server.
srv := internal.StartMockServer(t)
var rcvCounter atomic.Uint64
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, rcvCounter.Load(), msg.SequenceNum)
rcvCounter.Add(1)
time.Sleep(50 * time.Millisecond)
if rcvCounter.Load() == 1 {
resp := &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
}

if tc.availableComponents != nil {
// the first message received should contain just the available component hash
availableComponents := msg.GetAvailableComponents()
require.NotNil(t, availableComponents)
require.Nil(t, availableComponents.GetComponents())
require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash())

// add the flag asking for the full available component state to the response
resp.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents)
} else {
require.Nil(t, msg.GetAvailableComponents())
}

return resp
}

if rcvCounter.Load() == 2 {
if tc.availableComponents != nil {
// the second message received should contain the full component state
availableComponents := msg.GetAvailableComponents()
require.NotNil(t, availableComponents)
require.Equal(t, tc.availableComponents.GetComponents(), availableComponents.GetComponents())
require.Equal(t, tc.availableComponents.GetHash(), availableComponents.GetHash())
} else {
require.Nil(t, msg.GetAvailableComponents())
}

return nil
}

// all subsequent messages should not have any available components
require.Nil(t, msg.GetAvailableComponents())
return nil
}

// Start a client.
settings := types.StartSettings{}
settings.OpAMPServerURL = "http://" + srv.Endpoint
if tc.availableComponents != nil {
settings.Capabilities = protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents
settings.AvailableComponents = tc.availableComponents
}

client := NewHTTP(nil)
prepareClient(t, &settings, client)

assert.NoError(t, client.Start(context.Background(), settings))

// Verify that status report is delivered.
eventually(t, func() bool {
return rcvCounter.Load() == 1
})

if tc.availableComponents != nil {
// Verify that status report is delivered again. Polling should ensure this.
eventually(t, func() bool {
return rcvCounter.Load() == 2
})
} else {
// Verify that no second status report is delivered (polling is too infrequent for this to happen in 3 seconds)
require.Never(t, func() bool {
return rcvCounter.Load() == 2
}, 3*time.Second, 10*time.Millisecond)
}

// Verify that no third status report is delivered (polling is too infrequent for this to happen in 3 seconds)
require.Never(t, func() bool {
return rcvCounter.Load() == 3
}, 3*time.Second, 10*time.Millisecond)

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}
}
58 changes: 55 additions & 3 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
ErrReportsRemoteConfigNotSet = errors.New("ReportsRemoteConfig capability is not set")
ErrPackagesStateProviderNotSet = errors.New("PackagesStateProvider must be set")
ErrAcceptsPackagesNotSet = errors.New("AcceptsPackages and ReportsPackageStatuses must be set")
ErrAvailableComponentsMissing = errors.New("AvailableComponents is nil")

errAlreadyStarted = errors.New("already started")
errCannotStopNotStarted = errors.New("cannot stop because not started")
errReportsPackageStatusesNotSet = errors.New("ReportsPackageStatuses capability is not set")
errAlreadyStarted = errors.New("already started")
errCannotStopNotStarted = errors.New("cannot stop because not started")
errReportsPackageStatusesNotSet = errors.New("ReportsPackageStatuses capability is not set")
errReportsAvailableComponentsNotSet = errors.New("ReportsAvailableComponents capability is not set")
)

// ClientCommon contains the OpAMP logic that is common between WebSocket and
Expand Down Expand Up @@ -88,6 +90,16 @@
return ErrHealthMissing
}

if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 {
if settings.AvailableComponents == nil {
return ErrAvailableComponentsMissing
}

Check warning on line 96 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L95-L96

Added lines #L95 - L96 were not covered by tests

if err := c.ClientSyncedState.SetAvailableComponents(settings.AvailableComponents); err != nil {
return err
}

Check warning on line 100 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L99-L100

Added lines #L99 - L100 were not covered by tests
}

// Prepare remote config status.
if settings.RemoteConfigStatus == nil {
// RemoteConfigStatus is not provided. Start with empty.
Expand Down Expand Up @@ -212,6 +224,15 @@
return err
}

// initially, do not send the full component state - just send the hash.
// full state is available on request from the server using the corresponding ServerToAgent flag
var availableComponents *protobufs.AvailableComponents
if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 {
availableComponents = &protobufs.AvailableComponents{
Hash: c.ClientSyncedState.AvailableComponents().GetHash(),
}
}

c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.AgentDescription = c.ClientSyncedState.AgentDescription()
Expand All @@ -221,6 +242,7 @@
msg.Capabilities = uint64(c.Capabilities)
msg.CustomCapabilities = c.ClientSyncedState.CustomCapabilities()
msg.Flags = c.ClientSyncedState.Flags()
msg.AvailableComponents = availableComponents
},
)
return nil
Expand Down Expand Up @@ -433,3 +455,33 @@

return sendingChan, nil
}

// SetAvailableComponents sends a message to the server with the available components for the agent
func (c *ClientCommon) SetAvailableComponents(components *protobufs.AvailableComponents) error {
if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents == 0 {
return errReportsAvailableComponentsNotSet
}

Check warning on line 463 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L460-L463

Added lines #L460 - L463 were not covered by tests

if len(components.Hash) == 0 {
return errNoAvailableComponentHash
}

Check warning on line 467 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L465-L467

Added lines #L465 - L467 were not covered by tests

// implement agent status compression, don't send the message if it hasn't changed from the previous message
availableComponentsChanged := !proto.Equal(c.ClientSyncedState.AvailableComponents(), components)

if availableComponentsChanged {
if err := c.ClientSyncedState.SetAvailableComponents(components); err != nil {
return err
}

Check warning on line 475 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L470-L475

Added lines #L470 - L475 were not covered by tests

c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.AvailableComponents = c.ClientSyncedState.AvailableComponents()
},

Check warning on line 480 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L477-L480

Added lines #L477 - L480 were not covered by tests
)

c.sender.ScheduleSend()

Check warning on line 483 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L483

Added line #L483 was not covered by tests
}

return nil

Check warning on line 486 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L486

Added line #L486 was not covered by tests
}
39 changes: 31 additions & 8 deletions client/internal/clientstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
errPackageStatusesMissing = errors.New("PackageStatuses is not set")
errServerProvidedAllPackagesHashNil = errors.New("ServerProvidedAllPackagesHash is nil")
errCustomCapabilitiesMissing = errors.New("CustomCapabilities is not set")
errAvailableComponentsMissing = errors.New("AvailableComponents is not set")
errNoAvailableComponentHash = errors.New("AvailableComponents.Hash is empty")
)

// ClientSyncedState stores the state of the Agent messages that the OpAMP Client needs to
// have access to synchronize to the Server. Six messages can be stored in this store:
// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses, CustomCapabilities and Flags.
// have access to synchronize to the Server. Seven messages can be stored in this store:
// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses, CustomCapabilities, AvailableComponents and Flags.
//
// See OpAMP spec for more details on how status reporting works:
// https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#status-reporting
Expand All @@ -34,12 +36,13 @@
type ClientSyncedState struct {
mutex sync.Mutex

agentDescription *protobufs.AgentDescription
health *protobufs.ComponentHealth
remoteConfigStatus *protobufs.RemoteConfigStatus
packageStatuses *protobufs.PackageStatuses
customCapabilities *protobufs.CustomCapabilities
flags protobufs.AgentToServerFlags
agentDescription *protobufs.AgentDescription
health *protobufs.ComponentHealth
remoteConfigStatus *protobufs.RemoteConfigStatus
packageStatuses *protobufs.PackageStatuses
customCapabilities *protobufs.CustomCapabilities
availableComponents *protobufs.AvailableComponents
flags protobufs.AgentToServerFlags
}

func (s *ClientSyncedState) AgentDescription() *protobufs.AgentDescription {
Expand Down Expand Up @@ -72,6 +75,12 @@
return s.customCapabilities
}

func (s *ClientSyncedState) AvailableComponents() *protobufs.AvailableComponents {
defer s.mutex.Unlock()
s.mutex.Lock()
return s.availableComponents
}

func (s *ClientSyncedState) Flags() uint64 {
defer s.mutex.Unlock()
s.mutex.Lock()
Expand Down Expand Up @@ -176,6 +185,20 @@
return false
}

func (s *ClientSyncedState) SetAvailableComponents(components *protobufs.AvailableComponents) error {
if components == nil {
return errAvailableComponentsMissing
}

Check warning on line 191 in client/internal/clientstate.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientstate.go#L190-L191

Added lines #L190 - L191 were not covered by tests

clone := proto.Clone(components).(*protobufs.AvailableComponents)

defer s.mutex.Unlock()
s.mutex.Lock()
s.availableComponents = clone

return nil
}

// SetFlags sets the flags in the state.
func (s *ClientSyncedState) SetFlags(flags protobufs.AgentToServerFlags) {
defer s.mutex.Unlock()
Expand Down
10 changes: 10 additions & 0 deletions client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func (r *receivedProcessor) rcvFlags(
msg.PackageStatuses = r.clientSyncedState.PackageStatuses()
msg.CustomCapabilities = r.clientSyncedState.CustomCapabilities()
msg.Flags = r.clientSyncedState.Flags()
msg.AvailableComponents = r.clientSyncedState.AvailableComponents()

// The logic for EffectiveConfig is similar to the previous 6 sub-messages however
// the EffectiveConfig is fetched using GetEffectiveConfig instead of
Expand All @@ -207,6 +208,15 @@ func (r *receivedProcessor) rcvFlags(
scheduleSend = true
}

if flags&protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents != 0 {
r.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.AvailableComponents = r.clientSyncedState.AvailableComponents()
},
)
scheduleSend = true
}

return scheduleSend, nil
}

Expand Down
3 changes: 3 additions & 0 deletions client/types/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type MessageData struct {

// CustomMessage contains a custom message sent by the server.
CustomMessage *protobufs.CustomMessage

// Flags contains any flags sent by the server.
Flags protobufs.AgentToServerFlags
}

// Callbacks contains functions that are executed when the client encounters
Expand Down
5 changes: 5 additions & 0 deletions client/types/startsettings.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,9 @@ type StartSettings struct {
//
// If the ReportsHeartbeat capability is disabled, this option has no effect.
HeartbeatInterval *time.Duration

// Defines the available components of the Agent.
// Required if the ReportsAvailableComponents capability is set.
// If the ReportsAvailableComponents capability is not set, this option has no effect.
AvailableComponents *protobufs.AvailableComponents
}
5 changes: 5 additions & 0 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@
return c.common.SendCustomMessage(message)
}

// SetAvailableComponents implements OpAMPClient.SetAvailableComponents
func (c *wsClient) SetAvailableComponents(components *protobufs.AvailableComponents) error {
return c.common.SetAvailableComponents(components)

Check warning on line 162 in client/wsclient.go

View check run for this annotation

Codecov / codecov/patch

client/wsclient.go#L161-L162

Added lines #L161 - L162 were not covered by tests
}

func viaReq(resps []*http.Response) []*http.Request {
reqs := make([]*http.Request, 0, len(resps))
for _, resp := range resps {
Expand Down
Loading
Loading