From e5ca0d600823b4eab1d862253b36472aab0a9379 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 10 Jan 2025 16:33:14 -0800 Subject: [PATCH] Add capability for memory and persistent queue to block when add items Signed-off-by: Bogdan Drutu --- .chloggen/add-blocking.yaml | 25 ++ .../exporterqueue/bounded_memory_queue.go | 5 +- .../bounded_memory_queue_test.go | 71 +++- exporter/exporterqueue/cond.go | 62 +++ exporter/exporterqueue/persistent_queue.go | 66 +-- .../exporterqueue/persistent_queue_test.go | 375 +++++++++--------- exporter/exporterqueue/sized_queue.go | 47 ++- exporter/exporterqueue/sized_queue_test.go | 8 +- 8 files changed, 406 insertions(+), 253 deletions(-) create mode 100644 .chloggen/add-blocking.yaml create mode 100644 exporter/exporterqueue/cond.go diff --git a/.chloggen/add-blocking.yaml b/.chloggen/add-blocking.yaml new file mode 100644 index 000000000000..5b2d3895b7e8 --- /dev/null +++ b/.chloggen/add-blocking.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add capability for memory and persistent queue to block when add items + +# One or more tracking issues or pull requests related to the change +issues: [12074] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/exporterqueue/bounded_memory_queue.go b/exporter/exporterqueue/bounded_memory_queue.go index 7543958b2a1a..e94eb5da1c38 100644 --- a/exporter/exporterqueue/bounded_memory_queue.go +++ b/exporter/exporterqueue/bounded_memory_queue.go @@ -23,17 +23,18 @@ type boundedMemoryQueue[T any] struct { type memoryQueueSettings[T any] struct { sizer sizer[T] capacity int64 + blocking bool } // newBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional // callback for dropped items (e.g. useful to emit metrics). func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) Queue[T] { return &boundedMemoryQueue[T]{ - sizedQueue: newSizedQueue[T](set.capacity, set.sizer), + sizedQueue: newSizedQueue[T](set.capacity, set.sizer, set.blocking), } } -func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) { +func (q *boundedMemoryQueue[T]) Read(context.Context) (uint64, context.Context, T, bool) { ctx, req, ok := q.sizedQueue.pop() return 0, ctx, req, ok } diff --git a/exporter/exporterqueue/bounded_memory_queue_test.go b/exporter/exporterqueue/bounded_memory_queue_test.go index f723f8263f89..ff1666ef6049 100644 --- a/exporter/exporterqueue/bounded_memory_queue_test.go +++ b/exporter/exporterqueue/bounded_memory_queue_test.go @@ -16,8 +16,6 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/pdata/ptrace" - "go.opentelemetry.io/collector/pdata/testdata" ) // In this test we run a queue with capacity 1 and a single consumer. @@ -102,11 +100,11 @@ func TestShutdownWhileNotEmpty(t *testing.T) { func TestQueueUsage(t *testing.T) { tests := []struct { name string - sizer sizer[ptrace.Traces] + sizer sizer[uint64] }{ { name: "requests_based", - sizer: &requestSizer[ptrace.Traces]{}, + sizer: &requestSizer[uint64]{}, }, { name: "items_based", @@ -115,16 +113,15 @@ func TestQueueUsage(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - q := newBoundedMemoryQueue[ptrace.Traces](memoryQueueSettings[ptrace.Traces]{sizer: tt.sizer, capacity: int64(100)}) + q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(100)}) consumed := &atomic.Int64{} require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) - ac := newAsyncConsumer(q, 1, func(context.Context, ptrace.Traces) error { + ac := newAsyncConsumer(q, 1, func(context.Context, uint64) error { consumed.Add(1) return nil }) - td := testdata.GenerateTraces(10) for j := 0; j < 10; j++ { - require.NoError(t, q.Offer(context.Background(), td)) + require.NoError(t, q.Offer(context.Background(), uint64(10))) } assert.NoError(t, q.Shutdown(context.Background())) assert.NoError(t, ac.Shutdown(context.Background())) @@ -133,6 +130,47 @@ func TestQueueUsage(t *testing.T) { } } +func TestBlockingQueueUsage(t *testing.T) { + tests := []struct { + name string + sizer sizer[uint64] + }{ + { + name: "requests_based", + sizer: &requestSizer[uint64]{}, + }, + { + name: "items_based", + sizer: &itemsSizer{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(100), blocking: true}) + consumed := &atomic.Int64{} + require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) + ac := newAsyncConsumer(q, 10, func(context.Context, uint64) error { + consumed.Add(1) + return nil + }) + wg := &sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 100_000; j++ { + assert.NoError(t, q.Offer(context.Background(), uint64(10))) + } + }() + } + wg.Wait() + assert.NoError(t, q.Shutdown(context.Background())) + assert.NoError(t, ac.Shutdown(context.Background())) + assert.Equal(t, int64(1_000_000), consumed.Load()) + }) + } +} + func TestZeroSizeNoConsumers(t *testing.T) { q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 0}) @@ -149,8 +187,7 @@ func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool if !ok { return false } - consumeErr := consumeFunc(ctx, req) - q.OnProcessingFinished(index, consumeErr) + q.OnProcessingFinished(index, consumeFunc(ctx, req)) return true } @@ -170,8 +207,7 @@ func newAsyncConsumer[T any](q Queue[T], numConsumers int, consumeFunc func(cont if !ok { return } - consumeErr := consumeFunc(ctx, req) - q.OnProcessingFinished(index, consumeErr) + q.OnProcessingFinished(index, consumeFunc(ctx, req)) } }() } @@ -187,11 +223,11 @@ func (qc *asyncConsumer) Shutdown(_ context.Context) error { func BenchmarkOffer(b *testing.B) { tests := []struct { name string - sizer sizer[ptrace.Traces] + sizer sizer[uint64] }{ { name: "requests_based", - sizer: &requestSizer[ptrace.Traces]{}, + sizer: &requestSizer[uint64]{}, }, { name: "items_based", @@ -200,18 +236,17 @@ func BenchmarkOffer(b *testing.B) { } for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { - q := newBoundedMemoryQueue[ptrace.Traces](memoryQueueSettings[ptrace.Traces]{sizer: &requestSizer[ptrace.Traces]{}, capacity: int64(10 * b.N)}) + q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: &requestSizer[uint64]{}, capacity: int64(10 * b.N)}) consumed := &atomic.Int64{} require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost())) - ac := newAsyncConsumer(q, 1, func(context.Context, ptrace.Traces) error { + ac := newAsyncConsumer(q, 1, func(context.Context, uint64) error { consumed.Add(1) return nil }) - td := testdata.GenerateTraces(10) b.ResetTimer() b.ReportAllocs() for j := 0; j < b.N; j++ { - require.NoError(b, q.Offer(context.Background(), td)) + require.NoError(b, q.Offer(context.Background(), uint64(10))) } assert.NoError(b, q.Shutdown(context.Background())) assert.NoError(b, ac.Shutdown(context.Background())) diff --git a/exporter/exporterqueue/cond.go b/exporter/exporterqueue/cond.go new file mode 100644 index 000000000000..c41d3efe2240 --- /dev/null +++ b/exporter/exporterqueue/cond.go @@ -0,0 +1,62 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue" + +import ( + "context" + "sync" +) + +// cond is equivalent with sync.Cond, but context.Context aware. Which means Wait() will return if context is done. +// Also, it requires the caller to hold the c.L during all calls. +type cond struct { + L sync.Locker + ch chan struct{} + waiting int64 +} + +func newCond(l sync.Locker) *cond { + return &cond{L: l, ch: make(chan struct{}, 1)} +} + +// Signal wakes one goroutine waiting on c, if there is any. +// It requires for the caller to hold c.L during the call. +func (c *cond) Signal() { + if c.waiting == 0 { + return + } + c.waiting-- + c.ch <- struct{}{} +} + +// Broadcast wakes all goroutines waiting on c. +// It requires for the caller to hold c.L during the call. +func (c *cond) Broadcast() { + for ; c.waiting > 0; c.waiting-- { + c.ch <- struct{}{} + } +} + +// Wait atomically unlocks c.L and suspends execution of the calling goroutine. After later resuming execution, Wait locks c.L before returning. +func (c *cond) Wait(ctx context.Context) error { + c.waiting++ + c.L.Unlock() + select { + case <-ctx.Done(): + c.L.Lock() + if c.waiting == 0 { + // If waiting is 0, it means that there was a signal sent and nobody else waits for it. + // Consume it, so that we don't unblock other consumer unnecessary, + // or we don't block the producer because the channel buffer is full. + <-c.ch + } else { + // Decrease the number of waiting routines. + c.waiting-- + } + return ctx.Err() + case <-c.ch: + c.L.Lock() + return nil + } +} diff --git a/exporter/exporterqueue/persistent_queue.go b/exporter/exporterqueue/persistent_queue.go index 586a51396302..a1b35d4231f0 100644 --- a/exporter/exporterqueue/persistent_queue.go +++ b/exporter/exporterqueue/persistent_queue.go @@ -11,7 +11,6 @@ import ( "strconv" "sync" - "go.uber.org/multierr" "go.uber.org/zap" "go.opentelemetry.io/collector/component" @@ -42,6 +41,7 @@ var ( type persistentQueueSettings[T any] struct { sizer sizer[T] capacity int64 + blocking bool signal pipeline.Signal storageID component.ID marshaler Marshaler[T] @@ -81,7 +81,8 @@ type persistentQueue[T any] struct { // mu guards everything declared below. mu sync.Mutex - hasElements *sync.Cond + hasMoreElements *cond + hasMoreSpace *cond readIndex uint64 writeIndex uint64 currentlyDispatchedItems []uint64 @@ -98,7 +99,8 @@ func newPersistentQueue[T any](set persistentQueueSettings[T]) Queue[T] { logger: set.set.Logger, isRequestSized: isRequestSized, } - pq.hasElements = sync.NewCond(&pq.mu) + pq.hasMoreElements = newCond(&pq.mu) + pq.hasMoreSpace = newCond(&pq.mu) return pq } @@ -194,8 +196,8 @@ func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error { backupErr := pq.backupQueueSize(ctx) // Mark this queue as stopped, so consumer don't start any more work. pq.stopped = true - pq.hasElements.Broadcast() - return multierr.Combine(backupErr, pq.unrefClient(ctx)) + pq.hasMoreElements.Broadcast() + return errors.Join(backupErr, pq.unrefClient(ctx)) } // backupQueueSize writes the current queue size to storage. The value is used to recover the queue size @@ -233,8 +235,13 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error { // putInternal is the internal version that requires caller to hold the mutex lock. func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { reqSize := pq.set.sizer.Sizeof(req) - if pq.queueSize+reqSize > pq.set.capacity { - return ErrQueueIsFull + for pq.queueSize+reqSize > pq.set.capacity { + if !pq.set.blocking { + return ErrQueueIsFull + } + if err := pq.hasMoreSpace.Wait(ctx); err != nil { + return err + } } reqBuf, err := pq.set.marshaler(req) @@ -253,7 +260,7 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { pq.writeIndex++ pq.queueSize += reqSize - pq.hasElements.Signal() + pq.hasMoreElements.Signal() // Back up the queue size to storage every 10 writes. The stored value is used to recover the queue size // in case if the collector is killed. The recovered queue size is allowed to be inaccurate. @@ -269,32 +276,38 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, context.Context, T, bool) { pq.mu.Lock() defer pq.mu.Unlock() + for { if pq.stopped { var req T return 0, context.Background(), req, false } - // If queue is empty, wait until more elements and restart. - if pq.readIndex == pq.writeIndex { - pq.hasElements.Wait() - continue - } - - index, req, consumed := pq.getNextItem(ctx) - if consumed { - pq.queueSize -= pq.set.sizer.Sizeof(req) - // The size might be not in sync with the queue in case it's restored from the disk - // because we don't flush the current queue size on the disk on every read/write. - // In that case we need to make sure it doesn't go below 0. - if pq.queueSize < 0 { + // Read until either a successful retrieved element or no more elements in the storage. + for pq.readIndex != pq.writeIndex { + index, req, consumed := pq.getNextItem(ctx) + // Ensure the used size and the channel size are in sync. + if pq.readIndex == pq.writeIndex { pq.queueSize = 0 + pq.hasMoreSpace.Signal() + } + if consumed { + pq.queueSize -= pq.set.sizer.Sizeof(req) + // The size might be not in sync with the queue in case it's restored from the disk + // because we don't flush the current queue size on the disk on every read/write. + // In that case we need to make sure it doesn't go below 0. + if pq.queueSize < 0 { + pq.queueSize = 0 + } + pq.hasMoreSpace.Signal() + + return index, context.Background(), req, true } - - return index, context.Background(), req, true } - // If we did not consume any element retry from the beginning. + // TODO: Change the Queue interface to return an error to allow distinguish between shutdown and context canceled. + // Ok to ignore the error, since the context.Background() will never be done. + _ = pq.hasMoreElements.Wait(context.Background()) } } @@ -363,11 +376,6 @@ func (pq *persistentQueue[T]) OnProcessingFinished(index uint64, consumeErr erro pq.logger.Error("Error writing queue size to storage", zap.Error(qsErr)) } } - - // Ensure the used size and the channel size are in sync. - if pq.readIndex == pq.writeIndex { - pq.queueSize = 0 - } } // retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage diff --git a/exporter/exporterqueue/persistent_queue_test.go b/exporter/exporterqueue/persistent_queue_test.go index 6866573efdb7..b4cb71eb9b6b 100644 --- a/exporter/exporterqueue/persistent_queue_test.go +++ b/exporter/exporterqueue/persistent_queue_test.go @@ -5,8 +5,10 @@ package exporterqueue import ( "context" + "encoding/binary" "errors" "fmt" + "math" "strconv" "sync" "sync/atomic" @@ -23,26 +25,28 @@ import ( "go.opentelemetry.io/collector/exporter/internal/storagetest" "go.opentelemetry.io/collector/extension/extensiontest" "go.opentelemetry.io/collector/extension/xextension/storage" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pipeline" ) // itemsSizer is a sizer implementation that returns the size of a queue element as the number of items it contains. type itemsSizer struct{} -func (is *itemsSizer) Sizeof(el ptrace.Traces) int64 { - return int64(el.SpanCount()) +func (is *itemsSizer) Sizeof(val uint64) int64 { + if val > math.MaxInt64 { + return math.MaxInt64 + } + return int64(val) } -func marshalTracesRequest(td ptrace.Traces) ([]byte, error) { - marshaler := &ptrace.ProtoMarshaler{} - return marshaler.MarshalTraces(td) +func uint64Marshaler(val uint64) ([]byte, error) { + return binary.LittleEndian.AppendUint64([]byte{}, val), nil } -func unmarshalTracesRequest(bytes []byte) (ptrace.Traces, error) { - unmarshaler := &ptrace.ProtoUnmarshaler{} - return unmarshaler.UnmarshalTraces(bytes) +func uint64Unmarshaler(bytes []byte) (uint64, error) { + if len(bytes) < 8 { + return 0, errInvalidValue + } + return binary.LittleEndian.Uint64(bytes), nil } type mockHost struct { @@ -213,16 +217,15 @@ func (m *fakeStorageClientWithErrors) Reset() { } // createAndStartTestPersistentQueue creates and starts a fake queue with the given capacity and number of consumers. -func createAndStartTestPersistentQueue(t *testing.T, sizer sizer[ptrace.Traces], capacity int64, numConsumers int, - consumeFunc func(_ context.Context, item ptrace.Traces) error, -) Queue[ptrace.Traces] { - pq := newPersistentQueue[ptrace.Traces](persistentQueueSettings[ptrace.Traces]{ +func createAndStartTestPersistentQueue(t *testing.T, sizer sizer[uint64], capacity int64, numConsumers int, + consumeFunc func(_ context.Context, item uint64) error) Queue[uint64] { + pq := newPersistentQueue[uint64](persistentQueueSettings[uint64]{ sizer: sizer, capacity: capacity, signal: pipeline.SignalTraces, storageID: component.ID{}, - marshaler: marshalTracesRequest, - unmarshaler: unmarshalTracesRequest, + marshaler: uint64Marshaler, + unmarshaler: uint64Unmarshaler, set: exportertest.NewNopSettings(), }) host := &mockHost{ext: map[component.ID]component.Component{ @@ -237,40 +240,40 @@ func createAndStartTestPersistentQueue(t *testing.T, sizer sizer[ptrace.Traces], return pq } -func createTestPersistentQueueWithClient(client storage.Client) *persistentQueue[ptrace.Traces] { - pq := newPersistentQueue[ptrace.Traces](persistentQueueSettings[ptrace.Traces]{ - sizer: &requestSizer[ptrace.Traces]{}, +func createTestPersistentQueueWithClient(client storage.Client) *persistentQueue[uint64] { + pq := newPersistentQueue[uint64](persistentQueueSettings[uint64]{ + sizer: &requestSizer[uint64]{}, capacity: 1000, signal: pipeline.SignalTraces, storageID: component.ID{}, - marshaler: marshalTracesRequest, - unmarshaler: unmarshalTracesRequest, + marshaler: uint64Marshaler, + unmarshaler: uint64Unmarshaler, set: exportertest.NewNopSettings(), - }).(*persistentQueue[ptrace.Traces]) + }).(*persistentQueue[uint64]) pq.initClient(context.Background(), client) return pq } -func createTestPersistentQueueWithRequestsCapacity(tb testing.TB, ext storage.Extension, capacity int64) *persistentQueue[ptrace.Traces] { - return createTestPersistentQueueWithCapacityLimiter(tb, ext, &requestSizer[ptrace.Traces]{}, capacity) +func createTestPersistentQueueWithRequestsCapacity(tb testing.TB, ext storage.Extension, capacity int64) *persistentQueue[uint64] { + return createTestPersistentQueueWithCapacityLimiter(tb, ext, &requestSizer[uint64]{}, capacity) } -func createTestPersistentQueueWithItemsCapacity(tb testing.TB, ext storage.Extension, capacity int64) *persistentQueue[ptrace.Traces] { +func createTestPersistentQueueWithItemsCapacity(tb testing.TB, ext storage.Extension, capacity int64) *persistentQueue[uint64] { return createTestPersistentQueueWithCapacityLimiter(tb, ext, &itemsSizer{}, capacity) } -func createTestPersistentQueueWithCapacityLimiter(tb testing.TB, ext storage.Extension, sizer sizer[ptrace.Traces], +func createTestPersistentQueueWithCapacityLimiter(tb testing.TB, ext storage.Extension, sizer sizer[uint64], capacity int64, -) *persistentQueue[ptrace.Traces] { - pq := newPersistentQueue[ptrace.Traces](persistentQueueSettings[ptrace.Traces]{ +) *persistentQueue[uint64] { + pq := newPersistentQueue[uint64](persistentQueueSettings[uint64]{ sizer: sizer, capacity: capacity, signal: pipeline.SignalTraces, storageID: component.ID{}, - marshaler: marshalTracesRequest, - unmarshaler: unmarshalTracesRequest, + marshaler: uint64Marshaler, + unmarshaler: uint64Unmarshaler, set: exportertest.NewNopSettings(), - }).(*persistentQueue[ptrace.Traces]) + }).(*persistentQueue[uint64]) require.NoError(tb, pq.Start(context.Background(), &mockHost{ext: map[component.ID]component.Component{{}: ext}})) return pq } @@ -278,13 +281,13 @@ func createTestPersistentQueueWithCapacityLimiter(tb testing.TB, ext storage.Ext func TestPersistentQueue_FullCapacity(t *testing.T) { tests := []struct { name string - sizer sizer[ptrace.Traces] + sizer sizer[uint64] capacity int64 sizeMultiplier int }{ { name: "requests_capacity", - sizer: &requestSizer[ptrace.Traces]{}, + sizer: &requestSizer[uint64]{}, capacity: 5, sizeMultiplier: 1, }, @@ -298,13 +301,15 @@ func TestPersistentQueue_FullCapacity(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { done := make(chan struct{}) - pq := createAndStartTestPersistentQueue(t, tt.sizer, tt.capacity, 1, func(context.Context, ptrace.Traces) error { - <-done - return nil - }) + pq := createAndStartTestPersistentQueue(t, + tt.sizer, tt.capacity, 1, + func(context.Context, uint64) error { + <-done + return nil + }) assert.Equal(t, 0, pq.Size()) - req := newTracesRequest(1, 10) + req := uint64(10) // First request is picked by the consumer. Wait until the consumer is blocked on done. require.NoError(t, pq.Offer(context.Background(), req)) @@ -313,7 +318,7 @@ func TestPersistentQueue_FullCapacity(t *testing.T) { }, 2*time.Second, 10*time.Millisecond) for i := 0; i < 10; i++ { - result := pq.Offer(context.Background(), newTracesRequest(1, 10)) + result := pq.Offer(context.Background(), uint64(10)) if i < 5 { require.NoError(t, result) } else { @@ -327,12 +332,12 @@ func TestPersistentQueue_FullCapacity(t *testing.T) { } func TestPersistentQueue_Shutdown(t *testing.T) { - pq := createAndStartTestPersistentQueue(t, &requestSizer[ptrace.Traces]{}, 1001, 100, func(context.Context, - ptrace.Traces, - ) error { - return nil - }) - req := newTracesRequest(1, 10) + pq := createAndStartTestPersistentQueue(t, + &requestSizer[uint64]{}, 1001, 1, + func(context.Context, uint64) error { + return nil + }) + req := uint64(10) for i := 0; i < 1000; i++ { assert.NoError(t, pq.Offer(context.Background(), req)) @@ -368,14 +373,13 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) { for _, c := range cases { t.Run(fmt.Sprintf("#messages: %d #consumers: %d", c.numMessagesProduced, c.numConsumers), func(t *testing.T) { - req := newTracesRequest(1, 10) - - numMessagesConsumed := &atomic.Int32{} - pq := createAndStartTestPersistentQueue(t, &requestSizer[ptrace.Traces]{}, 1000, c.numConsumers, - func(context.Context, - ptrace.Traces, - ) error { - numMessagesConsumed.Add(int32(1)) + req := uint64(10) + + consumed := &atomic.Int64{} + pq := createAndStartTestPersistentQueue(t, + &requestSizer[uint64]{}, 1000, c.numConsumers, + func(context.Context, uint64) error { + consumed.Add(int64(1)) return nil }) @@ -383,38 +387,71 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) { require.NoError(t, pq.Offer(context.Background(), req)) } + // Because the persistent queue is not draining after Shutdown, need to wait here for the drain. assert.Eventually(t, func() bool { - return c.numMessagesProduced == int(numMessagesConsumed.Load()) + return c.numMessagesProduced == int(consumed.Load()) }, 5*time.Second, 10*time.Millisecond) }) } } -func newTracesRequest(numTraces int, numSpans int) ptrace.Traces { - traces := ptrace.NewTraces() - batch := traces.ResourceSpans().AppendEmpty() - batch.Resource().Attributes().PutStr("resource-attr", "some-resource") - batch.Resource().Attributes().PutInt("num-traces", int64(numTraces)) - batch.Resource().Attributes().PutInt("num-spans", int64(numSpans)) - - for i := 0; i < numTraces; i++ { - traceID := pcommon.TraceID([16]byte{1, 2, 3, byte(i)}) - ils := batch.ScopeSpans().AppendEmpty() - for j := 0; j < numSpans; j++ { - span := ils.Spans().AppendEmpty() - span.SetTraceID(traceID) - span.SetSpanID([8]byte{1, 2, 3, byte(j)}) - span.SetName("should-not-be-changed") - span.Attributes().PutInt("int-attribute", int64(j)) - span.Attributes().PutStr("str-attribute-1", "foobar") - span.Attributes().PutStr("str-attribute-2", "fdslafjasdk12312312jkl") - span.Attributes().PutStr("str-attribute-3", "AbcDefGeKKjkfdsafasdfsdasdf") - span.Attributes().PutStr("str-attribute-4", "xxxxxx") - span.Attributes().PutStr("str-attribute-5", "abcdef") - } +func TestPersistentBlockingQueue(t *testing.T) { + tests := []struct { + name string + sizer sizer[uint64] + }{ + { + name: "requests_based", + sizer: &requestSizer[uint64]{}, + }, + { + name: "items_based", + sizer: &itemsSizer{}, + }, } - return traces + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pq := newPersistentQueue[uint64](persistentQueueSettings[uint64]{ + sizer: tt.sizer, + capacity: 100, + blocking: true, + signal: pipeline.SignalTraces, + storageID: component.ID{}, + marshaler: uint64Marshaler, + unmarshaler: uint64Unmarshaler, + set: exportertest.NewNopSettings(), + }) + host := &mockHost{ext: map[component.ID]component.Component{ + {}: storagetest.NewMockStorageExtension(nil), + }} + require.NoError(t, pq.Start(context.Background(), host)) + consumed := &atomic.Int64{} + ac := newAsyncConsumer(pq, 10, func(context.Context, uint64) error { + consumed.Add(int64(1)) + return nil + }) + + td := uint64(10) + wg := &sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 100_000; j++ { + assert.NoError(t, pq.Offer(context.Background(), td)) + } + }() + } + wg.Wait() + // Because the persistent queue is not draining after Shutdown, need to wait here for the drain. + assert.Eventually(t, func() bool { + return 1_000_000 == int(consumed.Load()) + }, 5*time.Second, 10*time.Millisecond) + assert.NoError(t, pq.Shutdown(context.Background())) + assert.NoError(t, ac.Shutdown(context.Background())) + }) + } } func TestToStorageClient(t *testing.T) { @@ -504,14 +541,12 @@ func TestInvalidStorageExtensionType(t *testing.T) { } func TestPersistentQueue_StopAfterBadStart(t *testing.T) { - pq := newPersistentQueue[ptrace.Traces](persistentQueueSettings[ptrace.Traces]{}) + pq := newPersistentQueue[uint64](persistentQueueSettings[uint64]{}) // verify that stopping a un-start/started w/error queue does not panic assert.NoError(t, pq.Shutdown(context.Background())) } func TestPersistentQueue_CorruptedData(t *testing.T) { - req := newTracesRequest(5, 10) - cases := []struct { name string corruptAllData bool @@ -569,11 +604,11 @@ func TestPersistentQueue_CorruptedData(t *testing.T) { // Put some items, make sure they are loaded and shutdown the storage... for i := 0; i < 3; i++ { - err := ps.Offer(context.Background(), req) + err := ps.Offer(context.Background(), uint64(50)) require.NoError(t, err) } assert.Equal(t, 3, ps.Size()) - require.True(t, consume(ps, func(context.Context, ptrace.Traces) error { + require.True(t, consume(ps, func(context.Context, uint64) error { return experr.NewShutdownErr(nil) })) assert.Equal(t, 2, ps.Size()) @@ -610,7 +645,7 @@ func TestPersistentQueue_CorruptedData(t *testing.T) { } func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) { - req := newTracesRequest(5, 10) + req := uint64(50) ext := storagetest.NewMockStorageExtension(nil) ps := createTestPersistentQueueWithRequestsCapacity(t, ext, 1000) @@ -646,8 +681,8 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) { // We should be able to pull all remaining items now for i := 0; i < 4; i++ { - consume(newPs, func(_ context.Context, traces ptrace.Traces) error { - assert.Equal(t, req, traces) + consume(newPs, func(_ context.Context, val uint64) error { + assert.Equal(t, req, val) return nil }) } @@ -669,7 +704,7 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) { // this test attempts to check if all the invariants are kept if the queue is recreated while // close to full and with some items dispatched func TestPersistentQueueStartWithNonDispatched(t *testing.T) { - req := newTracesRequest(5, 10) + req := uint64(50) ext := storagetest.NewMockStorageExtension(nil) ps := createTestPersistentQueueWithRequestsCapacity(t, ext, 5) @@ -680,7 +715,7 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) { require.NoError(t, err) } - require.True(t, consume(ps, func(context.Context, ptrace.Traces) error { + require.True(t, consume(ps, func(context.Context, uint64) error { // put one more item in require.NoError(t, ps.Offer(context.Background(), req)) require.Equal(t, 5, ps.Size()) @@ -694,7 +729,7 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) { } func TestPersistentQueueStartWithNonDispatchedConcurrent(t *testing.T) { - req := newTracesRequest(1, 1) + req := uint64(1) ext := storagetest.NewMockStorageExtensionWithDelay(nil, 20*time.Nanosecond) pq := createTestPersistentQueueWithItemsCapacity(t, ext, 25) @@ -724,7 +759,7 @@ func TestPersistentQueueStartWithNonDispatchedConcurrent(t *testing.T) { go func() { defer conWg.Done() for i := 0; i < 10; i++ { - assert.True(t, consume(pq, func(context.Context, ptrace.Traces) error { return nil })) + assert.True(t, consume(pq, func(context.Context, uint64) error { return nil })) } }() } @@ -758,7 +793,7 @@ func TestPersistentQueueStartWithNonDispatchedConcurrent(t *testing.T) { } func TestPersistentQueue_PutCloseReadClose(t *testing.T) { - req := newTracesRequest(5, 10) + req := uint64(50) ext := storagetest.NewMockStorageExtension(nil) ps := createTestPersistentQueueWithRequestsCapacity(t, ext, 1000) assert.Equal(t, 0, ps.Size()) @@ -775,59 +810,37 @@ func TestPersistentQueue_PutCloseReadClose(t *testing.T) { require.Equal(t, 2, newPs.Size()) // Let's read both of the elements we put - consume(newPs, func(_ context.Context, traces ptrace.Traces) error { - require.Equal(t, req, traces) + consume(newPs, func(_ context.Context, val uint64) error { + require.Equal(t, req, val) return nil }) assert.Equal(t, 1, newPs.Size()) - consume(newPs, func(_ context.Context, traces ptrace.Traces) error { - require.Equal(t, req, traces) + consume(newPs, func(_ context.Context, val uint64) error { + require.Equal(t, req, val) return nil }) require.Equal(t, 0, newPs.Size()) assert.NoError(t, newPs.Shutdown(context.Background())) } -func BenchmarkPersistentQueue_TraceSpans(b *testing.B) { - cases := []struct { - numTraces int - numSpansPerTrace int - }{ - { - numTraces: 1, - numSpansPerTrace: 1, - }, - { - numTraces: 1, - numSpansPerTrace: 10, - }, - { - numTraces: 10, - numSpansPerTrace: 10, - }, - } - - for _, c := range cases { - b.Run(fmt.Sprintf("#traces: %d #spansPerTrace: %d", c.numTraces, c.numSpansPerTrace), func(bb *testing.B) { - ext := storagetest.NewMockStorageExtension(nil) - ps := createTestPersistentQueueWithRequestsCapacity(b, ext, 10000000) +func BenchmarkPersistentQueue(b *testing.B) { + ext := storagetest.NewMockStorageExtension(nil) + ps := createTestPersistentQueueWithRequestsCapacity(b, ext, 10000000) - req := newTracesRequest(c.numTraces, c.numSpansPerTrace) + req := uint64(100) - bb.ReportAllocs() - bb.ResetTimer() + b.ReportAllocs() + b.ResetTimer() - for i := 0; i < bb.N; i++ { - require.NoError(bb, ps.Offer(context.Background(), req)) - } + for i := 0; i < b.N; i++ { + require.NoError(b, ps.Offer(context.Background(), req)) + } - for i := 0; i < bb.N; i++ { - require.True(bb, consume(ps, func(context.Context, ptrace.Traces) error { return nil })) - } - require.NoError(b, ext.Shutdown(context.Background())) - }) + for i := 0; i < b.N; i++ { + require.True(b, consume(ps, func(context.Context, uint64) error { return nil })) } + require.NoError(b, ext.Shutdown(context.Background())) } func TestItemIndexMarshaling(t *testing.T) { @@ -894,7 +907,7 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) { assert.Equal(t, 0, ps.Size()) assert.False(t, ps.client.(*storagetest.MockStorageClient).IsClosed()) - require.NoError(t, ps.Offer(context.Background(), newTracesRequest(5, 10))) + require.NoError(t, ps.Offer(context.Background(), uint64(50))) index, _, _, ok := ps.Read(context.Background()) require.True(t, ok) @@ -906,11 +919,9 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) { } func TestPersistentQueue_StorageFull(t *testing.T) { - req := newTracesRequest(5, 10) - marshaled, err := marshalTracesRequest(req) + marshaled, err := uint64Marshaler(uint64(50)) require.NoError(t, err) maxSizeInBytes := len(marshaled) * 5 // arbitrary small number - freeSpaceInBytes := 1 client := newFakeBoundedStorageClient(maxSizeInBytes) ps := createTestPersistentQueueWithClient(client) @@ -918,7 +929,7 @@ func TestPersistentQueue_StorageFull(t *testing.T) { // Put enough items in to fill the underlying storage reqCount := 0 for { - err = ps.Offer(context.Background(), req) + err = ps.Offer(context.Background(), uint64(50)) if errors.Is(err, syscall.ENOSPC) { break } @@ -929,24 +940,22 @@ func TestPersistentQueue_StorageFull(t *testing.T) { // Check that the size is correct require.Equal(t, reqCount, ps.Size(), "Size must be equal to the number of items inserted") - // Manually set the storage to only have a small amount of free space left - newMaxSize := client.GetSizeInBytes() + freeSpaceInBytes + // Manually set the storage to only have a small amount of free space left (needs 24). + newMaxSize := client.GetSizeInBytes() + 23 client.SetMaxSizeInBytes(newMaxSize) - // Try to put an item in, should fail - require.Error(t, ps.Offer(context.Background(), req)) - // Take out all the items // Getting the first item fails, as we can't update the state in storage, so we just delete it without returning it // Subsequent items succeed, as deleting the first item frees enough space for the state update reqCount-- for i := reqCount; i > 0; i-- { - require.True(t, consume(ps, func(context.Context, ptrace.Traces) error { return nil })) + require.True(t, consume(ps, func(context.Context, uint64) error { return nil })) } + require.Equal(t, 0, ps.Size()) // We should be able to put a new item in // However, this will fail if deleting items fails with full storage - require.NoError(t, ps.Offer(context.Background(), req)) + require.NoError(t, ps.Offer(context.Background(), uint64(50))) } func TestPersistentQueue_ItemDispatchingFinish_ErrorHandling(t *testing.T) { @@ -1008,16 +1017,16 @@ func TestPersistentQueue_ItemsCapacityUsageRestoredOnShutdown(t *testing.T) { assert.Equal(t, 0, pq.Size()) // Fill the queue up to the capacity. - assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(4, 10))) - assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(4, 10))) - assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(2, 10))) + assert.NoError(t, pq.Offer(context.Background(), uint64(40))) + assert.NoError(t, pq.Offer(context.Background(), uint64(40))) + assert.NoError(t, pq.Offer(context.Background(), uint64(20))) assert.Equal(t, 100, pq.Size()) - require.ErrorIs(t, pq.Offer(context.Background(), newTracesRequest(5, 5)), ErrQueueIsFull) + require.ErrorIs(t, pq.Offer(context.Background(), uint64(25)), ErrQueueIsFull) assert.Equal(t, 100, pq.Size()) - assert.True(t, consume(pq, func(_ context.Context, traces ptrace.Traces) error { - assert.Equal(t, 40, traces.SpanCount()) + assert.True(t, consume(pq, func(_ context.Context, val uint64) error { + assert.Equal(t, uint64(40), val) return nil })) assert.Equal(t, 60, pq.Size()) @@ -1029,19 +1038,19 @@ func TestPersistentQueue_ItemsCapacityUsageRestoredOnShutdown(t *testing.T) { // The queue should be restored to the previous size. assert.Equal(t, 60, newPQ.Size()) - require.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) + require.NoError(t, newPQ.Offer(context.Background(), uint64(10))) // Check the combined queue size. assert.Equal(t, 70, newPQ.Size()) - assert.True(t, consume(newPQ, func(_ context.Context, traces ptrace.Traces) error { - assert.Equal(t, 40, traces.SpanCount()) + assert.True(t, consume(newPQ, func(_ context.Context, val uint64) error { + assert.Equal(t, uint64(40), val) return nil })) assert.Equal(t, 30, newPQ.Size()) - assert.True(t, consume(newPQ, func(_ context.Context, traces ptrace.Traces) error { - assert.Equal(t, 20, traces.SpanCount()) + assert.True(t, consume(newPQ, func(_ context.Context, val uint64) error { + assert.Equal(t, uint64(20), val) return nil })) assert.Equal(t, 10, newPQ.Size()) @@ -1056,13 +1065,13 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { assert.Equal(t, 0, pq.Size()) - assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(4, 10))) - assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(2, 10))) - assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(5, 5))) + assert.NoError(t, pq.Offer(context.Background(), uint64(40))) + assert.NoError(t, pq.Offer(context.Background(), uint64(20))) + assert.NoError(t, pq.Offer(context.Background(), uint64(25))) assert.Equal(t, 3, pq.Size()) - assert.True(t, consume(pq, func(_ context.Context, traces ptrace.Traces) error { - assert.Equal(t, 40, traces.SpanCount()) + assert.True(t, consume(pq, func(_ context.Context, val uint64) error { + assert.Equal(t, uint64(40), val) return nil })) assert.Equal(t, 2, pq.Size()) @@ -1074,31 +1083,31 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { // The queue items size cannot be restored, fall back to request-based size assert.Equal(t, 2, newPQ.Size()) - require.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) + require.NoError(t, newPQ.Offer(context.Background(), uint64(10))) // Only new items are correctly reflected assert.Equal(t, 12, newPQ.Size()) // Consuming a restored request should reduce the restored size by 20 but it should not go to below zero - assert.True(t, consume(newPQ, func(_ context.Context, traces ptrace.Traces) error { - assert.Equal(t, 20, traces.SpanCount()) + assert.True(t, consume(newPQ, func(_ context.Context, val uint64) error { + assert.Equal(t, uint64(20), val) return nil })) assert.Equal(t, 0, newPQ.Size()) // Consuming another restored request should not affect the restored size since it's already dropped to 0. - assert.True(t, consume(newPQ, func(_ context.Context, traces ptrace.Traces) error { - assert.Equal(t, 25, traces.SpanCount()) + assert.True(t, consume(newPQ, func(_ context.Context, val uint64) error { + assert.Equal(t, uint64(25), val) return nil })) assert.Equal(t, 0, newPQ.Size()) // Adding another batch should update the size accordingly - require.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(5, 5))) + require.NoError(t, newPQ.Offer(context.Background(), uint64(25))) assert.Equal(t, 25, newPQ.Size()) - assert.True(t, consume(newPQ, func(_ context.Context, traces ptrace.Traces) error { - assert.Equal(t, 10, traces.SpanCount()) + assert.True(t, consume(newPQ, func(_ context.Context, val uint64) error { + assert.Equal(t, uint64(10), val) return nil })) assert.Equal(t, 15, newPQ.Size()) @@ -1114,15 +1123,15 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { assert.Equal(t, 0, pq.Size()) - assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(4, 10))) - assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(2, 10))) - assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(5, 5))) - assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(1, 5))) + assert.NoError(t, pq.Offer(context.Background(), uint64(40))) + assert.NoError(t, pq.Offer(context.Background(), uint64(20))) + assert.NoError(t, pq.Offer(context.Background(), uint64(25))) + assert.NoError(t, pq.Offer(context.Background(), uint64(5))) // Read the first request just to populate the read index in the storage. // Otherwise, the write index won't be restored either. - assert.True(t, consume(pq, func(_ context.Context, traces ptrace.Traces) error { - assert.Equal(t, 40, traces.SpanCount()) + assert.True(t, consume(pq, func(_ context.Context, val uint64) error { + assert.Equal(t, uint64(40), val) return nil })) assert.Equal(t, 3, pq.Size()) @@ -1137,25 +1146,25 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { assert.Equal(t, 3, newPQ.Size()) // Queue is full - require.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) + require.Error(t, newPQ.Offer(context.Background(), uint64(10))) - assert.True(t, consume(newPQ, func(_ context.Context, traces ptrace.Traces) error { - assert.Equal(t, 20, traces.SpanCount()) + assert.True(t, consume(newPQ, func(_ context.Context, val uint64) error { + assert.Equal(t, uint64(20), val) return nil })) assert.Equal(t, 2, newPQ.Size()) // Still full - require.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) + require.Error(t, newPQ.Offer(context.Background(), uint64(10))) - assert.True(t, consume(newPQ, func(_ context.Context, traces ptrace.Traces) error { - assert.Equal(t, 25, traces.SpanCount()) + assert.True(t, consume(newPQ, func(_ context.Context, val uint64) error { + assert.Equal(t, uint64(25), val) return nil })) assert.Equal(t, 1, newPQ.Size()) // Now it can accept new items - assert.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) + assert.NoError(t, newPQ.Offer(context.Background(), uint64(10))) assert.NoError(t, newPQ.Shutdown(context.Background())) } @@ -1169,13 +1178,13 @@ func TestPersistentQueue_RestoredUsedSizeIsCorrectedOnDrain(t *testing.T) { assert.Equal(t, 0, pq.Size()) for i := 0; i < 6; i++ { - require.NoError(t, pq.Offer(context.Background(), newTracesRequest(2, 5))) + require.NoError(t, pq.Offer(context.Background(), uint64(10))) } assert.Equal(t, 60, pq.Size()) // Consume 30 items for i := 0; i < 3; i++ { - assert.True(t, consume(pq, func(context.Context, ptrace.Traces) error { return nil })) + assert.True(t, consume(pq, func(context.Context, uint64) error { return nil })) } // The used size is now 30, but the snapshot should have 50, because it's taken every 5 read/writes. assert.Equal(t, 30, pq.Size()) @@ -1187,19 +1196,19 @@ func TestPersistentQueue_RestoredUsedSizeIsCorrectedOnDrain(t *testing.T) { // In reality the size should be 30. Once the queue is drained, it will be updated to the correct size. assert.Equal(t, 50, newPQ.Size()) - assert.True(t, consume(newPQ, func(context.Context, ptrace.Traces) error { return nil })) - assert.True(t, consume(newPQ, func(context.Context, ptrace.Traces) error { return nil })) + assert.True(t, consume(newPQ, func(context.Context, uint64) error { return nil })) + assert.True(t, consume(newPQ, func(context.Context, uint64) error { return nil })) assert.Equal(t, 30, newPQ.Size()) // Now the size must be correctly reflected - assert.True(t, consume(newPQ, func(context.Context, ptrace.Traces) error { return nil })) + assert.True(t, consume(newPQ, func(context.Context, uint64) error { return nil })) assert.Equal(t, 0, newPQ.Size()) assert.NoError(t, newPQ.Shutdown(context.Background())) assert.NoError(t, pq.Shutdown(context.Background())) } -func requireCurrentlyDispatchedItemsEqual(t *testing.T, pq *persistentQueue[ptrace.Traces], compare []uint64) { +func requireCurrentlyDispatchedItemsEqual(t *testing.T, pq *persistentQueue[uint64], compare []uint64) { pq.mu.Lock() defer pq.mu.Unlock() assert.ElementsMatch(t, compare, pq.currentlyDispatchedItems) diff --git a/exporter/exporterqueue/sized_queue.go b/exporter/exporterqueue/sized_queue.go index a9b58241c28d..d5eeb93903c3 100644 --- a/exporter/exporterqueue/sized_queue.go +++ b/exporter/exporterqueue/sized_queue.go @@ -50,22 +50,26 @@ type sizedQueue[T any] struct { sizer sizer[T] cap int64 - mu sync.Mutex - hasElements *sync.Cond - items *linkedQueue[T] - size int64 - stopped bool + mu sync.Mutex + hasMoreElements *cond + hasMoreSpace *cond + items *linkedQueue[T] + size int64 + stopped bool + blocking bool } // newSizedQueue creates a sized elements channel. Each element is assigned a size by the provided sizer. // capacity is the capacity of the queue. -func newSizedQueue[T any](capacity int64, sizer sizer[T]) *sizedQueue[T] { +func newSizedQueue[T any](capacity int64, sizer sizer[T], blocking bool) *sizedQueue[T] { sq := &sizedQueue[T]{ - sizer: sizer, - cap: capacity, - items: &linkedQueue[T]{}, + sizer: sizer, + cap: capacity, + items: &linkedQueue[T]{}, + blocking: blocking, } - sq.hasElements = sync.NewCond(&sq.mu) + sq.hasMoreElements = newCond(&sq.mu) + sq.hasMoreSpace = newCond(&sq.mu) return sq } @@ -84,14 +88,20 @@ func (sq *sizedQueue[T]) Offer(ctx context.Context, el T) error { sq.mu.Lock() defer sq.mu.Unlock() - if sq.size+elSize > sq.cap { - return ErrQueueIsFull + for sq.size+elSize > sq.cap { + if !sq.blocking { + return ErrQueueIsFull + } + // Wait for more space or before the ctx is Done. + if err := sq.hasMoreSpace.Wait(ctx); err != nil { + return err + } } sq.size += elSize sq.items.push(ctx, el, elSize) // Signal one consumer if any. - sq.hasElements.Signal() + sq.hasMoreElements.Signal() return nil } @@ -104,9 +114,10 @@ func (sq *sizedQueue[T]) pop() (context.Context, T, bool) { for { if sq.size > 0 { - ctx, el, elSize := sq.items.pop() + elCtx, el, elSize := sq.items.pop() sq.size -= elSize - return ctx, el, true + sq.hasMoreSpace.Signal() + return elCtx, el, true } if sq.stopped { @@ -114,7 +125,9 @@ func (sq *sizedQueue[T]) pop() (context.Context, T, bool) { return context.Background(), el, false } - sq.hasElements.Wait() + // TODO: Change the Queue interface to return an error to allow distinguish between shutdown and context canceled. + // Ok to ignore the error, since the context.Background() will never be done. + _ = sq.hasMoreElements.Wait(context.Background()) } } @@ -123,7 +136,7 @@ func (sq *sizedQueue[T]) Shutdown(context.Context) error { sq.mu.Lock() defer sq.mu.Unlock() sq.stopped = true - sq.hasElements.Broadcast() + sq.hasMoreElements.Broadcast() return nil } diff --git a/exporter/exporterqueue/sized_queue_test.go b/exporter/exporterqueue/sized_queue_test.go index 4fa5e81dee8a..66632acdb743 100644 --- a/exporter/exporterqueue/sized_queue_test.go +++ b/exporter/exporterqueue/sized_queue_test.go @@ -18,7 +18,7 @@ func (s sizerInt) Sizeof(el int) int64 { } func TestSizedQueue(t *testing.T) { - q := newSizedQueue[int](7, sizerInt{}) + q := newSizedQueue[int](7, sizerInt{}, false) require.NoError(t, q.Offer(context.Background(), 1)) assert.Equal(t, 1, q.Size()) assert.Equal(t, 7, q.Capacity()) @@ -47,7 +47,7 @@ func TestSizedQueue(t *testing.T) { } func TestSizedQueue_DrainAllElements(t *testing.T) { - q := newSizedQueue[int](7, sizerInt{}) + q := newSizedQueue[int](7, sizerInt{}, false) require.NoError(t, q.Offer(context.Background(), 1)) require.NoError(t, q.Offer(context.Background(), 3)) @@ -68,12 +68,12 @@ func TestSizedQueue_DrainAllElements(t *testing.T) { } func TestSizedChannel_OfferInvalidSize(t *testing.T) { - q := newSizedQueue[int](1, sizerInt{}) + q := newSizedQueue[int](1, sizerInt{}, false) require.ErrorIs(t, q.Offer(context.Background(), -1), errInvalidSize) } func TestSizedChannel_OfferZeroSize(t *testing.T) { - q := newSizedQueue[int](1, sizerInt{}) + q := newSizedQueue[int](1, sizerInt{}, false) require.NoError(t, q.Offer(context.Background(), 0)) require.NoError(t, q.Shutdown(context.Background())) // Because the size 0 is ignored, nothing to drain.