Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoC] sdk/log: Record is concurrent safe #5478

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {

ticker := time.NewTicker(interval)
// TODO: investigate using a sync.Pool instead of cloning.
buf := make([]Record, b.batchSize)
buf := make([]*Record, b.batchSize)
go func() {
defer close(done)
defer ticker.Stop()
Expand All @@ -154,7 +154,7 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {
global.Warn("dropped log records", "dropped", d)
}

qLen := b.q.TryDequeue(buf, func(r []Record) bool {
qLen := b.q.TryDequeue(buf, func(r []*Record) bool {
ok := b.exporter.EnqueueExport(r)
if ok {
buf = slices.Clone(buf)
Expand All @@ -176,7 +176,7 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {
}

// OnEmit batches provided log record.
func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error {
func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error {
if b.stopped.Load() || b.q == nil {
return nil
}
Expand All @@ -193,7 +193,7 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error {
}

// Enabled returns if b is enabled.
func (b *BatchProcessor) Enabled(context.Context, Record) bool {
func (b *BatchProcessor) Enabled(context.Context, *Record) bool {
return !b.stopped.Load() && b.q != nil
}

Expand Down Expand Up @@ -230,10 +230,10 @@ func (b *BatchProcessor) ForceFlush(ctx context.Context) error {
return nil
}

buf := make([]Record, b.q.cap)
buf := make([]*Record, b.q.cap)
notFlushed := func() bool {
var flushed bool
_ = b.q.TryDequeue(buf, func(r []Record) bool {
_ = b.q.TryDequeue(buf, func(r []*Record) bool {
flushed = b.exporter.EnqueueExport(r)
return flushed
})
Expand Down Expand Up @@ -284,7 +284,7 @@ func (q *queue) Dropped() uint64 {
//
// If enqueueing r will exceed the capacity of q, the oldest Record held in q
// will be dropped and r retained.
func (q *queue) Enqueue(r Record) int {
func (q *queue) Enqueue(r *Record) int {
q.Lock()
defer q.Unlock()

Expand All @@ -309,7 +309,7 @@ func (q *queue) Enqueue(r Record) int {
//
// When write is called the lock of q is held. The write function must not call
// other methods of this q that acquire the lock.
func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int {
func (q *queue) TryDequeue(buf []*Record, write func([]*Record) bool) int {
q.Lock()
defer q.Unlock()

Expand All @@ -331,11 +331,11 @@ func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int {

// Flush returns all the Records held in the queue and resets it to be
// empty.
func (q *queue) Flush() []Record {
func (q *queue) Flush() []*Record {
q.Lock()
defer q.Unlock()

out := make([]Record, q.len)
out := make([]*Record, q.len)
for i := range out {
out[i] = q.read.Value
q.read = q.read.Next()
Expand Down
65 changes: 33 additions & 32 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestEmptyBatchConfig(t *testing.T) {
assert.NotPanics(t, func() {
var bp BatchProcessor
ctx := context.Background()
var record Record
record := new(Record)
assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit")
assert.False(t, bp.Enabled(ctx, record), "Enabled")
assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush")
Expand Down Expand Up @@ -197,10 +197,10 @@ func TestBatchProcessor(t *testing.T) {
WithExportInterval(time.Nanosecond),
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, size) {
assert.NoError(t, b.OnEmit(ctx, r))
for i := 0; i < size; i++ {
assert.NoError(t, b.OnEmit(ctx, new(Record)))
}
var got []Record
var got []*Record
assert.Eventually(t, func() bool {
for _, r := range e.Records() {
got = append(got, r...)
Expand All @@ -220,8 +220,8 @@ func TestBatchProcessor(t *testing.T) {
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, 10*batch) {
assert.NoError(t, b.OnEmit(ctx, r))
for i := 0; i < 10*batch; i++ {
assert.NoError(t, b.OnEmit(ctx, new(Record)))
}
assert.Eventually(t, func() bool {
return e.ExportN() > 1
Expand All @@ -243,8 +243,8 @@ func TestBatchProcessor(t *testing.T) {
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, 2*batch) {
assert.NoError(t, b.OnEmit(ctx, r))
for i := 0; i < 2*batch; i++ {
assert.NoError(t, b.OnEmit(ctx, new(Record)))
}

var n int
Expand All @@ -255,7 +255,7 @@ func TestBatchProcessor(t *testing.T) {

var err error
require.Eventually(t, func() bool {
err = b.OnEmit(ctx, Record{})
err = b.OnEmit(ctx, &Record{})
return true
}, time.Second, time.Microsecond, "OnEmit blocked")
assert.NoError(t, err)
Expand All @@ -272,10 +272,10 @@ func TestBatchProcessor(t *testing.T) {

t.Run("Enabled", func(t *testing.T) {
b := NewBatchProcessor(defaultNoopExporter)
assert.True(t, b.Enabled(ctx, Record{}))
assert.True(t, b.Enabled(ctx, &Record{}))

_ = b.Shutdown(ctx)
assert.False(t, b.Enabled(ctx, Record{}))
assert.False(t, b.Enabled(ctx, &Record{}))
})

t.Run("Shutdown", func(t *testing.T) {
Expand Down Expand Up @@ -303,15 +303,15 @@ func TestBatchProcessor(t *testing.T) {
assert.NoError(t, b.Shutdown(ctx))

want := e.ExportN()
assert.NoError(t, b.OnEmit(ctx, Record{}))
assert.NoError(t, b.OnEmit(ctx, &Record{}))
assert.Equal(t, want, e.ExportN(), "Export called after shutdown")
})

t.Run("ForceFlush", func(t *testing.T) {
e := newTestExporter(nil)
b := NewBatchProcessor(e)

assert.NoError(t, b.OnEmit(ctx, Record{}))
assert.NoError(t, b.OnEmit(ctx, &Record{}))
assert.NoError(t, b.Shutdown(ctx))

assert.NoError(t, b.ForceFlush(ctx))
Expand Down Expand Up @@ -344,7 +344,7 @@ func TestBatchProcessor(t *testing.T) {
)
t.Cleanup(func() { _ = b.Shutdown(ctx) })

var r Record
r := new(Record)
r.SetBody(log.BoolValue(true))
require.NoError(t, b.OnEmit(ctx, r))

Expand Down Expand Up @@ -381,7 +381,7 @@ func TestBatchProcessor(t *testing.T) {

// Enqueue 10 x "batch size" amount of records.
for i := 0; i < 10*batch; i++ {
require.NoError(t, b.OnEmit(ctx, Record{}))
require.NoError(t, b.OnEmit(ctx, &Record{}))
}
assert.Eventually(t, func() bool {
return e.ExportN() > 0 && len(b.exporter.input) == cap(b.exporter.input)
Expand Down Expand Up @@ -423,7 +423,7 @@ func TestBatchProcessor(t *testing.T) {
b := NewBatchProcessor(e)
t.Cleanup(func() { _ = b.Shutdown(ctx) })

var r Record
r := new(Record)
r.SetBody(log.BoolValue(true))
_ = b.OnEmit(ctx, r)
t.Cleanup(func() { _ = b.Shutdown(ctx) })
Expand Down Expand Up @@ -453,7 +453,7 @@ func TestBatchProcessor(t *testing.T) {
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
var r Record
r := new(Record)
// First record will be blocked by testExporter.Export
assert.NoError(t, b.OnEmit(ctx, r), "exported record")
require.Eventually(t, func() bool {
Expand Down Expand Up @@ -497,7 +497,7 @@ func TestBatchProcessor(t *testing.T) {
case <-ctx.Done():
return
default:
assert.NoError(t, b.OnEmit(ctx, Record{}))
assert.NoError(t, b.OnEmit(ctx, &Record{}))
// Ignore partial flush errors.
_ = b.ForceFlush(ctx)
}
Expand All @@ -521,7 +521,7 @@ func TestBatchProcessor(t *testing.T) {
}

func TestQueue(t *testing.T) {
var r Record
r := new(Record)
r.SetBody(log.BoolValue(true))

t.Run("newQueue", func(t *testing.T) {
Expand All @@ -537,7 +537,7 @@ func TestQueue(t *testing.T) {
const size = 2
q := newQueue(size)

var notR Record
notR := new(Record)
notR.SetBody(log.IntValue(10))

assert.Equal(t, 1, q.Enqueue(notR), "incomplete batch")
Expand All @@ -552,7 +552,7 @@ func TestQueue(t *testing.T) {
assert.Equal(t, 2, q.len, "length")
assert.Equal(t, size, q.cap, "capacity")

assert.Equal(t, []Record{r, r}, q.Flush(), "flushed Records")
assert.Equal(t, []*Record{r, r}, q.Flush(), "flushed Records")
})

t.Run("Dropped", func(t *testing.T) {
Expand All @@ -574,7 +574,7 @@ func TestQueue(t *testing.T) {
q.write = q.write.Next()
q.len = 1

assert.Equal(t, []Record{r}, q.Flush(), "flushed")
assert.Equal(t, []*Record{r}, q.Flush(), "flushed")
})

t.Run("TryFlush", func(t *testing.T) {
Expand All @@ -586,33 +586,34 @@ func TestQueue(t *testing.T) {
q.len++
}

buf := make([]Record, 1)
f := func([]Record) bool { return false }
buf := make([]*Record, 1)
buf[0] = new(Record)
f := func([]*Record) bool { return false }
assert.Equal(t, size-1, q.TryDequeue(buf, f), "not flushed")
require.Equal(t, size-1, q.len, "length")
require.NotSame(t, q.read, q.write, "read ring advanced")

var flushed []Record
f = func(r []Record) bool {
var flushed []*Record
f = func(r []*Record) bool {
flushed = append(flushed, r...)
return true
}
if assert.Equal(t, size-2, q.TryDequeue(buf, f), "did not flush len(buf)") {
assert.Equal(t, []Record{r}, flushed, "Records")
assert.Equal(t, []*Record{r}, flushed, "Records")
}

buf = slices.Grow(buf, size)
flushed = flushed[:0]
if assert.Equal(t, 0, q.TryDequeue(buf, f), "did not flush len(queue)") {
assert.Equal(t, []Record{r}, flushed, "Records")
assert.Equal(t, []*Record{r}, flushed, "Records")
}
})

t.Run("ConcurrentSafe", func(t *testing.T) {
const goRoutines = 10

flushed := make(chan []Record, goRoutines)
out := make([]Record, 0, goRoutines)
flushed := make(chan []*Record, goRoutines)
out := make([]*Record, 0, goRoutines)
done := make(chan struct{})
go func() {
defer close(done)
Expand All @@ -628,7 +629,7 @@ func TestQueue(t *testing.T) {
for i := 0; i < goRoutines; i++ {
go func() {
defer wg.Done()
b.Enqueue(Record{})
b.Enqueue(&Record{})
flushed <- b.Flush()
}()
}
Expand All @@ -642,7 +643,7 @@ func TestQueue(t *testing.T) {
}

func BenchmarkBatchProcessorOnEmit(b *testing.B) {
var r Record
r := new(Record)
body := log.BoolValue(true)
r.SetBody(body)

Expand Down
4 changes: 2 additions & 2 deletions sdk/log/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type timestampDecorator struct {
Processor
}

func (e timestampDecorator) OnEmit(ctx context.Context, r Record) error {
func (e timestampDecorator) OnEmit(ctx context.Context, r *Record) error {
r = r.Clone()
r.SetObservedTimestamp(time.Date(1988, time.November, 17, 0, 0, 0, 0, time.UTC))
return e.Processor.OnEmit(ctx, r)
Expand All @@ -93,7 +93,7 @@ type attrDecorator struct {
Processor
}

func (e attrDecorator) OnEmit(ctx context.Context, r Record) error {
func (e attrDecorator) OnEmit(ctx context.Context, r *Record) error {
r = r.Clone()
r.SetAttributes(log.String("replace", "me"))
return e.Processor.OnEmit(ctx, r)
Expand Down
Loading
Loading