Skip to content

Commit

Permalink
First iteration of streaming PromQL engine (#7693)
Browse files Browse the repository at this point in the history
* Use forked Prometheus module

* Use promql.QueryEngine rather than *promql.Engine, add config option to select engine

* Add parallel query path with streaming engine.

* Add IntelliJ debugging configuration for streaming querier.

* Add Grafana datasource for streaming querier

* Use correct query-scheduler port and enable debugging

* Set up load test.

* Fix p99 latency panel

* Commit settings used in previous test.

* Enable chunks streaming.

* Disable load generator.

* Create separate load testing query paths to ensure meta-monitoring queries don't interfere with load test results.

* Add initial k6-based load test script

* Add script to run load tests against both engines.

* Add todo

* Extract friendly name to hostname logic to separate script

* Rename k6 output file

* Check both engines produce the same result.

* Summarise results after each test run.

* Enable streaming chunks from store-gateways to queriers.

* Refactor test script, print max and average CPU and memory after test

* Reorder variables to match output order

* Don't include query name in k6 output file name.

* Add initial script to generate data.

* Rename script

* Add script to generate data generation config.

* Make it easier to test different queries.

* Rename PromQL engine CLI flag.

* Add Thanos' engine.

* Add Thanos engine to benchmarking setup.

* Set default test duration to 5 minutes.

* Rework data generation setup to work around limitation in prometheus-toolbox.

* Load test data faster.

* Record test config for first test

* Record test config for second test

* Record test config for third test

* Disable gRPC compression for ingester client

* Record test config for re-run of second test

* Record test config for re-run of first test

* Record test config for re-run of third test

* Log URL used for test.

* Record test config for fourth test

* Record test config for fifth test

* Record test config for 10k series test

* Record test config for 1k series test

* Record test config for 50k series test

* Fix configuration issue that does not affect outcome of testing.

This does not affect the outcome of testing because all tests use
queriers directly (ie. do not use query-frontends or query-schedulers)

* Record test config for seventh test

* Move production code to main Mimir code tree

* Copy test files into repo

* Add license and provenance comments

* Add a readme.

* Update package path in scripts and remove unnecessary testing script

* Add more notes to code

* Explain benefit of streaming engine.

* Mention benchmarking tools in readme.

* Remove references to Thanos' engine.

* Undo unnecessary go.mod changes

* Remove benchmarking tool for now

We can add something more robust in the future if we need it.

* Remove docker-compose changes

* Reset whitespace changes

* Remove readme

* Vendor in branch of mimir-prometheus with changes from prometheus/prometheus#13713

* Fix issues introduced with recent Prometheus changes and fix linting issues

* Detect and reject configuration not supported by streaming engine

* Remove TODOs tracked elsewhere

* Bring in latest changes to `QueryEngine` interface

* Remove more TODOs tracked elsewhere

* Change signature of `Next()` to return an error when the stream is exhausted

* Ignore warnings when comparing results from Prometheus and streaming engines.

* Correct naming

* Rename `Operator` to clarify that it produces instant vectors.

* Rename test file

* Reorganise test file

* Bring in latest benchmark changes from upstream.

* Run tests and benchmarks with isolation disabled, to mirror what Mimir itself does

* Fix script permissions

* Simplify test setup and don't bother testing 2000 series case to speed up tests.

* Use assertion helpers in benchmark

* Improve engine benchmark comparison script: fix issue with package name containing "streaming", clearly label results

* Remove unnecessary files

* Add note to engine benchmark comparison script

* Make test names clearer

* Ensure both engines produce the same result before benchmarking

* Add tests for unsupported expressions and improve error messages returned when unsupported expression used.

* Fix linting warnings

* Add notes for invalid cases that should be caught by the PromQL parser.

* Implement `Query.Statement()`

* Give variable a better name

* Remove Pool interface

* Remove unnecessary types

* Update comments

* Remove unused lookback delta for range vector selector

* Extract selector logic to a shared type

* Add more TODOs

* Use Selector for range vector selectors.

* Compute values used on every call to Next() once in instant and range vector selectors.

This gives a 0-7% latency improvement for queries with many series at
small absolute cost to queries with few series.

Memory consumption is slightly higher (tens of bytes), but not
concerning.

* Remove unnecessary script and update comment

* Validate query time range

* Enforce that range query expression produces a vector or scalar result.

* Add CLI flag to experimental features list in docs

* Add changelog entry

* Fix linting warnings

* Move common PromQL engine configuration to its own package

* Don't bother with 10 series benchmarks.

They're not very interesting, and the benchmarks take too long to run.

* Sort imports

* Add missing license header.

* Use bucketed pool based on zeropool.

This greatly improves latency and allocations, with latency
improvements between 0 and 15% on our benchmarks.

* Fix import sorting

* Set upper limits for sizes of pooled slices.

* Set size for SeriesBatch pool

* Add integration test.

* Move RingBuffer out of util package and pool slices used.

This improves latency of rate-related benchmarks by between 0 and 11%.

* Add tests for ring buffer type.

* Return RingBuffer point slices to pool when no longer required.

* Fix linting

* Remove TODOs about pooling group slices and maps.

Pooling these slices did not have a substantial performance impact.

* Pool groups in aggregation operator.

This reduces latency of aggregation operations by 0-2% in our
benchmarks.

* Remove TODOs about pooling label builders.

Pooling these had a negative performance impact.

* Remove more TODOs, clarify interface expectations

* Release memory sooner after query execution

* Fix typo in comment

* Remove unecessary context cancellation checks from operators.

Context cancellation is handled by the underlying queryable, at least
for the PromQL expressions we currently support.

* Fix linting warnings

* Address remaining TODOs in Selector

* Fix integration test flakiness

* Don't panic when encountering a native histogram

* Fix issue where bucketed pool could return a slice with capacity less than the requested size.

* Remove redundant checks

* Add support for @ in instant and range vector selectors.

This is used extensively by Prometheus' test scripts, so it's easier
to just implement support for this rather than try to workaround it
or disable it.

* Run Prometheus' test cases against streaming engine.

There are still some failing test cases, but these appear to be
genuine issues that need to be investigated.

* Fix failing staleness test: don't emit empty series for instant query

* Upgrade mimir-prometheus

* Add test case for stale markers in a range query.

* Add test cases for @ modifier with range queries

* Check for errors while reading test script

* Fix whitespace

* Remove duplicate timestamp computation in Selector and operators

* Update `rate` calculation to match behaviour in prometheus/prometheus#13725

* Add an overview of the engine.

* Fix linting issues and clarify docs.

* Add test case for scenario where input to aggregation returns no results or no points, and remove stale TODOs

* Implement `Query.String()`, and remove stale TODOs.

* Clarify BucketedPool contract, and add test case for case where min size is greater than 1.

* Expand test coverage to include scenarios not covered by upstream tests or those uniquely likely to cause issues in the streaming engine.

* Elaborate on comment

* Expand example in docs and fix typo

* Remove unnecessary sorting and use non-experimental slices package in comparison tests.

* Add missing comments.

* Clean up temporary files in benchmark comparison script, and improve formatting of output

* Add note explaining purpose of script.

* Address PR feedback

* Replace use of `DurationMilliseconds` with `d.Milliseconds()`

* Rename `Series()` to `SeriesMetadata()`

* Include names of acceptable values in CLI flag description

* Remove TODOs

* Add explanation for RangeVectorSelectorWithTransformation

* Move to top-level `streamingpromql` package

* Move common query engine config back to original location (revert e3933a2)
  • Loading branch information
charleskorn committed Apr 9, 2024
1 parent a67d3e9 commit c98802c
Show file tree
Hide file tree
Showing 48 changed files with 6,511 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -10,6 +10,7 @@
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456
* [ENHANCEMENT] Store-gateway: Add `stage="wait_max_concurrent"` to `cortex_bucket_store_series_request_stage_duration_seconds` which records how long the query had to wait for its turn for `-blocks-storage.bucket-store.max-concurrent`. #7609
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Expand Up @@ -1876,6 +1876,17 @@
"fieldType": "duration",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "promql_engine",
"required": false,
"desc": "PromQL engine to use, either 'standard' or 'streaming'",
"fieldValue": null,
"fieldDefaultValue": "standard",
"fieldFlag": "querier.promql-engine",
"fieldType": "string",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_concurrent",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Expand Up @@ -1725,6 +1725,8 @@ Usage of ./cmd/mimir/mimir:
Delay before initiating requests to further ingesters when request minimization is enabled and the initially selected set of ingesters have not all responded. Ignored if -querier.minimize-ingester-requests is not enabled. (default 3s)
-querier.prefer-streaming-chunks-from-store-gateways
[experimental] Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this.
-querier.promql-engine string
[experimental] PromQL engine to use, either 'standard' or 'streaming' (default "standard")
-querier.promql-experimental-functions-enabled
[experimental] Enable experimental PromQL functions. This config option should be set on query-frontend too when query sharding is enabled.
-querier.query-ingesters-within duration
Expand Down
1 change: 1 addition & 0 deletions docs/sources/mimir/configure/about-versioning.md
Expand Up @@ -126,6 +126,7 @@ The following features are currently experimental:
- Maximum response size for active series queries (`-querier.active-series-results-max-size-bytes`)
- Enable PromQL experimental functions (`-querier.promql-experimental-functions-enabled`)
- Allow streaming of `/active_series` responses to the frontend (`-querier.response-streaming-enabled`)
- Streaming PromQL engine (`-querier.promql-engine=streaming`)
- Query-frontend
- `-query-frontend.querier-forget-delay`
- Instant query splitting (`-query-frontend.split-instant-queries-by-interval`)
Expand Down
Expand Up @@ -1325,6 +1325,10 @@ store_gateway_client:
# CLI flag: -querier.minimize-ingester-requests-hedging-delay
[minimize_ingester_requests_hedging_delay: <duration> | default = 3s]

# (experimental) PromQL engine to use, either 'standard' or 'streaming'
# CLI flag: -querier.promql-engine
[promql_engine: <string> | default = "standard"]

# The number of workers running in each querier process. This setting limits the
# maximum number of concurrent queries in each querier.
# CLI flag: -querier.max-concurrent
Expand Down
46 changes: 46 additions & 0 deletions integration/querier_test.go
Expand Up @@ -514,6 +514,52 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
}
}

