Skip to content

Commit

Permalink
feat prometheusIngester: add offset setting for query (#101)
Browse files Browse the repository at this point in the history
* feat prometheusIngester: add offset setting for query

Signed-off-by: Martin Chodur <[email protected]>

* chore: changelog update

Signed-off-by: Martin Chodur <[email protected]>

* fix: comment

* Update CHANGELOG.md

---------

Signed-off-by: Martin Chodur <[email protected]>
  • Loading branch information
FUSAKLA authored Mar 7, 2023
1 parent 2f64f54 commit 1c2b590
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 38 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

## [v6.13.0] 2023-03-07
### Added
- [#101](https://github.com/seznam/slo-exporter/pull/101) prometheusIngester add `offset` option to query, allowing to query bit older data to ensure consistency on prometheus compatible systems using remote write.

## [v6.12.1] 2022-10-14
### Added
- [#95](https://github.com/seznam/slo-exporter/pull/95) prometheusIngester sends user-agent header
Expand Down
12 changes: 7 additions & 5 deletions docs/modules/prometheus_ingester.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Prometheus ingester

| | |
|----------------|-------------------------|
| `moduleName` | `prometheusIngester` |
| Module type | `producer` |
| Output event | `raw` |
| | |
| ------------ | -------------------- |
| `moduleName` | `prometheusIngester` |
| Module type | `producer` |
| Output event | `raw` |

Prometheus ingester generates events based on results of provided Prometheus queries.
For its usage example see [the prometheus example](/examples/prometheus).
Expand Down Expand Up @@ -56,6 +56,8 @@ type: '<query_type>'
resultAsQuantity: false
# How often to execute the query.
interval: <go_duration>
# Query data with given offset. Useful to ensure consistency when querying data coming from remote write.
offset: <go_duration>
# Names of the labels that should be dropped from the result.
dropLabels:
- <label_name>
Expand Down
23 changes: 16 additions & 7 deletions examples/prometheus/slo_exporter.yaml
Original file line number Diff line number Diff line change
@@ -1,29 +1,38 @@
webServerListenAddress: "0.0.0.0:8080"

pipeline: ["prometheusIngester", "relabel", "eventKeyGenerator", "dynamicClassifier", "sloEventProducer", "prometheusExporter"]
pipeline:
[
"prometheusIngester",
"relabel",
"eventKeyGenerator",
"dynamicClassifier",
"sloEventProducer",
"prometheusExporter",
]

modules:
prometheusIngester:
apiUrl: "http://demo.robustperception.io:9090"
httpHeaders:
- name: X-Scope-OrgID
value: "myOrganization"
- name: Authorization
valueFromEnv:
name: "SLO_EXPORTER_AUTH_TOKEN"
valuePrefix: "Bearer "
# - name: Authorization
# valueFromEnv:
# name: "SLO_EXPORTER_AUTH_TOKEN"
# valuePrefix: "Bearer "
queryTimeout: 30s
queries:
# Generate events from counter for every HTTP request with status code for availability SLO.
- type: counter_increase
query: 'prometheus_http_requests_total'
query: "prometheus_http_requests_total"
interval: 30s
offset: 5m
additionalLabels:
event_type: http_request_result

# Generate events from histogram for every HTTP request for latency SLO.
- type: histogram_increase
query: 'prometheus_http_request_duration_seconds_bucket'
query: "prometheus_http_request_duration_seconds_bucket"
interval: 30s
additionalLabels:
event_type: http_request_latency
Expand Down
22 changes: 9 additions & 13 deletions pkg/prometheus_ingester/prometheus_ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func validateQueryType(queryType queryType) error {
type queryOptions struct {
Query string
Interval time.Duration
Offset time.Duration
DropLabels []string
AdditionalLabels stringmap.StringMap
Type queryType
Expand Down Expand Up @@ -135,7 +136,7 @@ type PrometheusIngesterConfig struct {
}

type PrometheusIngester struct {
queryExecutors *[]queryExecutor
queryExecutors []*queryExecutor
queryTimeout time.Duration
client api.Client
api v1.API
Expand Down Expand Up @@ -210,10 +211,7 @@ func newFalse() *bool {
}

func New(initConfig PrometheusIngesterConfig, logger logrus.FieldLogger) (*PrometheusIngester, error) {
var (
queryExecutors = []queryExecutor{}
ingester = PrometheusIngester{}
)
var ingester = PrometheusIngester{}

headers, err := initConfig.HttpHeaders.toMap()
if err != nil {
Expand All @@ -232,7 +230,7 @@ func New(initConfig PrometheusIngesterConfig, logger logrus.FieldLogger) (*Prome
}

ingester = PrometheusIngester{
queryExecutors: &queryExecutors,
queryExecutors: []*queryExecutor{},
queryTimeout: initConfig.QueryTimeout,
client: client,
api: v1.NewAPI(client),
Expand All @@ -256,9 +254,9 @@ func New(initConfig PrometheusIngesterConfig, logger logrus.FieldLogger) (*Prome
q.ResultAsQuantity = newFalse()
}
}
queryExecutors = append(
queryExecutors,
queryExecutor{
ingester.queryExecutors = append(
ingester.queryExecutors,
&queryExecutor{
Query: q,
queryTimeout: ingester.queryTimeout,
eventsChan: ingester.outputChannel,
Expand All @@ -285,11 +283,9 @@ func (i *PrometheusIngester) Run() {

var wg sync.WaitGroup
// Start all queries
for _, queryExecutor := range *i.queryExecutors {
for _, queryExecutor := range i.queryExecutors {
wg.Add(1)
// declare local scope variable to prevent shadowing by the next iterations
qe := queryExecutor
go qe.run(queriesContext, &wg)
go queryExecutor.run(queriesContext, &wg)
}

<-i.shutdownChannel
Expand Down
52 changes: 49 additions & 3 deletions pkg/prometheus_ingester/prometheus_ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
"testing"
Expand All @@ -21,8 +22,9 @@ import (
)

type MockedRoundTripper struct {
t *testing.T
result model.Value
t *testing.T
result model.Value
expectedTimestamp string
}

func (m *MockedRoundTripper) resultFabricator() string {
Expand All @@ -46,6 +48,12 @@ func (m *MockedRoundTripper) RoundTrip(req *http.Request) (*http.Response, error
m.t.Error(err)
return nil, err
}
vals, err := url.ParseQuery(buf.String())
assert.NoError(m.t, err)

if m.expectedTimestamp != "" {
assert.Equal(m.t, m.expectedTimestamp, vals.Get("time"))
}

response := m.resultFabricator()

Expand Down Expand Up @@ -209,7 +217,7 @@ func Test_Ingests_Various_ModelTypes(t *testing.T) {
}

// Prepare the string interpretation of the actual results
assert.ElementsMatchf(t, tc.eventsProduced, actualEventResult, "Produced events doesnt match expected events", "actual", HttpRequestsToString(actualEventResult))
assert.ElementsMatchf(t, tc.eventsProduced, actualEventResult, "Produced events doesn't match expected events", "actual", HttpRequestsToString(actualEventResult))
}
}

Expand Down Expand Up @@ -852,3 +860,41 @@ func Test_httpHeader_getValue(t *testing.T) {
})
}
}

func Test_queryOffset(t *testing.T) {
type testCase struct {
name string
queryOpts queryOptions
expectError bool
}

cases := []testCase{
{name: "no offset expected", queryOpts: queryOptions{Query: "up", Interval: time.Second, Type: "simple"}},
{name: "offset expected", queryOpts: queryOptions{Query: "up", Interval: time.Second, Type: "simple", Offset: time.Minute}},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
ts := model.Now()
roundTripper := &MockedRoundTripper{
t: t,
expectedTimestamp: ts.Add(-tc.queryOpts.Offset).String(),
result: &model.Scalar{Value: 1, Timestamp: 0},
}

ingester, err := New(PrometheusIngesterConfig{
RoundTripper: roundTripper,
QueryTimeout: 400 * time.Millisecond,
Queries: []queryOptions{
tc.queryOpts,
},
}, logrus.New())
if err != nil {
t.Error(err)
return
}
_, _, err = ingester.queryExecutors[0].execute(ts.Time())
assert.NoError(t, err)
})
}
}
21 changes: 11 additions & 10 deletions pkg/prometheus_ingester/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package prometheus_ingester
import (
"context"
"fmt"
"github.com/hashicorp/go-multierror"
"go.uber.org/atomic"
"math"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/hashicorp/go-multierror"
"go.uber.org/atomic"

v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/seznam/slo-exporter/pkg/event"
Expand Down Expand Up @@ -73,8 +74,9 @@ func (q *queryExecutor) withRangeSelector(ts time.Time) string {
return q.Query.Query + fmt.Sprintf("[%ds]", int64(rangeSelector.Seconds()))
}

// execute query at provided timestamp ts
func (q *queryExecutor) execute(ts time.Time) (model.Value, error) {
// execute query at provided timestamp ts taking the configured query offser into account. Returns the actual timestamp with offset applied.
func (q *queryExecutor) execute(ts time.Time) (model.Value, time.Time, error) {
ts = ts.Add(-q.Query.Offset)
q.queryInProgress.Store(true)
defer q.queryInProgress.Store(false)
timeoutCtx, cancel := context.WithTimeout(context.Background(), q.queryTimeout)
Expand All @@ -93,7 +95,7 @@ func (q *queryExecutor) execute(ts time.Time) (model.Value, error) {
case simpleQueryType:
query = q.Query.Query
default:
return nil, fmt.Errorf("unknown query type: '%s'", q.Query.Type)
return nil, ts, fmt.Errorf("unknown query type: '%s'", q.Query.Type)
}
start := time.Now()
result, warnings, err = q.api.Query(timeoutCtx, query, ts)
Expand All @@ -104,7 +106,7 @@ func (q *queryExecutor) execute(ts time.Time) (model.Value, error) {
q.logger.WithField("query", query).Warnf("warnings in query execution: %+v", warnings)
}

return result, err
return result, ts, err
}

func (q *queryExecutor) run(ctx context.Context, wg *sync.WaitGroup) {
Expand All @@ -123,14 +125,13 @@ func (q *queryExecutor) run(ctx context.Context, wg *sync.WaitGroup) {
q.logger.Warn("skipping query execution, previous query still in progress...")
continue
}
ts := time.Now()
result, err := q.execute(ts)
result, queryTs, err := q.execute(time.Now())
if err != nil {
prometheusQueryFail.WithLabelValues(string(q.Query.Type)).Inc()
q.logger.WithField("query", q.Query.Query).Errorf("failed querying Prometheus: '%+v'", err)
continue
}
err = q.ProcessResult(result, ts)
err = q.ProcessResult(result, queryTs)
if err != nil {
q.logger.WithField("query", q.Query.Query).Errorf("failed processing the query result: '%+v'", err)
}
Expand Down Expand Up @@ -246,7 +247,7 @@ func (q *queryExecutor) processHistogramIncrease(matrix model.Matrix, ts time.Ti
for _, metric := range metricBucketIncreases {
metricBuckets := make([]float64, len(metric))
i := 0
for bucket, _ := range metric {
for bucket := range metric {
metricBuckets[i] = bucket
i++
}
Expand Down

0 comments on commit 1c2b590

Please sign in to comment.