From e6a5aeed7b12760f57ccac66e02f0828956f6517 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 9 Jan 2025 19:12:15 -0800 Subject: [PATCH] Change persistent queue to not use sized channel, improve memory usage and simplify sized_channel. (#12060) Signed-off-by: Bogdan Drutu --- .chloggen/change-persistent-queue.yaml | 25 +++ .../exporterqueue/bounded_memory_queue.go | 16 +- .../bounded_memory_queue_test.go | 33 ++- exporter/exporterqueue/persistent_queue.go | 206 ++++++++---------- .../exporterqueue/persistent_queue_test.go | 164 +++++++------- exporter/exporterqueue/sized_channel.go | 60 ++--- exporter/exporterqueue/sized_channel_test.go | 31 +-- 7 files changed, 252 insertions(+), 283 deletions(-) create mode 100644 .chloggen/change-persistent-queue.yaml diff --git a/.chloggen/change-persistent-queue.yaml b/.chloggen/change-persistent-queue.yaml new file mode 100644 index 00000000000..39e06ca0b89 --- /dev/null +++ b/.chloggen/change-persistent-queue.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Change persistent queue to not use sized channel, improve memory usage and simplify sized_channel. + +# One or more tracking issues or pull requests related to the change +issues: [12060] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/exporterqueue/bounded_memory_queue.go b/exporter/exporterqueue/bounded_memory_queue.go index ecf5ec1649f..7cb48ec77c4 100644 --- a/exporter/exporterqueue/bounded_memory_queue.go +++ b/exporter/exporterqueue/bounded_memory_queue.go @@ -17,7 +17,6 @@ import ( type boundedMemoryQueue[T any] struct { component.StartFunc *sizedChannel[memQueueEl[T]] - sizer sizer[T] } // memoryQueueSettings defines internal parameters for boundedMemoryQueue creation. @@ -30,18 +29,17 @@ type memoryQueueSettings[T any] struct { // callback for dropped items (e.g. useful to emit metrics). func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) Queue[T] { return &boundedMemoryQueue[T]{ - sizedChannel: newSizedChannel[memQueueEl[T]](set.capacity, nil, 0), - sizer: set.sizer, + sizedChannel: newSizedChannel[memQueueEl[T]](set.capacity, memQueueElSizer[T]{sizer: set.sizer}), } } // Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic. func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error { - return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req}, q.sizer.Sizeof(req), nil) + return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req}) } func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) { - item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) }) + item, ok := q.sizedChannel.pop() return 0, item.ctx, item.req, ok } @@ -60,3 +58,11 @@ type memQueueEl[T any] struct { req T ctx context.Context } + +type memQueueElSizer[T any] struct { + sizer sizer[T] +} + +func (mqes memQueueElSizer[T]) Sizeof(el memQueueEl[T]) int64 { + return mqes.sizer.Sizeof(el.req) +} diff --git a/exporter/exporterqueue/bounded_memory_queue_test.go b/exporter/exporterqueue/bounded_memory_queue_test.go index b56d5a57766..578733bc6eb 100644 --- a/exporter/exporterqueue/bounded_memory_queue_test.go +++ b/exporter/exporterqueue/bounded_memory_queue_test.go @@ -16,6 +16,8 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/testdata" ) // In this test we run a queue with capacity 1 and a single consumer. @@ -98,49 +100,50 @@ func TestShutdownWhileNotEmpty(t *testing.T) { } func Benchmark_QueueUsage_1000_requests(b *testing.B) { - benchmarkQueueUsage(b, &requestSizer[fakeReq]{}, 1000) + benchmarkQueueUsage(b, &requestSizer[ptrace.Traces]{}, 1000) } func Benchmark_QueueUsage_100000_requests(b *testing.B) { - benchmarkQueueUsage(b, &requestSizer[fakeReq]{}, 100000) + benchmarkQueueUsage(b, &requestSizer[ptrace.Traces]{}, 100000) } func Benchmark_QueueUsage_10000_items(b *testing.B) { // each request has 10 items: 1000 requests = 10000 items - benchmarkQueueUsage(b, &itemsSizer[fakeReq]{}, 1000) + benchmarkQueueUsage(b, &itemsSizer{}, 1000) } func Benchmark_QueueUsage_1M_items(b *testing.B) { // each request has 10 items: 100000 requests = 1M items - benchmarkQueueUsage(b, &itemsSizer[fakeReq]{}, 100000) + benchmarkQueueUsage(b, &itemsSizer{}, 100000) } func TestQueueUsage(t *testing.T) { t.Run("requests_based", func(t *testing.T) { - queueUsage(t, &requestSizer[fakeReq]{}, 10) + queueUsage(t, &requestSizer[ptrace.Traces]{}, 10) }) t.Run("items_based", func(t *testing.T) { - queueUsage(t, &itemsSizer[fakeReq]{}, 10) + queueUsage(t, &itemsSizer{}, 10) }) } -func benchmarkQueueUsage(b *testing.B, sizer sizer[fakeReq], requestsCount int) { +func benchmarkQueueUsage(b *testing.B, sizer sizer[ptrace.Traces], requestsCount int) { b.ReportAllocs() for i := 0; i < b.N; i++ { queueUsage(b, sizer, requestsCount) } } -func queueUsage(tb testing.TB, sizer sizer[fakeReq], requestsCount int) { - q := newBoundedMemoryQueue[fakeReq](memoryQueueSettings[fakeReq]{sizer: sizer, capacity: int64(10 * requestsCount)}) +func queueUsage(tb testing.TB, sizer sizer[ptrace.Traces], requestsCount int) { + q := newBoundedMemoryQueue[ptrace.Traces](memoryQueueSettings[ptrace.Traces]{sizer: sizer, capacity: int64(10 * requestsCount)}) consumed := &atomic.Int64{} require.NoError(tb, q.Start(context.Background(), componenttest.NewNopHost())) - ac := newAsyncConsumer(q, 1, func(context.Context, fakeReq) error { + ac := newAsyncConsumer(q, 1, func(context.Context, ptrace.Traces) error { consumed.Add(1) return nil }) + td := testdata.GenerateTraces(10) for j := 0; j < requestsCount; j++ { - require.NoError(tb, q.Offer(context.Background(), fakeReq{10})) + require.NoError(tb, q.Offer(context.Background(), td)) } assert.NoError(tb, q.Shutdown(context.Background())) assert.NoError(tb, ac.Shutdown(context.Background())) @@ -158,14 +161,6 @@ func TestZeroSizeNoConsumers(t *testing.T) { assert.NoError(t, q.Shutdown(context.Background())) } -type fakeReq struct { - itemsCount int -} - -func (r fakeReq) ItemsCount() int { - return r.itemsCount -} - func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool { index, ctx, req, ok := q.Read(context.Background()) if !ok { diff --git a/exporter/exporterqueue/persistent_queue.go b/exporter/exporterqueue/persistent_queue.go index d930df5fabf..586a5139630 100644 --- a/exporter/exporterqueue/persistent_queue.go +++ b/exporter/exporterqueue/persistent_queue.go @@ -21,6 +21,34 @@ import ( "go.opentelemetry.io/collector/pipeline" ) +const ( + zapKey = "key" + zapErrorCount = "errorCount" + zapNumberOfItems = "numberOfItems" + + readIndexKey = "ri" + writeIndexKey = "wi" + currentlyDispatchedItemsKey = "di" + queueSizeKey = "si" +) + +var ( + errValueNotSet = errors.New("value not set") + errInvalidValue = errors.New("invalid value") + errNoStorageClient = errors.New("no storage client extension found") + errWrongExtensionType = errors.New("requested extension is not a storage extension") +) + +type persistentQueueSettings[T any] struct { + sizer sizer[T] + capacity int64 + signal pipeline.Signal + storageID component.ID + marshaler Marshaler[T] + unmarshaler Unmarshaler[T] + set exporter.Settings +} + // persistentQueue provides a persistent queue implementation backed by file storage extension // // Write index describes the position at which next item is going to be stored. @@ -44,11 +72,6 @@ import ( // index index x // xxxx deleted type persistentQueue[T any] struct { - // sizedChannel is used by the persistent queue for two purposes: - // 1. a communication channel notifying the consumer that a new item is available. - // 2. capacity control based on the size of the items. - *sizedChannel[permanentQueueEl] - set persistentQueueSettings[T] logger *zap.Logger client storage.Client @@ -58,49 +81,25 @@ type persistentQueue[T any] struct { // mu guards everything declared below. mu sync.Mutex + hasElements *sync.Cond readIndex uint64 writeIndex uint64 currentlyDispatchedItems []uint64 + queueSize int64 refClient int64 stopped bool } -const ( - zapKey = "key" - zapErrorCount = "errorCount" - zapNumberOfItems = "numberOfItems" - - readIndexKey = "ri" - writeIndexKey = "wi" - currentlyDispatchedItemsKey = "di" - queueSizeKey = "si" -) - -var ( - errValueNotSet = errors.New("value not set") - errInvalidValue = errors.New("invalid value") - errNoStorageClient = errors.New("no storage client extension found") - errWrongExtensionType = errors.New("requested extension is not a storage extension") -) - -type persistentQueueSettings[T any] struct { - sizer sizer[T] - capacity int64 - signal pipeline.Signal - storageID component.ID - marshaler Marshaler[T] - unmarshaler Unmarshaler[T] - set exporter.Settings -} - // newPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage func newPersistentQueue[T any](set persistentQueueSettings[T]) Queue[T] { _, isRequestSized := set.sizer.(*requestSizer[T]) - return &persistentQueue[T]{ + pq := &persistentQueue[T]{ set: set, logger: set.set.Logger, isRequestSized: isRequestSized, } + pq.hasElements = sync.NewCond(&pq.mu) + return pq } // Start starts the persistentQueue with the given number of consumers. @@ -113,6 +112,16 @@ func (pq *persistentQueue[T]) Start(ctx context.Context, host component.Host) er return nil } +func (pq *persistentQueue[T]) Size() int { + pq.mu.Lock() + defer pq.mu.Unlock() + return int(pq.queueSize) +} + +func (pq *persistentQueue[T]) Capacity() int { + return int(pq.set.capacity) +} + func (pq *persistentQueue[T]) initClient(ctx context.Context, client storage.Client) { pq.client = client // Start with a reference 1 which is the reference we use for the producer goroutines and initialization. @@ -145,34 +154,18 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex pq.writeIndex = 0 } - initIndexSize := pq.writeIndex - pq.readIndex + queueSize := pq.writeIndex - pq.readIndex - var ( - initEls []permanentQueueEl - initQueueSize uint64 - ) - - // Pre-allocate the communication channel with the size of the restored queue. - if initIndexSize > 0 { - initQueueSize = initIndexSize - // If the queue is sized by the number of requests, no need to read the queue size from storage. - if !pq.isRequestSized { - if restoredQueueSize, err := pq.restoreQueueSizeFromStorage(ctx); err == nil { - initQueueSize = restoredQueueSize - } + // If the queue is sized by the number of requests, no need to read the queue size from storage. + if queueSize > 0 && !pq.isRequestSized { + if restoredQueueSize, err := pq.restoreQueueSizeFromStorage(ctx); err == nil { + queueSize = restoredQueueSize } - - // Ensure the communication channel filled with evenly sized elements up to the total restored queue size. - initEls = make([]permanentQueueEl, initIndexSize) } - // nolint: gosec - pq.sizedChannel = newSizedChannel[permanentQueueEl](pq.set.capacity, initEls, int64(initQueueSize)) + pq.queueSize = int64(queueSize) } -// permanentQueueEl is the type of the elements passed to the sizedChannel by the persistentQueue. -type permanentQueueEl struct{} - // restoreQueueSizeFromStorage restores the queue size from storage. func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) (uint64, error) { val, err := pq.client.Get(ctx, queueSizeKey) @@ -199,9 +192,9 @@ func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error { pq.mu.Lock() defer pq.mu.Unlock() backupErr := pq.backupQueueSize(ctx) - pq.sizedChannel.shutdown() // Mark this queue as stopped, so consumer don't start any more work. pq.stopped = true + pq.hasElements.Broadcast() return multierr.Combine(backupErr, pq.unrefClient(ctx)) } @@ -215,7 +208,7 @@ func (pq *persistentQueue[T]) backupQueueSize(ctx context.Context) error { } // nolint: gosec - return pq.client.Set(ctx, queueSizeKey, itemIndexToBytes(uint64(pq.Size()))) + return pq.client.Set(ctx, queueSizeKey, itemIndexToBytes(uint64(pq.queueSize))) } // unrefClient unrefs the client, and closes if no more references. Callers MUST hold the mutex. @@ -239,31 +232,29 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error { // putInternal is the internal version that requires caller to hold the mutex lock. func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { - err := pq.sizedChannel.push(permanentQueueEl{}, pq.set.sizer.Sizeof(req), func() error { - itemKey := getItemKey(pq.writeIndex) - newIndex := pq.writeIndex + 1 - - reqBuf, err := pq.set.marshaler(req) - if err != nil { - return err - } - - // Carry out a transaction where we both add the item and update the write index - ops := []*storage.Operation{ - storage.SetOperation(writeIndexKey, itemIndexToBytes(newIndex)), - storage.SetOperation(itemKey, reqBuf), - } - if storageErr := pq.client.Batch(ctx, ops...); storageErr != nil { - return storageErr - } + reqSize := pq.set.sizer.Sizeof(req) + if pq.queueSize+reqSize > pq.set.capacity { + return ErrQueueIsFull + } - pq.writeIndex = newIndex - return nil - }) + reqBuf, err := pq.set.marshaler(req) if err != nil { return err } + // Carry out a transaction where we both add the item and update the write index + ops := []*storage.Operation{ + storage.SetOperation(writeIndexKey, itemIndexToBytes(pq.writeIndex+1)), + storage.SetOperation(getItemKey(pq.writeIndex), reqBuf), + } + if err = pq.client.Batch(ctx, ops...); err != nil { + return err + } + + pq.writeIndex++ + pq.queueSize += reqSize + pq.hasElements.Signal() + // Back up the queue size to storage every 10 writes. The stored value is used to recover the queue size // in case if the collector is killed. The recovered queue size is allowed to be inaccurate. if (pq.writeIndex % 10) == 5 { @@ -276,29 +267,34 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { } func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, context.Context, T, bool) { + pq.mu.Lock() + defer pq.mu.Unlock() for { - var ( - index uint64 - req T - consumed bool - ) - _, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 { - size := int64(0) - index, req, consumed = pq.getNextItem(ctx) - if consumed { - size = pq.set.sizer.Sizeof(req) - } - return size - }) - if !ok { - return 0, nil, req, false + if pq.stopped { + var req T + return 0, context.Background(), req, false } + + // If queue is empty, wait until more elements and restart. + if pq.readIndex == pq.writeIndex { + pq.hasElements.Wait() + continue + } + + index, req, consumed := pq.getNextItem(ctx) if consumed { - return index, context.TODO(), req, true + pq.queueSize -= pq.set.sizer.Sizeof(req) + // The size might be not in sync with the queue in case it's restored from the disk + // because we don't flush the current queue size on the disk on every read/write. + // In that case we need to make sure it doesn't go below 0. + if pq.queueSize < 0 { + pq.queueSize = 0 + } + + return index, context.Background(), req, true } - // If ok && !consumed, it means we are stopped. In this case, we still process all the other events - // in the channel before, so we will free the channel fast and get to the stop. + // If we did not consume any element retry from the beginning. } } @@ -306,19 +302,6 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, context.Context // finished, the index should be called with OnProcessingFinished to clean up the storage. If no new item is available, // returns false. func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) { - pq.mu.Lock() - defer pq.mu.Unlock() - - var request T - - if pq.stopped { - return 0, request, false - } - - if pq.readIndex == pq.writeIndex { - return 0, request, false - } - index := pq.readIndex // Increase here, so even if errors happen below, it always iterates pq.readIndex++ @@ -329,6 +312,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) storage.SetOperation(currentlyDispatchedItemsKey, itemIndexArrayToBytes(pq.currentlyDispatchedItems)), getOp) + var request T if err == nil { request, err = pq.set.unmarshaler(getOp.Value) } @@ -381,7 +365,9 @@ func (pq *persistentQueue[T]) OnProcessingFinished(index uint64, consumeErr erro } // Ensure the used size and the channel size are in sync. - pq.sizedChannel.syncSize() + if pq.readIndex == pq.writeIndex { + pq.queueSize = 0 + } } // retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage diff --git a/exporter/exporterqueue/persistent_queue_test.go b/exporter/exporterqueue/persistent_queue_test.go index 18c85e92b80..6866573efdb 100644 --- a/exporter/exporterqueue/persistent_queue_test.go +++ b/exporter/exporterqueue/persistent_queue_test.go @@ -28,34 +28,21 @@ import ( "go.opentelemetry.io/collector/pipeline" ) -type itemsCounter interface { - ItemsCount() int -} - // itemsSizer is a sizer implementation that returns the size of a queue element as the number of items it contains. -type itemsSizer[T itemsCounter] struct{} - -func (is *itemsSizer[T]) Sizeof(el T) int64 { - return int64(el.ItemsCount()) -} +type itemsSizer struct{} -type tracesRequest struct { - traces ptrace.Traces +func (is *itemsSizer) Sizeof(el ptrace.Traces) int64 { + return int64(el.SpanCount()) } -func (tr tracesRequest) ItemsCount() int { - return tr.traces.SpanCount() -} - -func marshalTracesRequest(tr tracesRequest) ([]byte, error) { +func marshalTracesRequest(td ptrace.Traces) ([]byte, error) { marshaler := &ptrace.ProtoMarshaler{} - return marshaler.MarshalTraces(tr.traces) + return marshaler.MarshalTraces(td) } -func unmarshalTracesRequest(bytes []byte) (tracesRequest, error) { +func unmarshalTracesRequest(bytes []byte) (ptrace.Traces, error) { unmarshaler := &ptrace.ProtoUnmarshaler{} - traces, err := unmarshaler.UnmarshalTraces(bytes) - return tracesRequest{traces: traces}, err + return unmarshaler.UnmarshalTraces(bytes) } type mockHost struct { @@ -226,10 +213,10 @@ func (m *fakeStorageClientWithErrors) Reset() { } // createAndStartTestPersistentQueue creates and starts a fake queue with the given capacity and number of consumers. -func createAndStartTestPersistentQueue(t *testing.T, sizer sizer[tracesRequest], capacity int64, numConsumers int, - consumeFunc func(_ context.Context, item tracesRequest) error, -) Queue[tracesRequest] { - pq := newPersistentQueue[tracesRequest](persistentQueueSettings[tracesRequest]{ +func createAndStartTestPersistentQueue(t *testing.T, sizer sizer[ptrace.Traces], capacity int64, numConsumers int, + consumeFunc func(_ context.Context, item ptrace.Traces) error, +) Queue[ptrace.Traces] { + pq := newPersistentQueue[ptrace.Traces](persistentQueueSettings[ptrace.Traces]{ sizer: sizer, capacity: capacity, signal: pipeline.SignalTraces, @@ -250,32 +237,32 @@ func createAndStartTestPersistentQueue(t *testing.T, sizer sizer[tracesRequest], return pq } -func createTestPersistentQueueWithClient(client storage.Client) *persistentQueue[tracesRequest] { - pq := newPersistentQueue[tracesRequest](persistentQueueSettings[tracesRequest]{ - sizer: &requestSizer[tracesRequest]{}, +func createTestPersistentQueueWithClient(client storage.Client) *persistentQueue[ptrace.Traces] { + pq := newPersistentQueue[ptrace.Traces](persistentQueueSettings[ptrace.Traces]{ + sizer: &requestSizer[ptrace.Traces]{}, capacity: 1000, signal: pipeline.SignalTraces, storageID: component.ID{}, marshaler: marshalTracesRequest, unmarshaler: unmarshalTracesRequest, set: exportertest.NewNopSettings(), - }).(*persistentQueue[tracesRequest]) + }).(*persistentQueue[ptrace.Traces]) pq.initClient(context.Background(), client) return pq } -func createTestPersistentQueueWithRequestsCapacity(tb testing.TB, ext storage.Extension, capacity int64) *persistentQueue[tracesRequest] { - return createTestPersistentQueueWithCapacityLimiter(tb, ext, &requestSizer[tracesRequest]{}, capacity) +func createTestPersistentQueueWithRequestsCapacity(tb testing.TB, ext storage.Extension, capacity int64) *persistentQueue[ptrace.Traces] { + return createTestPersistentQueueWithCapacityLimiter(tb, ext, &requestSizer[ptrace.Traces]{}, capacity) } -func createTestPersistentQueueWithItemsCapacity(tb testing.TB, ext storage.Extension, capacity int64) *persistentQueue[tracesRequest] { - return createTestPersistentQueueWithCapacityLimiter(tb, ext, &itemsSizer[tracesRequest]{}, capacity) +func createTestPersistentQueueWithItemsCapacity(tb testing.TB, ext storage.Extension, capacity int64) *persistentQueue[ptrace.Traces] { + return createTestPersistentQueueWithCapacityLimiter(tb, ext, &itemsSizer{}, capacity) } -func createTestPersistentQueueWithCapacityLimiter(tb testing.TB, ext storage.Extension, sizer sizer[tracesRequest], +func createTestPersistentQueueWithCapacityLimiter(tb testing.TB, ext storage.Extension, sizer sizer[ptrace.Traces], capacity int64, -) *persistentQueue[tracesRequest] { - pq := newPersistentQueue[tracesRequest](persistentQueueSettings[tracesRequest]{ +) *persistentQueue[ptrace.Traces] { + pq := newPersistentQueue[ptrace.Traces](persistentQueueSettings[ptrace.Traces]{ sizer: sizer, capacity: capacity, signal: pipeline.SignalTraces, @@ -283,7 +270,7 @@ func createTestPersistentQueueWithCapacityLimiter(tb testing.TB, ext storage.Ext marshaler: marshalTracesRequest, unmarshaler: unmarshalTracesRequest, set: exportertest.NewNopSettings(), - }).(*persistentQueue[tracesRequest]) + }).(*persistentQueue[ptrace.Traces]) require.NoError(tb, pq.Start(context.Background(), &mockHost{ext: map[component.ID]component.Component{{}: ext}})) return pq } @@ -291,29 +278,27 @@ func createTestPersistentQueueWithCapacityLimiter(tb testing.TB, ext storage.Ext func TestPersistentQueue_FullCapacity(t *testing.T) { tests := []struct { name string - sizer sizer[tracesRequest] + sizer sizer[ptrace.Traces] capacity int64 sizeMultiplier int }{ { name: "requests_capacity", - sizer: &requestSizer[tracesRequest]{}, + sizer: &requestSizer[ptrace.Traces]{}, capacity: 5, sizeMultiplier: 1, }, { name: "items_capacity", - sizer: &itemsSizer[tracesRequest]{}, + sizer: &itemsSizer{}, capacity: 55, sizeMultiplier: 10, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - start := make(chan struct{}) done := make(chan struct{}) - pq := createAndStartTestPersistentQueue(t, tt.sizer, tt.capacity, 1, func(context.Context, tracesRequest) error { - <-start + pq := createAndStartTestPersistentQueue(t, tt.sizer, tt.capacity, 1, func(context.Context, ptrace.Traces) error { <-done return nil }) @@ -323,8 +308,9 @@ func TestPersistentQueue_FullCapacity(t *testing.T) { // First request is picked by the consumer. Wait until the consumer is blocked on done. require.NoError(t, pq.Offer(context.Background(), req)) - start <- struct{}{} - close(start) + assert.Eventually(t, func() bool { + return pq.Size() == 0 + }, 2*time.Second, 10*time.Millisecond) for i := 0; i < 10; i++ { result := pq.Offer(context.Background(), newTracesRequest(1, 10)) @@ -341,8 +327,8 @@ func TestPersistentQueue_FullCapacity(t *testing.T) { } func TestPersistentQueue_Shutdown(t *testing.T) { - pq := createAndStartTestPersistentQueue(t, &requestSizer[tracesRequest]{}, 1001, 100, func(context.Context, - tracesRequest, + pq := createAndStartTestPersistentQueue(t, &requestSizer[ptrace.Traces]{}, 1001, 100, func(context.Context, + ptrace.Traces, ) error { return nil }) @@ -385,9 +371,9 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) { req := newTracesRequest(1, 10) numMessagesConsumed := &atomic.Int32{} - pq := createAndStartTestPersistentQueue(t, &requestSizer[tracesRequest]{}, 1000, c.numConsumers, + pq := createAndStartTestPersistentQueue(t, &requestSizer[ptrace.Traces]{}, 1000, c.numConsumers, func(context.Context, - tracesRequest, + ptrace.Traces, ) error { numMessagesConsumed.Add(int32(1)) return nil @@ -404,7 +390,7 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) { } } -func newTracesRequest(numTraces int, numSpans int) tracesRequest { +func newTracesRequest(numTraces int, numSpans int) ptrace.Traces { traces := ptrace.NewTraces() batch := traces.ResourceSpans().AppendEmpty() batch.Resource().Attributes().PutStr("resource-attr", "some-resource") @@ -428,7 +414,7 @@ func newTracesRequest(numTraces int, numSpans int) tracesRequest { } } - return tracesRequest{traces: traces} + return traces } func TestToStorageClient(t *testing.T) { @@ -518,7 +504,7 @@ func TestInvalidStorageExtensionType(t *testing.T) { } func TestPersistentQueue_StopAfterBadStart(t *testing.T) { - pq := newPersistentQueue[tracesRequest](persistentQueueSettings[tracesRequest]{}) + pq := newPersistentQueue[ptrace.Traces](persistentQueueSettings[ptrace.Traces]{}) // verify that stopping a un-start/started w/error queue does not panic assert.NoError(t, pq.Shutdown(context.Background())) } @@ -587,7 +573,7 @@ func TestPersistentQueue_CorruptedData(t *testing.T) { require.NoError(t, err) } assert.Equal(t, 3, ps.Size()) - require.True(t, consume(ps, func(context.Context, tracesRequest) error { + require.True(t, consume(ps, func(context.Context, ptrace.Traces) error { return experr.NewShutdownErr(nil) })) assert.Equal(t, 2, ps.Size()) @@ -637,13 +623,13 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) { requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{}) // Takes index 0 in process. - _, readReq, found := ps.getNextItem(context.Background()) + _, _, readReq, found := ps.Read(context.Background()) require.True(t, found) assert.Equal(t, req, readReq) requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0}) // This takes item 1 to process. - secondIndex, secondReadReq, found := ps.getNextItem(context.Background()) + secondIndex, _, secondReadReq, found := ps.Read(context.Background()) require.True(t, found) assert.Equal(t, req, secondReadReq) requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0, 1}) @@ -660,7 +646,7 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) { // We should be able to pull all remaining items now for i := 0; i < 4; i++ { - consume(newPs, func(_ context.Context, traces tracesRequest) error { + consume(newPs, func(_ context.Context, traces ptrace.Traces) error { assert.Equal(t, req, traces) return nil }) @@ -694,7 +680,7 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) { require.NoError(t, err) } - require.True(t, consume(ps, func(context.Context, tracesRequest) error { + require.True(t, consume(ps, func(context.Context, ptrace.Traces) error { // put one more item in require.NoError(t, ps.Offer(context.Background(), req)) require.Equal(t, 5, ps.Size()) @@ -738,7 +724,7 @@ func TestPersistentQueueStartWithNonDispatchedConcurrent(t *testing.T) { go func() { defer conWg.Done() for i := 0; i < 10; i++ { - assert.True(t, consume(pq, func(context.Context, tracesRequest) error { return nil })) + assert.True(t, consume(pq, func(context.Context, ptrace.Traces) error { return nil })) } }() } @@ -768,7 +754,7 @@ func TestPersistentQueueStartWithNonDispatchedConcurrent(t *testing.T) { case <-doneCtx.Done(): assert.Fail(t, "timed out waiting for producers to complete") } - assert.Zero(t, pq.sizedChannel.Size()) + assert.Zero(t, pq.Size()) } func TestPersistentQueue_PutCloseReadClose(t *testing.T) { @@ -782,20 +768,20 @@ func TestPersistentQueue_PutCloseReadClose(t *testing.T) { assert.NoError(t, ps.Offer(context.Background(), req)) assert.Equal(t, 2, ps.Size()) // TODO: Remove this, after the initialization writes the readIndex. - _, _, _ = ps.getNextItem(context.Background()) + _, _, _, _ = ps.Read(context.Background()) require.NoError(t, ps.Shutdown(context.Background())) newPs := createTestPersistentQueueWithRequestsCapacity(t, ext, 1000) require.Equal(t, 2, newPs.Size()) // Let's read both of the elements we put - consume(newPs, func(_ context.Context, traces tracesRequest) error { + consume(newPs, func(_ context.Context, traces ptrace.Traces) error { require.Equal(t, req, traces) return nil }) assert.Equal(t, 1, newPs.Size()) - consume(newPs, func(_ context.Context, traces tracesRequest) error { + consume(newPs, func(_ context.Context, traces ptrace.Traces) error { require.Equal(t, req, traces) return nil }) @@ -837,7 +823,7 @@ func BenchmarkPersistentQueue_TraceSpans(b *testing.B) { } for i := 0; i < bb.N; i++ { - require.True(bb, consume(ps, func(context.Context, tracesRequest) error { return nil })) + require.True(bb, consume(ps, func(context.Context, ptrace.Traces) error { return nil })) } require.NoError(b, ext.Shutdown(context.Background())) }) @@ -910,7 +896,7 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) { require.NoError(t, ps.Offer(context.Background(), newTracesRequest(5, 10))) - index, _, ok := ps.getNextItem(context.Background()) + index, _, _, ok := ps.Read(context.Background()) require.True(t, ok) assert.False(t, ps.client.(*storagetest.MockStorageClient).IsClosed()) require.NoError(t, ps.Shutdown(context.Background())) @@ -955,7 +941,7 @@ func TestPersistentQueue_StorageFull(t *testing.T) { // Subsequent items succeed, as deleting the first item frees enough space for the state update reqCount-- for i := reqCount; i > 0; i-- { - require.True(t, consume(ps, func(context.Context, tracesRequest) error { return nil })) + require.True(t, consume(ps, func(context.Context, ptrace.Traces) error { return nil })) } // We should be able to put a new item in @@ -1030,8 +1016,8 @@ func TestPersistentQueue_ItemsCapacityUsageRestoredOnShutdown(t *testing.T) { require.ErrorIs(t, pq.Offer(context.Background(), newTracesRequest(5, 5)), ErrQueueIsFull) assert.Equal(t, 100, pq.Size()) - assert.True(t, consume(pq, func(_ context.Context, traces tracesRequest) error { - assert.Equal(t, 40, traces.traces.SpanCount()) + assert.True(t, consume(pq, func(_ context.Context, traces ptrace.Traces) error { + assert.Equal(t, 40, traces.SpanCount()) return nil })) assert.Equal(t, 60, pq.Size()) @@ -1048,14 +1034,14 @@ func TestPersistentQueue_ItemsCapacityUsageRestoredOnShutdown(t *testing.T) { // Check the combined queue size. assert.Equal(t, 70, newPQ.Size()) - assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { - assert.Equal(t, 40, traces.traces.SpanCount()) + assert.True(t, consume(newPQ, func(_ context.Context, traces ptrace.Traces) error { + assert.Equal(t, 40, traces.SpanCount()) return nil })) assert.Equal(t, 30, newPQ.Size()) - assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { - assert.Equal(t, 20, traces.traces.SpanCount()) + assert.True(t, consume(newPQ, func(_ context.Context, traces ptrace.Traces) error { + assert.Equal(t, 20, traces.SpanCount()) return nil })) assert.Equal(t, 10, newPQ.Size()) @@ -1075,8 +1061,8 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(5, 5))) assert.Equal(t, 3, pq.Size()) - assert.True(t, consume(pq, func(_ context.Context, traces tracesRequest) error { - assert.Equal(t, 40, traces.traces.SpanCount()) + assert.True(t, consume(pq, func(_ context.Context, traces ptrace.Traces) error { + assert.Equal(t, 40, traces.SpanCount()) return nil })) assert.Equal(t, 2, pq.Size()) @@ -1094,15 +1080,15 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { assert.Equal(t, 12, newPQ.Size()) // Consuming a restored request should reduce the restored size by 20 but it should not go to below zero - assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { - assert.Equal(t, 20, traces.traces.SpanCount()) + assert.True(t, consume(newPQ, func(_ context.Context, traces ptrace.Traces) error { + assert.Equal(t, 20, traces.SpanCount()) return nil })) assert.Equal(t, 0, newPQ.Size()) // Consuming another restored request should not affect the restored size since it's already dropped to 0. - assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { - assert.Equal(t, 25, traces.traces.SpanCount()) + assert.True(t, consume(newPQ, func(_ context.Context, traces ptrace.Traces) error { + assert.Equal(t, 25, traces.SpanCount()) return nil })) assert.Equal(t, 0, newPQ.Size()) @@ -1111,8 +1097,8 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { require.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(5, 5))) assert.Equal(t, 25, newPQ.Size()) - assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { - assert.Equal(t, 10, traces.traces.SpanCount()) + assert.True(t, consume(newPQ, func(_ context.Context, traces ptrace.Traces) error { + assert.Equal(t, 10, traces.SpanCount()) return nil })) assert.Equal(t, 15, newPQ.Size()) @@ -1135,8 +1121,8 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { // Read the first request just to populate the read index in the storage. // Otherwise, the write index won't be restored either. - assert.True(t, consume(pq, func(_ context.Context, traces tracesRequest) error { - assert.Equal(t, 40, traces.traces.SpanCount()) + assert.True(t, consume(pq, func(_ context.Context, traces ptrace.Traces) error { + assert.Equal(t, 40, traces.SpanCount()) return nil })) assert.Equal(t, 3, pq.Size()) @@ -1153,8 +1139,8 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { // Queue is full require.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) - assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { - assert.Equal(t, 20, traces.traces.SpanCount()) + assert.True(t, consume(newPQ, func(_ context.Context, traces ptrace.Traces) error { + assert.Equal(t, 20, traces.SpanCount()) return nil })) assert.Equal(t, 2, newPQ.Size()) @@ -1162,8 +1148,8 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { // Still full require.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) - assert.True(t, consume(newPQ, func(_ context.Context, traces tracesRequest) error { - assert.Equal(t, 25, traces.traces.SpanCount()) + assert.True(t, consume(newPQ, func(_ context.Context, traces ptrace.Traces) error { + assert.Equal(t, 25, traces.SpanCount()) return nil })) assert.Equal(t, 1, newPQ.Size()) @@ -1189,7 +1175,7 @@ func TestPersistentQueue_RestoredUsedSizeIsCorrectedOnDrain(t *testing.T) { // Consume 30 items for i := 0; i < 3; i++ { - assert.True(t, consume(pq, func(context.Context, tracesRequest) error { return nil })) + assert.True(t, consume(pq, func(context.Context, ptrace.Traces) error { return nil })) } // The used size is now 30, but the snapshot should have 50, because it's taken every 5 read/writes. assert.Equal(t, 30, pq.Size()) @@ -1201,19 +1187,19 @@ func TestPersistentQueue_RestoredUsedSizeIsCorrectedOnDrain(t *testing.T) { // In reality the size should be 30. Once the queue is drained, it will be updated to the correct size. assert.Equal(t, 50, newPQ.Size()) - assert.True(t, consume(newPQ, func(context.Context, tracesRequest) error { return nil })) - assert.True(t, consume(newPQ, func(context.Context, tracesRequest) error { return nil })) + assert.True(t, consume(newPQ, func(context.Context, ptrace.Traces) error { return nil })) + assert.True(t, consume(newPQ, func(context.Context, ptrace.Traces) error { return nil })) assert.Equal(t, 30, newPQ.Size()) // Now the size must be correctly reflected - assert.True(t, consume(newPQ, func(context.Context, tracesRequest) error { return nil })) + assert.True(t, consume(newPQ, func(context.Context, ptrace.Traces) error { return nil })) assert.Equal(t, 0, newPQ.Size()) assert.NoError(t, newPQ.Shutdown(context.Background())) assert.NoError(t, pq.Shutdown(context.Background())) } -func requireCurrentlyDispatchedItemsEqual(t *testing.T, pq *persistentQueue[tracesRequest], compare []uint64) { +func requireCurrentlyDispatchedItemsEqual(t *testing.T, pq *persistentQueue[ptrace.Traces], compare []uint64) { pq.mu.Lock() defer pq.mu.Unlock() assert.ElementsMatch(t, compare, pq.currentlyDispatchedItems) diff --git a/exporter/exporterqueue/sized_channel.go b/exporter/exporterqueue/sized_channel.go index 3a343769e1b..aeae9e32e28 100644 --- a/exporter/exporterqueue/sized_channel.go +++ b/exporter/exporterqueue/sized_channel.go @@ -8,7 +8,8 @@ import "sync/atomic" // sizedChannel is a channel wrapper for sized elements with a capacity set to a total size of all the elements. // The channel will accept elements until the total size of the elements reaches the capacity. type sizedChannel[T any] struct { - used *atomic.Int64 + sizer sizer[T] + used *atomic.Int64 // We need to store the capacity in a separate field because the capacity of the channel can be higher. // It happens when we restore a persistent queue from a disk that is bigger than the pre-configured capacity. @@ -19,24 +20,15 @@ type sizedChannel[T any] struct { // newSizedChannel creates a sized elements channel. Each element is assigned a size by the provided sizer. // chanCapacity is the capacity of the underlying channel which usually should be equal to the capacity of the queue to // avoid blocking the producer. Optionally, the channel can be preloaded with the elements and their total size. -func newSizedChannel[T any](capacity int64, els []T, totalSize int64) *sizedChannel[T] { +func newSizedChannel[T any](capacity int64, sizer sizer[T]) *sizedChannel[T] { used := &atomic.Int64{} - used.Store(totalSize) - - chCap := capacity - if chCap < int64(len(els)) { - chCap = int64(len(els)) - } - - ch := make(chan T, chCap) - for _, el := range els { - ch <- el - } + ch := make(chan T, capacity) return &sizedChannel[T]{ - used: used, - cap: capacity, - ch: ch, + sizer: sizer, + used: used, + cap: capacity, + ch: ch, } } @@ -44,17 +36,12 @@ func newSizedChannel[T any](capacity int64, els []T, totalSize int64) *sizedChan // Returns an error if the queue is full. The callback is called before the element is committed to the queue. // If the callback returns an error, the element is not put into the queue and the error is returned. // The size is the size of the element MUST be positive. -func (vcq *sizedChannel[T]) push(el T, size int64, callback func() error) error { - if vcq.used.Add(size) > vcq.cap { - vcq.used.Add(-size) +func (vcq *sizedChannel[T]) push(el T) error { + elSize := vcq.sizer.Sizeof(el) + if vcq.used.Add(elSize) > vcq.cap { + vcq.used.Add(-elSize) return ErrQueueIsFull } - if callback != nil { - if err := callback(); err != nil { - vcq.used.Add(-size) - return err - } - } select { // for persistent queue implementation, channel len can be out of sync with used size. Attempt to put it @@ -62,7 +49,7 @@ func (vcq *sizedChannel[T]) push(el T, size int64, callback func() error) error case vcq.ch <- el: return nil default: - vcq.used.Add(-size) + vcq.used.Add(-elSize) return ErrQueueIsFull } } @@ -71,33 +58,16 @@ func (vcq *sizedChannel[T]) push(el T, size int64, callback func() error) error // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped and emptied. // The callback is called before the element is removed from the queue. It must return the size of the element. -func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) { +func (vcq *sizedChannel[T]) pop() (T, bool) { el, ok := <-vcq.ch if !ok { return el, false } - size := callback(el) - - // The used size and the channel size might be not in sync with the queue in case it's restored from the disk - // because we don't flush the current queue size on the disk on every read/write. - // In that case we need to make sure it doesn't go below 0. - if vcq.used.Add(-size) < 0 { - vcq.used.Store(0) - } + vcq.used.Add(-vcq.sizer.Sizeof(el)) return el, true } -// syncSize updates the used size to 0 if the queue is empty. -// The caller must ensure that this call is not called concurrently with push. -// It's used by the persistent queue to ensure the used value correctly reflects the reality which may not be always -// the case in case if the queue size is restored from the disk after a crash. -func (vcq *sizedChannel[T]) syncSize() { - if len(vcq.ch) == 0 { - vcq.used.Store(0) - } -} - // shutdown closes the queue channel to initiate draining of the queue. func (vcq *sizedChannel[T]) shutdown() { close(vcq.ch) diff --git a/exporter/exporterqueue/sized_channel_test.go b/exporter/exporterqueue/sized_channel_test.go index 24aadb8aba0..9c42f4eaf4b 100644 --- a/exporter/exporterqueue/sized_channel_test.go +++ b/exporter/exporterqueue/sized_channel_test.go @@ -4,52 +4,53 @@ package exporterqueue import ( - "errors" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +type sizerInt struct{} + +func (s sizerInt) Sizeof(el int) int64 { + return int64(el) +} + func TestSizedCapacityChannel(t *testing.T) { - q := newSizedChannel[int](7, nil, 0) - require.NoError(t, q.push(1, 1, nil)) + q := newSizedChannel[int](7, sizerInt{}) + require.NoError(t, q.push(1)) assert.Equal(t, 1, q.Size()) assert.Equal(t, 7, q.Capacity()) - // failed callback should not allow the element to be added - require.Error(t, q.push(2, 2, func() error { return errors.New("failed") })) - assert.Equal(t, 1, q.Size()) - - require.NoError(t, q.push(3, 3, nil)) + require.NoError(t, q.push(3)) assert.Equal(t, 4, q.Size()) // should not be able to send to the full queue - require.Error(t, q.push(4, 4, nil)) + require.Error(t, q.push(4)) assert.Equal(t, 4, q.Size()) - el, ok := q.pop(func(el int) int64 { return int64(el) }) + el, ok := q.pop() assert.Equal(t, 1, el) assert.True(t, ok) assert.Equal(t, 3, q.Size()) - el, ok = q.pop(func(el int) int64 { return int64(el) }) + el, ok = q.pop() assert.Equal(t, 3, el) assert.True(t, ok) assert.Equal(t, 0, q.Size()) q.shutdown() - el, ok = q.pop(func(el int) int64 { return int64(el) }) + el, ok = q.pop() assert.False(t, ok) assert.Equal(t, 0, el) } func TestSizedCapacityChannel_Offer_sizedNotFullButChannelFull(t *testing.T) { - q := newSizedChannel[int](1, nil, 0) - require.NoError(t, q.push(1, 1, nil)) + q := newSizedChannel[int](1, sizerInt{}) + require.NoError(t, q.push(1)) q.used.Store(0) - err := q.push(1, 1, nil) + err := q.push(1) require.Error(t, err) assert.Equal(t, ErrQueueIsFull, err) }