func TestStreamingPromQLEngine(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

flags := mergeFlags(BlocksStorageFlags(), BlocksStorageS3Flags(), map[string]string{
"-querier.promql-engine": "streaming",
})

consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags)
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))

// Wait until the distributor and querier have updated the ring.
// The distributor should have 512 tokens for the ingester ring and 1 for the distributor ring,
// and the querier should have 512 tokens for the ingester ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512+1), "cortex_ring_tokens_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Push a series to Mimir.
writeClient, err := e2emimir.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
require.NoError(t, err)

seriesName := "series_1"
seriesTimestamp := time.Now()
series, expectedVector, _ := generateFloatSeries(seriesName, seriesTimestamp, prompb.Label{Name: seriesName, Value: seriesName})

res, err := writeClient.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Query back the same series using the streaming PromQL engine.
c, err := e2emimir.NewClient("", querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

result, err := c.Query(seriesName, seriesTimestamp)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))
}

func testMetadataQueriesWithBlocksStorage(
t *testing.T,
c *e2emimir.Client,
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/handlers.go
Expand Up @@ -212,7 +212,7 @@ func NewQuerierHandler(
queryable storage.SampleAndChunkQueryable,
exemplarQueryable storage.ExemplarQueryable,
metadataSupplier querier.MetadataSupplier,
engine *promql.Engine,
engine promql.QueryEngine,
distributor Distributor,
reg prometheus.Registerer,
logger log.Logger,
Expand Down
2 changes: 1 addition & 1 deletion pkg/mimir/mimir.go
Expand Up @@ -709,7 +709,7 @@ type Mimir struct {
QuerierQueryable prom_storage.SampleAndChunkQueryable
ExemplarQueryable prom_storage.ExemplarQueryable
MetadataSupplier querier.MetadataSupplier
QuerierEngine *promql.Engine
QuerierEngine promql.QueryEngine
QueryFrontendTripperware querymiddleware.Tripperware
QueryFrontendCodec querymiddleware.Codec
Ruler *ruler.Ruler
Expand Down
4 changes: 4 additions & 0 deletions pkg/mimir/mimir_test.go
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/grafana/mimir/pkg/distributor"
"github.com/grafana/mimir/pkg/frontend/v1/frontendv1pb"
"github.com/grafana/mimir/pkg/ingester"
"github.com/grafana/mimir/pkg/querier"
"github.com/grafana/mimir/pkg/ruler"
"github.com/grafana/mimir/pkg/ruler/rulestore"
"github.com/grafana/mimir/pkg/scheduler/schedulerpb"
Expand Down Expand Up @@ -161,6 +162,9 @@ func TestMimir(t *testing.T) {
ReplicationFactor: 1,
InstanceInterfaceNames: []string{"en0", "eth0", "lo0", "lo"},
}},
Querier: querier.Config{
PromQLEngine: "standard",
},
}
require.NoError(t, cfg.Server.LogLevel.Set("info"))

Expand Down
11 changes: 9 additions & 2 deletions pkg/mimir/modules.go
Expand Up @@ -500,9 +500,12 @@ func (t *Mimir) initQueryable() (serv services.Service, err error) {
registerer := prometheus.WrapRegistererWith(querierEngine, t.Registerer)

// Create a querier queryable and PromQL engine
t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(
t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine, err = querier.New(
t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryable, registerer, util_log.Logger, t.ActivityTracker,
)
if err != nil {
return nil, fmt.Errorf("could not create queryable: %w", err)
}

// Use the distributor to return metric metadata by default
t.MetadataSupplier = t.Distributor
Expand Down Expand Up @@ -842,7 +845,11 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
// TODO: Consider wrapping logger to differentiate from querier module logger
rulerRegisterer := prometheus.WrapRegistererWith(rulerEngine, t.Registerer)

queryable, _, eng := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryable, rulerRegisterer, util_log.Logger, t.ActivityTracker)
queryable, _, eng, err := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryable, rulerRegisterer, util_log.Logger, t.ActivityTracker)
if err != nil {
return nil, fmt.Errorf("could not create queryable for ruler: %w", err)
}

queryable = querier.NewErrorTranslateQueryableWithFn(queryable, ruler.WrapQueryableErrors)

if t.Cfg.Ruler.TenantFederation.Enabled {
Expand Down
36 changes: 31 additions & 5 deletions pkg/querier/querier.go
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/storage/chunk"
"github.com/grafana/mimir/pkg/storage/lazyquery"
"github.com/grafana/mimir/pkg/streamingpromql"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/activitytracker"
"github.com/grafana/mimir/pkg/util/limiter"
Expand All @@ -55,12 +56,16 @@ type Config struct {
MinimizeIngesterRequests bool `yaml:"minimize_ingester_requests" category:"advanced"`
MinimiseIngesterRequestsHedgingDelay time.Duration `yaml:"minimize_ingester_requests_hedging_delay" category:"advanced"`

PromQLEngine string `yaml:"promql_engine" category:"experimental"`

// PromQL engine config.
EngineConfig engine.Config `yaml:",inline"`
}

const (
queryStoreAfterFlag = "querier.query-store-after"
queryStoreAfterFlag = "querier.query-store-after"
standardPromQLEngine = "standard"
streamingPromQLEngine = "streaming"
)

// RegisterFlags adds the flags required to config this to the given FlagSet.
Expand All @@ -82,10 +87,16 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Uint64Var(&cfg.StreamingChunksPerIngesterSeriesBufferSize, "querier.streaming-chunks-per-ingester-buffer-size", 256, "Number of series to buffer per ingester when streaming chunks from ingesters.")
f.Uint64Var(&cfg.StreamingChunksPerStoreGatewaySeriesBufferSize, "querier.streaming-chunks-per-store-gateway-buffer-size", 256, "Number of series to buffer per store-gateway when streaming chunks from store-gateways.")

f.StringVar(&cfg.PromQLEngine, "querier.promql-engine", standardPromQLEngine, fmt.Sprintf("PromQL engine to use, either '%v' or '%v'", standardPromQLEngine, streamingPromQLEngine))

cfg.EngineConfig.RegisterFlags(f)
}

func (cfg *Config) Validate() error {
if cfg.PromQLEngine != standardPromQLEngine && cfg.PromQLEngine != streamingPromQLEngine {
return fmt.Errorf("unknown PromQL engine '%s'", cfg.PromQLEngine)
}

return nil
}

Expand Down Expand Up @@ -123,7 +134,7 @@ func ShouldQueryBlockStore(queryStoreAfter time.Duration, now time.Time, queryMi
}

// New builds a queryable and promql engine.
func New(cfg Config, limits *validation.Overrides, distributor Distributor, storeQueryable storage.Queryable, reg prometheus.Registerer, logger log.Logger, tracker *activitytracker.ActivityTracker) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, *promql.Engine) {
func New(cfg Config, limits *validation.Overrides, distributor Distributor, storeQueryable storage.Queryable, reg prometheus.Registerer, logger log.Logger, tracker *activitytracker.ActivityTracker) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, promql.QueryEngine, error) {
queryMetrics := stats.NewQueryMetrics(reg)

distributorQueryable := newDistributorQueryable(distributor, limits, queryMetrics, logger)
Expand All @@ -139,13 +150,28 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor
return lazyquery.NewLazyQuerier(querier), nil
})

