Skip to content

Commit

Permalink
end to end support for integration, product, client configuration cha…
Browse files Browse the repository at this point in the history
…nge payloads

Signed-off-by: Eliott Bouhana <[email protected]>
  • Loading branch information
eliottness committed Jan 15, 2025
1 parent 1ee7e85 commit 54a315e
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 60 deletions.
35 changes: 24 additions & 11 deletions internal/newtelemetry/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,42 @@ package newtelemetry

import (
"net/http"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/types"
)

type ClientConfig struct {
// AgentlessURL is the full URL to the agentless telemetry endpoint.
// AgentlessURL is the full URL to the agentless telemetry endpoint. (Either AgentlessURL or AgentURL must be set or both)
// Defaults to https://instrumentation-telemetry-intake.datadoghq.com/api/v2/apmtelemetry
AgentlessURL string

// AgentURL is the url to the agent without the path,
// AgentURL is the url of the agent to send telemetry to. (Either AgentlessURL or AgentURL must be set or both)
AgentURL string

// APIKey is the API key to use for sending telemetry, defaults to the env var DD_API_KEY.
APIKey string

// HTTPClient is the http client to use for sending telemetry, defaults to a http.DefaultClient copy.
HTTPClient http.RoundTripper

// HeartbeatInterval is the interval at which to send a heartbeat payload, defaults to 60s.
// The maximum value is 60s.
HeartbeatInterval time.Duration

// FlushIntervalRange is the interval at which the client flushes the data.
// By default, the client will start to flush at 60s intervals and will reduce the interval based on the load till it hit 15s
// Both values cannot be higher than 60s because the heartbeat need to be sent at least every 60s.
FlushIntervalRange struct {
Min time.Duration
Max time.Duration
}
}

// MetricHandle can be used to submit different values for the same metric.
// MetricHandle is used to reduce lock contention when submitting metrics.
// This can also be used ephemerally to submit a single metric value like this:
//
// telemetry.Metric(telemetry.Appsec, "my-count", map[string]string{"tag1": "true", "tag2": "1.0"}).Submit(1.0)
// ```go
// telemetry.Metric(telemetry.Appsec, "my-count", map[string]string{"tag1": "true", "tag2": "1.0"}).Submit(1.0)
// ```
type MetricHandle interface {
Submit(value float64)

Expand Down Expand Up @@ -59,11 +71,12 @@ type TelemetryLogger interface {

// Integration is an integration that is configured to be traced.
type Integration struct {
Name string
Version string
AutoEnabled bool
Compatible bool
Error string
// Name is an arbitrary string that must stay constant for the integration.
Name string
// Version is the version of the integration/dependency that is being loaded.
Version string
// Error is the error that occurred while loading the integration.
Error string
}

// Client constitutes all the functions available concurrently for the telemetry users. All methods are thread-safe
Expand Down
58 changes: 32 additions & 26 deletions internal/newtelemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package newtelemetry

import (
"gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/internal/transport"
"gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/types"
)

Expand All @@ -15,74 +17,78 @@ func NewClient(service, env, version string, config ClientConfig) (Client, error
}

type client struct {
tracerConfig internal.TracerConfig
writer internal.Writer
payloadQueue internal.RingQueue[transport.Payload]

// Data sources
integrations integrations
products products
configuration configuration
}

func (c client) MarkIntegrationAsLoaded(integration Integration) {
//TODO implement me
panic("implement me")
func (c *client) MarkIntegrationAsLoaded(integration Integration) {
c.integrations.Add(integration)
}

func (c client) Count(namespace types.Namespace, name string, tags map[string]string) MetricHandle {
func (c *client) Count(_ types.Namespace, _ string, _ map[string]string) MetricHandle {
//TODO implement me
panic("implement me")
}

func (c client) Rate(namespace types.Namespace, name string, tags map[string]string) MetricHandle {
func (c *client) Rate(_ types.Namespace, _ string, _ map[string]string) MetricHandle {
//TODO implement me
panic("implement me")
}

func (c client) Gauge(namespace types.Namespace, name string, tags map[string]string) MetricHandle {
func (c *client) Gauge(_ types.Namespace, _ string, _ map[string]string) MetricHandle {
//TODO implement me
panic("implement me")
}

func (c client) Distribution(namespace types.Namespace, name string, tags map[string]string) MetricHandle {
func (c *client) Distribution(_ types.Namespace, _ string, _ map[string]string) MetricHandle {
//TODO implement me
panic("implement me")
}

func (c client) Logger() TelemetryLogger {
func (c *client) Logger() TelemetryLogger {
//TODO implement me
panic("implement me")
}

func (c client) ProductStarted(product types.Namespace) {
//TODO implement me
panic("implement me")
func (c *client) ProductStarted(product types.Namespace) {
c.products.Add(product, true, nil)
}

func (c client) ProductStopped(product types.Namespace) {
//TODO implement me
panic("implement me")
func (c *client) ProductStopped(product types.Namespace) {
c.products.Add(product, false, nil)
}

func (c client) ProductStartError(product types.Namespace, err error) {
//TODO implement me
panic("implement me")
func (c *client) ProductStartError(product types.Namespace, err error) {
c.products.Add(product, false, err)
}

func (c client) AddAppConfig(key string, value any, origin types.Origin) {
//TODO implement me
panic("implement me")
func (c *client) AddAppConfig(key string, value any, origin types.Origin) {
c.configuration.Add(key, value, origin)
}

func (c client) AddBulkAppConfig(kvs map[string]any, origin types.Origin) {
//TODO implement me
panic("implement me")
func (c *client) AddBulkAppConfig(kvs map[string]any, origin types.Origin) {
for key, value := range kvs {
c.configuration.Add(key, value, origin)
}
}

func (c client) flush() {
func (c *client) flush() {
//TODO implement me
panic("implement me")
}

func (c client) appStart() error {
func (c *client) appStart() error {
//TODO implement me
panic("implement me")
}

func (c client) appStop() {
func (c *client) appStop() {
//TODO implement me
panic("implement me")
}
Expand Down
55 changes: 55 additions & 0 deletions internal/newtelemetry/configuration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025 Datadog, Inc.

package newtelemetry

import (
"sync"

"gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/internal/transport"
"gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/types"
)

type configuration struct {
mu sync.Mutex
config map[string]transport.ConfKeyValue
seqID uint64
}

func (c *configuration) Add(key string, value any, origin types.Origin) {
c.mu.Lock()
defer c.mu.Unlock()

if c.config == nil {
c.config = make(map[string]transport.ConfKeyValue)
}

c.config[key] = transport.ConfKeyValue{
Name: key,
Value: value,
Origin: origin,
}
}

func (c *configuration) Payload() transport.Payload {
c.mu.Lock()
defer c.mu.Unlock()
if len(c.config) == 0 {
return nil
}

configs := make([]transport.ConfKeyValue, len(c.config))
idx := 0
for _, conf := range c.config {
conf.SeqID = c.seqID
configs[idx] = conf
idx++
c.seqID++
}
c.config = nil
return transport.AppClientConfigurationChange{
Configuration: configs,
}
}
62 changes: 39 additions & 23 deletions internal/newtelemetry/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,52 @@
package newtelemetry

import (
"net"
"net/http"
"time"
)

var (
// We copy the transport to avoid using the default one, as it might be
// augmented with tracing and we don't want these calls to be recorded.
// See https://golang.org/pkg/net/http/#DefaultTransport .
//orchestrion:ignore
defaultHTTPClient = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
Timeout: 5 * time.Second,
}

// agentlessURL is the endpoint used to send telemetry in an agentless environment. It is
// also the default URL in case connecting to the agent URL fails.
agentlessURL = "https://instrumentation-telemetry-intake.datadoghq.com/api/v2/apmtelemetry"

// defaultHeartbeatInterval is the default interval at which the agent sends a heartbeat.
defaultHeartbeatInterval = 60.0
defaultHeartbeatInterval = 60.0 * time.Second

// defaultMinFlushInterval is the default interval at which the client flushes the data.
defaultMinFlushInterval = 15.0 * time.Second

// defaultMaxFlushInterval is the default interval at which the client flushes the data.
defaultMaxFlushInterval = 60.0 * time.Second
)

// clamp squeezes a value between a minimum and maximum value.
func clamp[T ~int64](value, minVal, maxVal T) T {
return max(min(maxVal, value), minVal)
}

// defaultConfig returns a ClientConfig with default values set.
func defaultConfig(config ClientConfig) ClientConfig {
if config.AgentlessURL == "" {
config.AgentlessURL = agentlessURL
}

if config.HeartbeatInterval == 0 {
config.HeartbeatInterval = defaultHeartbeatInterval
} else {
config.HeartbeatInterval = clamp(config.HeartbeatInterval, time.Microsecond, 60*time.Second)
}

if config.FlushIntervalRange.Min == 0 {
config.FlushIntervalRange.Min = defaultMinFlushInterval
} else {
config.FlushIntervalRange.Min = clamp(config.FlushIntervalRange.Min, time.Microsecond, 60*time.Second)
}

if config.FlushIntervalRange.Max == 0 {
config.FlushIntervalRange.Max = defaultMaxFlushInterval
} else {
config.FlushIntervalRange.Max = clamp(config.FlushIntervalRange.Max, time.Microsecond, 60*time.Second)
}

return config
}
44 changes: 44 additions & 0 deletions internal/newtelemetry/integration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025 Datadog, Inc.

package newtelemetry

import (
"sync"

"gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/internal/transport"
)

type integrations struct {
mu sync.Mutex
integrations []Integration
}

func (i *integrations) Add(integration Integration) {
i.mu.Lock()
defer i.mu.Unlock()
i.integrations = append(i.integrations, integration)
}

func (i *integrations) Payload() transport.Payload {
i.mu.Lock()
defer i.mu.Unlock()
if len(i.integrations) == 0 {
return nil
}

integrations := make([]transport.Integration, len(i.integrations))
for idx, integration := range i.integrations {
integrations[idx] = transport.Integration{
Name: integration.Name,
Version: integration.Version,
Enabled: true,
Error: integration.Error,
}
}
return transport.AppIntegrationChange{
Integrations: integrations,
}
}
Loading

0 comments on commit 54a315e

Please sign in to comment.