Skip to content

Commit

Permalink
add more tests
Browse files Browse the repository at this point in the history
Signed-off-by: Eliott Bouhana <[email protected]>
  • Loading branch information
eliottness committed Jan 22, 2025
1 parent 8f26647 commit 6dad2a3
Show file tree
Hide file tree
Showing 10 changed files with 310 additions and 43 deletions.
5 changes: 3 additions & 2 deletions internal/newtelemetry/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type Integration struct {
Name string
// Version is the version of the integration/dependency that is being loaded.
Version string
// Error is the error that occurred while loading the integration.
// Error is the error that occurred while loading the integration. If this field is specified, the integration is
// considered to be having been forcefully disabled because of the error.
Error string
}

Expand Down Expand Up @@ -101,6 +102,6 @@ type Client interface {
// appStart sends the telemetry necessary to signal that the app is starting.
appStart()

// appStop sends the telemetry necessary to signal that the app is stopping and calls Close()
// appStop sends the telemetry necessary to signal that the app is stopping.
appStop()
}
16 changes: 7 additions & 9 deletions internal/newtelemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (c *client) flush(payloads []transport.Payload) (int, error) {
}

nbBytesOfPayload, err := c.writer.Flush(payload)
if nbBytes > 0 {
if nbBytes > 0 && err != nil {
nonFatalErros = append(nonFatalErros, err)
err = nil
}
Expand All @@ -208,7 +208,11 @@ func (c *client) flush(payloads []transport.Payload) (int, error) {
}

if len(nonFatalErros) > 0 {
log.Debug("non-fatal error while flushing telemetry data: %v", errors.Join(nonFatalErros...))
err := "error"
if len(nonFatalErros) > 1 {
err = "errors"
}
log.Debug("non-fatal %s while flushing telemetry data: %v", err, errors.Join(nonFatalErros...))
}

return nbBytes, nil
Expand All @@ -217,19 +221,13 @@ func (c *client) flush(payloads []transport.Payload) (int, error) {
func (c *client) appStart() {
c.flushMapperMu.Lock()
defer c.flushMapperMu.Unlock()

// Wrap the current flushMapper with the AppStartedMapper so we can add the app-started event to the payloads using available payloads at the time of the call one minute later
c.flushMapper = mapper.NewAppStartedMapper(c.flushMapper)
}

func (c *client) appStop() {
c.flushMapperMu.Lock()
defer c.flushMapperMu.Unlock()
c.flushMapper = mapper.NewAppClosingMapper(c.flushMapper)
c.flushMapperMu.Unlock()

// Flush locks the flushMapperMu mutex, so we need to call it outside the lock
c.Flush()
c.Close()
}

func (c *client) Close() error {
Expand Down
File renamed without changes.
263 changes: 262 additions & 1 deletion internal/newtelemetry/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
package newtelemetry

import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/internal/transport"
"gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/types"
)

func TestNewClient(t *testing.T) {
Expand Down Expand Up @@ -75,7 +78,7 @@ func (w *testWriter) Flush(payloads transport.Payload) (int, error) {
return 1, nil
}

func TestClient(t *testing.T) {
func TestClientFlush(t *testing.T) {
tracerConfig := internal.TracerConfig{
Service: "test-service",
Env: "test-env",
Expand All @@ -93,9 +96,267 @@ func TestClient(t *testing.T) {
HeartbeatInterval: time.Nanosecond,
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.AppHeartbeat{}, payload)
assert.Equal(t, payload.RequestType(), transport.RequestTypeAppHeartbeat)
},
},
{
name: "extended-heartbeat-config",
clientConfig: ClientConfig{
HeartbeatInterval: time.Nanosecond,
},
when: func(c *client) {
c.AddAppConfig("key", "value", types.OriginDefault)
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.AppExtendedHeartbeat{}, payload)
heartbeat := payload.(transport.AppExtendedHeartbeat)
assert.Len(t, heartbeat.Configuration, 1)
assert.Equal(t, heartbeat.Configuration[0].Name, "key")
assert.Equal(t, heartbeat.Configuration[0].Value, "value")
assert.Equal(t, heartbeat.Configuration[0].Origin, types.OriginDefault)
},
},
{
name: "extended-heartbeat-integrations",
clientConfig: ClientConfig{
HeartbeatInterval: time.Nanosecond,
},
when: func(c *client) {
c.MarkIntegrationAsLoaded(Integration{Name: "test-integration", Version: "1.0.0"})
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.AppExtendedHeartbeat{}, payload)
heartbeat := payload.(transport.AppExtendedHeartbeat)
assert.Len(t, heartbeat.Integrations, 1)
assert.Equal(t, heartbeat.Integrations[0].Name, "test-integration")
assert.Equal(t, heartbeat.Integrations[0].Version, "1.0.0")
},
},
{
name: "configuration-default",
when: func(c *client) {
c.AddAppConfig("key", "value", types.OriginDefault)
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.AppClientConfigurationChange{}, payload)
config := payload.(transport.AppClientConfigurationChange)
assert.Len(t, config.Configuration, 1)
assert.Equal(t, config.Configuration[0].Name, "key")
assert.Equal(t, config.Configuration[0].Value, "value")
assert.Equal(t, config.Configuration[0].Origin, types.OriginDefault)
},
},
{
name: "configuration-default",
when: func(c *client) {
c.AddAppConfig("key", "value", types.OriginDefault)
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.AppClientConfigurationChange{}, payload)
config := payload.(transport.AppClientConfigurationChange)
assert.Len(t, config.Configuration, 1)
assert.Equal(t, config.Configuration[0].Name, "key")
assert.Equal(t, config.Configuration[0].Value, "value")
assert.Equal(t, config.Configuration[0].Origin, types.OriginDefault)
},
},
{
name: "product-start",
when: func(c *client) {
c.ProductStarted("test-product")
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.AppProductChange{}, payload)
productChange := payload.(transport.AppProductChange)
assert.Len(t, productChange.Products, 1)
assert.True(t, productChange.Products[types.Namespace("test-product")].Enabled)
},
},
{
name: "product-start-error",
when: func(c *client) {
c.ProductStartError("test-product", errors.New("test-error"))
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.AppProductChange{}, payload)
productChange := payload.(transport.AppProductChange)
assert.Len(t, productChange.Products, 1)
assert.False(t, productChange.Products[types.Namespace("test-product")].Enabled)
assert.Equal(t, "test-error", productChange.Products[types.Namespace("test-product")].Error.Message)
},
},
{
name: "product-stop",
when: func(c *client) {
c.ProductStopped("test-product")
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.AppProductChange{}, payload)
productChange := payload.(transport.AppProductChange)
assert.Len(t, productChange.Products, 1)
assert.False(t, productChange.Products[types.Namespace("test-product")].Enabled)
},
},
{
name: "integration",
when: func(c *client) {
c.MarkIntegrationAsLoaded(Integration{Name: "test-integration", Version: "1.0.0"})
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.AppIntegrationChange{}, payload)
integrationChange := payload.(transport.AppIntegrationChange)
assert.Len(t, integrationChange.Integrations, 1)
assert.Equal(t, integrationChange.Integrations[0].Name, "test-integration")
assert.Equal(t, integrationChange.Integrations[0].Version, "1.0.0")
assert.True(t, integrationChange.Integrations[0].Enabled)
},
},
{
name: "integration-error",
when: func(c *client) {
c.MarkIntegrationAsLoaded(Integration{Name: "test-integration", Version: "1.0.0", Error: "test-error"})
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.AppIntegrationChange{}, payload)
integrationChange := payload.(transport.AppIntegrationChange)
assert.Len(t, integrationChange.Integrations, 1)
assert.Equal(t, integrationChange.Integrations[0].Name, "test-integration")
assert.Equal(t, integrationChange.Integrations[0].Version, "1.0.0")
assert.False(t, integrationChange.Integrations[0].Enabled)
assert.Equal(t, integrationChange.Integrations[0].Error, "test-error")
},
},
{
name: "product+integration",
when: func(c *client) {
c.ProductStarted("test-product")
c.MarkIntegrationAsLoaded(Integration{Name: "test-integration", Version: "1.0.0"})
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.MessageBatch{}, payload)
batch := payload.(transport.MessageBatch)
for _, payload := range batch.Payload {
switch p := payload.Payload.(type) {
case transport.AppProductChange:
assert.Equal(t, transport.RequestTypeAppProductChange, payload.RequestType)
assert.Len(t, p.Products, 1)
assert.True(t, p.Products[types.Namespace("test-product")].Enabled)
case transport.AppIntegrationChange:
assert.Equal(t, transport.RequestTypeAppIntegrationsChange, payload.RequestType)
assert.Len(t, p.Integrations, 1)
assert.Equal(t, p.Integrations[0].Name, "test-integration")
assert.Equal(t, p.Integrations[0].Version, "1.0.0")
assert.True(t, p.Integrations[0].Enabled)
default:
t.Fatalf("unexpected payload type: %T", p)
}
}
},
},
{
name: "app-started",
when: func(c *client) {
c.appStart()
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.AppStarted{}, payload)
appStart := payload.(transport.AppStarted)
assert.Equal(t, appStart.InstallSignature.InstallID, globalconfig.InstrumentationInstallID())
assert.Equal(t, appStart.InstallSignature.InstallType, globalconfig.InstrumentationInstallType())
assert.Equal(t, appStart.InstallSignature.InstallTime, globalconfig.InstrumentationInstallTime())
},
},
{
name: "app-started-with-product",
when: func(c *client) {
c.appStart()
c.ProductStarted("test-product")
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.AppStarted{}, payload)
appStart := payload.(transport.AppStarted)
assert.Equal(t, appStart.Products[types.Namespace("test-product")].Enabled, true)
},
},
{
name: "app-started-with-configuration",
when: func(c *client) {
c.appStart()
c.AddAppConfig("key", "value", types.OriginDefault)
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.AppStarted{}, payload)
appStart := payload.(transport.AppStarted)
require.Len(t, appStart.Configuration, 1)
assert.Equal(t, appStart.Configuration[0].Name, "key")
assert.Equal(t, appStart.Configuration[0].Value, "value")
},
},
{
name: "app-started+integrations",
when: func(c *client) {
c.appStart()
c.MarkIntegrationAsLoaded(Integration{Name: "test-integration", Version: "1.0.0"})
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.MessageBatch{}, payload)
batch := payload.(transport.MessageBatch)

// Check AppStarted is the first payload in MessageBatch
assert.IsType(t, transport.AppStarted{}, batch.Payload[0].Payload)

for _, payload := range batch.Payload {
switch p := payload.Payload.(type) {
case transport.AppStarted:
assert.Equal(t, transport.RequestTypeAppStarted, payload.RequestType)
case transport.AppIntegrationChange:
assert.Equal(t, transport.RequestTypeAppIntegrationsChange, payload.RequestType)
assert.Len(t, p.Integrations, 1)
assert.Equal(t, p.Integrations[0].Name, "test-integration")
assert.Equal(t, p.Integrations[0].Version, "1.0.0")
default:
t.Fatalf("unexpected payload type: %T", p)
}
}
},
},
{
name: "app-started+heartbeat",
clientConfig: ClientConfig{
HeartbeatInterval: time.Nanosecond,
},
when: func(c *client) {
c.appStart()
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.MessageBatch{}, payload)
batch := payload.(transport.MessageBatch)

// Check AppStarted is the first payload in MessageBatch
assert.IsType(t, transport.AppStarted{}, batch.Payload[0].Payload)

for _, payload := range batch.Payload {
switch p := payload.Payload.(type) {
case transport.AppStarted:
assert.Equal(t, transport.RequestTypeAppStarted, payload.RequestType)
case transport.AppHeartbeat:
assert.Equal(t, transport.RequestTypeAppHeartbeat, payload.RequestType)
default:
t.Fatalf("unexpected payload type: %T", p)
}
}
},
},
{
name: "app-stopped",
when: func(c *client) {
c.appStop()
},
expect: func(t *testing.T, payload transport.Payload) {
require.IsType(t, transport.AppClosing{}, payload)
},
},
} {
t.Run(test.name, func(t *testing.T) {
t.Parallel()
Expand Down
11 changes: 6 additions & 5 deletions internal/newtelemetry/globalclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ var (
)

// StartApp starts the telemetry client with the given client send the app-started telemetry and sets it as the global (*client).
func StartApp(client Client) error {
if Disabled() {
return nil
func StartApp(client Client) {
if Disabled() || globalClient.Load() != nil {
return
}

client.appStart()
SwapClient(client)
return nil
}

// SwapClient swaps the global client with the given client and Flush the old (*client).
Expand All @@ -40,12 +39,14 @@ func SwapClient(client Client) {

// StopApp creates the app-stopped telemetry, adding to the queue and Flush all the queue before stopping the (*client).
func StopApp() {
if Disabled() {
if Disabled() || globalClient.Load() == nil {
return
}

if client := globalClient.Swap(nil); client != nil && *client != nil {
(*client).appStop()
(*client).Flush()
(*client).Close()
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/newtelemetry/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (i *integrations) Payload() transport.Payload {
integrations[idx] = transport.Integration{
Name: integration.Name,
Version: integration.Version,
Enabled: true,
Enabled: integration.Error == "", // no error means the integration was enabled successfully
Error: integration.Error,
}
}
Expand Down
Loading

0 comments on commit 6dad2a3

Please sign in to comment.