diff --git a/connector/connector.go b/connector/connector.go index 64a038d2a95..2ae78f26a6b 100644 --- a/connector/connector.go +++ b/connector/connector.go @@ -112,17 +112,9 @@ type Factory interface { LogsToMetricsStability() component.StabilityLevel LogsToLogsStability() component.StabilityLevel - // Metadata returns the metadata describing the receiver. - Metadata() Metadata - unexportedFactoryFunc() } -// Metadata contains metadata describing the component that is created by the factory. -type Metadata struct { - SingletonInstance bool -} - // FactoryOption applies changes to Factory. type FactoryOption interface { // apply applies the option. @@ -309,13 +301,6 @@ func WithLogsToLogs(createLogsToLogs CreateLogsToLogsFunc, sl component.Stabilit }) } -// AsSingletonInstance indicates that the factory always returns the same instance of the component. -func AsSingletonInstance() FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.metadata.SingletonInstance = true - }) -} - // factory implements the Factory interface. type factory struct { cfgType component.Type @@ -344,8 +329,6 @@ type factory struct { logsToTracesStabilityLevel component.StabilityLevel logsToMetricsStabilityLevel component.StabilityLevel logsToLogsStabilityLevel component.StabilityLevel - - metadata Metadata } // Type returns the type of component. @@ -391,10 +374,6 @@ func (f *factory) LogsToLogsStability() component.StabilityLevel { return f.logsToLogsStabilityLevel } -func (f *factory) Metadata() Metadata { - return f.metadata -} - // NewFactory returns a Factory. func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { f := &factory{ diff --git a/connector/connector_test.go b/connector/connector_test.go index 9a31f829054..e190f5a4c31 100644 --- a/connector/connector_test.go +++ b/connector/connector_test.go @@ -48,8 +48,6 @@ func TestNewFactoryNoOptions(t *testing.T) { assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalLogs, pipeline.SignalMetrics)) _, err = factory.CreateLogsToLogs(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalLogs, pipeline.SignalLogs)) - - assert.False(t, factory.Metadata().SingletonInstance) } func TestNewFactoryWithSameTypes(t *testing.T) { @@ -57,8 +55,7 @@ func TestNewFactoryWithSameTypes(t *testing.T) { factory := NewFactory(testType, func() component.Config { return &defaultCfg }, WithTracesToTraces(createTracesToTraces, component.StabilityLevelAlpha), WithMetricsToMetrics(createMetricsToMetrics, component.StabilityLevelBeta), - WithLogsToLogs(createLogsToLogs, component.StabilityLevelUnmaintained), - AsSingletonInstance()) + WithLogsToLogs(createLogsToLogs, component.StabilityLevelUnmaintained)) assert.EqualValues(t, testType, factory.Type()) assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) @@ -88,8 +85,6 @@ func TestNewFactoryWithSameTypes(t *testing.T) { assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalLogs, pipeline.SignalTraces)) _, err = factory.CreateLogsToMetrics(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalLogs, pipeline.SignalMetrics)) - - assert.True(t, factory.Metadata().SingletonInstance) } func TestNewFactoryWithTranslateTypes(t *testing.T) { diff --git a/connector/xconnector/connector.go b/connector/xconnector/connector.go index 412e361a3d0..d697e06a8e4 100644 --- a/connector/xconnector/connector.go +++ b/connector/xconnector/connector.go @@ -269,13 +269,6 @@ func WithProfilesToLogs(createProfilesToLogs CreateProfilesToLogsFunc, sl compon }) } -// AsSingletonInstance sets the connector as a singleton instance. -func AsSingletonInstance() FactoryOption { - return factoryOptionFunc(func(o *factoryOpts) { - o.opts = append(o.opts, connector.AsSingletonInstance()) - }) -} - // factory implements the Factory interface. type factory struct { connector.Factory diff --git a/connector/xconnector/connector_test.go b/connector/xconnector/connector_test.go index 6c096557c0d..636e76450ed 100644 --- a/connector/xconnector/connector_test.go +++ b/connector/xconnector/connector_test.go @@ -44,15 +44,12 @@ func TestNewFactoryNoOptions(t *testing.T) { assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalMetrics)) _, err = factory.CreateProfilesToLogs(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalLogs)) - - assert.False(t, factory.Metadata().SingletonInstance) } func TestNewFactoryWithSameTypes(t *testing.T) { defaultCfg := struct{}{} factory := NewFactory(testType, func() component.Config { return &defaultCfg }, WithProfilesToProfiles(createProfilesToProfiles, component.StabilityLevelAlpha), - AsSingletonInstance(), ) assert.EqualValues(t, testType, factory.Type()) assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) @@ -67,8 +64,6 @@ func TestNewFactoryWithSameTypes(t *testing.T) { assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalMetrics)) _, err = factory.CreateProfilesToLogs(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalLogs)) - - assert.True(t, factory.Metadata().SingletonInstance) } func TestNewFactoryWithTranslateTypes(t *testing.T) { diff --git a/exporter/exporter_test.go b/exporter/exporter_test.go index a9dc24117ad..fcc5f5a0b68 100644 --- a/exporter/exporter_test.go +++ b/exporter/exporter_test.go @@ -55,7 +55,7 @@ func TestNewFactoryWithOptions(t *testing.T) { assert.Equal(t, component.StabilityLevelDeprecated, f.LogsStability()) _, err = f.CreateLogs(context.Background(), Settings{}, &defaultCfg) - assert.NoError(t, err) + require.NoError(t, err) assert.True(t, f.Metadata().SingletonInstance) } diff --git a/exporter/xexporter/exporter_test.go b/exporter/xexporter/exporter_test.go index d1929c4c22e..f7287d2807b 100644 --- a/exporter/xexporter/exporter_test.go +++ b/exporter/xexporter/exporter_test.go @@ -21,6 +21,7 @@ func TestNewFactoryWithProfiles(t *testing.T) { testType, func() component.Config { return &defaultCfg }, WithProfiles(createProfiles, component.StabilityLevelDevelopment), + AsSingletonInstance(), ) assert.EqualValues(t, testType, factory.Type()) assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) diff --git a/receiver/receiver_test.go b/receiver/receiver_test.go index 3ce1635b59d..d685909ea51 100644 --- a/receiver/receiver_test.go +++ b/receiver/receiver_test.go @@ -56,7 +56,7 @@ func TestNewFactoryWithOptions(t *testing.T) { assert.Equal(t, component.StabilityLevelStable, f.LogsStability()) _, err = f.CreateLogs(context.Background(), Settings{}, &defaultCfg, nil) - assert.NoError(t, err) + require.NoError(t, err) assert.True(t, f.Metadata().SingletonInstance) } diff --git a/receiver/xreceiver/receiver_test.go b/receiver/xreceiver/receiver_test.go index 06ff1f592f9..8229174e416 100644 --- a/receiver/xreceiver/receiver_test.go +++ b/receiver/xreceiver/receiver_test.go @@ -22,6 +22,7 @@ func TestNewFactoryWithProfiles(t *testing.T) { testType, func() component.Config { return &defaultCfg }, WithProfiles(createProfiles, component.StabilityLevelAlpha), + AsSingletonInstance(), ) assert.EqualValues(t, testType, factory.Type()) assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index fcefcb33f20..dd5204ac182 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -17,8 +17,8 @@ import ( "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/extension/extensioncapabilities" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" - "go.opentelemetry.io/collector/service/internal/components" "go.opentelemetry.io/collector/service/internal/status" "go.opentelemetry.io/collector/service/internal/zpages" ) @@ -38,7 +38,7 @@ type Extensions struct { func (bes *Extensions) Start(ctx context.Context, host component.Host) error { bes.telemetry.Logger.Info("Starting extensions...") for _, extID := range bes.extensionIDs { - extLogger := components.ExtensionLogger(bes.telemetry.Logger, extID) + extLogger := attribute.Extension(extID).Logger(bes.telemetry.Logger) extLogger.Info("Extension is starting...") instanceID := bes.instanceIDs[extID] ext := bes.extMap[extID] @@ -216,7 +216,7 @@ func New(ctx context.Context, set Settings, cfg Config, options ...Option) (*Ext BuildInfo: set.BuildInfo, ModuleInfo: set.ModuleInfo, } - extSet.TelemetrySettings.Logger = components.ExtensionLogger(set.Telemetry.Logger, extID) + extSet.TelemetrySettings.Logger = attribute.Extension(extID).Logger(set.Telemetry.Logger) ext, err := set.Extensions.Create(ctx, extSet) if err != nil { diff --git a/service/internal/graph/attribute/attribute.go b/service/internal/attribute/attribute.go similarity index 63% rename from service/internal/graph/attribute/attribute.go rename to service/internal/attribute/attribute.go index e81b1f9be23..bd38093e4cd 100644 --- a/service/internal/graph/attribute/attribute.go +++ b/service/internal/attribute/attribute.go @@ -1,13 +1,13 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package attribute // import "go.opentelemetry.io/collector/service/internal/graph/attribute" +package attribute // import "go.opentelemetry.io/collector/service/internal/attribute" import ( - "fmt" "hash/fnv" "go.opentelemetry.io/otel/attribute" + "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pipeline" @@ -20,10 +20,6 @@ const ( signalKey = "otelcol.signal" signalOutputKey = "otelcol.signal.output" - receiverKind = "receiver" - processorKind = "processor" - exporterKind = "exporter" - connectorKind = "connector" capabiltiesKind = "capabilities" fanoutKind = "fanout" ) @@ -52,17 +48,32 @@ func (a Attributes) ID() int64 { return a.id } +func (a Attributes) Logger(logger *zap.Logger) *zap.Logger { + fields := make([]zap.Field, 0, a.set.Len()) + for _, kv := range a.set.ToSlice() { + fields = append(fields, zap.String(string(kv.Key), kv.Value.AsString())) + } + return logger.With(fields...) +} + func Receiver(pipelineType pipeline.Signal, id component.ID) *Attributes { return newAttributes( - attribute.String(componentKindKey, receiverKind), + attribute.String(componentKindKey, component.KindReceiver.String()), attribute.String(signalKey, pipelineType.String()), attribute.String(componentIDKey, id.String()), ) } +func ReceiverSingleton(id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, component.KindReceiver.String()), + attribute.String(componentIDKey, id.String()), + ) +} + func Processor(pipelineID pipeline.ID, id component.ID) *Attributes { return newAttributes( - attribute.String(componentKindKey, processorKind), + attribute.String(componentKindKey, component.KindProcessor.String()), attribute.String(signalKey, pipelineID.Signal().String()), attribute.String(pipelineIDKey, pipelineID.String()), attribute.String(componentIDKey, id.String()), @@ -71,16 +82,24 @@ func Processor(pipelineID pipeline.ID, id component.ID) *Attributes { func Exporter(pipelineType pipeline.Signal, id component.ID) *Attributes { return newAttributes( - attribute.String(componentKindKey, exporterKind), + attribute.String(componentKindKey, component.KindExporter.String()), attribute.String(signalKey, pipelineType.String()), attribute.String(componentIDKey, id.String()), ) } +func ExporterSingleton(id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, component.KindExporter.String()), + attribute.String(componentIDKey, id.String()), + ) +} + func Connector(exprPipelineType, rcvrPipelineType pipeline.Signal, id component.ID) *Attributes { return newAttributes( - attribute.String(componentKindKey, connectorKind), - attribute.String(signalKey, fmt.Sprintf("%s_to_%s", exprPipelineType.String(), rcvrPipelineType.String())), + attribute.String(componentKindKey, component.KindConnector.String()), + attribute.String(signalKey, exprPipelineType.String()), + attribute.String(signalOutputKey, rcvrPipelineType.String()), attribute.String(componentIDKey, id.String()), ) } @@ -98,3 +117,10 @@ func Fanout(pipelineID pipeline.ID) *Attributes { attribute.String(pipelineIDKey, pipelineID.String()), ) } + +func Extension(id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, component.KindExtension.String()), + attribute.String(componentIDKey, id.String()), + ) +} diff --git a/service/internal/attribute/attribute_test.go b/service/internal/attribute/attribute_test.go new file mode 100644 index 00000000000..eb554953d67 --- /dev/null +++ b/service/internal/attribute/attribute_test.go @@ -0,0 +1,225 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package attribute + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/pipeline/xpipeline" +) + +var ( + signals = []pipeline.Signal{ + pipeline.SignalTraces, + pipeline.SignalMetrics, + pipeline.SignalLogs, + xpipeline.SignalProfiles, + } + + cIDs = []component.ID{ + component.MustNewID("foo"), + component.MustNewID("foo2"), + component.MustNewID("bar"), + } + + pIDs = []pipeline.ID{ + pipeline.MustNewID("traces"), + pipeline.MustNewIDWithName("traces", "2"), + pipeline.MustNewID("metrics"), + pipeline.MustNewIDWithName("metrics", "2"), + pipeline.MustNewID("logs"), + pipeline.MustNewIDWithName("logs", "2"), + pipeline.MustNewID("profiles"), + pipeline.MustNewIDWithName("profiles", "2"), + } +) + +func TestReceiver(t *testing.T) { + for _, sig := range signals { + for _, id := range cIDs { + r := Receiver(sig, id) + componentKind, ok := r.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindReceiver.String(), componentKind.AsString()) + + signal, ok := r.Attributes().Value(signalKey) + require.True(t, ok) + require.Equal(t, sig.String(), signal.AsString()) + + componentID, ok := r.Attributes().Value(componentIDKey) + require.True(t, ok) + require.Equal(t, id.String(), componentID.AsString()) + } + } +} + +func TestReceiverSingleton(t *testing.T) { + for _, id := range cIDs { + r := ReceiverSingleton(id) + componentKind, ok := r.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindReceiver.String(), componentKind.AsString()) + + componentID, ok := r.Attributes().Value(componentIDKey) + require.True(t, ok) + require.Equal(t, id.String(), componentID.AsString()) + } +} + +func TestProcessor(t *testing.T) { + for _, pID := range pIDs { + for _, id := range cIDs { + p := Processor(pID, id) + componentKind, ok := p.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindProcessor.String(), componentKind.AsString()) + + pipelineID, ok := p.Attributes().Value(pipelineIDKey) + require.True(t, ok) + require.Equal(t, pID.String(), pipelineID.AsString()) + + componentID, ok := p.Attributes().Value(componentIDKey) + require.True(t, ok) + require.Equal(t, id.String(), componentID.AsString()) + } + } +} + +func TestExporter(t *testing.T) { + for _, sig := range signals { + for _, id := range cIDs { + e := Exporter(sig, id) + componentKind, ok := e.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindExporter.String(), componentKind.AsString()) + + signal, ok := e.Attributes().Value(signalKey) + require.True(t, ok) + require.Equal(t, sig.String(), signal.AsString()) + + componentID, ok := e.Attributes().Value(componentIDKey) + require.True(t, ok) + require.Equal(t, id.String(), componentID.AsString()) + } + } +} + +func TestExporterSingleton(t *testing.T) { + for _, id := range cIDs { + e := ExporterSingleton(id) + componentKind, ok := e.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindExporter.String(), componentKind.AsString()) + + componentID, ok := e.Attributes().Value(componentIDKey) + require.True(t, ok) + require.Equal(t, id.String(), componentID.AsString()) + } +} + +func TestConnector(t *testing.T) { + for _, exprSig := range signals { + for _, rcvrSig := range signals { + for _, id := range cIDs { + c := Connector(exprSig, rcvrSig, id) + componentKind, ok := c.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindConnector.String(), componentKind.AsString()) + + signal, ok := c.Attributes().Value(signalKey) + require.True(t, ok) + require.Equal(t, exprSig.String(), signal.AsString()) + + signalOutput, ok := c.Attributes().Value(signalOutputKey) + require.True(t, ok) + require.Equal(t, rcvrSig.String(), signalOutput.AsString()) + + componentID, ok := c.Attributes().Value(componentIDKey) + require.True(t, ok) + require.Equal(t, id.String(), componentID.AsString()) + } + } + } +} + +func TestExtension(t *testing.T) { + e := Extension(component.MustNewID("foo")) + componentKind, ok := e.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindExtension.String(), componentKind.AsString()) +} + +func TestSetEquality(t *testing.T) { + // The sets are created independently but should be exactly equivalent. + // We will ensure that corresponding elements are equal and that + // non-corresponding elements are not equal. + setI, setJ := createExampleSets(), createExampleSets() + for i, ei := range setI { + for j, ej := range setJ { + if i == j { + require.Equal(t, ei.ID(), ej.ID()) + require.True(t, ei.Attributes().Equals(ej.Attributes())) + } else { + require.NotEqual(t, ei.ID(), ej.ID()) + require.False(t, ei.Attributes().Equals(ej.Attributes())) + } + } + } +} + +func createExampleSets() []*Attributes { + sets := []*Attributes{} + + // Receiver examples. + for _, sig := range signals { + for _, id := range cIDs { + sets = append(sets, Receiver(sig, id)) + } + } + for _, id := range cIDs { + sets = append(sets, ReceiverSingleton(id)) + } + + // Processor examples. + for _, pID := range pIDs { + for _, cID := range cIDs { + sets = append(sets, Processor(pID, cID)) + } + } + + // Exporter examples. + for _, sig := range signals { + for _, id := range cIDs { + sets = append(sets, Exporter(sig, id)) + } + } + for _, id := range cIDs { + sets = append(sets, ExporterSingleton(id)) + } + + // Connector examples. + for _, exprSig := range signals { + for _, rcvrSig := range signals { + for _, id := range cIDs { + sets = append(sets, Connector(exprSig, rcvrSig, id)) + } + } + } + + // Capabilities examples. + for _, pID := range pIDs { + sets = append(sets, Capabilities(pID)) + } + + // Fanout examples. + for _, pID := range pIDs { + sets = append(sets, Fanout(pID)) + } + + return sets +} diff --git a/service/internal/builders/connector.go b/service/internal/builders/connector.go index b157d2dd64a..3bc8f36f762 100644 --- a/service/internal/builders/connector.go +++ b/service/internal/builders/connector.go @@ -376,7 +376,7 @@ func (b *ConnectorBuilder) IsConfigured(componentID component.ID) bool { return ok } -func (b *ConnectorBuilder) Factory(componentType component.Type) component.Factory { +func (b *ConnectorBuilder) Factory(componentType component.Type) connector.Factory { return b.factories[componentType] } diff --git a/service/internal/builders/exporter.go b/service/internal/builders/exporter.go index 6570828b76f..5b881cc9a58 100644 --- a/service/internal/builders/exporter.go +++ b/service/internal/builders/exporter.go @@ -94,7 +94,7 @@ func (b *ExporterBuilder) CreateProfiles(ctx context.Context, set exporter.Setti return f.CreateProfiles(ctx, set, cfg) } -func (b *ExporterBuilder) Factory(componentType component.Type) component.Factory { +func (b *ExporterBuilder) Factory(componentType component.Type) exporter.Factory { return b.factories[componentType] } diff --git a/service/internal/builders/processor.go b/service/internal/builders/processor.go index c0df0f3b575..2394a815c0e 100644 --- a/service/internal/builders/processor.go +++ b/service/internal/builders/processor.go @@ -108,7 +108,7 @@ func (b *ProcessorBuilder) CreateProfiles(ctx context.Context, set processor.Set return f.CreateProfiles(ctx, set, cfg, next) } -func (b *ProcessorBuilder) Factory(componentType component.Type) component.Factory { +func (b *ProcessorBuilder) Factory(componentType component.Type) processor.Factory { return b.factories[componentType] } diff --git a/service/internal/builders/receiver.go b/service/internal/builders/receiver.go index 007d9be2187..bf715fd3250 100644 --- a/service/internal/builders/receiver.go +++ b/service/internal/builders/receiver.go @@ -110,7 +110,7 @@ func (b *ReceiverBuilder) CreateProfiles(ctx context.Context, set receiver.Setti return f.CreateProfiles(ctx, set, cfg, next) } -func (b *ReceiverBuilder) Factory(componentType component.Type) component.Factory { +func (b *ReceiverBuilder) Factory(componentType component.Type) receiver.Factory { return b.factories[componentType] } diff --git a/service/internal/components/loggers.go b/service/internal/components/loggers.go deleted file mode 100644 index f02d19fb082..00000000000 --- a/service/internal/components/loggers.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package components // import "go.opentelemetry.io/collector/service/internal/components" - -import ( - "strings" - - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pipeline" -) - -const ( - zapKindKey = "kind" - zapNameKey = "name" - zapDataTypeKey = "data_type" - zapStabilityKey = "stability" - zapPipelineKey = "pipeline" - zapExporterInPipeline = "exporter_in_pipeline" - zapReceiverInPipeline = "receiver_in_pipeline" -) - -func ReceiverLogger(logger *zap.Logger, id component.ID, dt pipeline.Signal) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindReceiver.String())), - zap.String(zapNameKey, id.String()), - zap.String(zapDataTypeKey, dt.String())) -} - -func ProcessorLogger(logger *zap.Logger, id component.ID, pipelineID pipeline.ID) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindProcessor.String())), - zap.String(zapNameKey, id.String()), - zap.String(zapPipelineKey, pipelineID.String())) -} - -func ExporterLogger(logger *zap.Logger, id component.ID, dt pipeline.Signal) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindExporter.String())), - zap.String(zapDataTypeKey, dt.String()), - zap.String(zapNameKey, id.String())) -} - -func ExtensionLogger(logger *zap.Logger, id component.ID) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindExtension.String())), - zap.String(zapNameKey, id.String())) -} - -func ConnectorLogger(logger *zap.Logger, id component.ID, expDT, rcvDT pipeline.Signal) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindConnector.String())), - zap.String(zapNameKey, id.String()), - zap.String(zapExporterInPipeline, expDT.String()), - zap.String(zapReceiverInPipeline, rcvDT.String())) -} diff --git a/service/internal/components/package_test.go b/service/internal/components/package_test.go deleted file mode 100644 index 30b5a82311a..00000000000 --- a/service/internal/components/package_test.go +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package components - -import ( - "testing" - - "go.uber.org/goleak" -) - -func TestMain(m *testing.M) { - goleak.VerifyTestMain(m) -} diff --git a/service/internal/graph/attribute/attribute_test.go b/service/internal/graph/attribute/attribute_test.go deleted file mode 100644 index db2b32dc197..00000000000 --- a/service/internal/graph/attribute/attribute_test.go +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package attribute - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/pipeline/pipelineprofiles" -) - -var ( - signals = []pipeline.Signal{ - pipeline.SignalTraces, - pipeline.SignalMetrics, - pipeline.SignalLogs, - pipelineprofiles.SignalProfiles, - } - - cIDs = []component.ID{ - component.MustNewID("foo"), - component.MustNewID("foo2"), - component.MustNewID("bar"), - } - - pIDs = []pipeline.ID{ - pipeline.MustNewID("traces"), - pipeline.MustNewIDWithName("traces", "2"), - pipeline.MustNewID("metrics"), - pipeline.MustNewIDWithName("metrics", "2"), - pipeline.MustNewID("logs"), - pipeline.MustNewIDWithName("logs", "2"), - pipeline.MustNewID("profiles"), - pipeline.MustNewIDWithName("profiles", "2"), - } -) - -func TestAttributes(t *testing.T) { - // The sets are created independently but should be exactly equivalent. - // We will ensure that corresponding elements are equal and that - // non-corresponding elements are not equal. - setI, setJ := createExampleSets(), createExampleSets() - for i, ei := range setI { - for j, ej := range setJ { - if i == j { - require.Equal(t, ei.ID(), ej.ID()) - require.True(t, ei.Attributes().Equals(ej.Attributes())) - } else { - require.NotEqual(t, ei.ID(), ej.ID()) - require.False(t, ei.Attributes().Equals(ej.Attributes())) - } - } - } -} - -func createExampleSets() []*Attributes { - sets := []*Attributes{} - - // Receiver examples. - for _, sig := range signals { - for _, id := range cIDs { - sets = append(sets, Receiver(sig, id)) - } - } - - // Processor examples. - for _, pID := range pIDs { - for _, cID := range cIDs { - sets = append(sets, Processor(pID, cID)) - } - } - - // Exporter examples. - for _, sig := range signals { - for _, id := range cIDs { - sets = append(sets, Exporter(sig, id)) - } - } - - // Connector examples. - for _, exprSig := range signals { - for _, rcvrSig := range signals { - for _, id := range cIDs { - sets = append(sets, Connector(exprSig, rcvrSig, id)) - } - } - } - - // Capabilities examples. - for _, pID := range pIDs { - sets = append(sets, Capabilities(pID)) - } - - // Fanout examples. - for _, pID := range pIDs { - sets = append(sets, Fanout(pID)) - } - - return sets -} diff --git a/service/internal/graph/capabilities.go b/service/internal/graph/capabilities.go index ec8e2a576c0..fa9ed5ea142 100644 --- a/service/internal/graph/capabilities.go +++ b/service/internal/graph/capabilities.go @@ -7,7 +7,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/service/internal/graph/attribute" + "go.opentelemetry.io/collector/service/internal/attribute" ) var _ consumerNode = (*capabilitiesNode)(nil) diff --git a/service/internal/graph/connector.go b/service/internal/graph/connector.go index df780a20b4b..73ab6ede49e 100644 --- a/service/internal/graph/connector.go +++ b/service/internal/graph/connector.go @@ -13,10 +13,9 @@ import ( "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/pipeline/xpipeline" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/capabilityconsumer" - "go.opentelemetry.io/collector/service/internal/components" - "go.opentelemetry.io/collector/service/internal/graph/attribute" ) var _ consumerNode = (*connectorNode)(nil) @@ -51,7 +50,7 @@ func (n *connectorNode) buildComponent( builder *builders.ConnectorBuilder, nexts []baseConsumer, ) error { - tel.Logger = components.ConnectorLogger(tel.Logger, n.componentID, n.exprPipelineType, n.rcvrPipelineType) + tel.Logger = n.Attributes.Logger(tel.Logger) set := connector.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} switch n.rcvrPipelineType { case pipeline.SignalTraces: diff --git a/service/internal/graph/exporter.go b/service/internal/graph/exporter.go index 5598121c6cc..5b852f6b64d 100644 --- a/service/internal/graph/exporter.go +++ b/service/internal/graph/exporter.go @@ -11,9 +11,8 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/pipeline/xpipeline" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" - "go.opentelemetry.io/collector/service/internal/components" - "go.opentelemetry.io/collector/service/internal/graph/attribute" ) var _ consumerNode = (*exporterNode)(nil) @@ -22,16 +21,24 @@ var _ consumerNode = (*exporterNode)(nil) // Therefore, nodeID is derived from "pipeline type" and "component ID". type exporterNode struct { *attribute.Attributes - componentID component.ID - pipelineType pipeline.Signal + componentID component.ID + pipelineTypes map[pipeline.Signal]struct{} component.Component } func newExporterNode(pipelineType pipeline.Signal, exprID component.ID) *exporterNode { return &exporterNode{ - Attributes: attribute.Exporter(pipelineType, exprID), - componentID: exprID, - pipelineType: pipelineType, + Attributes: attribute.Exporter(pipelineType, exprID), + componentID: exprID, + pipelineTypes: map[pipeline.Signal]struct{}{pipelineType: {}}, + } +} + +func newExporterSingletonNode(exprID component.ID) *exporterNode { + return &exporterNode{ + Attributes: attribute.ExporterSingleton(exprID), + componentID: exprID, + pipelineTypes: map[pipeline.Signal]struct{}{}, } } @@ -39,29 +46,35 @@ func (n *exporterNode) getConsumer() baseConsumer { return n.Component.(baseConsumer) } +func (n *exporterNode) withSignalType(pipelineType pipeline.Signal) { + n.pipelineTypes[pipelineType] = struct{}{} +} + func (n *exporterNode) buildComponent( ctx context.Context, tel component.TelemetrySettings, info component.BuildInfo, builder *builders.ExporterBuilder, ) error { - tel.Logger = components.ExporterLogger(tel.Logger, n.componentID, n.pipelineType) + tel.Logger = n.Attributes.Logger(tel.Logger) set := exporter.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} var err error - switch n.pipelineType { - case pipeline.SignalTraces: - n.Component, err = builder.CreateTraces(ctx, set) - case pipeline.SignalMetrics: - n.Component, err = builder.CreateMetrics(ctx, set) - case pipeline.SignalLogs: - n.Component, err = builder.CreateLogs(ctx, set) - case xpipeline.SignalProfiles: - n.Component, err = builder.CreateProfiles(ctx, set) - default: - return fmt.Errorf("error creating exporter %q for data type %q is not supported", set.ID, n.pipelineType) - } - if err != nil { - return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, n.pipelineType, err) + for signal := range n.pipelineTypes { + switch signal { + case pipeline.SignalTraces: + n.Component, err = builder.CreateTraces(ctx, set) + case pipeline.SignalMetrics: + n.Component, err = builder.CreateMetrics(ctx, set) + case pipeline.SignalLogs: + n.Component, err = builder.CreateLogs(ctx, set) + case xpipeline.SignalProfiles: + n.Component, err = builder.CreateProfiles(ctx, set) + default: + return fmt.Errorf("error creating exporter %q for data type %q is not supported", set.ID, signal) + } + if err != nil { + return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, signal, err) + } } return nil } diff --git a/service/internal/graph/fanout.go b/service/internal/graph/fanout.go index 7d1a76f086f..40aa7c01d29 100644 --- a/service/internal/graph/fanout.go +++ b/service/internal/graph/fanout.go @@ -5,7 +5,7 @@ package graph // import "go.opentelemetry.io/collector/service/internal/graph" import ( "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/service/internal/graph/attribute" + "go.opentelemetry.io/collector/service/internal/attribute" ) var _ consumerNode = (*fanOutNode)(nil) diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 2bc4e163d7e..0fff7405013 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -111,13 +111,28 @@ func (g *Graph) createNodes(set Settings) error { connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID) continue } - rcvrNode := g.createReceiver(pipelineID, recvID) + factory := set.ReceiverBuilder.Factory(recvID.Type()) + if factory == nil { + return fmt.Errorf("receiver factory not available for: %q", recvID.Type()) + } + var rcvrNode *receiverNode + if factory.Metadata().SingletonInstance { + rcvrNode = g.singletonReceiver(recvID) + rcvrNode.withSignalType(pipelineID.Signal()) + } else { + rcvrNode = g.createReceiver(pipelineID, recvID) + } pipe.receivers[rcvrNode.ID()] = rcvrNode } pipe.capabilitiesNode = newCapabilitiesNode(pipelineID) for _, procID := range pipelineCfg.Processors { + factory := set.ProcessorBuilder.Factory(procID.Type()) + if factory == nil { + return fmt.Errorf("processor factory not available for: %q", procID.Type()) + } + procNode := g.createProcessor(pipelineID, procID) pipe.processors = append(pipe.processors, procNode) } @@ -130,8 +145,19 @@ func (g *Graph) createNodes(set Settings) error { connectorsAsExporter[exprID] = append(connectorsAsExporter[exprID], pipelineID) continue } - expNode := g.createExporter(pipelineID, exprID) - pipe.exporters[expNode.ID()] = expNode + factory := set.ExporterBuilder.Factory(exprID.Type()) + if factory == nil { + return fmt.Errorf("exporter factory not available for: %q", exprID.Type()) + } + + var exprNode *exporterNode + if factory.Metadata().SingletonInstance { + exprNode = g.singletonExporter(exprID) + exprNode.withSignalType(pipelineID.Signal()) + } else { + exprNode = g.createExporter(pipelineID, exprID) + } + pipe.exporters[exprNode.ID()] = exprNode } } @@ -140,7 +166,6 @@ func (g *Graph) createNodes(set Settings) error { if factory == nil { return fmt.Errorf("connector factory not available for: %q", connID.Type()) } - connFactory := factory.(connector.Factory) expTypes := make(map[pipeline.Signal]bool) for _, pipelineID := range connectorsAsExporter[connID] { @@ -160,7 +185,7 @@ func (g *Graph) createNodes(set Settings) error { for expType := range expTypes { for recType := range recTypes { // Typechecks the connector's receiving and exporting datatypes. - if connectorStability(connFactory, expType, recType) != component.StabilityLevelUndefined { + if connectorStability(factory, expType, recType) != component.StabilityLevelUndefined { expTypes[expType] = true recTypes[recType] = true } @@ -182,12 +207,12 @@ func (g *Graph) createNodes(set Settings) error { for _, eID := range connectorsAsExporter[connID] { for _, rID := range connectorsAsReceiver[connID] { - if connectorStability(connFactory, eID.Signal(), rID.Signal()) == component.StabilityLevelUndefined { + if connectorStability(factory, eID.Signal(), rID.Signal()) == component.StabilityLevelUndefined { // Connector is not supported for this combination, but we know it is used correctly elsewhere continue } - connNode := g.createConnector(eID, rID, connID) + connNode := g.createConnector(eID, rID, connID) g.pipelines[eID].exporters[connNode.ID()] = connNode g.pipelines[rID].receivers[connNode.ID()] = connNode } @@ -196,6 +221,16 @@ func (g *Graph) createNodes(set Settings) error { return nil } +func (g *Graph) singletonReceiver(recvID component.ID) *receiverNode { + // TODO create ID for the node before creating entire node + rcvrNode := newReceiverSingletonNode(recvID) + if node := g.componentGraph.Node(rcvrNode.ID()); node != nil { + return g.componentGraph.Node(rcvrNode.ID()).(*receiverNode) + } + g.componentGraph.AddNode(rcvrNode) + return rcvrNode +} + func (g *Graph) createReceiver(pipelineID pipeline.ID, recvID component.ID) *receiverNode { rcvrNode := newReceiverNode(pipelineID.Signal(), recvID) if node := g.componentGraph.Node(rcvrNode.ID()); node != nil { @@ -219,6 +254,16 @@ func (g *Graph) createProcessor(pipelineID pipeline.ID, procID component.ID) *pr return procNode } +func (g *Graph) singletonExporter(exprID component.ID) *exporterNode { + // TODO create ID for the node before creating entire node + expNode := newExporterSingletonNode(exprID) + if node := g.componentGraph.Node(expNode.ID()); node != nil { + return g.componentGraph.Node(expNode.ID()).(*exporterNode) + } + g.componentGraph.AddNode(expNode) + return expNode +} + func (g *Graph) createExporter(pipelineID pipeline.ID, exprID component.ID) *exporterNode { expNode := newExporterNode(pipelineID.Signal(), exprID) if node := g.componentGraph.Node(expNode.ID()); node != nil { @@ -302,7 +347,7 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error { MutatesData: g.pipelines[n.pipelineID].fanOutNode.getConsumer().Capabilities().MutatesData, } for _, proc := range g.pipelines[n.pipelineID].processors { - capability.MutatesData = capability.MutatesData || proc.getConsumer().Capabilities().MutatesData + capability.MutatesData = capability.MutatesData || proc.(*processorNode).getConsumer().Capabilities().MutatesData } next := g.nextConsumers(n.ID())[0] switch n.pipelineID.Signal() { @@ -379,7 +424,7 @@ type pipelineNodes struct { *capabilitiesNode // The order of processors is very important. Therefore use a slice for processors. - processors []*processorNode + processors []graph.Node // Emits to exporters. *fanOutNode @@ -490,7 +535,9 @@ func (g *Graph) GetExporters() map[pipeline.Signal]map[component.ID]component.Co for _, expNode := range pg.exporters { // Skip connectors, otherwise individual components can introduce cycles if expNode, ok := g.componentGraph.Node(expNode.ID()).(*exporterNode); ok { - exportersMap[expNode.pipelineType][expNode.componentID] = expNode.Component + for signal := range expNode.pipelineTypes { + exportersMap[signal][expNode.componentID] = expNode.Component + } } } } @@ -529,7 +576,8 @@ func cycleErr(err error, cycles [][]graph.Node) error { case *processorNode: componentDetails = append(componentDetails, fmt.Sprintf("processor %q in pipeline %q", n.componentID, n.pipelineID.String())) case *connectorNode: - componentDetails = append(componentDetails, fmt.Sprintf("connector %q (%s to %s)", n.componentID, n.exprPipelineType, n.rcvrPipelineType)) + componentDetails = append(componentDetails, fmt.Sprintf("connector %q (%s to %s)", n.componentID, n.exprPipelineType, + n.rcvrPipelineType)) default: continue // skip capabilities/fanout nodes } diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index a1025e14e33..a96632ab096 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -840,7 +840,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { } for _, n := range pipeline.processors { - require.True(t, n.Component.(*testcomponents.ExampleProcessor).Started()) + require.True(t, n.(*processorNode).Component.(*testcomponents.ExampleProcessor).Started()) } for _, n := range pipeline.receivers { @@ -929,7 +929,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { } for _, n := range pipeline.processors { - require.True(t, n.Component.(*testcomponents.ExampleProcessor).Stopped()) + require.True(t, n.(*processorNode).Component.(*testcomponents.ExampleProcessor).Stopped()) } for _, n := range pipeline.exporters { @@ -1010,6 +1010,376 @@ func TestConnectorPipelinesGraph(t *testing.T) { } } +func TestInstances(t *testing.T) { + tests := []struct { + name string + pipelineConfigs pipelines.Config + expectInstances map[component.ID]int + }{ + { + name: "one_pipeline_each_signal", + pipelineConfigs: pipelines.Config{ + pipeline.NewID(pipeline.SignalTraces): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewID(pipeline.SignalMetrics): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewID(pipeline.SignalLogs): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewID(xpipeline.SignalProfiles): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + }, + expectInstances: map[component.ID]int{ + component.MustNewID("examplereceiver"): 4, // one per signal + component.MustNewID("exampleprocessor"): 4, // one per pipeline + component.MustNewID("exampleexporter"): 4, // one per signal + }, + }, + { + name: "shared_by_signals", + pipelineConfigs: pipelines.Config{ + pipeline.NewIDWithName(pipeline.SignalTraces, "1"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalTraces, "2"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalMetrics, "1"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalMetrics, "2"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalLogs, "1"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalLogs, "2"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(xpipeline.SignalProfiles, "1"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(xpipeline.SignalProfiles, "2"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + }, + expectInstances: map[component.ID]int{ + component.MustNewID("examplereceiver"): 4, // one per signal + component.MustNewID("exampleprocessor"): 8, // one per pipeline + component.MustNewID("exampleexporter"): 4, // one per signal + }, + }, + { + name: "singleton_receiver", + pipelineConfigs: pipelines.Config{ + pipeline.NewIDWithName(pipeline.SignalTraces, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalTraces, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalMetrics, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalMetrics, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalLogs, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalLogs, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(xpipeline.SignalProfiles, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(xpipeline.SignalProfiles, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + }, + expectInstances: map[component.ID]int{ + component.MustNewID("singletonreceiver"): 1, // singleton + component.MustNewID("exampleprocessor"): 8, // one per pipeline + component.MustNewID("exampleexporter"): 4, // each data type has an instance + }, + }, + { + name: "singleton_exporter", + pipelineConfigs: pipelines.Config{ + pipeline.NewIDWithName(pipeline.SignalTraces, "1"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalTraces, "2"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalMetrics, "1"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalMetrics, "2"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalLogs, "1"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalLogs, "2"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(xpipeline.SignalProfiles, "1"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(xpipeline.SignalProfiles, "2"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + }, + expectInstances: map[component.ID]int{ + component.MustNewID("examplereceiver"): 4, // each data type has an instance + component.MustNewID("exampleprocessor"): 8, // one per pipeline + component.MustNewID("singletonexporter"): 1, // singleton + }, + }, + { + name: "singleton_receiver_and_exporter", + pipelineConfigs: pipelines.Config{ + pipeline.NewIDWithName(pipeline.SignalTraces, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalTraces, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalMetrics, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalMetrics, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalLogs, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalLogs, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(xpipeline.SignalProfiles, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(xpipeline.SignalProfiles, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + }, + expectInstances: map[component.ID]int{ + component.MustNewID("singletonreceiver"): 1, // singleton + component.MustNewID("exampleprocessor"): 8, // one per pipeline + component.MustNewID("singletonexporter"): 1, // singleton + }, + }, + { + name: "mixed", + pipelineConfigs: pipelines.Config{ + pipeline.NewIDWithName(pipeline.SignalTraces, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver"), component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter"), component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalTraces, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver"), component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter"), component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalMetrics, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver"), component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter"), component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalMetrics, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver"), component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter"), component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalLogs, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver"), component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter"), component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalLogs, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver"), component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter"), component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(xpipeline.SignalProfiles, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver"), component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter"), component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(xpipeline.SignalProfiles, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver"), component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter"), component.MustNewID("exampleexporter")}, + }, + }, + expectInstances: map[component.ID]int{ + component.MustNewID("singletonreceiver"): 1, // singleton + component.MustNewID("examplereceiver"): 4, // one per signal + component.MustNewID("exampleprocessor"): 8, // one per pipeline + component.MustNewID("singletonexporter"): 1, // singleton + component.MustNewID("exampleexporter"): 4, // one per signal + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + set := Settings{ + Telemetry: componenttest.NewNopTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + ReceiverBuilder: builders.NewReceiver( + map[component.ID]component.Config{ + component.MustNewID("examplereceiver"): testcomponents.ExampleReceiverFactory.CreateDefaultConfig(), + component.MustNewID("singletonreceiver"): testcomponents.SingletonReceiverFactory.CreateDefaultConfig(), + }, + map[component.Type]receiver.Factory{ + testcomponents.ExampleReceiverFactory.Type(): testcomponents.ExampleReceiverFactory, + testcomponents.SingletonReceiverFactory.Type(): testcomponents.SingletonReceiverFactory, + }, + ), + ProcessorBuilder: builders.NewProcessor( + map[component.ID]component.Config{ + component.MustNewID("exampleprocessor"): testcomponents.ExampleProcessorFactory.CreateDefaultConfig(), + }, + map[component.Type]processor.Factory{ + testcomponents.ExampleProcessorFactory.Type(): testcomponents.ExampleProcessorFactory, + }, + ), + ExporterBuilder: builders.NewExporter( + map[component.ID]component.Config{ + component.MustNewID("exampleexporter"): testcomponents.ExampleExporterFactory.CreateDefaultConfig(), + component.MustNewID("singletonexporter"): testcomponents.SingletonExporterFactory.CreateDefaultConfig(), + }, + map[component.Type]exporter.Factory{ + testcomponents.ExampleExporterFactory.Type(): testcomponents.ExampleExporterFactory, + testcomponents.SingletonExporterFactory.Type(): testcomponents.SingletonExporterFactory, + }, + ), + ConnectorBuilder: builders.NewConnector(map[component.ID]component.Config{}, map[component.Type]connector.Factory{}), + PipelineConfigs: tt.pipelineConfigs, + } + + pg, err := Build(context.Background(), set) + require.NoError(t, err) + + require.Equal(t, len(set.PipelineConfigs), len(pg.pipelines)) + + // For each component id, build a map of the instances of that component. + // Use graph.Node.ID() as the key to determine uniqueness of instances. + componentInstances := map[component.ID]map[int64]struct{}{} + for _, pipeline := range pg.pipelines { + for _, n := range pipeline.receivers { + r := n.(*receiverNode) + if _, ok := componentInstances[r.componentID]; !ok { + componentInstances[r.componentID] = map[int64]struct{}{} + } + componentInstances[r.componentID][n.ID()] = struct{}{} + } + for _, n := range pipeline.processors { + p := n.(*processorNode) + if _, ok := componentInstances[p.componentID]; !ok { + componentInstances[p.componentID] = map[int64]struct{}{} + } + componentInstances[p.componentID][n.ID()] = struct{}{} + } + for _, n := range pipeline.exporters { + e := n.(*exporterNode) + if _, ok := componentInstances[e.componentID]; !ok { + componentInstances[e.componentID] = map[int64]struct{}{} + } + componentInstances[e.componentID][n.ID()] = struct{}{} + } + } + + var totalExpected int + for id, instances := range componentInstances { + totalExpected += tt.expectInstances[id] + require.Len(t, instances, tt.expectInstances[id], id.String()) + } + totalExpected += len(tt.pipelineConfigs) * 2 // one fanout & one capabilities node per pipeline + require.Equal(t, totalExpected, pg.componentGraph.Nodes().Len()) + }) + } +} + func TestConnectorRouter(t *testing.T) { rcvrID := component.MustNewID("examplereceiver") routeTracesID := component.MustNewIDWithName("examplerouter", "traces") @@ -2157,11 +2527,11 @@ func TestGraphBuildErrors(t *testing.T) { }, }, expected: `cycle detected: ` + - `connector "nop/conn" (logs to logs) -> ` + - `processor "nop" in pipeline "logs/1" -> ` + `connector "nop/conn1" (logs to logs) -> ` + `processor "nop" in pipeline "logs/2" -> ` + - `connector "nop/conn" (logs to logs)`, + `connector "nop/conn" (logs to logs) -> ` + + `processor "nop" in pipeline "logs/1" -> ` + + `connector "nop/conn1" (logs to logs)`, }, { name: "not_allowed_deep_cycle_profiles.yaml", @@ -2263,13 +2633,13 @@ func TestGraphBuildErrors(t *testing.T) { }, }, expected: `cycle detected: ` + - `connector "nop/forkagain" (traces to traces) -> ` + - `processor "nop" in pipeline "traces/copy2b" -> ` + `connector "nop/rawlog" (traces to logs) -> ` + `processor "nop" in pipeline "logs/raw" -> ` + `connector "nop/fork" (logs to traces) -> ` + `processor "nop" in pipeline "traces/copy2" -> ` + - `connector "nop/forkagain" (traces to traces)`, + `connector "nop/forkagain" (traces to traces) -> ` + + `processor "nop" in pipeline "traces/copy2b" -> ` + + `connector "nop/rawlog" (traces to logs)`, }, { name: "unknown_exporter_config", @@ -2301,7 +2671,7 @@ func TestGraphBuildErrors(t *testing.T) { Exporters: []component.ID{component.MustNewID("unknown")}, }, }, - expected: "failed to create \"unknown\" exporter for data type \"traces\": exporter factory not available for: \"unknown\"", + expected: "exporter factory not available for: \"unknown\"", }, { name: "unknown_processor_config", @@ -2341,7 +2711,7 @@ func TestGraphBuildErrors(t *testing.T) { Exporters: []component.ID{component.MustNewID("nop")}, }, }, - expected: "failed to create \"unknown\" processor, in pipeline \"metrics\": processor factory not available for: \"unknown\"", + expected: "processor factory not available for: \"unknown\"", }, { name: "unknown_receiver_config", @@ -2373,7 +2743,7 @@ func TestGraphBuildErrors(t *testing.T) { Exporters: []component.ID{component.MustNewID("nop")}, }, }, - expected: "failed to create \"unknown\" receiver for data type \"logs\": receiver factory not available for: \"unknown\"", + expected: "receiver factory not available for: \"unknown\"", }, { name: "unknown_connector_factory", diff --git a/service/internal/graph/processor.go b/service/internal/graph/processor.go index cfe36cb461c..77af19e8d6c 100644 --- a/service/internal/graph/processor.go +++ b/service/internal/graph/processor.go @@ -13,9 +13,8 @@ import ( "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" - "go.opentelemetry.io/collector/service/internal/components" - "go.opentelemetry.io/collector/service/internal/graph/attribute" ) var _ consumerNode = (*processorNode)(nil) @@ -47,7 +46,7 @@ func (n *processorNode) buildComponent(ctx context.Context, builder *builders.ProcessorBuilder, next baseConsumer, ) error { - tel.Logger = components.ProcessorLogger(tel.Logger, n.componentID, n.pipelineID) + tel.Logger = n.Attributes.Logger(tel.Logger) set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} var err error switch n.pipelineID.Signal() { diff --git a/service/internal/graph/receiver.go b/service/internal/graph/receiver.go index 57dfcded117..19a40e52b07 100644 --- a/service/internal/graph/receiver.go +++ b/service/internal/graph/receiver.go @@ -14,67 +14,80 @@ import ( "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" - "go.opentelemetry.io/collector/service/internal/components" - "go.opentelemetry.io/collector/service/internal/graph/attribute" ) // A receiver instance can be shared by multiple pipelines of the same type. // Therefore, nodeID is derived from "pipeline type" and "component ID". type receiverNode struct { *attribute.Attributes - componentID component.ID - pipelineType pipeline.Signal + componentID component.ID + pipelineTypes map[pipeline.Signal]struct{} component.Component } func newReceiverNode(pipelineType pipeline.Signal, recvID component.ID) *receiverNode { return &receiverNode{ - Attributes: attribute.Receiver(pipelineType, recvID), - componentID: recvID, - pipelineType: pipelineType, + Attributes: attribute.Receiver(pipelineType, recvID), + componentID: recvID, + pipelineTypes: map[pipeline.Signal]struct{}{pipelineType: {}}, } } +func newReceiverSingletonNode(recvID component.ID) *receiverNode { + return &receiverNode{ + Attributes: attribute.ReceiverSingleton(recvID), + componentID: recvID, + pipelineTypes: map[pipeline.Signal]struct{}{}, + } +} + +func (n *receiverNode) withSignalType(pipelineType pipeline.Signal) { + n.pipelineTypes[pipelineType] = struct{}{} +} + func (n *receiverNode) buildComponent(ctx context.Context, tel component.TelemetrySettings, info component.BuildInfo, builder *builders.ReceiverBuilder, nexts []baseConsumer, ) error { - tel.Logger = components.ReceiverLogger(tel.Logger, n.componentID, n.pipelineType) + tel.Logger = n.Attributes.Logger(tel.Logger) set := receiver.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} var err error - switch n.pipelineType { - case pipeline.SignalTraces: - var consumers []consumer.Traces - for _, next := range nexts { - consumers = append(consumers, next.(consumer.Traces)) - } - n.Component, err = builder.CreateTraces(ctx, set, fanoutconsumer.NewTraces(consumers)) - case pipeline.SignalMetrics: - var consumers []consumer.Metrics - for _, next := range nexts { - consumers = append(consumers, next.(consumer.Metrics)) + for signal := range n.pipelineTypes { + switch signal { + case pipeline.SignalTraces: + var consumers []consumer.Traces + for _, next := range nexts { + consumers = append(consumers, next.(consumer.Traces)) + } + n.Component, err = builder.CreateTraces(ctx, set, fanoutconsumer.NewTraces(consumers)) + case pipeline.SignalMetrics: + var consumers []consumer.Metrics + for _, next := range nexts { + consumers = append(consumers, next.(consumer.Metrics)) + } + n.Component, err = builder.CreateMetrics(ctx, set, fanoutconsumer.NewMetrics(consumers)) + case pipeline.SignalLogs: + var consumers []consumer.Logs + for _, next := range nexts { + consumers = append(consumers, next.(consumer.Logs)) + } + n.Component, err = builder.CreateLogs(ctx, set, fanoutconsumer.NewLogs(consumers)) + case xpipeline.SignalProfiles: + var consumers []xconsumer.Profiles + for _, next := range nexts { + consumers = append(consumers, next.(xconsumer.Profiles)) + } + n.Component, err = builder.CreateProfiles(ctx, set, fanoutconsumer.NewProfiles(consumers)) + default: + return fmt.Errorf("error creating receiver %q for data type %q is not supported", set.ID, signal) } - n.Component, err = builder.CreateMetrics(ctx, set, fanoutconsumer.NewMetrics(consumers)) - case pipeline.SignalLogs: - var consumers []consumer.Logs - for _, next := range nexts { - consumers = append(consumers, next.(consumer.Logs)) + if err != nil { + return fmt.Errorf("failed to create %q receiver for data type %q: %w", set.ID, signal, err) } - n.Component, err = builder.CreateLogs(ctx, set, fanoutconsumer.NewLogs(consumers)) - case xpipeline.SignalProfiles: - var consumers []xconsumer.Profiles - for _, next := range nexts { - consumers = append(consumers, next.(xconsumer.Profiles)) - } - n.Component, err = builder.CreateProfiles(ctx, set, fanoutconsumer.NewProfiles(consumers)) - default: - return fmt.Errorf("error creating receiver %q for data type %q is not supported", set.ID, n.pipelineType) - } - if err != nil { - return fmt.Errorf("failed to create %q receiver for data type %q: %w", set.ID, n.pipelineType, err) } return nil } diff --git a/service/internal/graph/util_test.go b/service/internal/graph/util_test.go index 5e27bf6f340..84d8abc70e7 100644 --- a/service/internal/graph/util_test.go +++ b/service/internal/graph/util_test.go @@ -91,7 +91,9 @@ func (g *Graph) getReceivers() map[pipeline.Signal]map[component.ID]component.Co if !ok { continue } - receiversMap[rcvrNode.pipelineType][rcvrNode.componentID] = rcvrNode.Component + for signal := range rcvrNode.pipelineTypes { + receiversMap[signal][rcvrNode.componentID] = rcvrNode.Component + } } } return receiversMap diff --git a/service/internal/graph/zpages.go b/service/internal/graph/zpages.go index 4b7f2c5ae6d..e4b9b5869a5 100644 --- a/service/internal/graph/zpages.go +++ b/service/internal/graph/zpages.go @@ -41,7 +41,7 @@ func (g *Graph) HandleZPages(w http.ResponseWriter, r *http.Request) { } procIDs := make([]string, 0, len(p.processors)) for _, c := range p.processors { - procIDs = append(procIDs, c.componentID.String()) + procIDs = append(procIDs, c.(*processorNode).componentID.String()) } exprIDs := make([]string, 0, len(p.exporters)) for _, c := range p.exporters { diff --git a/service/internal/testcomponents/example_exporter.go b/service/internal/testcomponents/example_exporter.go index 10a269b07e4..95191e0cfd5 100644 --- a/service/internal/testcomponents/example_exporter.go +++ b/service/internal/testcomponents/example_exporter.go @@ -16,18 +16,16 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" ) -var testType = component.MustNewType("exampleexporter") - -const stability = component.StabilityLevelDevelopment +var exporterType = component.MustNewType("exampleexporter") // ExampleExporterFactory is factory for ExampleExporter. var ExampleExporterFactory = xexporter.NewFactory( - testType, + exporterType, createExporterDefaultConfig, - xexporter.WithTraces(createTracesExporter, stability), - xexporter.WithMetrics(createMetricsExporter, stability), - xexporter.WithLogs(createLogsExporter, stability), - xexporter.WithProfiles(createProfilesExporter, stability), + xexporter.WithTraces(createTracesExporter, component.StabilityLevelDevelopment), + xexporter.WithMetrics(createMetricsExporter, component.StabilityLevelDevelopment), + xexporter.WithLogs(createLogsExporter, component.StabilityLevelDevelopment), + xexporter.WithProfiles(createProfilesExporter, component.StabilityLevelDevelopment), ) func createExporterDefaultConfig() component.Config { diff --git a/service/internal/testcomponents/example_receiver.go b/service/internal/testcomponents/example_receiver.go index 87d65f0397d..7f7f0bad4f3 100644 --- a/service/internal/testcomponents/example_receiver.go +++ b/service/internal/testcomponents/example_receiver.go @@ -19,77 +19,53 @@ var receiverType = component.MustNewType("examplereceiver") var ExampleReceiverFactory = xreceiver.NewFactory( receiverType, createReceiverDefaultConfig, - xreceiver.WithTraces(createTraces, component.StabilityLevelDevelopment), - xreceiver.WithMetrics(createMetrics, component.StabilityLevelDevelopment), - xreceiver.WithLogs(createLogs, component.StabilityLevelDevelopment), - xreceiver.WithProfiles(createProfiles, component.StabilityLevelDevelopment), + xreceiver.WithTraces(createTracesReceiver, component.StabilityLevelDevelopment), + xreceiver.WithMetrics(createMetricsReceiver, component.StabilityLevelDevelopment), + xreceiver.WithLogs(createLogsReceiver, component.StabilityLevelDevelopment), + xreceiver.WithProfiles(createProfilesReceiver, component.StabilityLevelDevelopment), ) func createReceiverDefaultConfig() component.Config { return &struct{}{} } -// createTraces creates a receiver.Traces based on this config. -func createTraces( +func createTracesReceiver( _ context.Context, _ receiver.Settings, - cfg component.Config, + _ component.Config, nextConsumer consumer.Traces, ) (receiver.Traces, error) { - tr := createReceiver(cfg) - tr.ConsumeTracesFunc = nextConsumer.ConsumeTraces - return tr, nil + return &ExampleReceiver{ConsumeTracesFunc: nextConsumer.ConsumeTraces}, nil } // createMetrics creates a receiver.Metrics based on this config. -func createMetrics( +func createMetricsReceiver( _ context.Context, _ receiver.Settings, - cfg component.Config, + _ component.Config, nextConsumer consumer.Metrics, ) (receiver.Metrics, error) { - mr := createReceiver(cfg) - mr.ConsumeMetricsFunc = nextConsumer.ConsumeMetrics - return mr, nil + return &ExampleReceiver{ConsumeMetricsFunc: nextConsumer.ConsumeMetrics}, nil } // createLogs creates a receiver.Logs based on this config. -func createLogs( +func createLogsReceiver( _ context.Context, _ receiver.Settings, - cfg component.Config, + _ component.Config, nextConsumer consumer.Logs, ) (receiver.Logs, error) { - lr := createReceiver(cfg) - lr.ConsumeLogsFunc = nextConsumer.ConsumeLogs - return lr, nil + return &ExampleReceiver{ConsumeLogsFunc: nextConsumer.ConsumeLogs}, nil } // createProfiles creates a receiver.Profiles based on this config. -func createProfiles( +func createProfilesReceiver( _ context.Context, _ receiver.Settings, - cfg component.Config, + _ component.Config, nextConsumer xconsumer.Profiles, ) (xreceiver.Profiles, error) { - tr := createReceiver(cfg) - tr.ConsumeProfilesFunc = nextConsumer.ConsumeProfiles - return tr, nil -} - -func createReceiver(cfg component.Config) *ExampleReceiver { - // There must be one receiver for all data types. We maintain a map of - // receivers per config. - - // Check to see if there is already a receiver for this config. - er, ok := exampleReceivers[cfg] - if !ok { - er = &ExampleReceiver{} - // Remember the receiver in the map - exampleReceivers[cfg] = er - } - - return er + return &ExampleReceiver{ConsumeProfilesFunc: nextConsumer.ConsumeProfiles}, nil } // ExampleReceiver allows producing traces, metrics, logs and profiles for testing purposes. @@ -100,9 +76,3 @@ type ExampleReceiver struct { consumer.ConsumeLogsFunc xconsumer.ConsumeProfilesFunc } - -// This is the map of already created example receivers for particular configurations. -// We maintain this map because the receiver.Factory is asked trace and metric receivers separately -// when it gets CreateTraces() and CreateMetrics() but they must not -// create separate objects, they must use one Receiver object per configuration. -var exampleReceivers = map[component.Config]*ExampleReceiver{} diff --git a/service/internal/testcomponents/singleton_exporter.go b/service/internal/testcomponents/singleton_exporter.go new file mode 100644 index 00000000000..0c98a58c0fc --- /dev/null +++ b/service/internal/testcomponents/singleton_exporter.go @@ -0,0 +1,124 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package testcomponents // import "go.opentelemetry.io/collector/service/internal/testcomponents" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/xexporter" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +var singletonExporterType = component.MustNewType("singletonexporter") + +// SingletonExporterFactory is factory for SingletonExporter. +var SingletonExporterFactory = xexporter.NewFactory( + singletonExporterType, + createSingletonExporterDefaultConfig, + xexporter.WithTraces(createSingletonTracesExporter, component.StabilityLevelDevelopment), + xexporter.WithMetrics(createSingletonMetricsExporter, component.StabilityLevelDevelopment), + xexporter.WithLogs(createSingletonLogsExporter, component.StabilityLevelDevelopment), + xexporter.WithProfiles(createSingletonProfilesExporter, component.StabilityLevelDevelopment), + xexporter.AsSingletonInstance(), +) + +func createSingletonExporterDefaultConfig() component.Config { + return &struct{}{} +} + +func createSingletonTracesExporter( + _ context.Context, + _ exporter.Settings, + cfg component.Config, +) (exporter.Traces, error) { + return createSingletonExporter(cfg), nil +} + +func createSingletonMetricsExporter( + _ context.Context, + _ exporter.Settings, + cfg component.Config, +) (exporter.Metrics, error) { + return createSingletonExporter(cfg), nil +} + +func createSingletonLogsExporter( + _ context.Context, + _ exporter.Settings, + cfg component.Config, +) (exporter.Logs, error) { + return createSingletonExporter(cfg), nil +} + +func createSingletonProfilesExporter( + _ context.Context, + _ exporter.Settings, + cfg component.Config, +) (xexporter.Profiles, error) { + return createSingletonExporter(cfg), nil +} + +func createSingletonExporter(cfg component.Config) *SingletonExporter { + // There must be one receiver for all data types. We maintain a map of + // receivers per config. + + // Check to see if there is already a receiver for this config. + sr, ok := singletonExporters[cfg] + if !ok { + sr = &SingletonExporter{} + // Remember the receiver in the map + singletonExporters[cfg] = sr + } + + return sr +} + +// SingletonExporter stores consumed traces, metrics, logs and profiles for testing purposes. +type SingletonExporter struct { + componentState + Traces []ptrace.Traces + Metrics []pmetric.Metrics + Logs []plog.Logs + Profiles []pprofile.Profiles +} + +// ConsumeTraces receives ptrace.Traces for processing by the consumer.Traces. +func (exp *SingletonExporter) ConsumeTraces(_ context.Context, td ptrace.Traces) error { + exp.Traces = append(exp.Traces, td) + return nil +} + +// ConsumeMetrics receives pmetric.Metrics for processing by the Metrics. +func (exp *SingletonExporter) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error { + exp.Metrics = append(exp.Metrics, md) + return nil +} + +// ConsumeLogs receives plog.Logs for processing by the Logs. +func (exp *SingletonExporter) ConsumeLogs(_ context.Context, ld plog.Logs) error { + exp.Logs = append(exp.Logs, ld) + return nil +} + +// ConsumeProfiles receives pprofile.Profiles for processing by the xconsumer.Profiles. +func (exp *SingletonExporter) ConsumeProfiles(_ context.Context, td pprofile.Profiles) error { + exp.Profiles = append(exp.Profiles, td) + return nil +} + +func (exp *SingletonExporter) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +// This is the map of already created singleton exporters for particular configurations. +// We maintain this map because the receiver.Factory is asked trace and metric exporters separately +// when it gets CreateTraces() and CreateMetrics() but they must not +// create separate objects, they must use one Receiver object per configuration. +var singletonExporters = map[component.Config]*SingletonExporter{} diff --git a/service/internal/testcomponents/singleton_exporter_test.go b/service/internal/testcomponents/singleton_exporter_test.go new file mode 100644 index 00000000000..e192d6d0654 --- /dev/null +++ b/service/internal/testcomponents/singleton_exporter_test.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package testcomponents + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestSingletonExporter(t *testing.T) { + exp := &SingletonExporter{} + host := componenttest.NewNopHost() + assert.False(t, exp.Started()) + require.NoError(t, exp.Start(context.Background(), host)) + assert.True(t, exp.Started()) + + assert.Empty(t, exp.Traces) + require.NoError(t, exp.ConsumeTraces(context.Background(), ptrace.Traces{})) + assert.Len(t, exp.Traces, 1) + + assert.Empty(t, exp.Metrics) + require.NoError(t, exp.ConsumeMetrics(context.Background(), pmetric.Metrics{})) + assert.Len(t, exp.Metrics, 1) + + assert.Empty(t, exp.Logs) + require.NoError(t, exp.ConsumeLogs(context.Background(), plog.Logs{})) + assert.Len(t, exp.Logs, 1) + + assert.Empty(t, exp.Profiles) + require.NoError(t, exp.ConsumeProfiles(context.Background(), pprofile.Profiles{})) + assert.Len(t, exp.Profiles, 1) + + assert.False(t, exp.Stopped()) + require.NoError(t, exp.Shutdown(context.Background())) + assert.True(t, exp.Stopped()) +} diff --git a/service/internal/testcomponents/singleton_receiver.go b/service/internal/testcomponents/singleton_receiver.go new file mode 100644 index 00000000000..f8f1afcb561 --- /dev/null +++ b/service/internal/testcomponents/singleton_receiver.go @@ -0,0 +1,105 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package testcomponents // import "go.opentelemetry.io/collector/service/internal/testcomponents" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/xconsumer" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/xreceiver" +) + +var singletonReceiverType = component.MustNewType("singletonreceiver") + +// SingletonReceiverFactory is factory for SingletonReceiver. +var SingletonReceiverFactory = xreceiver.NewFactory( + singletonReceiverType, + createSingletonReceiverDefaultConfig, + xreceiver.WithTraces(createSingletonTracesReceiver, component.StabilityLevelDevelopment), + xreceiver.WithMetrics(createSingletonMetricsReceiver, component.StabilityLevelDevelopment), + xreceiver.WithLogs(createSingletonLogsReceiver, component.StabilityLevelDevelopment), + xreceiver.WithProfiles(createSingletonProfilesReceiver, component.StabilityLevelDevelopment), + xreceiver.AsSingletonInstance(), +) + +func createSingletonReceiverDefaultConfig() component.Config { + return &struct{}{} +} + +func createSingletonTracesReceiver( + _ context.Context, + _ receiver.Settings, + cfg component.Config, + nextConsumer consumer.Traces, +) (receiver.Traces, error) { + r := createSingletonReceiver(cfg) + r.ConsumeTracesFunc = nextConsumer.ConsumeTraces + return r, nil +} + +func createSingletonMetricsReceiver( + _ context.Context, + _ receiver.Settings, + cfg component.Config, + nextConsumer consumer.Metrics, +) (receiver.Metrics, error) { + r := createSingletonReceiver(cfg) + r.ConsumeMetricsFunc = nextConsumer.ConsumeMetrics + return r, nil +} + +func createSingletonLogsReceiver( + _ context.Context, + _ receiver.Settings, + cfg component.Config, + nextConsumer consumer.Logs, +) (receiver.Logs, error) { + r := createSingletonReceiver(cfg) + r.ConsumeLogsFunc = nextConsumer.ConsumeLogs + return r, nil +} + +func createSingletonProfilesReceiver( + _ context.Context, + _ receiver.Settings, + cfg component.Config, + nextConsumer xconsumer.Profiles, +) (xreceiver.Profiles, error) { + r := createSingletonReceiver(cfg) + r.ConsumeProfilesFunc = nextConsumer.ConsumeProfiles + return r, nil +} + +func createSingletonReceiver(cfg component.Config) *SingletonReceiver { + // There must be one receiver for all data types. We maintain a map of + // receivers per config. + + // Check to see if there is already a receiver for this config. + sr, ok := singletonReceivers[cfg] + if !ok { + sr = &SingletonReceiver{} + // Remember the receiver in the map + singletonReceivers[cfg] = sr + } + + return sr +} + +// SingletonReceiver allows producing traces, metrics, logs and profiles for testing purposes. +type SingletonReceiver struct { + componentState + consumer.ConsumeTracesFunc + consumer.ConsumeMetricsFunc + consumer.ConsumeLogsFunc + xconsumer.ConsumeProfilesFunc +} + +// This is the map of already created singleton receivers for particular configurations. +// We maintain this map because the receiver.Factory is asked trace and metric receivers separately +// when it gets CreateTraces() and CreateMetrics() but they must not +// create separate objects, they must use one Receiver object per configuration. +var singletonReceivers = map[component.Config]*SingletonReceiver{} diff --git a/service/internal/testcomponents/singleton_receiver_test.go b/service/internal/testcomponents/singleton_receiver_test.go new file mode 100644 index 00000000000..38f3366a8e0 --- /dev/null +++ b/service/internal/testcomponents/singleton_receiver_test.go @@ -0,0 +1,26 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package testcomponents + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" +) + +func TestSingletonReceiver(t *testing.T) { + rcv := &SingletonReceiver{} + host := componenttest.NewNopHost() + assert.False(t, rcv.Started()) + require.NoError(t, rcv.Start(context.Background(), host)) + assert.True(t, rcv.Started()) + + assert.False(t, rcv.Stopped()) + require.NoError(t, rcv.Shutdown(context.Background())) + assert.True(t, rcv.Stopped()) +}