-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Un-cancellable Query when hitting many large files. #14036
Comments
How do you cancel the query? You mean terminating the next()'s on the stream, or dropping the stream. If it is the former, the issue might be related with the RepartitionExec, not GroupedHashAggregateStream (AFAIK it won't loop until its child stream (in this case a FileStream) is exhausted, errors, or returns Pending., it returns once it can emit a result) |
The queries are running in the context of a gRPC request, which when cancelled drops the stream. |
I tried a bit today to re-create this but was not able to What I tried was to create a highly compressed parquet file (48MB that has 1B rows with all repeated strings) and then made 500 symlinks: table.tar.gz This lets me run queries like the following in SELECT DISTINCT "A","B","C","D","E" FROM 'table'; and SELECT DISTINCT "A","B","C","D","E" FROM 'table' WHERE "A" <> 'FooBar'; I tried several variants but cancelling always seemed to work just fine 🤔 I also tried a few settings, but also cancelling always worked. set datafusion.execution.parquet.pushdown_filters=true;
set datafusion.execution.target_partitions=4; |
We have also checkpoint tests which will drop the stream after some amount of time, and after the failure, FileStream offsets do not increment more. I think the same problem should be observed without the RepartitionExec (in single partition) Can you also provide a brief reproducer for the cancellation logic? Cancel requests are setting some tokens and those are checked during while next()? |
@berkaysynnada I'll take another stab at getting a reproducer for this that doesn't require customer data |
Describe the bug
TLDR; Reading many large Parquet files can prevent a query from being cancelled.
We have a customer that is running a query similar to the following (edited for privacy):
This produces a fairly straightforward plan explain.txt.
Simplified Plan:
This will read ~85 parquet files at ~100MB each. What we've seen is that even when a query is cancelled, the resources with that query (CPU/RAM) are still being utilized for almost the same amount of time as a typical execution of the query.
To Reproduce
I have struggled to come up with a good reproducer for this that doesn't rely on the customer data, I would welcome some help in this matter if anyone has a dataset that matches this pattern that is shareable.
Expected behavior
Cancelling a query should (within some reasonable amount of time) truly cancel the query, freeing up system resources for other query executions.
Additional context
This appears to be a problem with the interaction between the
GroupedHashAggregateStream
andFileStream
approaches to yielding.The
GroupedHashAggregateStream
will infinitely loop until its child stream (in this case aFileStream
) is exhausted, errors, or returnsPending
.The
FileStream
loops while attempting to readRecordBatch
es from the underlying file, while also doing some book-keeping to stage the nextFile
for reading. This Stream will returnReady
when aRecordBatch
is processed, or anError
encountered. However, it does not returnPending
at any point. When a File is finished reading, the next File up is swapped in and reading continues from the new File.The combination of these two behaviours means that if there are many large files being read by the
FileStream
, theGroupedHashAggregateStream
doesn't effectively yield back to Tokio.My PR to fix this is to have the
FileStream
returnPending
when swapping over to a new file. This seems like a natural yield point, and resolves the issue with cancellation.The text was updated successfully, but these errors were encountered: