Skip to content

Commit

Permalink
Merge branch 'graph-attributes' into singleton-flags-and-attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Jan 8, 2025
2 parents 09ba009 + dba8b05 commit e742a18
Show file tree
Hide file tree
Showing 11 changed files with 235 additions and 56 deletions.
100 changes: 100 additions & 0 deletions service/internal/graph/attribute/attribute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package attribute // import "go.opentelemetry.io/collector/service/internal/graph/attribute"

import (
"fmt"
"hash/fnv"

"go.opentelemetry.io/otel/attribute"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pipeline"
)

const (
componentKindKey = "otelcol.component.kind"
componentIDKey = "otelcol.component.id"
pipelineIDKey = "otelcol.pipeline.id"
signalKey = "otelcol.signal"
signalOutputKey = "otelcol.signal.output"

receiverKind = "receiver"
processorKind = "processor"
exporterKind = "exporter"
connectorKind = "connector"
capabiltiesKind = "capabilities"
fanoutKind = "fanout"
)

type Attributes struct {
set attribute.Set
id int64
}

func newAttributes(attrs ...attribute.KeyValue) *Attributes {
h := fnv.New64a()
for _, kv := range attrs {
h.Write([]byte("(" + string(kv.Key) + "|" + kv.Value.AsString() + ")"))
}
return &Attributes{
set: attribute.NewSet(attrs...),
id: int64(h.Sum64()), // #nosec G115
}
}

func (a Attributes) Attributes() *attribute.Set {
return &a.set
}

func (a Attributes) ID() int64 {
return a.id
}

func Receiver(pipelineType pipeline.Signal, id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, receiverKind),
attribute.String(signalKey, pipelineType.String()),
attribute.String(componentIDKey, id.String()),
)
}

func Processor(pipelineID pipeline.ID, id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, processorKind),
attribute.String(signalKey, pipelineID.Signal().String()),
attribute.String(pipelineIDKey, pipelineID.String()),
attribute.String(componentIDKey, id.String()),
)
}

func Exporter(pipelineType pipeline.Signal, id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, exporterKind),
attribute.String(signalKey, pipelineType.String()),
attribute.String(componentIDKey, id.String()),
)
}

func Connector(exprPipelineType, rcvrPipelineType pipeline.Signal, id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, connectorKind),
attribute.String(signalKey, fmt.Sprintf("%s_to_%s", exprPipelineType.String(), rcvrPipelineType.String())),
attribute.String(componentIDKey, id.String()),
)
}

func Capabilities(pipelineID pipeline.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, capabiltiesKind),
attribute.String(pipelineIDKey, pipelineID.String()),
)
}

func Fanout(pipelineID pipeline.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, fanoutKind),
attribute.String(pipelineIDKey, pipelineID.String()),
)
}
104 changes: 104 additions & 0 deletions service/internal/graph/attribute/attribute_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package attribute

import (
"testing"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/pipeline/pipelineprofiles"
)

var (
signals = []pipeline.Signal{
pipeline.SignalTraces,
pipeline.SignalMetrics,
pipeline.SignalLogs,
pipelineprofiles.SignalProfiles,
}

cIDs = []component.ID{
component.MustNewID("foo"),
component.MustNewID("foo2"),
component.MustNewID("bar"),
}

pIDs = []pipeline.ID{
pipeline.MustNewID("traces"),
pipeline.MustNewIDWithName("traces", "2"),
pipeline.MustNewID("metrics"),
pipeline.MustNewIDWithName("metrics", "2"),
pipeline.MustNewID("logs"),
pipeline.MustNewIDWithName("logs", "2"),
pipeline.MustNewID("profiles"),
pipeline.MustNewIDWithName("profiles", "2"),
}
)

