Skip to content

Commit

Permalink
Add capability for memory and persistent queue to block when add items
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jan 11, 2025
1 parent 7f4664e commit e5ca0d6
Show file tree
Hide file tree
Showing 8 changed files with 406 additions and 253 deletions.
25 changes: 25 additions & 0 deletions .chloggen/add-blocking.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add capability for memory and persistent queue to block when add items

# One or more tracking issues or pull requests related to the change
issues: [12074]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
5 changes: 3 additions & 2 deletions exporter/exporterqueue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@ type boundedMemoryQueue[T any] struct {
type memoryQueueSettings[T any] struct {
sizer sizer[T]
capacity int64
blocking bool
}

// newBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) Queue[T] {
return &boundedMemoryQueue[T]{
sizedQueue: newSizedQueue[T](set.capacity, set.sizer),
sizedQueue: newSizedQueue[T](set.capacity, set.sizer, set.blocking),
}
}

func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) {
func (q *boundedMemoryQueue[T]) Read(context.Context) (uint64, context.Context, T, bool) {
ctx, req, ok := q.sizedQueue.pop()
return 0, ctx, req, ok
}
Expand Down
71 changes: 53 additions & 18 deletions exporter/exporterqueue/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/testdata"
)

// In this test we run a queue with capacity 1 and a single consumer.
Expand Down Expand Up @@ -102,11 +100,11 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
func TestQueueUsage(t *testing.T) {
tests := []struct {
name string
sizer sizer[ptrace.Traces]
sizer sizer[uint64]
}{
{
name: "requests_based",
sizer: &requestSizer[ptrace.Traces]{},
sizer: &requestSizer[uint64]{},
},
{
name: "items_based",
Expand All @@ -115,16 +113,15 @@ func TestQueueUsage(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
q := newBoundedMemoryQueue[ptrace.Traces](memoryQueueSettings[ptrace.Traces]{sizer: tt.sizer, capacity: int64(100)})
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(100)})
consumed := &atomic.Int64{}
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
ac := newAsyncConsumer(q, 1, func(context.Context, ptrace.Traces) error {
ac := newAsyncConsumer(q, 1, func(context.Context, uint64) error {
consumed.Add(1)
return nil
})
td := testdata.GenerateTraces(10)
for j := 0; j < 10; j++ {
require.NoError(t, q.Offer(context.Background(), td))
require.NoError(t, q.Offer(context.Background(), uint64(10)))
}
assert.NoError(t, q.Shutdown(context.Background()))
assert.NoError(t, ac.Shutdown(context.Background()))
Expand All @@ -133,6 +130,47 @@ func TestQueueUsage(t *testing.T) {
}
}

func TestBlockingQueueUsage(t *testing.T) {
tests := []struct {
name string
sizer sizer[uint64]
}{
{
name: "requests_based",
sizer: &requestSizer[uint64]{},
},
{
name: "items_based",
sizer: &itemsSizer{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(100), blocking: true})
consumed := &atomic.Int64{}
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
ac := newAsyncConsumer(q, 10, func(context.Context, uint64) error {
consumed.Add(1)
return nil
})
wg := &sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 100_000; j++ {
assert.NoError(t, q.Offer(context.Background(), uint64(10)))
}
}()
}
wg.Wait()
assert.NoError(t, q.Shutdown(context.Background()))
assert.NoError(t, ac.Shutdown(context.Background()))
assert.Equal(t, int64(1_000_000), consumed.Load())
})
}
}

func TestZeroSizeNoConsumers(t *testing.T) {
q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 0})

Expand All @@ -149,8 +187,7 @@ func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool
if !ok {
return false
}
consumeErr := consumeFunc(ctx, req)
q.OnProcessingFinished(index, consumeErr)
q.OnProcessingFinished(index, consumeFunc(ctx, req))
return true
}

