From d2c178b75d0ba02c48f6e8c7d3ecbdd5d8c8e3b5 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Tue, 7 Jan 2025 09:33:23 -0800 Subject: [PATCH] [chore] Move exporterhelper queue code to exporterqueue (#12033) Signed-off-by: Bogdan Drutu --- exporter/exporterhelper/exporterhelper.go | 1 + .../internal/queue_sender_test.go | 18 ++- exporter/exporterhelper/logs.go | 3 +- exporter/exporterhelper/logs_test.go | 4 +- exporter/exporterhelper/metrics.go | 3 +- exporter/exporterhelper/metrics_test.go | 4 +- exporter/exporterhelper/traces.go | 3 +- exporter/exporterhelper/traces_test.go | 4 +- .../xexporterhelper/profiles_test.go | 4 +- .../bounded_memory_queue.go | 20 ++-- .../bounded_memory_queue_test.go | 75 +++++++++--- .../persistent_queue.go | 42 +++---- .../persistent_queue_test.go | 108 +++++++++--------- exporter/exporterqueue/queue.go | 49 +++++--- .../queue => exporterqueue}/sized_channel.go | 2 +- .../sized_channel_test.go | 2 +- exporter/exporterqueue/sizer.go | 16 +++ exporter/internal/queue/batcher.go | 7 +- exporter/internal/queue/consumers.go | 6 +- .../internal/queue/default_batcher_test.go | 63 ++++++---- .../internal/queue/disabled_batcher_test.go | 15 ++- exporter/internal/queue/queue.go | 49 -------- exporter/internal/queue/queue_test.go | 18 --- .../{queue => storagetest}/mock_storage.go | 20 ++-- 24 files changed, 282 insertions(+), 254 deletions(-) rename exporter/{internal/queue => exporterqueue}/bounded_memory_queue.go (78%) rename exporter/{internal/queue => exporterqueue}/bounded_memory_queue_test.go (68%) rename exporter/{internal/queue => exporterqueue}/persistent_queue.go (94%) rename exporter/{internal/queue => exporterqueue}/persistent_queue_test.go (92%) rename exporter/{internal/queue => exporterqueue}/sized_channel.go (97%) rename exporter/{internal/queue => exporterqueue}/sized_channel_test.go (98%) create mode 100644 exporter/exporterqueue/sizer.go delete mode 100644 exporter/internal/queue/queue.go delete mode 100644 exporter/internal/queue/queue_test.go rename exporter/internal/{queue => storagetest}/mock_storage.go (76%) diff --git a/exporter/exporterhelper/exporterhelper.go b/exporter/exporterhelper/exporterhelper.go index d9e90d821d9..cb2c10931fa 100644 --- a/exporter/exporterhelper/exporterhelper.go +++ b/exporter/exporterhelper/exporterhelper.go @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + import "go.opentelemetry.io/collector/exporter/internal" // Request represents a single request that can be sent to an external endpoint. diff --git a/exporter/exporterhelper/internal/queue_sender_test.go b/exporter/exporterhelper/internal/queue_sender_test.go index 138dc4ebabe..8acf7b37131 100644 --- a/exporter/exporterhelper/internal/queue_sender_test.go +++ b/exporter/exporterhelper/internal/queue_sender_test.go @@ -23,7 +23,7 @@ import ( "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/exporter/internal" - "go.opentelemetry.io/collector/exporter/internal/queue" + "go.opentelemetry.io/collector/exporter/internal/storagetest" "go.opentelemetry.io/collector/pipeline" ) @@ -435,7 +435,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { require.NoError(t, err) extensions := map[component.ID]component.Component{ - storageID: queue.NewMockStorageExtension(nil), + storageID: storagetest.NewMockStorageExtension(nil), } host := &MockHost{Ext: extensions} @@ -468,7 +468,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { require.NoError(t, err) extensions := map[component.ID]component.Component{ - storageID: queue.NewMockStorageExtension(storageError), + storageID: storagetest.NewMockStorageExtension(storageError), } host := &MockHost{Ext: extensions} @@ -500,7 +500,7 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { require.NoError(t, err) extensions := map[component.ID]component.Component{ - storageID: queue.NewMockStorageExtension(nil), + storageID: storagetest.NewMockStorageExtension(nil), } host := &MockHost{Ext: extensions} @@ -540,11 +540,17 @@ func TestQueueSenderNoStartShutdown(t *testing.T) { runTest := func(testName string, enableQueueBatcher bool) { t.Run(testName, func(t *testing.T) { defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - queue := queue.NewBoundedMemoryQueue[internal.Request](queue.MemoryQueueSettings[internal.Request]{}) set := exportertest.NewNopSettings() + queue := exporterqueue.NewMemoryQueueFactory[internal.Request]()( + context.Background(), + exporterqueue.Settings{ + Signal: pipeline.SignalTraces, + ExporterSettings: set, + }, + exporterqueue.NewDefaultConfig()) obsrep, err := NewExporter(ObsReportSettings{ ExporterID: exporterID, - ExporterCreateSettings: exportertest.NewNopSettings(), + ExporterCreateSettings: set, }) require.NoError(t, err) qs := NewQueueSender(queue, set, 1, "", obsrep, exporterbatcher.NewDefaultConfig()) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index b1acd7fda45..74a658b98fe 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -15,7 +15,6 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exporterqueue" - "go.opentelemetry.io/collector/exporter/internal/queue" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pipeline" ) @@ -135,7 +134,7 @@ func NewLogsRequest( return consumererror.NewPermanent(cErr) } sErr := be.Send(ctx, req) - if errors.Is(sErr, queue.ErrQueueIsFull) { + if errors.Is(sErr, exporterqueue.ErrQueueIsFull) { be.Obsrep.RecordEnqueueFailure(ctx, pipeline.SignalLogs, int64(req.ItemsCount())) } return sErr diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 71a9035393f..6f29010351a 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -27,7 +27,7 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/exporter/internal/queue" + "go.opentelemetry.io/collector/exporter/internal/storagetest" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/testdata" ) @@ -169,7 +169,7 @@ func TestLogs_WithPersistentQueue(t *testing.T) { require.NoError(t, err) host := &internal.MockHost{Ext: map[component.ID]component.Component{ - storageID: queue.NewMockStorageExtension(nil), + storageID: storagetest.NewMockStorageExtension(nil), }} require.NoError(t, te.Start(context.Background(), host)) t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 6488250d247..e3d4ccbd2ae 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -15,7 +15,6 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exporterqueue" - "go.opentelemetry.io/collector/exporter/internal/queue" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pipeline" ) @@ -135,7 +134,7 @@ func NewMetricsRequest( return consumererror.NewPermanent(cErr) } sErr := be.Send(ctx, req) - if errors.Is(sErr, queue.ErrQueueIsFull) { + if errors.Is(sErr, exporterqueue.ErrQueueIsFull) { be.Obsrep.RecordEnqueueFailure(ctx, pipeline.SignalMetrics, int64(req.ItemsCount())) } return sErr diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 34c108e4dd4..d1ba38e4f80 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -27,7 +27,7 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/exporter/internal/queue" + "go.opentelemetry.io/collector/exporter/internal/storagetest" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/testdata" ) @@ -170,7 +170,7 @@ func TestMetrics_WithPersistentQueue(t *testing.T) { require.NoError(t, err) host := &internal.MockHost{Ext: map[component.ID]component.Component{ - storageID: queue.NewMockStorageExtension(nil), + storageID: storagetest.NewMockStorageExtension(nil), }} require.NoError(t, te.Start(context.Background(), host)) t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 24a13676d5d..f8387d5a3b8 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -15,7 +15,6 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exporterqueue" - "go.opentelemetry.io/collector/exporter/internal/queue" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pipeline" ) @@ -135,7 +134,7 @@ func NewTracesRequest( return consumererror.NewPermanent(cErr) } sErr := be.Send(ctx, req) - if errors.Is(sErr, queue.ErrQueueIsFull) { + if errors.Is(sErr, exporterqueue.ErrQueueIsFull) { be.Obsrep.RecordEnqueueFailure(ctx, pipeline.SignalTraces, int64(req.ItemsCount())) } return sErr diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 563b49db5a9..9a6e63ae80e 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -27,7 +27,7 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/exporter/internal/queue" + "go.opentelemetry.io/collector/exporter/internal/storagetest" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/testdata" ) @@ -167,7 +167,7 @@ func TestTraces_WithPersistentQueue(t *testing.T) { require.NoError(t, err) host := &internal.MockHost{Ext: map[component.ID]component.Component{ - storageID: queue.NewMockStorageExtension(nil), + storageID: storagetest.NewMockStorageExtension(nil), }} require.NoError(t, te.Start(context.Background(), host)) t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) diff --git a/exporter/exporterhelper/xexporterhelper/profiles_test.go b/exporter/exporterhelper/xexporterhelper/profiles_test.go index ee02744165d..195c4326b3d 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_test.go @@ -30,7 +30,7 @@ import ( "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/exporter/internal/queue" + "go.opentelemetry.io/collector/exporter/internal/storagetest" "go.opentelemetry.io/collector/exporter/xexporter" "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/testdata" @@ -170,7 +170,7 @@ func TestProfilesExporter_WithPersistentQueue(t *testing.T) { require.NoError(t, err) host := &internal.MockHost{Ext: map[component.ID]component.Component{ - storageID: queue.NewMockStorageExtension(nil), + storageID: storagetest.NewMockStorageExtension(nil), }} require.NoError(t, te.Start(context.Background(), host)) t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) diff --git a/exporter/internal/queue/bounded_memory_queue.go b/exporter/exporterqueue/bounded_memory_queue.go similarity index 78% rename from exporter/internal/queue/bounded_memory_queue.go rename to exporter/exporterqueue/bounded_memory_queue.go index 015c94473df..ecf5ec1649f 100644 --- a/exporter/internal/queue/bounded_memory_queue.go +++ b/exporter/exporterqueue/bounded_memory_queue.go @@ -3,7 +3,7 @@ // Copyright (c) 2017 Uber Technologies, Inc. // SPDX-License-Identifier: Apache-2.0 -package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" +package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue" import ( "context" @@ -17,21 +17,21 @@ import ( type boundedMemoryQueue[T any] struct { component.StartFunc *sizedChannel[memQueueEl[T]] - sizer Sizer[T] + sizer sizer[T] } -// MemoryQueueSettings defines internal parameters for boundedMemoryQueue creation. -type MemoryQueueSettings[T any] struct { - Sizer Sizer[T] - Capacity int64 +// memoryQueueSettings defines internal parameters for boundedMemoryQueue creation. +type memoryQueueSettings[T any] struct { + sizer sizer[T] + capacity int64 } -// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional +// newBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional // callback for dropped items (e.g. useful to emit metrics). -func NewBoundedMemoryQueue[T any](set MemoryQueueSettings[T]) Queue[T] { +func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) Queue[T] { return &boundedMemoryQueue[T]{ - sizedChannel: newSizedChannel[memQueueEl[T]](set.Capacity, nil, 0), - sizer: set.Sizer, + sizedChannel: newSizedChannel[memQueueEl[T]](set.capacity, nil, 0), + sizer: set.sizer, } } diff --git a/exporter/internal/queue/bounded_memory_queue_test.go b/exporter/exporterqueue/bounded_memory_queue_test.go similarity index 68% rename from exporter/internal/queue/bounded_memory_queue_test.go rename to exporter/exporterqueue/bounded_memory_queue_test.go index 20816239c9c..b56d5a57766 100644 --- a/exporter/internal/queue/bounded_memory_queue_test.go +++ b/exporter/exporterqueue/bounded_memory_queue_test.go @@ -3,12 +3,13 @@ // Copyright (c) 2017 Uber Technologies, Inc. // SPDX-License-Identifier: Apache-2.0 -package queue +package exporterqueue import ( "context" "strconv" "sync" + "sync/atomic" "testing" "github.com/stretchr/testify/assert" @@ -21,7 +22,7 @@ import ( // We want to test the overflow behavior, so we block the consumer // by holding a startLock before submitting items to the queue. func TestBoundedQueue(t *testing.T) { - q := NewBoundedMemoryQueue[string](MemoryQueueSettings[string]{Sizer: &RequestSizer[string]{}, Capacity: 1}) + q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 1}) require.NoError(t, q.Offer(context.Background(), "a")) @@ -71,7 +72,7 @@ func TestBoundedQueue(t *testing.T) { // only after Stop will mean the consumers are still locked while // trying to perform the final consumptions. func TestShutdownWhileNotEmpty(t *testing.T) { - q := NewBoundedMemoryQueue[string](MemoryQueueSettings[string]{Sizer: &RequestSizer[string]{}, Capacity: 1000}) + q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 1000}) assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) for i := 0; i < 10; i++ { @@ -97,11 +98,11 @@ func TestShutdownWhileNotEmpty(t *testing.T) { } func Benchmark_QueueUsage_1000_requests(b *testing.B) { - benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 1000) + benchmarkQueueUsage(b, &requestSizer[fakeReq]{}, 1000) } func Benchmark_QueueUsage_100000_requests(b *testing.B) { - benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 100000) + benchmarkQueueUsage(b, &requestSizer[fakeReq]{}, 100000) } func Benchmark_QueueUsage_10000_items(b *testing.B) { @@ -116,40 +117,38 @@ func Benchmark_QueueUsage_1M_items(b *testing.B) { func TestQueueUsage(t *testing.T) { t.Run("requests_based", func(t *testing.T) { - queueUsage(t, &RequestSizer[fakeReq]{}, 10) + queueUsage(t, &requestSizer[fakeReq]{}, 10) }) t.Run("items_based", func(t *testing.T) { queueUsage(t, &itemsSizer[fakeReq]{}, 10) }) } -func benchmarkQueueUsage(b *testing.B, sizer Sizer[fakeReq], requestsCount int) { +func benchmarkQueueUsage(b *testing.B, sizer sizer[fakeReq], requestsCount int) { b.ReportAllocs() for i := 0; i < b.N; i++ { queueUsage(b, sizer, requestsCount) } } -func queueUsage(tb testing.TB, sizer Sizer[fakeReq], requestsCount int) { - var wg sync.WaitGroup - wg.Add(requestsCount) - q := NewBoundedMemoryQueue[fakeReq](MemoryQueueSettings[fakeReq]{Sizer: sizer, Capacity: int64(10 * requestsCount)}) - consumers := NewQueueConsumers(q, 1, func(context.Context, fakeReq) error { - wg.Done() +func queueUsage(tb testing.TB, sizer sizer[fakeReq], requestsCount int) { + q := newBoundedMemoryQueue[fakeReq](memoryQueueSettings[fakeReq]{sizer: sizer, capacity: int64(10 * requestsCount)}) + consumed := &atomic.Int64{} + require.NoError(tb, q.Start(context.Background(), componenttest.NewNopHost())) + ac := newAsyncConsumer(q, 1, func(context.Context, fakeReq) error { + consumed.Add(1) return nil }) - require.NoError(tb, q.Start(context.Background(), componenttest.NewNopHost())) - require.NoError(tb, consumers.Start(context.Background(), componenttest.NewNopHost())) for j := 0; j < requestsCount; j++ { require.NoError(tb, q.Offer(context.Background(), fakeReq{10})) } assert.NoError(tb, q.Shutdown(context.Background())) - assert.NoError(tb, consumers.Shutdown(context.Background())) - wg.Wait() + assert.NoError(tb, ac.Shutdown(context.Background())) + assert.Equal(tb, int64(requestsCount), consumed.Load()) } func TestZeroSizeNoConsumers(t *testing.T) { - q := NewBoundedMemoryQueue[string](MemoryQueueSettings[string]{Sizer: &RequestSizer[string]{}, Capacity: 0}) + q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 0}) err := q.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) @@ -166,3 +165,43 @@ type fakeReq struct { func (r fakeReq) ItemsCount() int { return r.itemsCount } + +func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool { + index, ctx, req, ok := q.Read(context.Background()) + if !ok { + return false + } + consumeErr := consumeFunc(ctx, req) + q.OnProcessingFinished(index, consumeErr) + return true +} + +type asyncConsumer struct { + stopWG sync.WaitGroup +} + +func newAsyncConsumer[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *asyncConsumer { + ac := &asyncConsumer{} + + ac.stopWG.Add(numConsumers) + for i := 0; i < numConsumers; i++ { + go func() { + defer ac.stopWG.Done() + for { + index, ctx, req, ok := q.Read(context.Background()) + if !ok { + return + } + consumeErr := consumeFunc(ctx, req) + q.OnProcessingFinished(index, consumeErr) + } + }() + } + return ac +} + +// Shutdown ensures that queue and all consumers are stopped. +func (qc *asyncConsumer) Shutdown(_ context.Context) error { + qc.stopWG.Wait() + return nil +} diff --git a/exporter/internal/queue/persistent_queue.go b/exporter/exporterqueue/persistent_queue.go similarity index 94% rename from exporter/internal/queue/persistent_queue.go rename to exporter/exporterqueue/persistent_queue.go index f3b529bf593..d930df5fabf 100644 --- a/exporter/internal/queue/persistent_queue.go +++ b/exporter/exporterqueue/persistent_queue.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" +package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue" import ( "context" @@ -49,7 +49,7 @@ type persistentQueue[T any] struct { // 2. capacity control based on the size of the items. *sizedChannel[permanentQueueEl] - set PersistentQueueSettings[T] + set persistentQueueSettings[T] logger *zap.Logger client storage.Client @@ -83,29 +83,29 @@ var ( errWrongExtensionType = errors.New("requested extension is not a storage extension") ) -type PersistentQueueSettings[T any] struct { - Sizer Sizer[T] - Capacity int64 - Signal pipeline.Signal - StorageID component.ID - Marshaler func(req T) ([]byte, error) - Unmarshaler func([]byte) (T, error) - ExporterSettings exporter.Settings +type persistentQueueSettings[T any] struct { + sizer sizer[T] + capacity int64 + signal pipeline.Signal + storageID component.ID + marshaler Marshaler[T] + unmarshaler Unmarshaler[T] + set exporter.Settings } -// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage -func NewPersistentQueue[T any](set PersistentQueueSettings[T]) Queue[T] { - _, isRequestSized := set.Sizer.(*RequestSizer[T]) +// newPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage +func newPersistentQueue[T any](set persistentQueueSettings[T]) Queue[T] { + _, isRequestSized := set.sizer.(*requestSizer[T]) return &persistentQueue[T]{ set: set, - logger: set.ExporterSettings.Logger, + logger: set.set.Logger, isRequestSized: isRequestSized, } } // Start starts the persistentQueue with the given number of consumers. func (pq *persistentQueue[T]) Start(ctx context.Context, host component.Host) error { - storageClient, err := toStorageClient(ctx, pq.set.StorageID, host, pq.set.ExporterSettings.ID, pq.set.Signal) + storageClient, err := toStorageClient(ctx, pq.set.storageID, host, pq.set.set.ID, pq.set.signal) if err != nil { return err } @@ -167,7 +167,7 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex } // nolint: gosec - pq.sizedChannel = newSizedChannel[permanentQueueEl](pq.set.Capacity, initEls, int64(initQueueSize)) + pq.sizedChannel = newSizedChannel[permanentQueueEl](pq.set.capacity, initEls, int64(initQueueSize)) } // permanentQueueEl is the type of the elements passed to the sizedChannel by the persistentQueue. @@ -239,11 +239,11 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error { // putInternal is the internal version that requires caller to hold the mutex lock. func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { - err := pq.sizedChannel.push(permanentQueueEl{}, pq.set.Sizer.Sizeof(req), func() error { + err := pq.sizedChannel.push(permanentQueueEl{}, pq.set.sizer.Sizeof(req), func() error { itemKey := getItemKey(pq.writeIndex) newIndex := pq.writeIndex + 1 - reqBuf, err := pq.set.Marshaler(req) + reqBuf, err := pq.set.marshaler(req) if err != nil { return err } @@ -286,7 +286,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, context.Context size := int64(0) index, req, consumed = pq.getNextItem(ctx) if consumed { - size = pq.set.Sizer.Sizeof(req) + size = pq.set.sizer.Sizeof(req) } return size }) @@ -330,7 +330,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) getOp) if err == nil { - request, err = pq.set.Unmarshaler(getOp.Value) + request, err = pq.set.unmarshaler(getOp.Value) } if err != nil { @@ -433,7 +433,7 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co pq.logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet)) continue } - req, err := pq.set.Unmarshaler(op.Value) + req, err := pq.set.unmarshaler(op.Value) // If error happened or item is nil, it will be efficiently ignored if err != nil { pq.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err)) diff --git a/exporter/internal/queue/persistent_queue_test.go b/exporter/exporterqueue/persistent_queue_test.go similarity index 92% rename from exporter/internal/queue/persistent_queue_test.go rename to exporter/exporterqueue/persistent_queue_test.go index 3bae63f0191..18c85e92b80 100644 --- a/exporter/internal/queue/persistent_queue_test.go +++ b/exporter/exporterqueue/persistent_queue_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package queue +package exporterqueue import ( "context" @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/exporter/internal/experr" + "go.opentelemetry.io/collector/exporter/internal/storagetest" "go.opentelemetry.io/collector/extension/extensiontest" "go.opentelemetry.io/collector/extension/xextension/storage" "go.opentelemetry.io/collector/pdata/pcommon" @@ -31,7 +32,7 @@ type itemsCounter interface { ItemsCount() int } -// itemsSizer is a Sizer implementation that returns the size of a queue element as the number of items it contains. +// itemsSizer is a sizer implementation that returns the size of a queue element as the number of items it contains. type itemsSizer[T itemsCounter] struct{} func (is *itemsSizer[T]) Sizeof(el T) int64 { @@ -225,64 +226,63 @@ func (m *fakeStorageClientWithErrors) Reset() { } // createAndStartTestPersistentQueue creates and starts a fake queue with the given capacity and number of consumers. -func createAndStartTestPersistentQueue(t *testing.T, sizer Sizer[tracesRequest], capacity int64, numConsumers int, +func createAndStartTestPersistentQueue(t *testing.T, sizer sizer[tracesRequest], capacity int64, numConsumers int, consumeFunc func(_ context.Context, item tracesRequest) error, ) Queue[tracesRequest] { - pq := NewPersistentQueue[tracesRequest](PersistentQueueSettings[tracesRequest]{ - Sizer: sizer, - Capacity: capacity, - Signal: pipeline.SignalTraces, - StorageID: component.ID{}, - Marshaler: marshalTracesRequest, - Unmarshaler: unmarshalTracesRequest, - ExporterSettings: exportertest.NewNopSettings(), + pq := newPersistentQueue[tracesRequest](persistentQueueSettings[tracesRequest]{ + sizer: sizer, + capacity: capacity, + signal: pipeline.SignalTraces, + storageID: component.ID{}, + marshaler: marshalTracesRequest, + unmarshaler: unmarshalTracesRequest, + set: exportertest.NewNopSettings(), }) host := &mockHost{ext: map[component.ID]component.Component{ - {}: NewMockStorageExtension(nil), + {}: storagetest.NewMockStorageExtension(nil), }} - consumers := NewQueueConsumers(pq, numConsumers, consumeFunc) require.NoError(t, pq.Start(context.Background(), host)) - require.NoError(t, consumers.Start(context.Background(), host)) + ac := newAsyncConsumer(pq, numConsumers, consumeFunc) t.Cleanup(func() { require.NoError(t, pq.Shutdown(context.Background())) - assert.NoError(t, consumers.Shutdown(context.Background())) + assert.NoError(t, ac.Shutdown(context.Background())) }) return pq } func createTestPersistentQueueWithClient(client storage.Client) *persistentQueue[tracesRequest] { - pq := NewPersistentQueue[tracesRequest](PersistentQueueSettings[tracesRequest]{ - Sizer: &RequestSizer[tracesRequest]{}, - Capacity: 1000, - Signal: pipeline.SignalTraces, - StorageID: component.ID{}, - Marshaler: marshalTracesRequest, - Unmarshaler: unmarshalTracesRequest, - ExporterSettings: exportertest.NewNopSettings(), + pq := newPersistentQueue[tracesRequest](persistentQueueSettings[tracesRequest]{ + sizer: &requestSizer[tracesRequest]{}, + capacity: 1000, + signal: pipeline.SignalTraces, + storageID: component.ID{}, + marshaler: marshalTracesRequest, + unmarshaler: unmarshalTracesRequest, + set: exportertest.NewNopSettings(), }).(*persistentQueue[tracesRequest]) pq.initClient(context.Background(), client) return pq } func createTestPersistentQueueWithRequestsCapacity(tb testing.TB, ext storage.Extension, capacity int64) *persistentQueue[tracesRequest] { - return createTestPersistentQueueWithCapacityLimiter(tb, ext, &RequestSizer[tracesRequest]{}, capacity) + return createTestPersistentQueueWithCapacityLimiter(tb, ext, &requestSizer[tracesRequest]{}, capacity) } func createTestPersistentQueueWithItemsCapacity(tb testing.TB, ext storage.Extension, capacity int64) *persistentQueue[tracesRequest] { return createTestPersistentQueueWithCapacityLimiter(tb, ext, &itemsSizer[tracesRequest]{}, capacity) } -func createTestPersistentQueueWithCapacityLimiter(tb testing.TB, ext storage.Extension, sizer Sizer[tracesRequest], +func createTestPersistentQueueWithCapacityLimiter(tb testing.TB, ext storage.Extension, sizer sizer[tracesRequest], capacity int64, ) *persistentQueue[tracesRequest] { - pq := NewPersistentQueue[tracesRequest](PersistentQueueSettings[tracesRequest]{ - Sizer: sizer, - Capacity: capacity, - Signal: pipeline.SignalTraces, - StorageID: component.ID{}, - Marshaler: marshalTracesRequest, - Unmarshaler: unmarshalTracesRequest, - ExporterSettings: exportertest.NewNopSettings(), + pq := newPersistentQueue[tracesRequest](persistentQueueSettings[tracesRequest]{ + sizer: sizer, + capacity: capacity, + signal: pipeline.SignalTraces, + storageID: component.ID{}, + marshaler: marshalTracesRequest, + unmarshaler: unmarshalTracesRequest, + set: exportertest.NewNopSettings(), }).(*persistentQueue[tracesRequest]) require.NoError(tb, pq.Start(context.Background(), &mockHost{ext: map[component.ID]component.Component{{}: ext}})) return pq @@ -291,13 +291,13 @@ func createTestPersistentQueueWithCapacityLimiter(tb testing.TB, ext storage.Ext func TestPersistentQueue_FullCapacity(t *testing.T) { tests := []struct { name string - sizer Sizer[tracesRequest] + sizer sizer[tracesRequest] capacity int64 sizeMultiplier int }{ { name: "requests_capacity", - sizer: &RequestSizer[tracesRequest]{}, + sizer: &requestSizer[tracesRequest]{}, capacity: 5, sizeMultiplier: 1, }, @@ -341,7 +341,7 @@ func TestPersistentQueue_FullCapacity(t *testing.T) { } func TestPersistentQueue_Shutdown(t *testing.T) { - pq := createAndStartTestPersistentQueue(t, &RequestSizer[tracesRequest]{}, 1001, 100, func(context.Context, + pq := createAndStartTestPersistentQueue(t, &requestSizer[tracesRequest]{}, 1001, 100, func(context.Context, tracesRequest, ) error { return nil @@ -385,7 +385,7 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) { req := newTracesRequest(1, 10) numMessagesConsumed := &atomic.Int32{} - pq := createAndStartTestPersistentQueue(t, &RequestSizer[tracesRequest]{}, 1000, c.numConsumers, + pq := createAndStartTestPersistentQueue(t, &requestSizer[tracesRequest]{}, 1000, c.numConsumers, func(context.Context, tracesRequest, ) error { @@ -474,7 +474,7 @@ func TestToStorageClient(t *testing.T) { extensions := map[component.ID]component.Component{} for i := 0; i < tt.numStorages; i++ { - extensions[component.MustNewIDWithName("file_storage", strconv.Itoa(i))] = NewMockStorageExtension(tt.getClientError) + extensions[component.MustNewIDWithName("file_storage", strconv.Itoa(i))] = storagetest.NewMockStorageExtension(tt.getClientError) } host := &mockHost{ext: extensions} ownerID := component.MustNewID("foo_exporter") @@ -518,7 +518,7 @@ func TestInvalidStorageExtensionType(t *testing.T) { } func TestPersistentQueue_StopAfterBadStart(t *testing.T) { - pq := NewPersistentQueue[tracesRequest](PersistentQueueSettings[tracesRequest]{}) + pq := newPersistentQueue[tracesRequest](persistentQueueSettings[tracesRequest]{}) // verify that stopping a un-start/started w/error queue does not panic assert.NoError(t, pq.Shutdown(context.Background())) } @@ -578,7 +578,7 @@ func TestPersistentQueue_CorruptedData(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { - ext := NewMockStorageExtension(nil) + ext := storagetest.NewMockStorageExtension(nil) ps := createTestPersistentQueueWithRequestsCapacity(t, ext, 1000) // Put some items, make sure they are loaded and shutdown the storage... @@ -626,7 +626,7 @@ func TestPersistentQueue_CorruptedData(t *testing.T) { func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) { req := newTracesRequest(5, 10) - ext := NewMockStorageExtension(nil) + ext := storagetest.NewMockStorageExtension(nil) ps := createTestPersistentQueueWithRequestsCapacity(t, ext, 1000) for i := 0; i < 5; i++ { @@ -685,7 +685,7 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) { func TestPersistentQueueStartWithNonDispatched(t *testing.T) { req := newTracesRequest(5, 10) - ext := NewMockStorageExtension(nil) + ext := storagetest.NewMockStorageExtension(nil) ps := createTestPersistentQueueWithRequestsCapacity(t, ext, 5) // Put in items up to capacity @@ -710,7 +710,7 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) { func TestPersistentQueueStartWithNonDispatchedConcurrent(t *testing.T) { req := newTracesRequest(1, 1) - ext := NewMockStorageExtensionWithDelay(nil, 20*time.Nanosecond) + ext := storagetest.NewMockStorageExtensionWithDelay(nil, 20*time.Nanosecond) pq := createTestPersistentQueueWithItemsCapacity(t, ext, 25) proWg := sync.WaitGroup{} @@ -773,7 +773,7 @@ func TestPersistentQueueStartWithNonDispatchedConcurrent(t *testing.T) { func TestPersistentQueue_PutCloseReadClose(t *testing.T) { req := newTracesRequest(5, 10) - ext := NewMockStorageExtension(nil) + ext := storagetest.NewMockStorageExtension(nil) ps := createTestPersistentQueueWithRequestsCapacity(t, ext, 1000) assert.Equal(t, 0, ps.Size()) @@ -824,7 +824,7 @@ func BenchmarkPersistentQueue_TraceSpans(b *testing.B) { for _, c := range cases { b.Run(fmt.Sprintf("#traces: %d #spansPerTrace: %d", c.numTraces, c.numSpansPerTrace), func(bb *testing.B) { - ext := NewMockStorageExtension(nil) + ext := storagetest.NewMockStorageExtension(nil) ps := createTestPersistentQueueWithRequestsCapacity(b, ext, 10000000) req := newTracesRequest(c.numTraces, c.numSpansPerTrace) @@ -903,20 +903,20 @@ func TestItemIndexArrayMarshaling(t *testing.T) { } func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) { - ps := createTestPersistentQueueWithRequestsCapacity(t, NewMockStorageExtension(nil), 1000) + ps := createTestPersistentQueueWithRequestsCapacity(t, storagetest.NewMockStorageExtension(nil), 1000) assert.Equal(t, 0, ps.Size()) - assert.False(t, ps.client.(*mockStorageClient).isClosed()) + assert.False(t, ps.client.(*storagetest.MockStorageClient).IsClosed()) require.NoError(t, ps.Offer(context.Background(), newTracesRequest(5, 10))) index, _, ok := ps.getNextItem(context.Background()) require.True(t, ok) - assert.False(t, ps.client.(*mockStorageClient).isClosed()) + assert.False(t, ps.client.(*storagetest.MockStorageClient).IsClosed()) require.NoError(t, ps.Shutdown(context.Background())) - assert.False(t, ps.client.(*mockStorageClient).isClosed()) + assert.False(t, ps.client.(*storagetest.MockStorageClient).IsClosed()) ps.OnProcessingFinished(index, nil) - assert.True(t, ps.client.(*mockStorageClient).isClosed()) + assert.True(t, ps.client.(*storagetest.MockStorageClient).IsClosed()) } func TestPersistentQueue_StorageFull(t *testing.T) { @@ -1016,7 +1016,7 @@ func TestPersistentQueue_ItemDispatchingFinish_ErrorHandling(t *testing.T) { } func TestPersistentQueue_ItemsCapacityUsageRestoredOnShutdown(t *testing.T) { - ext := NewMockStorageExtension(nil) + ext := storagetest.NewMockStorageExtension(nil) pq := createTestPersistentQueueWithItemsCapacity(t, ext, 100) assert.Equal(t, 0, pq.Size()) @@ -1065,7 +1065,7 @@ func TestPersistentQueue_ItemsCapacityUsageRestoredOnShutdown(t *testing.T) { // This test covers the case when the items capacity queue is enabled for the first time. func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { - ext := NewMockStorageExtension(nil) + ext := storagetest.NewMockStorageExtension(nil) pq := createTestPersistentQueueWithRequestsCapacity(t, ext, 100) assert.Equal(t, 0, pq.Size()) @@ -1123,7 +1123,7 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { // This test covers the case when the queue is restarted with the less capacity than needed to restore the queued items. // In that case, the queue has to be restored anyway even if it exceeds the capacity limit. func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { - ext := NewMockStorageExtension(nil) + ext := storagetest.NewMockStorageExtension(nil) pq := createTestPersistentQueueWithRequestsCapacity(t, ext, 100) assert.Equal(t, 0, pq.Size()) @@ -1177,7 +1177,7 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { // This test covers the case when the persistent storage is recovered from a snapshot which has // bigger value for the used size than the size of the actual items in the storage. func TestPersistentQueue_RestoredUsedSizeIsCorrectedOnDrain(t *testing.T) { - ext := NewMockStorageExtension(nil) + ext := storagetest.NewMockStorageExtension(nil) pq := createTestPersistentQueueWithItemsCapacity(t, ext, 1000) assert.Equal(t, 0, pq.Size()) diff --git a/exporter/exporterqueue/queue.go b/exporter/exporterqueue/queue.go index 724cc23e0ae..cb2a6b82de1 100644 --- a/exporter/exporterqueue/queue.go +++ b/exporter/exporterqueue/queue.go @@ -5,23 +5,40 @@ package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporter import ( "context" + "errors" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/internal/queue" "go.opentelemetry.io/collector/pipeline" ) -// ErrQueueIsFull is the error that Queue returns when full. +// ErrQueueIsFull is the error returned when an item is offered to the Queue and the queue is full. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -var ErrQueueIsFull = queue.ErrQueueIsFull +var ErrQueueIsFull = errors.New("sending queue is full") // Queue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue // (boundedMemoryQueue) or via a disk-based queue (persistentQueue) // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type Queue[T any] queue.Queue[T] +type Queue[T any] interface { + component.Component + // Offer inserts the specified element into this queue if it is possible to do so immediately + // without violating capacity restrictions. If success returns no error. + // It returns ErrQueueIsFull if no space is currently available. + Offer(ctx context.Context, item T) error + // Size returns the current Size of the queue + Size() int + // Capacity returns the capacity of the queue. + Capacity() int + // Read pulls the next available item from the queue along with its index. Once processing is + // finished, the index should be called with OnProcessingFinished to clean up the storage. + // The function blocks until an item is available or if the queue is stopped. + // Returns false if reading failed or if the queue is stopped. + Read(context.Context) (uint64, context.Context, T, bool) + // OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished. + OnProcessingFinished(index uint64, consumeErr error) +} // Settings defines settings for creating a queue. type Settings struct { @@ -49,9 +66,9 @@ type Factory[T any] func(context.Context, Settings, Config) Queue[T] // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. func NewMemoryQueueFactory[T any]() Factory[T] { return func(_ context.Context, _ Settings, cfg Config) Queue[T] { - return queue.NewBoundedMemoryQueue[T](queue.MemoryQueueSettings[T]{ - Sizer: &queue.RequestSizer[T]{}, - Capacity: int64(cfg.QueueSize), + return newBoundedMemoryQueue[T](memoryQueueSettings[T]{ + sizer: &requestSizer[T]{}, + capacity: int64(cfg.QueueSize), }) } } @@ -67,7 +84,7 @@ type PersistentQueueSettings[T any] struct { } // NewPersistentQueueFactory returns a factory to create a new persistent queue. -// If cfg.StorageID is nil then it falls back to memory queue. +// If cfg.storageID is nil then it falls back to memory queue. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. func NewPersistentQueueFactory[T any](storageID *component.ID, factorySettings PersistentQueueSettings[T]) Factory[T] { @@ -75,14 +92,14 @@ func NewPersistentQueueFactory[T any](storageID *component.ID, factorySettings P return NewMemoryQueueFactory[T]() } return func(_ context.Context, set Settings, cfg Config) Queue[T] { - return queue.NewPersistentQueue[T](queue.PersistentQueueSettings[T]{ - Sizer: &queue.RequestSizer[T]{}, - Capacity: int64(cfg.QueueSize), - Signal: set.Signal, - StorageID: *storageID, - Marshaler: factorySettings.Marshaler, - Unmarshaler: factorySettings.Unmarshaler, - ExporterSettings: set.ExporterSettings, + return newPersistentQueue[T](persistentQueueSettings[T]{ + sizer: &requestSizer[T]{}, + capacity: int64(cfg.QueueSize), + signal: set.Signal, + storageID: *storageID, + marshaler: factorySettings.Marshaler, + unmarshaler: factorySettings.Unmarshaler, + set: set.ExporterSettings, }) } } diff --git a/exporter/internal/queue/sized_channel.go b/exporter/exporterqueue/sized_channel.go similarity index 97% rename from exporter/internal/queue/sized_channel.go rename to exporter/exporterqueue/sized_channel.go index f322e58c01c..3a343769e1b 100644 --- a/exporter/internal/queue/sized_channel.go +++ b/exporter/exporterqueue/sized_channel.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" +package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue" import "sync/atomic" diff --git a/exporter/internal/queue/sized_channel_test.go b/exporter/exporterqueue/sized_channel_test.go similarity index 98% rename from exporter/internal/queue/sized_channel_test.go rename to exporter/exporterqueue/sized_channel_test.go index 3aff98bff99..24aadb8aba0 100644 --- a/exporter/internal/queue/sized_channel_test.go +++ b/exporter/exporterqueue/sized_channel_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package queue +package exporterqueue import ( "errors" diff --git a/exporter/exporterqueue/sizer.go b/exporter/exporterqueue/sizer.go new file mode 100644 index 00000000000..dad7343dab8 --- /dev/null +++ b/exporter/exporterqueue/sizer.go @@ -0,0 +1,16 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue" + +// sizer is an interface that returns the size of the given element. +type sizer[T any] interface { + Sizeof(T) int64 +} + +// requestSizer is a sizer implementation that returns the size of a queue element as one request. +type requestSizer[T any] struct{} + +func (rs *requestSizer[T]) Sizeof(T) int64 { + return 1 +} diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go index d320d2371e0..df45a7b97c9 100644 --- a/exporter/internal/queue/batcher.go +++ b/exporter/internal/queue/batcher.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/internal" ) @@ -25,7 +26,7 @@ type Batcher interface { type BaseBatcher struct { batchCfg exporterbatcher.Config - queue Queue[internal.Request] + queue exporterqueue.Queue[internal.Request] // TODO: Remove when the -1 hack for testing is removed. maxWorkers int workerPool chan bool @@ -34,7 +35,7 @@ type BaseBatcher struct { } func NewBatcher(batchCfg exporterbatcher.Config, - queue Queue[internal.Request], + queue exporterqueue.Queue[internal.Request], exportFunc func(ctx context.Context, req internal.Request) error, maxWorkers int, ) (Batcher, error) { @@ -45,7 +46,7 @@ func NewBatcher(batchCfg exporterbatcher.Config, } func newBaseBatcher(batchCfg exporterbatcher.Config, - queue Queue[internal.Request], + queue exporterqueue.Queue[internal.Request], exportFunc func(ctx context.Context, req internal.Request) error, maxWorkers int, ) BaseBatcher { diff --git a/exporter/internal/queue/consumers.go b/exporter/internal/queue/consumers.go index b8e07c3a79f..4db4b830e3c 100644 --- a/exporter/internal/queue/consumers.go +++ b/exporter/internal/queue/consumers.go @@ -8,21 +8,21 @@ import ( "sync" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exporterqueue" ) type Consumers[T any] struct { - queue Queue[T] + queue exporterqueue.Queue[T] numConsumers int consumeFunc func(context.Context, T) error stopWG sync.WaitGroup } -func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *Consumers[T] { +func NewQueueConsumers[T any](q exporterqueue.Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *Consumers[T] { return &Consumers[T]{ queue: q, numConsumers: numConsumers, consumeFunc: consumeFunc, - stopWG: sync.WaitGroup{}, } } diff --git a/exporter/internal/queue/default_batcher_test.go b/exporter/internal/queue/default_batcher_test.go index e93746f83f3..3f2c6253ee9 100644 --- a/exporter/internal/queue/default_batcher_test.go +++ b/exporter/internal/queue/default_batcher_test.go @@ -15,7 +15,10 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/exporterqueue" + "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/exporter/internal" + "go.opentelemetry.io/collector/pipeline" ) func TestDefaultBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) { @@ -45,11 +48,13 @@ func TestDefaultBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) { MinSizeItems: 0, } - q := NewBoundedMemoryQueue[internal.Request]( - MemoryQueueSettings[internal.Request]{ - Sizer: &RequestSizer[internal.Request]{}, - Capacity: 10, - }) + q := exporterqueue.NewMemoryQueueFactory[internal.Request]()( + context.Background(), + exporterqueue.Settings{ + Signal: pipeline.SignalTraces, + ExporterSettings: exportertest.NewNopSettings(), + }, + exporterqueue.NewDefaultConfig()) ba, err := NewBatcher(cfg, q, func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, @@ -105,11 +110,13 @@ func TestDefaultBatcher_NoSplit_TimeoutDisabled(t *testing.T) { MinSizeItems: 10, } - q := NewBoundedMemoryQueue[internal.Request]( - MemoryQueueSettings[internal.Request]{ - Sizer: &RequestSizer[internal.Request]{}, - Capacity: 10, - }) + q := exporterqueue.NewMemoryQueueFactory[internal.Request]()( + context.Background(), + exporterqueue.Settings{ + Signal: pipeline.SignalTraces, + ExporterSettings: exportertest.NewNopSettings(), + }, + exporterqueue.NewDefaultConfig()) ba, err := NewBatcher(cfg, q, func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, @@ -175,11 +182,13 @@ func TestDefaultBatcher_NoSplit_WithTimeout(t *testing.T) { MinSizeItems: 100, } - q := NewBoundedMemoryQueue[internal.Request]( - MemoryQueueSettings[internal.Request]{ - Sizer: &RequestSizer[internal.Request]{}, - Capacity: 10, - }) + q := exporterqueue.NewMemoryQueueFactory[internal.Request]()( + context.Background(), + exporterqueue.Settings{ + Signal: pipeline.SignalTraces, + ExporterSettings: exportertest.NewNopSettings(), + }, + exporterqueue.NewDefaultConfig()) ba, err := NewBatcher(cfg, q, func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, @@ -245,11 +254,13 @@ func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) { MaxSizeItems: 100, } - q := NewBoundedMemoryQueue[internal.Request]( - MemoryQueueSettings[internal.Request]{ - Sizer: &RequestSizer[internal.Request]{}, - Capacity: 10, - }) + q := exporterqueue.NewMemoryQueueFactory[internal.Request]()( + context.Background(), + exporterqueue.Settings{ + Signal: pipeline.SignalTraces, + ExporterSettings: exportertest.NewNopSettings(), + }, + exporterqueue.NewDefaultConfig()) ba, err := NewBatcher(cfg, q, func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, @@ -292,11 +303,13 @@ func TestDefaultBatcher_Shutdown(t *testing.T) { batchCfg.MinSizeItems = 10 batchCfg.FlushTimeout = 100 * time.Second - q := NewBoundedMemoryQueue[internal.Request]( - MemoryQueueSettings[internal.Request]{ - Sizer: &RequestSizer[internal.Request]{}, - Capacity: 10, - }) + q := exporterqueue.NewMemoryQueueFactory[internal.Request]()( + context.Background(), + exporterqueue.Settings{ + Signal: pipeline.SignalTraces, + ExporterSettings: exportertest.NewNopSettings(), + }, + exporterqueue.NewDefaultConfig()) ba, err := NewBatcher(batchCfg, q, func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, diff --git a/exporter/internal/queue/disabled_batcher_test.go b/exporter/internal/queue/disabled_batcher_test.go index ca612d82de5..0d9fe7e3bbb 100644 --- a/exporter/internal/queue/disabled_batcher_test.go +++ b/exporter/internal/queue/disabled_batcher_test.go @@ -14,7 +14,10 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/exporterqueue" + "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/exporter/internal" + "go.opentelemetry.io/collector/pipeline" ) func TestDisabledBatcher_Basic(t *testing.T) { @@ -40,11 +43,13 @@ func TestDisabledBatcher_Basic(t *testing.T) { cfg := exporterbatcher.NewDefaultConfig() cfg.Enabled = false - q := NewBoundedMemoryQueue[internal.Request]( - MemoryQueueSettings[internal.Request]{ - Sizer: &RequestSizer[internal.Request]{}, - Capacity: 10, - }) + q := exporterqueue.NewMemoryQueueFactory[internal.Request]()( + context.Background(), + exporterqueue.Settings{ + Signal: pipeline.SignalTraces, + ExporterSettings: exportertest.NewNopSettings(), + }, + exporterqueue.NewDefaultConfig()) ba, err := NewBatcher(cfg, q, func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, diff --git a/exporter/internal/queue/queue.go b/exporter/internal/queue/queue.go deleted file mode 100644 index 77cac737f7e..00000000000 --- a/exporter/internal/queue/queue.go +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright The OpenTelemetry Authors -// Copyright (c) 2019 The Jaeger Authors. -// Copyright (c) 2017 Uber Technologies, Inc. -// SPDX-License-Identifier: Apache-2.0 - -package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" - -import ( - "context" - "errors" - - "go.opentelemetry.io/collector/component" -) - -// ErrQueueIsFull is the error returned when an item is offered to the Queue and the queue is full. -var ErrQueueIsFull = errors.New("sending queue is full") - -// Queue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue -// (boundedMemoryQueue) or via a disk-based queue (persistentQueue) -type Queue[T any] interface { - component.Component - // Offer inserts the specified element into this queue if it is possible to do so immediately - // without violating capacity restrictions. If success returns no error. - // It returns ErrQueueIsFull if no space is currently available. - Offer(ctx context.Context, item T) error - // Size returns the current Size of the queue - Size() int - // Capacity returns the capacity of the queue. - Capacity() int - // Read pulls the next available item from the queue along with its index. Once processing is - // finished, the index should be called with OnProcessingFinished to clean up the storage. - // The function blocks until an item is available or if the queue is stopped. - // Returns false if reading failed or if the queue is stopped. - Read(context.Context) (uint64, context.Context, T, bool) - // OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished. - OnProcessingFinished(index uint64, consumeErr error) -} - -// Sizer is an interface that returns the size of the given element. -type Sizer[T any] interface { - Sizeof(T) int64 -} - -// RequestSizer is a Sizer implementation that returns the size of a queue element as one request. -type RequestSizer[T any] struct{} - -func (rs *RequestSizer[T]) Sizeof(T) int64 { - return 1 -} diff --git a/exporter/internal/queue/queue_test.go b/exporter/internal/queue/queue_test.go deleted file mode 100644 index 88276a82db9..00000000000 --- a/exporter/internal/queue/queue_test.go +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package queue - -import ( - "context" -) - -func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool { - index, ctx, req, ok := q.Read(context.Background()) - if !ok { - return false - } - consumeErr := consumeFunc(ctx, req) - q.OnProcessingFinished(index, consumeErr) - return true -} diff --git a/exporter/internal/queue/mock_storage.go b/exporter/internal/storagetest/mock_storage.go similarity index 76% rename from exporter/internal/queue/mock_storage.go rename to exporter/internal/storagetest/mock_storage.go index d721e4e8ed9..35024d35ee6 100644 --- a/exporter/internal/queue/mock_storage.go +++ b/exporter/internal/storagetest/mock_storage.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" +package storagetest // import "go.opentelemetry.io/collector/exporter/internal/storagetest" import ( "context" @@ -26,7 +26,7 @@ func (m *mockStorageExtension) GetClient(context.Context, component.Kind, compon if m.getClientError != nil { return nil, m.getClientError } - return &mockStorageClient{st: &m.st, closed: &atomic.Bool{}, executionDelay: m.executionDelay}, nil + return &MockStorageClient{st: &m.st, closed: &atomic.Bool{}, executionDelay: m.executionDelay}, nil } func NewMockStorageExtension(getClientError error) storage.Extension { @@ -40,33 +40,33 @@ func NewMockStorageExtensionWithDelay(getClientError error, executionDelay time. } } -type mockStorageClient struct { +type MockStorageClient struct { st *sync.Map closed *atomic.Bool executionDelay time.Duration // simulate real storage client delay } -func (m *mockStorageClient) Get(ctx context.Context, s string) ([]byte, error) { +func (m *MockStorageClient) Get(ctx context.Context, s string) ([]byte, error) { getOp := storage.GetOperation(s) err := m.Batch(ctx, getOp) return getOp.Value, err } -func (m *mockStorageClient) Set(ctx context.Context, s string, bytes []byte) error { +func (m *MockStorageClient) Set(ctx context.Context, s string, bytes []byte) error { return m.Batch(ctx, storage.SetOperation(s, bytes)) } -func (m *mockStorageClient) Delete(ctx context.Context, s string) error { +func (m *MockStorageClient) Delete(ctx context.Context, s string) error { return m.Batch(ctx, storage.DeleteOperation(s)) } -func (m *mockStorageClient) Close(context.Context) error { +func (m *MockStorageClient) Close(context.Context) error { m.closed.Store(true) return nil } -func (m *mockStorageClient) Batch(_ context.Context, ops ...*storage.Operation) error { - if m.isClosed() { +func (m *MockStorageClient) Batch(_ context.Context, ops ...*storage.Operation) error { + if m.IsClosed() { panic("client already closed") } if m.executionDelay != 0 { @@ -92,6 +92,6 @@ func (m *mockStorageClient) Batch(_ context.Context, ops ...*storage.Operation) return nil } -func (m *mockStorageClient) isClosed() bool { +func (m *MockStorageClient) IsClosed() bool { return m.closed.Load() }