func TestAttributes(t *testing.T) {
// The sets are created independently but should be exactly equivalent.
// We will ensure that corresponding elements are equal and that
// non-corresponding elements are not equal.
setI, setJ := createExampleSets(), createExampleSets()
for i, ei := range setI {
for j, ej := range setJ {
if i == j {
require.Equal(t, ei.ID(), ej.ID())
require.True(t, ei.Attributes().Equals(ej.Attributes()))
} else {
require.NotEqual(t, ei.ID(), ej.ID())
require.False(t, ei.Attributes().Equals(ej.Attributes()))
}
}
}
}

func createExampleSets() []*Attributes {
sets := []*Attributes{}

// Receiver examples.
for _, sig := range signals {
for _, id := range cIDs {
sets = append(sets, Receiver(sig, id))
}
}

// Processor examples.
for _, pID := range pIDs {
for _, cID := range cIDs {
sets = append(sets, Processor(pID, cID))
}
}

// Exporter examples.
for _, sig := range signals {
for _, id := range cIDs {
sets = append(sets, Exporter(sig, id))
}
}

// Connector examples.
for _, exprSig := range signals {
for _, rcvrSig := range signals {
for _, id := range cIDs {
sets = append(sets, Connector(exprSig, rcvrSig, id))
}
}
}

// Capabilities examples.
for _, pID := range pIDs {
sets = append(sets, Capabilities(pID))
}

// Fanout examples.
for _, pID := range pIDs {
sets = append(sets, Fanout(pID))
}

return sets
}
7 changes: 3 additions & 4 deletions service/internal/graph/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/xconsumer"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/service/internal/graph/attribute"
)

const capabilitiesSeed = "capabilities"

var _ consumerNode = (*capabilitiesNode)(nil)

