Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Un-cancellable Query when hitting many large files. #14036

Open
jeffreyssmith2nd opened this issue Jan 7, 2025 · 5 comments · May be fixed by #14028
Open

Un-cancellable Query when hitting many large files. #14036

jeffreyssmith2nd opened this issue Jan 7, 2025 · 5 comments · May be fixed by #14028
Labels
bug Something isn't working

Comments

@jeffreyssmith2nd
Copy link
Contributor

jeffreyssmith2nd commented Jan 7, 2025

Describe the bug

TLDR; Reading many large Parquet files can prevent a query from being cancelled.

We have a customer that is running a query similar to the following (edited for privacy):

SELECT DISTINCT "A","B","C","D","E" FROM table where "time" > now() - INTERVAL '5 days';

This produces a fairly straightforward plan explain.txt.

Simplified Plan:

AggregateExec mode=FinalPartitioned
  CoalesceBatchesExec target_batch_size=8192
    RepartitionExec input_partitions=4
      AggregateExec mode=Partial
        ParquetExec file_groups={4 groups}

This will read ~85 parquet files at ~100MB each. What we've seen is that even when a query is cancelled, the resources with that query (CPU/RAM) are still being utilized for almost the same amount of time as a typical execution of the query.

To Reproduce

I have struggled to come up with a good reproducer for this that doesn't rely on the customer data, I would welcome some help in this matter if anyone has a dataset that matches this pattern that is shareable.

Expected behavior

Cancelling a query should (within some reasonable amount of time) truly cancel the query, freeing up system resources for other query executions.

Additional context

This appears to be a problem with the interaction between the GroupedHashAggregateStream and FileStream approaches to yielding.

The GroupedHashAggregateStream will infinitely loop until its child stream (in this case a FileStream) is exhausted, errors, or returns Pending.

The FileStream loops while attempting to read RecordBatches from the underlying file, while also doing some book-keeping to stage the next File for reading. This Stream will return Ready when a RecordBatch is processed, or an Error encountered. However, it does not return Pending at any point. When a File is finished reading, the next File up is swapped in and reading continues from the new File.

The combination of these two behaviours means that if there are many large files being read by the FileStream, the GroupedHashAggregateStream doesn't effectively yield back to Tokio.

My PR to fix this is to have the FileStream return Pending when swapping over to a new file. This seems like a natural yield point, and resolves the issue with cancellation.

@jeffreyssmith2nd jeffreyssmith2nd added the bug Something isn't working label Jan 7, 2025
@berkaysynnada
Copy link
Contributor

How do you cancel the query? You mean terminating the next()'s on the stream, or dropping the stream. If it is the former, the issue might be related with the RepartitionExec, not GroupedHashAggregateStream (AFAIK it won't loop until its child stream (in this case a FileStream) is exhausted, errors, or returns Pending., it returns once it can emit a result)

@jeffreyssmith2nd
Copy link
Contributor Author

How do you cancel the query? You mean terminating the next()'s on the stream, or dropping the stream.

The queries are running in the context of a gRPC request, which when cancelled drops the stream.

@alamb
Copy link
Contributor

alamb commented Jan 8, 2025

I tried a bit today to re-create this but was not able to

What I tried was to create a highly compressed parquet file (48MB that has 1B rows with all repeated strings) and then made 500 symlinks: table.tar.gz

This lets me run queries like the following in datafusion-cli and tested by hitting CTRL-C

SELECT DISTINCT "A","B","C","D","E" FROM 'table';

and

SELECT DISTINCT "A","B","C","D","E" FROM 'table' WHERE "A" <> 'FooBar';

I tried several variants but cancelling always seemed to work just fine 🤔

I also tried a few settings, but also cancelling always worked.

set datafusion.execution.parquet.pushdown_filters=true;
set datafusion.execution.target_partitions=4;

@berkaysynnada
Copy link
Contributor

We have also checkpoint tests which will drop the stream after some amount of time, and after the failure, FileStream offsets do not increment more.

I think the same problem should be observed without the RepartitionExec (in single partition)

Can you also provide a brief reproducer for the cancellation logic? Cancel requests are setting some tokens and those are checked during while next()?

@jeffreyssmith2nd
Copy link
Contributor Author

@berkaysynnada I'll take another stab at getting a reproducer for this that doesn't require customer data

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
3 participants