diff --git a/cmd/query/app/apiv3/gateway_test.go b/cmd/query/app/apiv3/gateway_test.go index 27e992a6b73..e2b7a4541f4 100644 --- a/cmd/query/app/apiv3/gateway_test.go +++ b/cmd/query/app/apiv3/gateway_test.go @@ -85,7 +85,7 @@ func parseResponse(t *testing.T, body []byte, obj gogoproto.Message) { require.NoError(t, gogojsonpb.Unmarshal(bytes.NewBuffer(body), obj)) } -func makeTestTraceV2() ptrace.Traces { +func makeTestTrace() ptrace.Traces { trace := ptrace.NewTraces() resources := trace.ResourceSpans().AppendEmpty() scopes := resources.ScopeSpans().AppendEmpty() @@ -147,7 +147,7 @@ func (gw *testGateway) runGatewayGetTrace(t *testing.T) { gw.reader. On("GetTraces", matchContext, query). Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { - yield([]ptrace.Traces{makeTestTraceV2()}, nil) + yield([]ptrace.Traces{makeTestTrace()}, nil) })).Once() gw.getTracesAndVerify(t, "/api/v3/traces/1", traceID) } @@ -156,7 +156,7 @@ func (gw *testGateway) runGatewayFindTraces(t *testing.T) { q, qp := mockFindQueries() gw.reader.On("FindTraces", matchContext, qp). Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { - yield([]ptrace.Traces{makeTestTraceV2()}, nil) + yield([]ptrace.Traces{makeTestTrace()}, nil) })).Once() gw.getTracesAndVerify(t, "/api/v3/traces?"+q.Encode(), traceID) } diff --git a/cmd/query/app/apiv3/grpc_handler_test.go b/cmd/query/app/apiv3/grpc_handler_test.go index f37f0420fad..8f1ebc26eb0 100644 --- a/cmd/query/app/apiv3/grpc_handler_test.go +++ b/cmd/query/app/apiv3/grpc_handler_test.go @@ -109,7 +109,7 @@ func TestGetTrace(t *testing.T) { tsc := newTestServerClient(t) tsc.reader.On("GetTraces", matchContext, tc.expectedQuery). Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { - yield([]ptrace.Traces{makeTestTraceV2()}, nil) + yield([]ptrace.Traces{makeTestTrace()}, nil) })).Once() getTraceStream, err := tsc.client.GetTrace(context.Background(), &tc.request) @@ -162,7 +162,7 @@ func TestFindTraces(t *testing.T) { tsc := newTestServerClient(t) tsc.reader.On("FindTraces", matchContext, mock.AnythingOfType("tracestore.TraceQueryParams")). Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { - yield([]ptrace.Traces{makeTestTraceV2()}, nil) + yield([]ptrace.Traces{makeTestTrace()}, nil) })).Once() responseStream, err := tsc.client.FindTraces(context.Background(), &api_v3.FindTracesRequest{ @@ -188,7 +188,7 @@ func TestFindTracesSendError(t *testing.T) { reader := new(tracestoremocks.Reader) reader.On("FindTraces", mock.Anything, mock.AnythingOfType("tracestore.TraceQueryParams")). Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { - yield([]ptrace.Traces{makeTestTraceV2()}, nil) + yield([]ptrace.Traces{makeTestTrace()}, nil) })).Once() h := &Handler{ QueryService: querysvc.NewQueryService( diff --git a/cmd/query/app/apiv3/http_gateway_test.go b/cmd/query/app/apiv3/http_gateway_test.go index c42e8118d20..3fabd34fe85 100644 --- a/cmd/query/app/apiv3/http_gateway_test.go +++ b/cmd/query/app/apiv3/http_gateway_test.go @@ -129,7 +129,7 @@ func TestHTTPGatewayGetTrace(t *testing.T) { gw.reader. On("GetTraces", matchContext, tc.expectedQuery). Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { - yield([]ptrace.Traces{makeTestTraceV2()}, nil) + yield([]ptrace.Traces{makeTestTrace()}, nil) })).Once() q := url.Values{} diff --git a/cmd/query/app/querysvc/query_service.go b/cmd/query/app/querysvc/query_service.go index 57369669830..4598acb9924 100644 --- a/cmd/query/app/querysvc/query_service.go +++ b/cmd/query/app/querysvc/query_service.go @@ -63,8 +63,9 @@ type TraceQueryParameters struct { func NewQueryService(traceReader tracestore.Reader, dependencyReader depstore.Reader, options QueryServiceOptions) *QueryService { spanReader, err := v1adapter.GetV1Reader(traceReader) if err != nil { - // TODO: implement a reverse adapter to convert v2 reader to v1 reader - panic(err) + // if the spanstore.Reader is not available, downgrade the native tracestore.Reader to + // a spanstore.Reader + spanReader = v1adapter.NewSpanReader(traceReader) } qsvc := &QueryService{ spanReader: spanReader, diff --git a/cmd/query/app/querysvc/query_service_test.go b/cmd/query/app/querysvc/query_service_test.go index 5115d720b99..7a77d6c7016 100644 --- a/cmd/query/app/querysvc/query_service_test.go +++ b/cmd/query/app/querysvc/query_service_test.go @@ -24,6 +24,7 @@ import ( spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" "github.com/jaegertracing/jaeger/storage_v2/depstore" depsmocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" tracestoremocks "github.com/jaegertracing/jaeger/storage_v2/tracestore/mocks" "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) @@ -513,9 +514,32 @@ func TestMain(m *testing.M) { testutils.VerifyGoLeaks(m) } -func TestNewQueryService_PanicsForNonV1AdapterReader(t *testing.T) { - reader := &tracestoremocks.Reader{} - dependencyReader := &depsmocks.Reader{} - options := QueryServiceOptions{} - require.PanicsWithError(t, v1adapter.ErrV1ReaderNotAvailable.Error(), func() { NewQueryService(reader, dependencyReader, options) }) +func TestNewQueryService_UsesCorrectTypeForSpanReader(t *testing.T) { + tests := []struct { + name string + reader tracestore.Reader + expectedType spanstore.Reader + }{ + { + name: "wrapped spanstore.Reader gets extracted", + reader: func() tracestore.Reader { + reader := &spanstoremocks.Reader{} + return v1adapter.NewTraceReader(reader) + }(), + expectedType: &spanstoremocks.Reader{}, + }, + { + name: "tracestore.Reader gets downgraded to v1 spanstore.Reader", + reader: &tracestoremocks.Reader{}, + expectedType: &v1adapter.SpanReader{}, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + dependencyReader := &depsmocks.Reader{} + options := QueryServiceOptions{} + qs := NewQueryService(test.reader, dependencyReader, options) + assert.IsType(t, test.expectedType, qs.spanReader) + }) + } } diff --git a/storage_v2/v1adapter/spanreader.go b/storage_v2/v1adapter/spanreader.go new file mode 100644 index 00000000000..35b87f7bef2 --- /dev/null +++ b/storage_v2/v1adapter/spanreader.go @@ -0,0 +1,106 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package v1adapter + +import ( + "context" + "errors" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" +) + +var _ spanstore.Reader = (*SpanReader)(nil) + +var errTooManyTracesFound = errors.New("too many traces found") + +// SpanReader wraps a tracestore.Reader so that it can be downgraded to implement +// the v1 spanstore.Reader interface. +type SpanReader struct { + traceReader tracestore.Reader +} + +func NewSpanReader(traceReader tracestore.Reader) *SpanReader { + return &SpanReader{ + traceReader: traceReader, + } +} + +func (sr *SpanReader) GetTrace(ctx context.Context, query spanstore.GetTraceParameters) (*model.Trace, error) { + getTracesIter := sr.traceReader.GetTraces(ctx, tracestore.GetTraceParams{ + TraceID: query.TraceID.ToOTELTraceID(), + Start: query.StartTime, + End: query.EndTime, + }) + traces, err := V1TracesFromSeq2(getTracesIter) + if err != nil { + return nil, err + } + if len(traces) == 0 { + return nil, spanstore.ErrTraceNotFound + } else if len(traces) > 1 { + return nil, errTooManyTracesFound + } + return traces[0], nil +} + +func (sr *SpanReader) GetServices(ctx context.Context) ([]string, error) { + return sr.traceReader.GetServices(ctx) +} + +func (sr *SpanReader) GetOperations( + ctx context.Context, + query spanstore.OperationQueryParameters, +) ([]spanstore.Operation, error) { + o, err := sr.traceReader.GetOperations(ctx, tracestore.OperationQueryParams{ + ServiceName: query.ServiceName, + SpanKind: query.SpanKind, + }) + if err != nil || o == nil { + return nil, err + } + operations := []spanstore.Operation{} + for _, operation := range o { + operations = append(operations, spanstore.Operation{ + Name: operation.Name, + SpanKind: operation.SpanKind, + }) + } + return operations, nil +} + +func (sr *SpanReader) FindTraces( + ctx context.Context, + query *spanstore.TraceQueryParameters, +) ([]*model.Trace, error) { + getTracesIter := sr.traceReader.FindTraces(ctx, tracestore.TraceQueryParams{ + ServiceName: query.ServiceName, + OperationName: query.OperationName, + Tags: query.Tags, + StartTimeMin: query.StartTimeMin, + StartTimeMax: query.StartTimeMax, + DurationMin: query.DurationMin, + DurationMax: query.DurationMax, + NumTraces: query.NumTraces, + }) + return V1TracesFromSeq2(getTracesIter) +} + +func (sr *SpanReader) FindTraceIDs( + ctx context.Context, + query *spanstore.TraceQueryParameters, +) ([]model.TraceID, error) { + traceIDsIter := sr.traceReader.FindTraceIDs(ctx, tracestore.TraceQueryParams{ + ServiceName: query.ServiceName, + OperationName: query.OperationName, + Tags: query.Tags, + StartTimeMin: query.StartTimeMin, + StartTimeMax: query.StartTimeMax, + DurationMin: query.DurationMin, + DurationMax: query.DurationMax, + NumTraces: query.NumTraces, + }) + return V1TraceIDsFromSeq2(traceIDsIter) +} diff --git a/storage_v2/v1adapter/spanreader_test.go b/storage_v2/v1adapter/spanreader_test.go new file mode 100644 index 00000000000..7f19db825c4 --- /dev/null +++ b/storage_v2/v1adapter/spanreader_test.go @@ -0,0 +1,397 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package v1adapter + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/iter" + "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" + tracestoremocks "github.com/jaegertracing/jaeger/storage_v2/tracestore/mocks" +) + +func TestSpanReader_GetTrace(t *testing.T) { + tests := []struct { + name string + query spanstore.GetTraceParameters + expectedQuery tracestore.GetTraceParams + traces []ptrace.Traces + expectedTrace *model.Trace + err error + expectedErr error + }{ + { + name: "error getting trace", + query: spanstore.GetTraceParameters{ + TraceID: model.NewTraceID(1, 2), + }, + expectedQuery: tracestore.GetTraceParams{ + TraceID: [16]byte{0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2}, + }, + err: assert.AnError, + expectedErr: assert.AnError, + }, + { + name: "empty traces", + query: spanstore.GetTraceParameters{ + TraceID: model.NewTraceID(1, 2), + }, + expectedQuery: tracestore.GetTraceParams{ + TraceID: [16]byte{0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2}, + }, + traces: []ptrace.Traces{}, + expectedErr: spanstore.ErrTraceNotFound, + }, + { + name: "too many traces found", + query: spanstore.GetTraceParameters{ + TraceID: model.NewTraceID(1, 2), + }, + expectedQuery: tracestore.GetTraceParams{ + TraceID: [16]byte{0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2}, + }, + traces: func() []ptrace.Traces { + traces1 := ptrace.NewTraces() + resources1 := traces1.ResourceSpans().AppendEmpty() + resources1.Resource().Attributes().PutStr("service.name", "service1") + scopes1 := resources1.ScopeSpans().AppendEmpty() + span1 := scopes1.Spans().AppendEmpty() + span1.SetTraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2}) + + traces2 := ptrace.NewTraces() + resources2 := traces2.ResourceSpans().AppendEmpty() + resources2.Resource().Attributes().PutStr("service.name", "service1") + scopes2 := resources2.ScopeSpans().AppendEmpty() + span2 := scopes2.Spans().AppendEmpty() + span2.SetTraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3}) + + return []ptrace.Traces{traces1, traces2} + }(), + expectedErr: errTooManyTracesFound, + }, + { + name: "success", + query: spanstore.GetTraceParameters{ + TraceID: model.NewTraceID(1, 2), + }, + expectedQuery: tracestore.GetTraceParams{ + TraceID: [16]byte{0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2}, + }, + traces: func() []ptrace.Traces { + traces := ptrace.NewTraces() + resources := traces.ResourceSpans().AppendEmpty() + resources.Resource().Attributes().PutStr("service.name", "service") + scopes := resources.ScopeSpans().AppendEmpty() + span := scopes.Spans().AppendEmpty() + span.SetTraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2}) + span.SetSpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 3}) + span.SetName("span") + span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 0).UTC())) + return []ptrace.Traces{traces} + }(), + expectedTrace: &model.Trace{ + Spans: []*model.Span{ + { + TraceID: model.NewTraceID(1, 2), + SpanID: model.NewSpanID(3), + OperationName: "span", + Process: model.NewProcess("service", nil), + StartTime: time.Unix(0, 0).UTC(), + }, + }, + }, + }, + } + for _, test := range tests { + tr := tracestoremocks.Reader{} + tr.On("GetTraces", mock.Anything, mock.Anything). + Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield(test.traces, test.err) + })).Once() + + sr := NewSpanReader(&tr) + trace, err := sr.GetTrace(context.Background(), test.query) + require.ErrorIs(t, err, test.expectedErr) + require.Equal(t, test.expectedTrace, trace) + } +} + +func TestSpanReader_GetServices(t *testing.T) { + tests := []struct { + name string + services []string + expectedServices []string + err error + expectedErr error + }{ + { + name: "error getting services", + err: assert.AnError, + expectedErr: assert.AnError, + }, + { + name: "no services", + services: []string{}, + expectedServices: []string{}, + }, + { + name: "multiple services", + services: []string{"service1", "service2"}, + expectedServices: []string{"service1", "service2"}, + }, + } + + for _, test := range tests { + tr := tracestoremocks.Reader{} + tr.On("GetServices", mock.Anything). + Return(test.services, test.err).Once() + + sr := NewSpanReader(&tr) + services, err := sr.GetServices(context.Background()) + require.ErrorIs(t, err, test.expectedErr) + require.Equal(t, test.expectedServices, services) + } +} + +func TestSpanReader_GetOperations(t *testing.T) { + tests := []struct { + name string + query spanstore.OperationQueryParameters + expectedQuery tracestore.OperationQueryParams + operations []tracestore.Operation + expectedOperations []spanstore.Operation + err error + expectedErr error + }{ + { + name: "error getting operations", + query: spanstore.OperationQueryParameters{ + ServiceName: "service1", + }, + expectedQuery: tracestore.OperationQueryParams{ + ServiceName: "service1", + }, + err: assert.AnError, + expectedErr: assert.AnError, + }, + { + name: "no operations", + query: spanstore.OperationQueryParameters{ + ServiceName: "service1", + }, + expectedQuery: tracestore.OperationQueryParams{ + ServiceName: "service1", + }, + operations: []tracestore.Operation{}, + expectedOperations: []spanstore.Operation{}, + }, + { + name: "multiple operations", + query: spanstore.OperationQueryParameters{ + ServiceName: "service1", + }, + expectedQuery: tracestore.OperationQueryParams{ + ServiceName: "service1", + }, + operations: []tracestore.Operation{ + {Name: "operation1", SpanKind: "kind1"}, + {Name: "operation2", SpanKind: "kind2"}, + }, + expectedOperations: []spanstore.Operation{ + {Name: "operation1", SpanKind: "kind1"}, + {Name: "operation2", SpanKind: "kind2"}, + }, + }, + } + + for _, test := range tests { + tr := tracestoremocks.Reader{} + tr.On("GetOperations", mock.Anything, test.expectedQuery). + Return(test.operations, test.err).Once() + + sr := NewSpanReader(&tr) + ops, err := sr.GetOperations(context.Background(), test.query) + require.ErrorIs(t, err, test.expectedErr) + require.Equal(t, test.expectedOperations, ops) + } +} + +func TestSpanReader_FindTraces(t *testing.T) { + tests := []struct { + name string + query *spanstore.TraceQueryParameters + expectedQuery tracestore.TraceQueryParams + traces []ptrace.Traces + expectedTraces []*model.Trace + err error + expectedErr error + }{ + { + name: "error finding traces", + query: &spanstore.TraceQueryParameters{ + ServiceName: "service1", + }, + expectedQuery: tracestore.TraceQueryParams{ + ServiceName: "service1", + }, + err: assert.AnError, + expectedErr: assert.AnError, + }, + { + name: "no traces found", + query: &spanstore.TraceQueryParameters{ + ServiceName: "service1", + }, + expectedQuery: tracestore.TraceQueryParams{ + ServiceName: "service1", + }, + traces: []ptrace.Traces{}, + expectedTraces: nil, + }, + { + name: "multiple traces found", + query: &spanstore.TraceQueryParameters{ + ServiceName: "service1", + }, + expectedQuery: tracestore.TraceQueryParams{ + ServiceName: "service1", + }, + traces: func() []ptrace.Traces { + traces1 := ptrace.NewTraces() + resources1 := traces1.ResourceSpans().AppendEmpty() + resources1.Resource().Attributes().PutStr("service.name", "service1") + scopes1 := resources1.ScopeSpans().AppendEmpty() + span1 := scopes1.Spans().AppendEmpty() + span1.SetTraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2}) + span1.SetSpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 3}) + span1.SetName("span1") + span1.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 0).UTC())) + + traces2 := ptrace.NewTraces() + resources2 := traces2.ResourceSpans().AppendEmpty() + resources2.Resource().Attributes().PutStr("service.name", "service1") + scopes2 := resources2.ScopeSpans().AppendEmpty() + span2 := scopes2.Spans().AppendEmpty() + span2.SetTraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 5}) + span2.SetSpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 6}) + span2.SetName("span2") + span2.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, 0).UTC())) + + return []ptrace.Traces{traces1, traces2} + }(), + expectedTraces: []*model.Trace{ + { + Spans: []*model.Span{ + { + TraceID: model.NewTraceID(1, 2), + SpanID: model.NewSpanID(3), + OperationName: "span1", + Process: model.NewProcess("service1", nil), + StartTime: time.Unix(0, 0).UTC(), + }, + }, + }, + { + Spans: []*model.Span{ + { + TraceID: model.NewTraceID(4, 5), + SpanID: model.NewSpanID(6), + OperationName: "span2", + Process: model.NewProcess("service1", nil), + StartTime: time.Unix(0, 0).UTC(), + }, + }, + }, + }, + }, + } + + for _, test := range tests { + tr := tracestoremocks.Reader{} + tr.On("FindTraces", mock.Anything, test.expectedQuery). + Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield(test.traces, test.err) + })).Once() + + sr := NewSpanReader(&tr) + traces, err := sr.FindTraces(context.Background(), test.query) + require.ErrorIs(t, err, test.expectedErr) + require.Equal(t, test.expectedTraces, traces) + } +} + +func TestSpanReader_FindTraceIDs(t *testing.T) { + tests := []struct { + name string + query *spanstore.TraceQueryParameters + expectedQuery tracestore.TraceQueryParams + traceIDs []pcommon.TraceID + expectedTraceIDs []model.TraceID + err error + expectedErr error + }{ + { + name: "error finding trace IDs", + query: &spanstore.TraceQueryParameters{ + ServiceName: "service1", + }, + expectedQuery: tracestore.TraceQueryParams{ + ServiceName: "service1", + }, + err: assert.AnError, + expectedErr: assert.AnError, + }, + { + name: "no trace IDs found", + query: &spanstore.TraceQueryParameters{ + ServiceName: "service1", + }, + expectedQuery: tracestore.TraceQueryParams{ + ServiceName: "service1", + }, + traceIDs: []pcommon.TraceID{}, + expectedTraceIDs: nil, + }, + { + name: "multiple trace IDs found", + query: &spanstore.TraceQueryParameters{ + ServiceName: "service1", + }, + expectedQuery: tracestore.TraceQueryParams{ + ServiceName: "service1", + }, + traceIDs: []pcommon.TraceID{ + pcommon.TraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2}), + pcommon.TraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 4}), + }, + expectedTraceIDs: []model.TraceID{ + model.NewTraceID(1, 2), + model.NewTraceID(3, 4), + }, + }, + } + + for _, test := range tests { + tr := tracestoremocks.Reader{} + tr.On("FindTraceIDs", mock.Anything, test.expectedQuery). + Return(iter.Seq2[[]pcommon.TraceID, error](func(yield func([]pcommon.TraceID, error) bool) { + yield(test.traceIDs, test.err) + })).Once() + + sr := NewSpanReader(&tr) + traceIDs, err := sr.FindTraceIDs(context.Background(), test.query) + require.ErrorIs(t, err, test.expectedErr) + require.Equal(t, test.expectedTraceIDs, traceIDs) + } +} diff --git a/storage_v2/v1adapter/translator.go b/storage_v2/v1adapter/translator.go index fe788cd87a7..a3d1cb41254 100644 --- a/storage_v2/v1adapter/translator.go +++ b/storage_v2/v1adapter/translator.go @@ -61,6 +61,27 @@ func V1TracesFromSeq2(otelSeq iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace return jaegerTraces, nil } +func V1TraceIDsFromSeq2(traceIDsIter iter.Seq2[[]pcommon.TraceID, error]) ([]model.TraceID, error) { + var ( + iterErr error + modelTraceIDs []model.TraceID + ) + traceIDsIter(func(traceIDs []pcommon.TraceID, err error) bool { + if err != nil { + iterErr = err + return false + } + for _, traceID := range traceIDs { + modelTraceIDs = append(modelTraceIDs, model.TraceIDFromOTEL(traceID)) + } + return true + }) + if iterErr != nil { + return nil, iterErr + } + return modelTraceIDs, nil +} + // V1TraceToOtelTrace converts v1 traces (*model.Trace) to Otel traces (ptrace.Traces) func V1TraceToOtelTrace(jTrace *model.Trace) ptrace.Traces { batches := createBatchesFromModelTrace(jTrace) diff --git a/storage_v2/v1adapter/translator_test.go b/storage_v2/v1adapter/translator_test.go index 8aa1fc6ca03..ba0e888a91b 100644 --- a/storage_v2/v1adapter/translator_test.go +++ b/storage_v2/v1adapter/translator_test.go @@ -283,3 +283,63 @@ func TestV1TraceToOtelTrace_ReturnEmptyOtelTrace(t *testing.T) { require.Equal(t, eTrace.SpanCount(), aTrace.SpanCount(), 0) } + +func TestV1TraceIDsFromSeq2(t *testing.T) { + testCases := []struct { + name string + seqTraceIDs iter.Seq2[[]pcommon.TraceID, error] + expectedIDs []model.TraceID + expectedError error + }{ + { + name: "empty sequence", + seqTraceIDs: func(func([]pcommon.TraceID, error) bool) {}, + expectedIDs: nil, + expectedError: nil, + }, + { + name: "sequence with error", + seqTraceIDs: func(yield func([]pcommon.TraceID, error) bool) { + yield(nil, assert.AnError) + }, + expectedIDs: nil, + expectedError: assert.AnError, + }, + { + name: "sequence with one chunk of trace IDs", + seqTraceIDs: func(yield func([]pcommon.TraceID, error) bool) { + traceID1 := pcommon.TraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3}) + traceID2 := pcommon.TraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 5}) + yield([]pcommon.TraceID{traceID1, traceID2}, nil) + }, + expectedIDs: []model.TraceID{ + model.NewTraceID(2, 3), + model.NewTraceID(4, 5), + }, + expectedError: nil, + }, + { + name: "sequence with multiple chunks of trace IDs", + seqTraceIDs: func(yield func([]pcommon.TraceID, error) bool) { + traceID1 := pcommon.TraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3}) + traceID2 := pcommon.TraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 5}) + traceID3 := pcommon.TraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 7}) + yield([]pcommon.TraceID{traceID1}, nil) + yield([]pcommon.TraceID{traceID2, traceID3}, nil) + }, + expectedIDs: []model.TraceID{ + model.NewTraceID(2, 3), + model.NewTraceID(4, 5), + model.NewTraceID(6, 7), + }, + expectedError: nil, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualIDs, err := V1TraceIDsFromSeq2(tc.seqTraceIDs) + require.Equal(t, tc.expectedError, err) + require.Equal(t, tc.expectedIDs, actualIDs) + }) + } +}