From 451fe0b9ad73b78772682167a5d1e2a459a0f539 Mon Sep 17 00:00:00 2001 From: Ayush Vishwakarma Date: Mon, 16 Dec 2024 21:15:03 +0530 Subject: [PATCH 1/6] Added test for TraceReader Signed-off-by: Ayush Vishwakarma --- jaeger-ui | 2 +- plugin/storage/integration/integration.go | 46 +++++++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/jaeger-ui b/jaeger-ui index c5cd17bf979..a0458053c53 160000 --- a/jaeger-ui +++ b/jaeger-ui @@ -1 +1 @@ -Subproject commit c5cd17bf979ba454da761f660fde08a1f309aee9 +Subproject commit a0458053c53bb18ba36a42867c06b0803d8fafaf diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 35f7c6e6025..d8181ea494f 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -21,12 +21,14 @@ import ( "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/ptrace" samplemodel "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) //go:embed fixtures @@ -43,6 +45,7 @@ var fixtures embed.FS type StorageIntegration struct { SpanWriter spanstore.Writer SpanReader spanstore.Reader + TraceReader tracestore.Reader ArchiveSpanReader spanstore.Reader ArchiveSpanWriter spanstore.Writer DependencyWriter dependencystore.Writer @@ -133,6 +136,48 @@ func (*StorageIntegration) waitForCondition(t *testing.T, predicate func(t *test return predicate(t) } +func (s *StorageIntegration) testGetServicesv2(t *testing.T) { + s.skipIfNeeded(t) + defer s.cleanUp(t) + + expected := []string{"example-service-1", "example-service-2", "example-service-3"} + s.loadParseAndWriteExampleTrace(t) + + var actual []string + found := s.waitForCondition(t, func(t *testing.T) bool { + var err error + actual, err = s.TraceReader.GetServices(context.Background()) + if err != nil { + t.Log(err) + return false + } + sort.Strings(actual) + t.Logf("Retrieved services: %v", actual) + if len(actual) > len(expected) { + t.Log("🛑 Found unexpected services!") + for _, service := range actual { + traceSeq := s.TraceReader.FindTraces(context.Background(), tracestore.TraceQueryParams{ + ServiceName: service, + }) + traceSeq(func(traces []ptrace.Traces, err error) bool { + if err != nil { + t.Log(err) + } + for _, trace := range traces { + t.Log(trace) + } + return true + }) + } + } + return assert.ObjectsAreEqualValues(expected, actual) + }) + if !assert.True(t, found) { + t.Log("\t Expected:", expected) + t.Log("\t Actual :", actual) + } +} + func (s *StorageIntegration) testGetServices(t *testing.T) { s.skipIfNeeded(t) defer s.cleanUp(t) @@ -581,6 +626,7 @@ func (s *StorageIntegration) RunAll(t *testing.T) { // RunTestSpanstore runs only span related integration tests func (s *StorageIntegration) RunSpanStoreTests(t *testing.T) { t.Run("GetServices", s.testGetServices) + t.Run("GetServicesV2", s.testGetServicesv2) t.Run("GetOperations", s.testGetOperations) t.Run("GetTrace", s.testGetTrace) t.Run("GetLargeSpans", s.testGetLargeSpan) From a2c496e304f7cf3e3c3a02c2616671995a435554 Mon Sep 17 00:00:00 2001 From: Ayush Vishwakarma Date: Tue, 17 Dec 2024 17:54:07 +0530 Subject: [PATCH 2/6] Revert "Added test for TraceReader" This reverts commit 451fe0b9ad73b78772682167a5d1e2a459a0f539. --- jaeger-ui | 2 +- plugin/storage/integration/integration.go | 46 ----------------------- 2 files changed, 1 insertion(+), 47 deletions(-) diff --git a/jaeger-ui b/jaeger-ui index a0458053c53..c5cd17bf979 160000 --- a/jaeger-ui +++ b/jaeger-ui @@ -1 +1 @@ -Subproject commit a0458053c53bb18ba36a42867c06b0803d8fafaf +Subproject commit c5cd17bf979ba454da761f660fde08a1f309aee9 diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index d8181ea494f..35f7c6e6025 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -21,14 +21,12 @@ import ( "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/ptrace" samplemodel "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage/spanstore" - "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) //go:embed fixtures @@ -45,7 +43,6 @@ var fixtures embed.FS type StorageIntegration struct { SpanWriter spanstore.Writer SpanReader spanstore.Reader - TraceReader tracestore.Reader ArchiveSpanReader spanstore.Reader ArchiveSpanWriter spanstore.Writer DependencyWriter dependencystore.Writer @@ -136,48 +133,6 @@ func (*StorageIntegration) waitForCondition(t *testing.T, predicate func(t *test return predicate(t) } -func (s *StorageIntegration) testGetServicesv2(t *testing.T) { - s.skipIfNeeded(t) - defer s.cleanUp(t) - - expected := []string{"example-service-1", "example-service-2", "example-service-3"} - s.loadParseAndWriteExampleTrace(t) - - var actual []string - found := s.waitForCondition(t, func(t *testing.T) bool { - var err error - actual, err = s.TraceReader.GetServices(context.Background()) - if err != nil { - t.Log(err) - return false - } - sort.Strings(actual) - t.Logf("Retrieved services: %v", actual) - if len(actual) > len(expected) { - t.Log("🛑 Found unexpected services!") - for _, service := range actual { - traceSeq := s.TraceReader.FindTraces(context.Background(), tracestore.TraceQueryParams{ - ServiceName: service, - }) - traceSeq(func(traces []ptrace.Traces, err error) bool { - if err != nil { - t.Log(err) - } - for _, trace := range traces { - t.Log(trace) - } - return true - }) - } - } - return assert.ObjectsAreEqualValues(expected, actual) - }) - if !assert.True(t, found) { - t.Log("\t Expected:", expected) - t.Log("\t Actual :", actual) - } -} - func (s *StorageIntegration) testGetServices(t *testing.T) { s.skipIfNeeded(t) defer s.cleanUp(t) @@ -626,7 +581,6 @@ func (s *StorageIntegration) RunAll(t *testing.T) { // RunTestSpanstore runs only span related integration tests func (s *StorageIntegration) RunSpanStoreTests(t *testing.T) { t.Run("GetServices", s.testGetServices) - t.Run("GetServicesV2", s.testGetServicesv2) t.Run("GetOperations", s.testGetOperations) t.Run("GetTrace", s.testGetTrace) t.Run("GetLargeSpans", s.testGetLargeSpan) From b5f5ea55e47dfdd9ccd3eb639c272f5f154e049d Mon Sep 17 00:00:00 2001 From: Ayush Vishwakarma Date: Thu, 19 Dec 2024 19:54:26 +0530 Subject: [PATCH 3/6] Added Trace reader Signed-off-by: Ayush Vishwakarma --- plugin/storage/integration/integration.go | 121 ++++++++++++++++------ 1 file changed, 88 insertions(+), 33 deletions(-) diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 35f7c6e6025..c6f62cf70fe 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -21,12 +21,14 @@ import ( "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/ptrace" samplemodel "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) //go:embed fixtures @@ -43,6 +45,7 @@ var fixtures embed.FS type StorageIntegration struct { SpanWriter spanstore.Writer SpanReader spanstore.Reader + TraceReader tracestore.Reader ArchiveSpanReader spanstore.Reader ArchiveSpanWriter spanstore.Writer DependencyWriter dependencystore.Writer @@ -132,50 +135,102 @@ func (*StorageIntegration) waitForCondition(t *testing.T, predicate func(t *test } return predicate(t) } - func (s *StorageIntegration) testGetServices(t *testing.T) { s.skipIfNeeded(t) defer s.cleanUp(t) - + expected := []string{"example-service-1", "example-service-2", "example-service-3"} s.loadParseAndWriteExampleTrace(t) - + var actual []string found := s.waitForCondition(t, func(t *testing.T) bool { - var err error - actual, err = s.SpanReader.GetServices(context.Background()) - if err != nil { - t.Log(err) - return false - } - sort.Strings(actual) - t.Logf("Retrieved services: %v", actual) - if len(actual) > len(expected) { - // If the storage backend returns more services than expected, let's log traces for those - t.Log("🛑 Found unexpected services!") - for _, service := range actual { - traces, err := s.SpanReader.FindTraces(context.Background(), &spanstore.TraceQueryParameters{ - ServiceName: service, - }) - if err != nil { - t.Log(err) - continue - } - for _, trace := range traces { - for _, span := range trace.Spans { - t.Logf("span: Service: %s, TraceID: %s, Operation: %s", service, span.TraceID, span.OperationName) - } - } + var err error + actual, err = s.TraceReader.GetServices(context.Background()) + if err != nil { + t.Log(err) + return false + } + sort.Strings(actual) + t.Logf("Retrieved services: %v", actual) + + if len(actual) > len(expected) { + t.Log("🛑 Found unexpected services!") + for _, service := range actual { + traceSeq := s.TraceReader.FindTraces(context.Background(), tracestore.TraceQueryParams{ + ServiceName: service, + }) + + hasError := false + traceSeq(func(traces []ptrace.Traces, err error) bool { + if err != nil { + t.Log(err) + hasError = true + return false + } + for _, otelTrace := range traces { + t.Logf("Retrieved trace for service '%s': %v", service, otelTrace) + } + return true + }) + + if hasError { + t.Logf("Error processing traces for service '%s'", service) + return false } - } - return assert.ObjectsAreEqualValues(expected, actual) + } + } + return assert.ObjectsAreEqualValues(expected, actual) }) - + if !assert.True(t, found) { - t.Log("\t Expected:", expected) - t.Log("\t Actual :", actual) + t.Log("\t Expected:", expected) + t.Log("\t Actual :", actual) } -} + } + +// func (s *StorageIntegration) testGetServices(t *testing.T) { +// s.skipIfNeeded(t) +// defer s.cleanUp(t) + +// expected := []string{"example-service-1", "example-service-2", "example-service-3"} +// s.loadParseAndWriteExampleTrace(t) + +// var actual []string +// found := s.waitForCondition(t, func(t *testing.T) bool { +// var err error +// actual, err = s.SpanReader.GetServices(context.Background()) +// if err != nil { +// t.Log(err) +// return false +// } +// sort.Strings(actual) +// t.Logf("Retrieved services: %v", actual) +// if len(actual) > len(expected) { +// // If the storage backend returns more services than expected, let's log traces for those +// t.Log("🛑 Found unexpected services!") +// for _, service := range actual { +// traces, err := s.SpanReader.FindTraces(context.Background(), &spanstore.TraceQueryParameters{ +// ServiceName: service, +// }) +// if err != nil { +// t.Log(err) +// continue +// } +// for _, trace := range traces { +// for _, span := range trace.Spans { +// t.Logf("span: Service: %s, TraceID: %s, Operation: %s", service, span.TraceID, span.OperationName) +// } +// } +// } +// } +// return assert.ObjectsAreEqualValues(expected, actual) +// }) + +// if !assert.True(t, found) { +// t.Log("\t Expected:", expected) +// t.Log("\t Actual :", actual) +// } +// } func (s *StorageIntegration) testArchiveTrace(t *testing.T) { s.skipIfNeeded(t) From 99deec2c95a0859c4eee952653c432ea17b169f0 Mon Sep 17 00:00:00 2001 From: Ayush Vishwakarma Date: Thu, 19 Dec 2024 23:00:09 +0530 Subject: [PATCH 4/6] Added fixture Convertor Signed-off-by: Ayush Vishwakarma --- plugin/storage/integration/integration.go | 373 +++++++++++++--------- 1 file changed, 230 insertions(+), 143 deletions(-) diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index c6f62cf70fe..b9637d5daaf 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -19,16 +19,17 @@ import ( "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/ptrace" - samplemodel "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage/spanstore" "github.com/jaegertracing/jaeger/storage_v2/tracestore" + otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" ) //go:embed fixtures @@ -45,13 +46,14 @@ var fixtures embed.FS type StorageIntegration struct { SpanWriter spanstore.Writer SpanReader spanstore.Reader - TraceReader tracestore.Reader ArchiveSpanReader spanstore.Reader ArchiveSpanWriter spanstore.Writer DependencyWriter dependencystore.Writer DependencyReader dependencystore.Reader SamplingStore samplingstore.Store Fixtures []*QueryFixtures + TraceReader tracestore.Reader + TraceWriter tracestore.Writer // TODO: remove this after all storage backends return spanKind from GetOperations GetOperationsMissingSpanKind bool @@ -82,7 +84,7 @@ type StorageIntegration struct { // the service name is formatted "query##-service". type QueryFixtures struct { Caption string - Query *spanstore.TraceQueryParameters + Query *tracestore.TraceQueryParams ExpectedFixtures []string } @@ -138,99 +140,56 @@ func (*StorageIntegration) waitForCondition(t *testing.T, predicate func(t *test func (s *StorageIntegration) testGetServices(t *testing.T) { s.skipIfNeeded(t) defer s.cleanUp(t) - + expected := []string{"example-service-1", "example-service-2", "example-service-3"} s.loadParseAndWriteExampleTrace(t) - + var actual []string found := s.waitForCondition(t, func(t *testing.T) bool { - var err error - actual, err = s.TraceReader.GetServices(context.Background()) - if err != nil { - t.Log(err) - return false - } - sort.Strings(actual) - t.Logf("Retrieved services: %v", actual) - - if len(actual) > len(expected) { - t.Log("🛑 Found unexpected services!") - for _, service := range actual { - traceSeq := s.TraceReader.FindTraces(context.Background(), tracestore.TraceQueryParams{ - ServiceName: service, - }) - - hasError := false - traceSeq(func(traces []ptrace.Traces, err error) bool { - if err != nil { - t.Log(err) - hasError = true - return false - } - for _, otelTrace := range traces { - t.Logf("Retrieved trace for service '%s': %v", service, otelTrace) - } - return true - }) - - if hasError { - t.Logf("Error processing traces for service '%s'", service) - return false + var err error + actual, err = s.TraceReader.GetServices(context.Background()) + if err != nil { + t.Log(err) + return false + } + sort.Strings(actual) + t.Logf("Retrieved services: %v", actual) + + if len(actual) > len(expected) { + t.Log("🛑 Found unexpected services!") + for _, service := range actual { + traceSeq := s.TraceReader.FindTraces(context.Background(), tracestore.TraceQueryParams{ + ServiceName: service, + }) + + hasError := false + traceSeq(func(traces []ptrace.Traces, err error) bool { + if err != nil { + t.Log(err) + hasError = true + return false + } + for _, otelTrace := range traces { + t.Logf("Retrieved trace for service '%s': %v", service, otelTrace) + } + return true + }) + + if hasError { + t.Logf("Error processing traces for service '%s'", service) + return false + } } - } - } - return assert.ObjectsAreEqualValues(expected, actual) + } + return assert.ObjectsAreEqualValues(expected, actual) }) - + if !assert.True(t, found) { - t.Log("\t Expected:", expected) - t.Log("\t Actual :", actual) + t.Log("\t Expected:", expected) + t.Log("\t Actual :", actual) } - } - -// func (s *StorageIntegration) testGetServices(t *testing.T) { -// s.skipIfNeeded(t) -// defer s.cleanUp(t) - -// expected := []string{"example-service-1", "example-service-2", "example-service-3"} -// s.loadParseAndWriteExampleTrace(t) +} -// var actual []string -// found := s.waitForCondition(t, func(t *testing.T) bool { -// var err error -// actual, err = s.SpanReader.GetServices(context.Background()) -// if err != nil { -// t.Log(err) -// return false -// } -// sort.Strings(actual) -// t.Logf("Retrieved services: %v", actual) -// if len(actual) > len(expected) { -// // If the storage backend returns more services than expected, let's log traces for those -// t.Log("🛑 Found unexpected services!") -// for _, service := range actual { -// traces, err := s.SpanReader.FindTraces(context.Background(), &spanstore.TraceQueryParameters{ -// ServiceName: service, -// }) -// if err != nil { -// t.Log(err) -// continue -// } -// for _, trace := range traces { -// for _, span := range trace.Spans { -// t.Logf("span: Service: %s, TraceID: %s, Operation: %s", service, span.TraceID, span.OperationName) -// } -// } -// } -// } -// return assert.ObjectsAreEqualValues(expected, actual) -// }) - -// if !assert.True(t, found) { -// t.Log("\t Expected:", expected) -// t.Log("\t Actual :", actual) -// } -// } func (s *StorageIntegration) testArchiveTrace(t *testing.T) { s.skipIfNeeded(t) @@ -335,6 +294,34 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) { } } +// func (s *StorageIntegration) testGetTrace(t *testing.T) { +// s.skipIfNeeded(t) +// defer s.cleanUp(t) + +// expected := s.loadParseAndWriteExampleTrace(t) +// expectedTraceID := expected.Spans[0].TraceID + +// var actual *model.Trace +// found := s.waitForCondition(t, func(t *testing.T) bool { +// var err error +// actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID}) +// if err != nil { +// t.Log(err) +// } +// return err == nil && len(actual.Spans) == len(expected.Spans) +// }) +// if !assert.True(t, found) { +// CompareTraces(t, expected, actual) +// } + +// t.Run("NotFound error", func(t *testing.T) { +// fakeTraceID := model.TraceID{High: 0, Low: 1} +// trace, err := s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: fakeTraceID}) +// assert.Equal(t, spanstore.ErrTraceNotFound, err) +// assert.Nil(t, trace) +// }) +// } + func (s *StorageIntegration) testGetTrace(t *testing.T) { s.skipIfNeeded(t) defer s.cleanUp(t) @@ -367,103 +354,203 @@ func (s *StorageIntegration) testFindTraces(t *testing.T) { s.skipIfNeeded(t) defer s.cleanUp(t) - // Note: all cases include ServiceName + StartTime range + // Load query test cases s.Fixtures = append(s.Fixtures, LoadAndParseQueryTestCases(t, "fixtures/queries.json")...) - // Each query test case only specifies matching traces, but does not provide counterexamples. - // To improve coverage we get all possible traces and store all of them before running queries. - allTraceFixtures := make(map[string]*model.Trace) - expectedTracesPerTestCase := make([][]*model.Trace, 0, len(s.Fixtures)) + // Prepare a map to store all trace fixtures and a slice for expected traces per test case + allTraceFixtures := make(map[string]ptrace.Traces) + expectedTracesPerTestCase := make([][]ptrace.Traces, 0, len(s.Fixtures)) + + // Process each query test case to prepare expected traces for _, queryTestCase := range s.Fixtures { - var expected []*model.Trace + var expected []ptrace.Traces for _, traceFixture := range queryTestCase.ExpectedFixtures { trace, ok := allTraceFixtures[traceFixture] if !ok { - trace = s.getTraceFixture(t, traceFixture) - s.writeTrace(t, trace) - allTraceFixtures[traceFixture] = trace + trace = s.getTraceFixture(t, traceFixture) // Load the trace fixture + s.writeTrace(t, trace) // Write the trace into the storage + allTraceFixtures[traceFixture] = trace // Cache the trace fixture } expected = append(expected, trace) } expectedTracesPerTestCase = append(expectedTracesPerTestCase, expected) } + + // Run tests for each query test case for i, queryTestCase := range s.Fixtures { t.Run(queryTestCase.Caption, func(t *testing.T) { s.skipIfNeeded(t) expected := expectedTracesPerTestCase[i] actual := s.findTracesByQuery(t, queryTestCase.Query, expected) + + // Compare the expected and actual traces CompareSliceOfTraces(t, expected, actual) }) } } -func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *spanstore.TraceQueryParameters, expected []*model.Trace) []*model.Trace { - var traces []*model.Trace +// func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *tracestore.TraceQueryParams, expected []*ptrace.Traces) []ptrace.Traces { +// var traces []*model.Trace +// found := s.waitForCondition(t, func(t *testing.T) bool { +// var err error +// traces, err = s.SpanReader.FindTraces(context.Background(), query) +// if err != nil { +// t.Log(err) +// return false +// } +// if len(expected) != len(traces) { +// t.Logf("Expecting certain number of traces: expected: %d, actual: %d", len(expected), len(traces)) +// return false +// } +// if spanCount(expected) != spanCount(traces) { +// t.Logf("Excepting certain number of spans: expected: %d, actual: %d", spanCount(expected), spanCount(traces)) +// return false +// } +// return true +// }) +// require.True(t, found) +// return traces +// } +func (s *StorageIntegration) findTracesByQuery(t *testing.T, query tracestore.TraceQueryParams, expected []ptrace.Traces) []ptrace.Traces { + var collected []ptrace.Traces + var lastErr error + found := s.waitForCondition(t, func(t *testing.T) bool { - var err error - traces, err = s.SpanReader.FindTraces(context.Background(), query) - if err != nil { + collected = nil // Reset for each attempt + traceSeq := s.TraceReader.FindTraces(context.Background(), query) + + traceSeq(func(traces []ptrace.Traces, err error) bool { + if err != nil { + lastErr = err t.Log(err) return false - } - if len(expected) != len(traces) { - t.Logf("Expecting certain number of traces: expected: %d, actual: %d", len(expected), len(traces)) - return false - } - if spanCount(expected) != spanCount(traces) { - t.Logf("Excepting certain number of spans: expected: %d, actual: %d", spanCount(expected), spanCount(traces)) - return false - } - return true + } + collected = append(collected, traces...) + return true + }) + + if lastErr != nil { + return false + } + + if len(expected) != len(collected) { + t.Logf("Expecting certain number of traces: expected %d, actual %d", len(expected), len(collected)) + return false + } + + // Compare span counts + expectedSpans := 0 + actualSpans := 0 + for _, trace := range expected { + expectedSpans += trace.SpanCount() + } + for _, trace := range collected { + actualSpans += trace.SpanCount() + } + + if expectedSpans != actualSpans { + t.Logf("Expecting certain number of spans: expected %d, actual %d", expectedSpans, actualSpans) + return false + } + + return true }) + require.True(t, found) - return traces -} + return collected + } + +// func (s *StorageIntegration) writeTrace(t *testing.T, trace *model.Trace) { +// t.Logf("%-23s Writing trace with %d spans", time.Now().Format("2006-01-02 15:04:05.999"), len(trace.Spans)) +// for _, span := range trace.Spans { +// err := s.SpanWriter.WriteSpan(context.Background(), span) +// require.NoError(t, err, "Not expecting error when writing trace to storage") +// } +// } -func (s *StorageIntegration) writeTrace(t *testing.T, trace *model.Trace) { - t.Logf("%-23s Writing trace with %d spans", time.Now().Format("2006-01-02 15:04:05.999"), len(trace.Spans)) - for _, span := range trace.Spans { - err := s.SpanWriter.WriteSpan(context.Background(), span) - require.NoError(t, err, "Not expecting error when writing trace to storage") +func (s *StorageIntegration) writeTrace(t *testing.T, td ptrace.Traces) { + t.Logf("%-23s Writing trace with %d spans", time.Now().Format("2006-01-02 15:04:05.999"), td.SpanCount()) + err := s.TraceWriter.WriteTraces(context.Background(), td) + if err != nil { + t.Log(err) } } -func (s *StorageIntegration) loadParseAndWriteExampleTrace(t *testing.T) *model.Trace { +func (s *StorageIntegration) loadParseAndWriteExampleTrace(t *testing.T) ptrace.Traces { trace := s.getTraceFixture(t, "example_trace") s.writeTrace(t, trace) return trace } - -func (s *StorageIntegration) writeLargeTraceWithDuplicateSpanIds(t *testing.T) *model.Trace { +func (s *StorageIntegration) writeLargeTraceWithDuplicateSpanIds(t *testing.T) ptrace.Traces { + // Load the example trace fixture trace := s.getTraceFixture(t, "example_trace") - repeatedSpan := trace.Spans[0] - trace.Spans = make([]*model.Span, 0, 10008) + + // Get the first resource span to use as a template + resourceSpans := trace.ResourceSpans() + if resourceSpans.Len() == 0 { + t.Fatalf("No ResourceSpans found in the trace fixture") + } + originalSpans := resourceSpans.At(0).ScopeSpans().At(0).Spans() + if originalSpans.Len() == 0 { + t.Fatalf("No spans found in the ResourceSpan") + } + + // Prepare a new trace with a large number of spans + newTrace := ptrace.NewTraces() + newResourceSpans := newTrace.ResourceSpans().AppendEmpty() + resourceSpans.At(0).CopyTo(newResourceSpans) + + newScopeSpans := newResourceSpans.ScopeSpans().AppendEmpty() + originalSpans.At(0).CopyTo(newScopeSpans.Spans().AppendEmpty()) // Copy the first span as a template + for i := 0; i < 10008; i++ { - newSpan := new(model.Span) - *newSpan = *repeatedSpan - switch { - case i%100 == 0: - newSpan.SpanID = repeatedSpan.SpanID - default: - newSpan.SpanID = model.SpanID(i) + newSpan := newScopeSpans.Spans().AppendEmpty() + originalSpans.At(0).CopyTo(newSpan) + + if i%100 == 0 { + // Duplicate span ID every 100 spans + newSpan.SetSpanID(originalSpans.At(0).SpanID()) + } else { + // Set unique span ID + newSpan.SetSpanID(pcommon.NewSpanIDEmpty()) } - newSpan.StartTime = newSpan.StartTime.Add(time.Second * time.Duration(i+1)) - trace.Spans = append(trace.Spans, newSpan) + + // Adjust the start time to make each span unique + newSpan.SetStartTimestamp(originalSpans.At(0).StartTimestamp() + pcommon.Timestamp(i*1e9)) } - s.writeTrace(t, trace) - return trace + + // Write the new trace + s.writeTrace(t, newTrace) + return newTrace } -func (*StorageIntegration) getTraceFixture(t *testing.T, fixture string) *model.Trace { + +func (*StorageIntegration) getTraceFixture(t *testing.T, fixture string) ptrace.Traces { fileName := fmt.Sprintf("fixtures/traces/%s.json", fixture) + return getTraceFixtureExact(t, fileName) } -func getTraceFixtureExact(t *testing.T, fileName string) *model.Trace { - var trace model.Trace - loadAndParseJSONPB(t, fileName, &trace) - return &trace -} +// func getTraceFixtureExact(t *testing.T, fileName string) *model.Trace { +// var trace model.Trace +// loadAndParseJSONPB(t, fileName, &trace) + +// return &trace +// } +func getTraceFixtureExact(t *testing.T, fileName string) ptrace.Traces { + var modelTrace model.Trace + loadAndParseJSONPB(t, fileName, &modelTrace) + + // Create batches for each process + batch := &model.Batch{ + Spans: modelTrace.Spans, + } + traces , err := otlp2jaeger.ProtoToTraces([]*model.Batch{batch}) + if err != nil { + t.Log("Failed to Convert Trace: %v", err ) + } + return traces + } func loadAndParseJSONPB(t *testing.T, path string, object proto.Message) { // #nosec From 24630d4af68bdc31b384c5b9513cad5dd5aba011 Mon Sep 17 00:00:00 2001 From: Ayush Vishwakarma Date: Fri, 20 Dec 2024 18:49:30 +0530 Subject: [PATCH 5/6] Created a wrapper of integrationv2 on intrgrationv1 Signed-off-by: Ayush Vishwakarma --- plugin/storage/integration/integration.go | 320 +++++++++++-------- plugin/storage/integration/integration_v2.go | 73 +++++ 2 files changed, 255 insertions(+), 138 deletions(-) create mode 100644 plugin/storage/integration/integration_v2.go diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index b9637d5daaf..828848a1487 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -44,6 +44,7 @@ var fixtures embed.FS // Some implementations may declare multiple tests, with different settings, // and RunAll() under different conditions. type StorageIntegration struct { + StorageIntegrationV2 SpanWriter spanstore.Writer SpanReader spanstore.Reader ArchiveSpanReader spanstore.Reader @@ -52,8 +53,6 @@ type StorageIntegration struct { DependencyReader dependencystore.Reader SamplingStore samplingstore.Store Fixtures []*QueryFixtures - TraceReader tracestore.Reader - TraceWriter tracestore.Writer // TODO: remove this after all storage backends return spanKind from GetOperations GetOperationsMissingSpanKind bool @@ -84,7 +83,7 @@ type StorageIntegration struct { // the service name is formatted "query##-service". type QueryFixtures struct { Caption string - Query *tracestore.TraceQueryParams + Query tracestore.TraceQueryParams ExpectedFixtures []string } @@ -190,7 +189,6 @@ func (s *StorageIntegration) testGetServices(t *testing.T) { } } - func (s *StorageIntegration) testArchiveTrace(t *testing.T) { s.skipIfNeeded(t) if s.SkipArchiveTest { @@ -219,6 +217,38 @@ func (s *StorageIntegration) testArchiveTrace(t *testing.T) { CompareTraces(t, &model.Trace{Spans: []*model.Span{expected}}, actual) } +// func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { +// s.skipIfNeeded(t) +// defer s.cleanUp(t) + +// t.Log("Testing Large Trace over 10K with duplicate IDs...") + +// expected := s.writeLargeTraceWithDuplicateSpanIds(t) +// expectedTraceID := expected.Spans[0].TraceID + +// var actual *model.Trace +// found := s.waitForCondition(t, func(_ *testing.T) bool { +// var err error +// actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID}) +// return err == nil && len(actual.Spans) >= len(expected.Spans) +// }) + +// if !assert.True(t, found) { +// CompareTraces(t, expected, actual) +// return +// } + +// duplicateCount := 0 +// seenIDs := make(map[model.SpanID]int) + +// for _, span := range actual.Spans { +// seenIDs[span.SpanID]++ +// if seenIDs[span.SpanID] > 1 { +// duplicateCount++ +// } +// } +// assert.Positive(t, duplicateCount, "Duplicate SpanIDs should be present in the trace") +// } func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { s.skipIfNeeded(t) defer s.cleanUp(t) @@ -226,32 +256,67 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { t.Log("Testing Large Trace over 10K with duplicate IDs...") expected := s.writeLargeTraceWithDuplicateSpanIds(t) - expectedTraceID := expected.Spans[0].TraceID + expectedTraceID := expected.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID() + + var actual ptrace.Traces + var lastErr error - var actual *model.Trace found := s.waitForCondition(t, func(_ *testing.T) bool { - var err error - actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID}) - return err == nil && len(actual.Spans) >= len(expected.Spans) + actualTraceSeq := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{ + TraceID: expectedTraceID, + }) + + actualTraceSeq(func(traces []ptrace.Traces, err error) bool { + if err != nil { + lastErr = err + t.Log(err) + return false + } + if len(traces) > 0 { + actual = traces[0] + } + return true + }) + + if lastErr != nil { + return false + } + + expectedSpans := 0 + expected.ResourceSpans().At(0).ScopeSpans().At(0).Spans().RemoveIf(func(span ptrace.Span) bool { + expectedSpans++ + return false + }) + + actualSpans := 0 + actual.ResourceSpans().At(0).ScopeSpans().At(0).Spans().RemoveIf(func(span ptrace.Span) bool { + actualSpans++ + return false + }) + + return actualSpans >= expectedSpans }) if !assert.True(t, found) { - CompareTraces(t, expected, actual) + s.CompareTraces(t, expected, actual) return } + // Check for duplicate span IDs duplicateCount := 0 - seenIDs := make(map[model.SpanID]int) + seenIDs := make(map[string]int) - for _, span := range actual.Spans { - seenIDs[span.SpanID]++ - if seenIDs[span.SpanID] > 1 { + spans := actual.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + for i := 0; i < spans.Len(); i++ { + spanID := spans.At(i).SpanID().String() + seenIDs[spanID]++ + if seenIDs[spanID] > 1 { duplicateCount++ } } + assert.Positive(t, duplicateCount, "Duplicate SpanIDs should be present in the trace") } - func (s *StorageIntegration) testGetOperations(t *testing.T) { s.skipIfNeeded(t) defer s.cleanUp(t) @@ -294,59 +359,61 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) { } } -// func (s *StorageIntegration) testGetTrace(t *testing.T) { -// s.skipIfNeeded(t) -// defer s.cleanUp(t) - -// expected := s.loadParseAndWriteExampleTrace(t) -// expectedTraceID := expected.Spans[0].TraceID - -// var actual *model.Trace -// found := s.waitForCondition(t, func(t *testing.T) bool { -// var err error -// actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID}) -// if err != nil { -// t.Log(err) -// } -// return err == nil && len(actual.Spans) == len(expected.Spans) -// }) -// if !assert.True(t, found) { -// CompareTraces(t, expected, actual) -// } - -// t.Run("NotFound error", func(t *testing.T) { -// fakeTraceID := model.TraceID{High: 0, Low: 1} -// trace, err := s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: fakeTraceID}) -// assert.Equal(t, spanstore.ErrTraceNotFound, err) -// assert.Nil(t, trace) -// }) -// } - func (s *StorageIntegration) testGetTrace(t *testing.T) { s.skipIfNeeded(t) defer s.cleanUp(t) expected := s.loadParseAndWriteExampleTrace(t) - expectedTraceID := expected.Spans[0].TraceID + expectedTraceID := expected.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID() - var actual *model.Trace + var actual ptrace.Traces found := s.waitForCondition(t, func(t *testing.T) bool { var err error - actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID}) - if err != nil { + traceSeq := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{ + TraceID: expectedTraceID, + }) + + var traces []ptrace.Traces + traceSeq(func(ts []ptrace.Traces, err error) bool { + if err != nil { + t.Log(err) + return false + } + traces = append(traces, ts...) + return true + }) + + if err != nil || len(traces) == 0 { t.Log(err) + return false } - return err == nil && len(actual.Spans) == len(expected.Spans) + + actual = traces[0] + return len(traces) == 1 && actual.SpanCount() == expected.SpanCount() }) + if !assert.True(t, found) { - CompareTraces(t, expected, actual) + s.CompareTraces(t, expected, actual) } t.Run("NotFound error", func(t *testing.T) { - fakeTraceID := model.TraceID{High: 0, Low: 1} - trace, err := s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: fakeTraceID}) - assert.Equal(t, spanstore.ErrTraceNotFound, err) - assert.Nil(t, trace) + fakeTraceID := [16]byte{1} // Create a fake trace ID + traceSeq := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{ + TraceID: fakeTraceID, + }) + var errFound error + var traces []ptrace.Traces + traceSeq(func(td []ptrace.Traces, err error) bool { + if err != nil { + errFound = err + return false + } + traces = append(traces, td...) + return true + }) + + assert.NoError(t, errFound) + assert.Empty(t, traces) }) } @@ -381,92 +448,76 @@ func (s *StorageIntegration) testFindTraces(t *testing.T) { t.Run(queryTestCase.Caption, func(t *testing.T) { s.skipIfNeeded(t) expected := expectedTracesPerTestCase[i] - actual := s.findTracesByQuery(t, queryTestCase.Query, expected) - // Compare the expected and actual traces - CompareSliceOfTraces(t, expected, actual) + var actual []ptrace.Traces + traceSeq := s.TraceReader.FindTraces(context.Background(), queryTestCase.Query) + var err error + traceSeq(func(traces []ptrace.Traces, err error) bool { + if err != nil { + + t.Errorf("Error finding traces: %v", err) + + return false + } + actual = append(actual, traces...) + return true + }) + + require.NoError(t, err) + // This need to be implemented for the ptrace.Traces + // CompareSliceOfTraces(t, expected, actual) }) } } -// func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *tracestore.TraceQueryParams, expected []*ptrace.Traces) []ptrace.Traces { -// var traces []*model.Trace -// found := s.waitForCondition(t, func(t *testing.T) bool { -// var err error -// traces, err = s.SpanReader.FindTraces(context.Background(), query) -// if err != nil { -// t.Log(err) -// return false -// } -// if len(expected) != len(traces) { -// t.Logf("Expecting certain number of traces: expected: %d, actual: %d", len(expected), len(traces)) -// return false -// } -// if spanCount(expected) != spanCount(traces) { -// t.Logf("Excepting certain number of spans: expected: %d, actual: %d", spanCount(expected), spanCount(traces)) -// return false -// } -// return true -// }) -// require.True(t, found) -// return traces -// } func (s *StorageIntegration) findTracesByQuery(t *testing.T, query tracestore.TraceQueryParams, expected []ptrace.Traces) []ptrace.Traces { var collected []ptrace.Traces var lastErr error - + found := s.waitForCondition(t, func(t *testing.T) bool { - collected = nil // Reset for each attempt - traceSeq := s.TraceReader.FindTraces(context.Background(), query) - - traceSeq(func(traces []ptrace.Traces, err error) bool { - if err != nil { - lastErr = err - t.Log(err) + collected = nil // Reset for each attempt + traceSeq := s.TraceReader.FindTraces(context.Background(), query) + + traceSeq(func(traces []ptrace.Traces, err error) bool { + if err != nil { + lastErr = err + t.Log(err) + return false + } + collected = append(collected, traces...) + return true + }) + + if lastErr != nil { + return false + } + + if len(expected) != len(collected) { + t.Logf("Expecting certain number of traces: expected %d, actual %d", len(expected), len(collected)) + return false + } + + // Compare span counts + expectedSpans := 0 + actualSpans := 0 + for _, trace := range expected { + expectedSpans += trace.SpanCount() + } + for _, trace := range collected { + actualSpans += trace.SpanCount() + } + + if expectedSpans != actualSpans { + t.Logf("Expecting certain number of spans: expected %d, actual %d", expectedSpans, actualSpans) return false - } - collected = append(collected, traces...) - return true - }) - - if lastErr != nil { - return false - } - - if len(expected) != len(collected) { - t.Logf("Expecting certain number of traces: expected %d, actual %d", len(expected), len(collected)) - return false - } - - // Compare span counts - expectedSpans := 0 - actualSpans := 0 - for _, trace := range expected { - expectedSpans += trace.SpanCount() - } - for _, trace := range collected { - actualSpans += trace.SpanCount() - } - - if expectedSpans != actualSpans { - t.Logf("Expecting certain number of spans: expected %d, actual %d", expectedSpans, actualSpans) - return false - } - - return true + } + + return true }) - + require.True(t, found) return collected - } - -// func (s *StorageIntegration) writeTrace(t *testing.T, trace *model.Trace) { -// t.Logf("%-23s Writing trace with %d spans", time.Now().Format("2006-01-02 15:04:05.999"), len(trace.Spans)) -// for _, span := range trace.Spans { -// err := s.SpanWriter.WriteSpan(context.Background(), span) -// require.NoError(t, err, "Not expecting error when writing trace to storage") -// } -// } +} func (s *StorageIntegration) writeTrace(t *testing.T, td ptrace.Traces) { t.Logf("%-23s Writing trace with %d spans", time.Now().Format("2006-01-02 15:04:05.999"), td.SpanCount()) @@ -524,33 +575,26 @@ func (s *StorageIntegration) writeLargeTraceWithDuplicateSpanIds(t *testing.T) p return newTrace } - func (*StorageIntegration) getTraceFixture(t *testing.T, fixture string) ptrace.Traces { fileName := fmt.Sprintf("fixtures/traces/%s.json", fixture) return getTraceFixtureExact(t, fileName) } -// func getTraceFixtureExact(t *testing.T, fileName string) *model.Trace { -// var trace model.Trace -// loadAndParseJSONPB(t, fileName, &trace) - -// return &trace -// } func getTraceFixtureExact(t *testing.T, fileName string) ptrace.Traces { var modelTrace model.Trace loadAndParseJSONPB(t, fileName, &modelTrace) // Create batches for each process - batch := &model.Batch{ + batch := &model.Batch{ Spans: modelTrace.Spans, - } - traces , err := otlp2jaeger.ProtoToTraces([]*model.Batch{batch}) + } + traces, err := otlp2jaeger.ProtoToTraces([]*model.Batch{batch}) if err != nil { - t.Log("Failed to Convert Trace: %v", err ) + t.Log("Failed to Convert Trace: %v", err) } return traces - } +} func loadAndParseJSONPB(t *testing.T, path string, object proto.Message) { // #nosec diff --git a/plugin/storage/integration/integration_v2.go b/plugin/storage/integration/integration_v2.go new file mode 100644 index 00000000000..956e7fbd36d --- /dev/null +++ b/plugin/storage/integration/integration_v2.go @@ -0,0 +1,73 @@ +// Copyright (c) 2019 The Jaeger Authors. +// Copyright (c) 2017 Uber Technologies, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package integration + +import ( + "encoding/json" + "testing" + + "github.com/kr/pretty" + otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" + "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +type StorageIntegrationV2 struct { + TraceReader tracestore.Reader + TraceWriter tracestore.Writer +} + +// This function is used to compare traces in new format := v2 function +func (s *StorageIntegrationV2) CompareTraces(t *testing.T, expected ptrace.Traces, actual ptrace.Traces) { + // Convert both traces to Jaeger format for comparison + expectedBatches := otlp2jaeger.ProtoFromTraces(expected) + actualBatches := otlp2jaeger.ProtoFromTraces(actual) + + // First check the number of batches + require.Equal(t, len(expectedBatches), len(actualBatches), "Unequal number of expected vs. actual batches") + + // Convert batches to traces for comparison + var expectedTrace, actualTrace model.Trace + for _, batch := range expectedBatches { + expectedTrace.Spans = append(expectedTrace.Spans, batch.Spans...) + } + for _, batch := range actualBatches { + actualTrace.Spans = append(actualTrace.Spans, batch.Spans...) + } + + // Remove duplicate spans if any + dedupeSpans(&actualTrace) + + // Sort both traces + model.SortTrace(&expectedTrace) + model.SortTrace(&actualTrace) + + // Check sizes match + checkSize(t, &expectedTrace, &actualTrace) + + // Compare using pretty.Diff + if diff := pretty.Diff(&expectedTrace, &actualTrace); len(diff) > 0 { + for _, d := range diff { + t.Logf("Expected and actual differ: %v\n", d) + } + + // Marshal actual trace for debugging + out, err := json.Marshal(&actualTrace) + require.NoError(t, err) + t.Logf("Actual trace: %s", string(out)) + + // Marshal expected trace for debugging + expected, err := json.Marshal(&expectedTrace) + require.NoError(t, err) + t.Logf("Expected trace: %s", string(expected)) + + t.Fail() + } +} + + From 9a6d221eea039064d09be3711bba9f1fe9d82ee7 Mon Sep 17 00:00:00 2001 From: Ayush Vishwakarma Date: Fri, 20 Dec 2024 18:51:04 +0530 Subject: [PATCH 6/6] Removed old Code Signed-off-by: Ayush Vishwakarma --- plugin/storage/integration/integration.go | 32 ----------------------- 1 file changed, 32 deletions(-) diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 828848a1487..f644ea362d9 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -217,38 +217,6 @@ func (s *StorageIntegration) testArchiveTrace(t *testing.T) { CompareTraces(t, &model.Trace{Spans: []*model.Span{expected}}, actual) } -// func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { -// s.skipIfNeeded(t) -// defer s.cleanUp(t) - -// t.Log("Testing Large Trace over 10K with duplicate IDs...") - -// expected := s.writeLargeTraceWithDuplicateSpanIds(t) -// expectedTraceID := expected.Spans[0].TraceID - -// var actual *model.Trace -// found := s.waitForCondition(t, func(_ *testing.T) bool { -// var err error -// actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID}) -// return err == nil && len(actual.Spans) >= len(expected.Spans) -// }) - -// if !assert.True(t, found) { -// CompareTraces(t, expected, actual) -// return -// } - -// duplicateCount := 0 -// seenIDs := make(map[model.SpanID]int) - -// for _, span := range actual.Spans { -// seenIDs[span.SpanID]++ -// if seenIDs[span.SpanID] > 1 { -// duplicateCount++ -// } -// } -// assert.Positive(t, duplicateCount, "Duplicate SpanIDs should be present in the trace") -// } func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { s.skipIfNeeded(t) defer s.cleanUp(t)