diff --git a/CHANGELOG.md b/CHANGELOG.md index 6eb6c251775..257243b49f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Added + +- Add `WithFilterer` provider option and `Filterer` interface in `go.opentelemetry.io/otel/sdk/log` that accepts a newly introduced `FilterParameters` type. (#5825) + ### Changed - Enable exemplars by default in `go.opentelemetry.io/otel/sdk/metric`. Exemplars can be disabled by setting `OTEL_METRICS_EXEMPLAR_FILTER=always_off` (#5778) @@ -19,6 +23,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The race condition for multiple `FixedSize` exemplar reservoirs identified in #5814 is resolved. (#5819) - Fix log records duplication in case of heterogeneous resource attributes by correctly mapping each log record to it's resource and scope. (#5803) +### Removed + +- Remove `go.opentelemetry.io/otel/sdk/log/internal/x` package. Filtering is now done via `Filterer` interface in `go.opentelemetry.io/otel/sdk/log`. (#5825) + diff --git a/sdk/log/example_test.go b/sdk/log/example_test.go index 8070beef771..617ece38a5c 100644 --- a/sdk/log/example_test.go +++ b/sdk/log/example_test.go @@ -7,7 +7,6 @@ import ( "context" "fmt" "strings" - "sync" logapi "go.opentelemetry.io/otel/log" "go.opentelemetry.io/otel/log/global" @@ -51,19 +50,20 @@ func Example() { // slog.SetDefault(otelslog.NewLogger("my/pkg/name", otelslog.WithLoggerProvider(provider))) } -// Use a processor that filters out records based on the provided context. -func ExampleProcessor_filtering() { +// Use a filterer that filters out records based on the provided context. +func ExampleFilterer() { // Existing processor that emits telemetry. - var processor log.Processor = log.NewBatchProcessor(nil) + processor := log.NewBatchProcessor(nil) - // Wrap the processor so that it ignores processing log records + // Add a filterer so that the SDK ignores processing of log records // when a context deriving from WithIgnoreLogs is passed // to the logging methods. - processor = &ContextFilterProcessor{Processor: processor} + filterer := &ContextFilterer{} // The created processor can then be registered with // the OpenTelemetry Logs SDK using the WithProcessor option. _ = log.NewLoggerProvider( + log.WithFilterer(filterer), log.WithProcessor(processor), ) } @@ -72,54 +72,27 @@ type key struct{} var ignoreLogsKey key -// WithIgnoreLogs returns a context which is used by [ContextFilterProcessor] +// WithIgnoreLogs returns a context which is used by [ContextFilterer] // to filter out log records. func WithIgnoreLogs(ctx context.Context) context.Context { return context.WithValue(ctx, ignoreLogsKey, true) } -// ContextFilterProcessor filters out logs when a context deriving from -// [WithIgnoreLogs] is passed to its methods. -type ContextFilterProcessor struct { - log.Processor +// ContextFilterer filters out logs when a context deriving from +// [WithIgnoreLogs] is passed to Logger's methods. +type ContextFilterer struct{} - lazyFilter sync.Once - // Use the experimental FilterProcessor interface - // (go.opentelemetry.io/otel/sdk/log/internal/x). - filter filter -} - -type filter interface { - Enabled(ctx context.Context, param logapi.EnabledParameters) bool -} - -func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record) error { - if ignoreLogs(ctx) { - return nil - } - return p.Processor.OnEmit(ctx, record) -} - -func (p *ContextFilterProcessor) Enabled(ctx context.Context, param logapi.EnabledParameters) bool { - p.lazyFilter.Do(func() { - if f, ok := p.Processor.(filter); ok { - p.filter = f - } - }) - return !ignoreLogs(ctx) && (p.filter == nil || p.filter.Enabled(ctx, param)) -} - -func ignoreLogs(ctx context.Context) bool { +func (p *ContextFilterer) Filter(ctx context.Context, param log.FilterParameters) bool { _, ok := ctx.Value(ignoreLogsKey).(bool) return ok } // Use a processor which redacts sensitive data from some attributes. -func ExampleProcessor_redact() { +func ExampleProcessor() { // Existing processor that emits telemetry. - var processor log.Processor = log.NewBatchProcessor(nil) + processor := log.NewBatchProcessor(nil) - // Add a processor so that it redacts values from token attributes. + // Add a processor so that the SDK redacts values from token attributes. redactProcessor := &RedactTokensProcessor{} // The created processor can then be registered with diff --git a/sdk/log/filterer.go b/sdk/log/filterer.go new file mode 100644 index 00000000000..9e11d1c46cc --- /dev/null +++ b/sdk/log/filterer.go @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package log // import "go.opentelemetry.io/otel/sdk/log" + +import ( + "context" + + "go.opentelemetry.io/otel/log" +) + +// Filterer handles filtering of log records. +// +// Any of the Filterer's methods may be called concurrently with itself +// or with other methods. It is the responsibility of the Filterer to manage +// this concurrency. +type Filterer interface { + // Filter returns whether the SDK will process for the given context + // and param. + // + // The passed param may be a partial record (e.g a record with only the + // Severity set). If a Filterer needs more information than is provided, it + // is said to be in an indeterminate state. An implementation should + // return true for an indeterminate state. + // + // The returned value will be true when the SDK should process for the + // provided context and param, and will be false if the SDK should not + // process. + Filter(ctx context.Context, param FilterParameters) bool +} + +// FilterParameters represent Filter parameters. +type FilterParameters struct { + severity log.Severity + severitySet bool + + noCmp [0]func() //nolint: unused // This is indeed used. +} + +// Severity returns the [Severity] level value, or [SeverityUndefined] if no value was set. +// The ok result indicates whether the value was set. +func (r *FilterParameters) Severity() (value log.Severity, ok bool) { + return r.severity, r.severitySet +} + +// setSeverity sets the [Severity] level. +func (r *FilterParameters) setSeverity(level log.Severity) { + r.severity = level + r.severitySet = true +} diff --git a/sdk/log/internal/x/README.md b/sdk/log/internal/x/README.md deleted file mode 100644 index 73f4db626af..00000000000 --- a/sdk/log/internal/x/README.md +++ /dev/null @@ -1,35 +0,0 @@ -# Experimental Features - -The Logs SDK contains features that have not yet stabilized. -These features are added to the OpenTelemetry Go Logs SDK prior to stabilization so that users can start experimenting with them and provide feedback. - -These feature may change in backwards incompatible ways as feedback is applied. -See the [Compatibility and Stability](#compatibility-and-stability) section for more information. - -## Features - -- [Filter Processors](#filter-processor) - -### Filter Processor - -Users of logging libraries often want to know if a log `Record` will be processed or dropped before they perform complex operations to construct the `Record`. -The [`Logger`] in the Logs Bridge API provides the `Enabled` method for just this use-case. -In order for the Logs Bridge SDK to effectively implement this API, it needs to be known if the registered [`Processor`]s are enabled for the `Record` within a context. -A [`Processor`] that knows, and can identify, what `Record` it will process or drop when it is passed to `OnEmit` can communicate this to the SDK `Logger` by implementing the `FilterProcessor`. - -By default, the SDK `Logger.Enabled` will return true when called. -Only if all the registered [`Processor`]s implement `FilterProcessor` and they all return `false` will `Logger.Enabled` return `false`. - -See the [`minsev`] [`Processor`] for an example use-case. -It is used to filter `Record`s out that a have a `Severity` below a threshold. - -[`Logger`]: https://pkg.go.dev/go.opentelemetry.io/otel/log#Logger -[`Processor`]: https://pkg.go.dev/go.opentelemetry.io/otel/sdk/log#Processor -[`minsev`]: https://pkg.go.dev/go.opentelemetry.io/contrib/processors/minsev - -## Compatibility and Stability - -Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../VERSIONING.md). -These features may be removed or modified in successive version releases, including patch versions. - -When an experimental feature is promoted to a stable feature, a migration path will be included in the changelog entry of the release. diff --git a/sdk/log/internal/x/x.go b/sdk/log/internal/x/x.go deleted file mode 100644 index ca78d109778..00000000000 --- a/sdk/log/internal/x/x.go +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -// Package x contains support for Logs SDK experimental features. -package x // import "go.opentelemetry.io/otel/sdk/log/internal/x" - -import ( - "context" - - "go.opentelemetry.io/otel/log" -) - -// FilterProcessor is a [go.opentelemetry.io/otel/sdk/log.Processor] that knows, -// and can identify, what [log.Record] it will process or drop when it is -// passed to OnEmit. -// -// This is useful for users of logging libraries that want to know if a [log.Record] -// will be processed or dropped before they perform complex operations to -// construct the [log.Record]. -// -// Processor implementations that choose to support this by satisfying this -// interface are expected to re-evaluate the [log.Record]s passed to OnEmit, it is -// not expected that the caller to OnEmit will use the functionality from this -// interface prior to calling OnEmit. -// -// This should only be implemented for Processors that can make reliable -// enough determination of this prior to processing a [log.Record] and where -// the result is dynamic. -// -// [Processor]: https://pkg.go.dev/go.opentelemetry.io/otel/sdk/log#Processor -type FilterProcessor interface { - // Enabled returns whether the Processor will process for the given context - // and param. - // - // The passed param is likely to be a partial record with only the - // bridge-relevant information being provided (e.g a record with only the - // Severity set). If a Logger needs more information than is provided, it - // is said to be in an indeterminate state (see below). - // - // The returned value will be true when the Processor will process for the - // provided context and param, and will be false if the Processor will not - // process. An implementation should default to returning true for an - // indeterminate state. - // - // Implementations should not modify the param. - Enabled(ctx context.Context, param log.EnabledParameters) bool -} diff --git a/sdk/log/logger.go b/sdk/log/logger.go index d6ca2ea41aa..f572dbf35bf 100644 --- a/sdk/log/logger.go +++ b/sdk/log/logger.go @@ -11,7 +11,6 @@ import ( "go.opentelemetry.io/otel/log" "go.opentelemetry.io/otel/log/embedded" "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/log/internal/x" "go.opentelemetry.io/otel/trace" ) @@ -35,6 +34,12 @@ func newLogger(p *LoggerProvider, scope instrumentation.Scope) *logger { } func (l *logger) Emit(ctx context.Context, r log.Record) { + param := log.EnabledParameters{} + param.SetSeverity(r.Severity()) + if !l.Enabled(ctx, param) { + return + } + newRecord := l.newRecord(ctx, r) for _, p := range l.provider.processors { if err := p.OnEmit(ctx, &newRecord); err != nil { @@ -43,30 +48,20 @@ func (l *logger) Emit(ctx context.Context, r log.Record) { } } -// Enabled returns true if at least one Processor held by the LoggerProvider -// that created the logger will process param for the provided context and param. -// -// If it is not possible to definitively determine the param will be -// processed, true will be returned by default. A value of false will only be -// returned if it can be positively verified that no Processor will process. +// Enabled returns false if at least one Filterer held by the LoggerProvider +// that created the logger will return false for the provided context and param. func (l *logger) Enabled(ctx context.Context, param log.EnabledParameters) bool { - fltrs := l.provider.filterProcessors() - // If there are more Processors than FilterProcessors we cannot be sure - // that all Processors will drop the record. Therefore, return true. - // - // If all Processors are FilterProcessors, check if any is enabled. - return len(l.provider.processors) > len(fltrs) || anyEnabled(ctx, param, fltrs) -} + newParam := FilterParameters{} + if v, ok := param.Severity(); ok { + newParam.setSeverity(v) + } -func anyEnabled(ctx context.Context, param log.EnabledParameters, fltrs []x.FilterProcessor) bool { - for _, f := range fltrs { - if f.Enabled(ctx, param) { - // At least one Processor will process the Record. - return true + for _, flt := range l.provider.filterers { + if !flt.Filter(ctx, newParam) { + return false } } - // No Processor will process the record - return false + return true } func (l *logger) newRecord(ctx context.Context, r log.Record) Record { diff --git a/sdk/log/logger_test.go b/sdk/log/logger_test.go index 0c2f793db97..9a6f705acd9 100644 --- a/sdk/log/logger_test.go +++ b/sdk/log/logger_test.go @@ -198,6 +198,17 @@ func TestLoggerEmit(t *testing.T) { }, }, }, + { + name: "Filtered", + logger: newLogger(NewLoggerProvider( + WithProcessor(p0), + WithProcessor(p1), + WithFilterer(newFltr(false)), + ), instrumentation.Scope{}), + ctx: context.Background(), + record: r, + expectedRecords: nil, + }, } for _, tc := range testCases { @@ -215,9 +226,9 @@ func TestLoggerEmit(t *testing.T) { } func TestLoggerEnabled(t *testing.T) { - p0 := newFltrProcessor("0", true) - p1 := newFltrProcessor("1", true) - p2WithDisabled := newFltrProcessor("2", false) + p0 := newFltr(true) + p1 := newFltr(true) + p2WithDisabled := newFltr(false) testCases := []struct { name string @@ -229,13 +240,13 @@ func TestLoggerEnabled(t *testing.T) { name: "NoProcessors", logger: newLogger(NewLoggerProvider(), instrumentation.Scope{}), ctx: context.Background(), - expected: false, + expected: true, }, { name: "WithProcessors", logger: newLogger(NewLoggerProvider( - WithProcessor(p0), - WithProcessor(p1), + WithFilterer(p0), + WithFilterer(p1), ), instrumentation.Scope{}), ctx: context.Background(), expected: true, @@ -243,7 +254,7 @@ func TestLoggerEnabled(t *testing.T) { { name: "WithDisabledProcessors", logger: newLogger(NewLoggerProvider( - WithProcessor(p2WithDisabled), + WithFilterer(p2WithDisabled), ), instrumentation.Scope{}), ctx: context.Background(), expected: false, @@ -251,17 +262,17 @@ func TestLoggerEnabled(t *testing.T) { { name: "ContainsDisabledProcessor", logger: newLogger(NewLoggerProvider( - WithProcessor(p2WithDisabled), - WithProcessor(p0), + WithFilterer(p2WithDisabled), + WithFilterer(p0), ), instrumentation.Scope{}), ctx: context.Background(), - expected: true, + expected: false, }, { name: "WithNilContext", logger: newLogger(NewLoggerProvider( - WithProcessor(p0), - WithProcessor(p1), + WithFilterer(p0), + WithFilterer(p1), ), instrumentation.Scope{}), ctx: nil, expected: true, @@ -277,8 +288,8 @@ func TestLoggerEnabled(t *testing.T) { func BenchmarkLoggerEnabled(b *testing.B) { provider := NewLoggerProvider( - WithProcessor(newFltrProcessor("0", false)), - WithProcessor(newFltrProcessor("1", true)), + WithFilterer(newFltr(false)), + WithFilterer(newFltr(true)), ) logger := provider.Logger("BenchmarkLoggerEnabled") ctx, param := context.Background(), log.EnabledParameters{} diff --git a/sdk/log/processor.go b/sdk/log/processor.go index fcab34c7a48..0759c9c9430 100644 --- a/sdk/log/processor.go +++ b/sdk/log/processor.go @@ -12,15 +12,9 @@ import ( // Any of the Processor's methods may be called concurrently with itself // or with other methods. It is the responsibility of the Processor to manage // this concurrency. -// -// See [go.opentelemetry.io/otel/sdk/log/internal/x] for information about how -// a Processor can be extended to support experimental features. type Processor interface { // OnEmit is called when a Record is emitted. // - // OnEmit will be called independent of Enabled. Implementations need to - // validate the arguments themselves before processing. - // // Implementation should not interrupt the record processing // if the context is canceled. // @@ -29,8 +23,11 @@ type Processor interface { // considered unrecoverable and will be reported to a configured error // Handler. // - // The SDK invokes the processors sequentially in the same order as - // they were registered using [WithProcessor]. + // The SDK invokes the filters and processors sequentially in the same order as + // they were registered using [WithFilterer], [WithProcessor]. + // Processors' OnEmit will be called in none of the registered [Filterer]s + // returned false. + // // Implementations may synchronously modify the record so that the changes // are visible in the next registered processor. // Notice that [Record] is not concurrent safe. Therefore, asynchronous diff --git a/sdk/log/provider.go b/sdk/log/provider.go index 14084ed99a8..b32d2a90d84 100644 --- a/sdk/log/provider.go +++ b/sdk/log/provider.go @@ -15,7 +15,6 @@ import ( "go.opentelemetry.io/otel/log/embedded" "go.opentelemetry.io/otel/log/noop" "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/log/internal/x" "go.opentelemetry.io/otel/sdk/resource" ) @@ -30,6 +29,7 @@ const ( type providerConfig struct { resource *resource.Resource processors []Processor + filterers []Filterer attrCntLim setting[int] attrValLenLim setting[int] } @@ -64,12 +64,10 @@ type LoggerProvider struct { resource *resource.Resource processors []Processor + filterers []Filterer attributeCountLimit int attributeValueLengthLimit int - fltrProcessorsOnce sync.Once - fltrProcessors []x.FilterProcessor - loggersMu sync.Mutex loggers map[instrumentation.Scope]*logger @@ -92,22 +90,12 @@ func NewLoggerProvider(opts ...LoggerProviderOption) *LoggerProvider { return &LoggerProvider{ resource: cfg.resource, processors: cfg.processors, + filterers: cfg.filterers, attributeCountLimit: cfg.attrCntLim.Value, attributeValueLengthLimit: cfg.attrValLenLim.Value, } } -func (p *LoggerProvider) filterProcessors() []x.FilterProcessor { - p.fltrProcessorsOnce.Do(func() { - for _, proc := range p.processors { - if f, ok := proc.(x.FilterProcessor); ok { - p.fltrProcessors = append(p.fltrProcessors, f) - } - } - }) - return p.fltrProcessors -} - // Logger returns a new [log.Logger] with the provided name and configuration. // // If p is shut down, a [noop.Logger] instance is returned. @@ -223,6 +211,19 @@ func WithProcessor(processor Processor) LoggerProviderOption { }) } +// WithFilterer associates Filterer with a LoggerProvider. +// +// By default, if this option is not used, the LoggerProvider will process all data. +// +// The SDK invokes the filterers sequentially in the same order as they were +// registered. The SDK will not process data if any of the filterers returns false. +func WithFilterer(filterer Filterer) LoggerProviderOption { + return loggerProviderOptionFunc(func(cfg providerConfig) providerConfig { + cfg.filterers = append(cfg.filterers, filterer) + return cfg + }) +} + // WithAttributeCountLimit sets the maximum allowed log record attribute count. // Any attribute added to a log record once this limit is reached will be dropped. // diff --git a/sdk/log/provider_test.go b/sdk/log/provider_test.go index b9374e9d934..69c722297dc 100644 --- a/sdk/log/provider_test.go +++ b/sdk/log/provider_test.go @@ -22,7 +22,6 @@ import ( "go.opentelemetry.io/otel/log" "go.opentelemetry.io/otel/log/noop" ottest "go.opentelemetry.io/otel/sdk/internal/internaltest" - "go.opentelemetry.io/otel/sdk/log/internal/x" "go.opentelemetry.io/otel/sdk/resource" ) @@ -61,22 +60,19 @@ func (p *processor) ForceFlush(context.Context) error { return p.Err } -type fltrProcessor struct { - *processor - +type fltr struct { enabled bool } -var _ x.FilterProcessor = (*fltrProcessor)(nil) +var _ Filterer = (*fltr)(nil) -func newFltrProcessor(name string, enabled bool) *fltrProcessor { - return &fltrProcessor{ - processor: newProcessor(name), - enabled: enabled, +func newFltr(enabled bool) *fltr { + return &fltr{ + enabled: enabled, } } -func (p *fltrProcessor) Enabled(context.Context, log.EnabledParameters) bool { +func (p *fltr) Filter(context.Context, FilterParameters) bool { return p.enabled }