// Every pipeline has a "virtual" capabilities node immediately after the receiver(s).
Expand All @@ -19,7 +18,7 @@ var _ consumerNode = (*capabilitiesNode)(nil)
// 2. Present a consistent "first consumer" for each pipeline.
// The nodeID is derived from "pipeline ID".
type capabilitiesNode struct {
nodeID
*attribute.Attributes
pipelineID pipeline.ID
baseConsumer
consumer.ConsumeTracesFunc
Expand All @@ -30,7 +29,7 @@ type capabilitiesNode struct {

func newCapabilitiesNode(pipelineID pipeline.ID) *capabilitiesNode {
return &capabilitiesNode{
nodeID: newNodeID(capabilitiesSeed, pipelineID.String()),
Attributes: attribute.Capabilities(pipelineID),
pipelineID: pipelineID,
}
}
Expand Down
7 changes: 3 additions & 4 deletions service/internal/graph/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ import (
"go.opentelemetry.io/collector/service/internal/builders"
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
"go.opentelemetry.io/collector/service/internal/components"
"go.opentelemetry.io/collector/service/internal/graph/attribute"
)

const connectorSeed = "connector"

var _ consumerNode = (*connectorNode)(nil)

// A connector instance connects one pipeline type to one other pipeline type.
// Therefore, nodeID is derived from "exporter pipeline type", "receiver pipeline type", and "component ID".
type connectorNode struct {
nodeID
*attribute.Attributes
componentID component.ID
exprPipelineType pipeline.Signal
rcvrPipelineType pipeline.Signal
Expand All @@ -34,7 +33,7 @@ type connectorNode struct {

func newConnectorNode(exprPipelineType, rcvrPipelineType pipeline.Signal, connID component.ID) *connectorNode {
return &connectorNode{
nodeID: newNodeID(connectorSeed, connID.String(), exprPipelineType.String(), rcvrPipelineType.String()),
Attributes: attribute.Connector(exprPipelineType, rcvrPipelineType, connID),
componentID: connID,
exprPipelineType: exprPipelineType,
rcvrPipelineType: rcvrPipelineType,
Expand Down
7 changes: 3 additions & 4 deletions service/internal/graph/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,23 @@ import (
"go.opentelemetry.io/collector/pipeline/xpipeline"
"go.opentelemetry.io/collector/service/internal/builders"
"go.opentelemetry.io/collector/service/internal/components"
"go.opentelemetry.io/collector/service/internal/graph/attribute"
)

const exporterSeed = "exporter"

var _ consumerNode = (*exporterNode)(nil)

// An exporter instance can be shared by multiple pipelines of the same type.
// Therefore, nodeID is derived from "pipeline type" and "component ID".
type exporterNode struct {
nodeID
*attribute.Attributes
componentID component.ID
pipelineType pipeline.Signal
component.Component
}

func newExporterNode(pipelineType pipeline.Signal, exprID component.ID) *exporterNode {
return &exporterNode{
nodeID: newNodeID(exporterSeed, pipelineType.String(), exprID.String()),
Attributes: attribute.Exporter(pipelineType, exprID),
componentID: exprID,
pipelineType: pipelineType,
}
Expand Down
7 changes: 3 additions & 4 deletions service/internal/graph/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,22 @@ package graph // import "go.opentelemetry.io/collector/service/internal/graph"

import (
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/service/internal/graph/attribute"
)

const fanOutToExporters = "fanout_to_exporters"

var _ consumerNode = (*fanOutNode)(nil)

// Each pipeline has one fan-out node before exporters.
// Therefore, nodeID is derived from "pipeline ID".
type fanOutNode struct {
nodeID
*attribute.Attributes
pipelineID pipeline.ID
baseConsumer
}

func newFanOutNode(pipelineID pipeline.ID) *fanOutNode {
return &fanOutNode{
nodeID: newNodeID(fanOutToExporters, pipelineID.String()),
Attributes: attribute.Fanout(pipelineID),
pipelineID: pipelineID,
}
}
Expand Down
18 changes: 9 additions & 9 deletions service/internal/graph/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2067,11 +2067,11 @@ func TestGraphBuildErrors(t *testing.T) {
},
},
expected: `cycle detected: ` +
`connector "nop/conn1" (traces to traces) -> ` +
`processor "nop" in pipeline "traces/2" -> ` +
`connector "nop/conn" (traces to traces) -> ` +
`processor "nop" in pipeline "traces/1" -> ` +
`connector "nop/conn1" (traces to traces)`,
`connector "nop/conn1" (traces to traces) -> ` +
`processor "nop" in pipeline "traces/2" -> ` +
`connector "nop/conn" (traces to traces)`,
},
{
name: "not_allowed_deep_cycle_metrics.yaml",
Expand Down Expand Up @@ -2157,11 +2157,11 @@ func TestGraphBuildErrors(t *testing.T) {
},
},
expected: `cycle detected: ` +
`connector "nop/conn1" (logs to logs) -> ` +
`processor "nop" in pipeline "logs/2" -> ` +
`connector "nop/conn" (logs to logs) -> ` +
`processor "nop" in pipeline "logs/1" -> ` +
`connector "nop/conn1" (logs to logs)`,
`connector "nop/conn1" (logs to logs) -> ` +
`processor "nop" in pipeline "logs/2" -> ` +
`connector "nop/conn" (logs to logs)`,
},
{
name: "not_allowed_deep_cycle_profiles.yaml",
Expand Down Expand Up @@ -2263,13 +2263,13 @@ func TestGraphBuildErrors(t *testing.T) {
},
},
expected: `cycle detected: ` +
`connector "nop/forkagain" (traces to traces) -> ` +
`processor "nop" in pipeline "traces/copy2b" -> ` +
`connector "nop/rawlog" (traces to logs) -> ` +
`processor "nop" in pipeline "logs/raw" -> ` +
`connector "nop/fork" (logs to traces) -> ` +
`processor "nop" in pipeline "traces/copy2" -> ` +
`connector "nop/forkagain" (traces to traces) -> ` +
`processor "nop" in pipeline "traces/copy2b" -> ` +
`connector "nop/rawlog" (traces to logs)`,
`connector "nop/forkagain" (traces to traces)`,
},
{
name: "unknown_exporter_config",
Expand Down
22 changes: 0 additions & 22 deletions service/internal/graph/node.go

This file was deleted.

Loading

0 comments on commit e742a18

Please sign in to comment.