Expand All @@ -170,8 +207,7 @@ func newAsyncConsumer[T any](q Queue[T], numConsumers int, consumeFunc func(cont
if !ok {
return
}
consumeErr := consumeFunc(ctx, req)
q.OnProcessingFinished(index, consumeErr)
q.OnProcessingFinished(index, consumeFunc(ctx, req))
}
}()
}
Expand All @@ -187,11 +223,11 @@ func (qc *asyncConsumer) Shutdown(_ context.Context) error {
func BenchmarkOffer(b *testing.B) {
tests := []struct {
name string
sizer sizer[ptrace.Traces]
sizer sizer[uint64]
}{
{
name: "requests_based",
sizer: &requestSizer[ptrace.Traces]{},
sizer: &requestSizer[uint64]{},
},
{
name: "items_based",
Expand All @@ -200,18 +236,17 @@ func BenchmarkOffer(b *testing.B) {
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
q := newBoundedMemoryQueue[ptrace.Traces](memoryQueueSettings[ptrace.Traces]{sizer: &requestSizer[ptrace.Traces]{}, capacity: int64(10 * b.N)})
q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: &requestSizer[uint64]{}, capacity: int64(10 * b.N)})
consumed := &atomic.Int64{}
require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost()))
ac := newAsyncConsumer(q, 1, func(context.Context, ptrace.Traces) error {
ac := newAsyncConsumer(q, 1, func(context.Context, uint64) error {
consumed.Add(1)
return nil
})
td := testdata.GenerateTraces(10)
b.ResetTimer()
b.ReportAllocs()
for j := 0; j < b.N; j++ {
require.NoError(b, q.Offer(context.Background(), td))
require.NoError(b, q.Offer(context.Background(), uint64(10)))
}
assert.NoError(b, q.Shutdown(context.Background()))
assert.NoError(b, ac.Shutdown(context.Background()))
Expand Down
62 changes: 62 additions & 0 deletions exporter/exporterqueue/cond.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"

import (
"context"
"sync"
)

// cond is equivalent with sync.Cond, but context.Context aware. Which means Wait() will return if context is done.
// Also, it requires the caller to hold the c.L during all calls.
type cond struct {
L sync.Locker
ch chan struct{}
waiting int64
}

func newCond(l sync.Locker) *cond {
return &cond{L: l, ch: make(chan struct{}, 1)}
}

// Signal wakes one goroutine waiting on c, if there is any.
// It requires for the caller to hold c.L during the call.
func (c *cond) Signal() {
if c.waiting == 0 {
return
}
c.waiting--
c.ch <- struct{}{}
}

// Broadcast wakes all goroutines waiting on c.
// It requires for the caller to hold c.L during the call.
func (c *cond) Broadcast() {
for ; c.waiting > 0; c.waiting-- {
c.ch <- struct{}{}
}
}

// Wait atomically unlocks c.L and suspends execution of the calling goroutine. After later resuming execution, Wait locks c.L before returning.
func (c *cond) Wait(ctx context.Context) error {
c.waiting++
c.L.Unlock()
select {
case <-ctx.Done():
c.L.Lock()
if c.waiting == 0 {
// If waiting is 0, it means that there was a signal sent and nobody else waits for it.
// Consume it, so that we don't unblock other consumer unnecessary,
// or we don't block the producer because the channel buffer is full.
<-c.ch
} else {
// Decrease the number of waiting routines.
c.waiting--
}
return ctx.Err()

Check warning on line 57 in exporter/exporterqueue/cond.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterqueue/cond.go#L46-L57

Added lines #L46 - L57 were not covered by tests
case <-c.ch:
c.L.Lock()
return nil
}
}
66 changes: 37 additions & 29 deletions exporter/exporterqueue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"strconv"
"sync"

"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -42,6 +41,7 @@ var (
type persistentQueueSettings[T any] struct {
sizer sizer[T]
capacity int64
blocking bool
signal pipeline.Signal
storageID component.ID
marshaler Marshaler[T]
Expand Down Expand Up @@ -81,7 +81,8 @@ type persistentQueue[T any] struct {

// mu guards everything declared below.
mu sync.Mutex
hasElements *sync.Cond
hasMoreElements *cond
hasMoreSpace *cond
readIndex uint64
writeIndex uint64
currentlyDispatchedItems []uint64
Expand All @@ -98,7 +99,8 @@ func newPersistentQueue[T any](set persistentQueueSettings[T]) Queue[T] {
logger: set.set.Logger,
isRequestSized: isRequestSized,
}
pq.hasElements = sync.NewCond(&pq.mu)
pq.hasMoreElements = newCond(&pq.mu)
pq.hasMoreSpace = newCond(&pq.mu)
return pq
}

Expand Down Expand Up @@ -194,8 +196,8 @@ func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error {
backupErr := pq.backupQueueSize(ctx)
// Mark this queue as stopped, so consumer don't start any more work.
pq.stopped = true
pq.hasElements.Broadcast()
return multierr.Combine(backupErr, pq.unrefClient(ctx))
pq.hasMoreElements.Broadcast()
return errors.Join(backupErr, pq.unrefClient(ctx))
}

// backupQueueSize writes the current queue size to storage. The value is used to recover the queue size
Expand Down Expand Up @@ -233,8 +235,13 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error {
// putInternal is the internal version that requires caller to hold the mutex lock.
func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
reqSize := pq.set.sizer.Sizeof(req)
if pq.queueSize+reqSize > pq.set.capacity {
return ErrQueueIsFull
for pq.queueSize+reqSize > pq.set.capacity {
if !pq.set.blocking {
return ErrQueueIsFull
}
if err := pq.hasMoreSpace.Wait(ctx); err != nil {
return err
}

Check warning on line 244 in exporter/exporterqueue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterqueue/persistent_queue.go#L243-L244

Added lines #L243 - L244 were not covered by tests
}

reqBuf, err := pq.set.marshaler(req)
Expand All @@ -253,7 +260,7 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {

pq.writeIndex++
pq.queueSize += reqSize
pq.hasElements.Signal()
pq.hasMoreElements.Signal()

// Back up the queue size to storage every 10 writes. The stored value is used to recover the queue size
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
Expand All @@ -269,32 +276,38 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, context.Context, T, bool) {
pq.mu.Lock()
defer pq.mu.Unlock()

for {
if pq.stopped {
var req T
return 0, context.Background(), req, false
}

// If queue is empty, wait until more elements and restart.
if pq.readIndex == pq.writeIndex {
pq.hasElements.Wait()
continue
}

index, req, consumed := pq.getNextItem(ctx)
if consumed {
pq.queueSize -= pq.set.sizer.Sizeof(req)
// The size might be not in sync with the queue in case it's restored from the disk
// because we don't flush the current queue size on the disk on every read/write.
// In that case we need to make sure it doesn't go below 0.
if pq.queueSize < 0 {
// Read until either a successful retrieved element or no more elements in the storage.
for pq.readIndex != pq.writeIndex {
index, req, consumed := pq.getNextItem(ctx)
// Ensure the used size and the channel size are in sync.
if pq.readIndex == pq.writeIndex {
pq.queueSize = 0
pq.hasMoreSpace.Signal()
}
if consumed {
pq.queueSize -= pq.set.sizer.Sizeof(req)
// The size might be not in sync with the queue in case it's restored from the disk
// because we don't flush the current queue size on the disk on every read/write.
// In that case we need to make sure it doesn't go below 0.
if pq.queueSize < 0 {
pq.queueSize = 0
}
pq.hasMoreSpace.Signal()

return index, context.Background(), req, true
}

return index, context.Background(), req, true
}

// If we did not consume any element retry from the beginning.
// TODO: Change the Queue interface to return an error to allow distinguish between shutdown and context canceled.
// Ok to ignore the error, since the context.Background() will never be done.
_ = pq.hasMoreElements.Wait(context.Background())
}
}

Expand Down Expand Up @@ -363,11 +376,6 @@ func (pq *persistentQueue[T]) OnProcessingFinished(index uint64, consumeErr erro
pq.logger.Error("Error writing queue size to storage", zap.Error(qsErr))
}
}

// Ensure the used size and the channel size are in sync.
if pq.readIndex == pq.writeIndex {
pq.queueSize = 0
}
}

// retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage
Expand Down
Loading

0 comments on commit e5ca0d6

Please sign in to comment.