engineOpts, engineExperimentalFunctionsEnabled := engine.NewPromQLEngineOptions(cfg.EngineConfig, tracker, logger, reg)
engine := promql.NewEngine(engineOpts)
opts, engineExperimentalFunctionsEnabled := engine.NewPromQLEngineOptions(cfg.EngineConfig, tracker, logger, reg)

// Experimental functions can only be enabled globally, and not on a per-engine basis.
parser.EnableExperimentalFunctions = engineExperimentalFunctionsEnabled

return NewSampleAndChunkQueryable(lazyQueryable), exemplarQueryable, engine
var eng promql.QueryEngine

switch cfg.PromQLEngine {
case standardPromQLEngine:
eng = promql.NewEngine(opts)
case streamingPromQLEngine:
var err error

eng, err = streamingpromql.NewEngine(opts)
if err != nil {
return nil, nil, nil, err
}
default:
panic(fmt.Sprintf("invalid config not caught by validation: unknown PromQL engine '%s'", cfg.PromQLEngine))
}

return NewSampleAndChunkQueryable(lazyQueryable), exemplarQueryable, eng, nil
}

// NewSampleAndChunkQueryable creates a SampleAndChunkQueryable from a Queryable.
Expand Down

0 comments on commit c98802c

Please sign in to comment.