Skip to content

Commit

Permalink
ESQL: Add row counts to profile results (#120134)
Browse files Browse the repository at this point in the history
Closes #119969

- Rename "pages_in/out" to "pages_received/emitted", to standardize the name along most operators
  - **There are still "pages_processed" operators**, maybe it would make sense to also rename those?
- Add "pages_received/emitted" to TopN operator, as it was missing that
- Added "rows_received/emitted" to most operators
- Added a test to ensure all operators with status provide those metrics
  • Loading branch information
ivancea authored Jan 15, 2025
1 parent 1f182e7 commit b7ab8f8
Show file tree
Hide file tree
Showing 37 changed files with 846 additions and 191 deletions.
3 changes: 2 additions & 1 deletion benchmarks/build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import org.elasticsearch.gradle.internal.test.TestUtil
import org.elasticsearch.gradle.OS

/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
Expand Down Expand Up @@ -77,7 +78,7 @@ tasks.register("copyPainless", Copy) {
}

tasks.named("run").configure {
executable = "${buildParams.runtimeJavaHome.get()}/bin/java"
executable = "${buildParams.runtimeJavaHome.get()}/bin/java" + (OS.current() == OS.WINDOWS ? '.exe' : '')
args << "-Dplugins.dir=${buildDir}/plugins" << "-Dtests.index=${buildDir}/index"
dependsOn "copyExpression", "copyPainless", configurations.nativeLib
systemProperty 'es.nativelibs.path', TestUtil.getTestLibraryPath(file("../libs/native/libraries/build/platform/").toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ static TransportVersion def(int id) {
public static final TransportVersion REPLACE_FAILURE_STORE_OPTIONS_WITH_SELECTOR_SYNTAX = def(8_821_00_0);
public static final TransportVersion ELASTIC_INFERENCE_SERVICE_UNIFIED_CHAT_COMPLETIONS_INTEGRATION = def(8_822_00_0);
public static final TransportVersion KQL_QUERY_TECH_PREVIEW = def(8_823_00_0);
public static final TransportVersion ESQL_PROFILE_ROWS_PROCESSED = def(8_824_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ protected Page getCheckedOutput() throws IOException {
Page page = null;
// emit only one page
if (remainingDocs <= 0 && pagesEmitted == 0) {
pagesEmitted++;
LongBlock count = null;
BooleanBlock seen = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ public void collect(int doc) throws IOException {
Page page = null;
// emit only one page
if (remainingDocs <= 0 && pagesEmitted == 0) {
pagesEmitted++;
Block result = null;
BooleanBlock seen = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public abstract class LuceneOperator extends SourceOperator {
long processingNanos;
int pagesEmitted;
boolean doneCollecting;
/**
* Count of rows this operator has emitted.
*/
private long rowsEmitted;

protected LuceneOperator(BlockFactory blockFactory, int maxPageSize, LuceneSliceQueue sliceQueue) {
this.blockFactory = blockFactory;
Expand Down Expand Up @@ -115,7 +119,12 @@ public final int limit() {
@Override
public final Page getOutput() {
try {
return getCheckedOutput();
Page page = getCheckedOutput();
if (page != null) {
pagesEmitted++;
rowsEmitted += page.getPositionCount();
}
return page;
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
}
Expand Down Expand Up @@ -252,6 +261,7 @@ public static class Status implements Operator.Status {
private final int sliceMin;
private final int sliceMax;
private final int current;
private final long rowsEmitted;

private Status(LuceneOperator operator) {
processedSlices = operator.processedSlices;
Expand All @@ -276,6 +286,7 @@ private Status(LuceneOperator operator) {
current = scorer.position;
}
pagesEmitted = operator.pagesEmitted;
rowsEmitted = operator.rowsEmitted;
}

Status(
Expand All @@ -288,7 +299,8 @@ private Status(LuceneOperator operator) {
int pagesEmitted,
int sliceMin,
int sliceMax,
int current
int current,
long rowsEmitted
) {
this.processedSlices = processedSlices;
this.processedQueries = processedQueries;
Expand All @@ -300,6 +312,7 @@ private Status(LuceneOperator operator) {
this.sliceMin = sliceMin;
this.sliceMax = sliceMax;
this.current = current;
this.rowsEmitted = rowsEmitted;
}

Status(StreamInput in) throws IOException {
Expand All @@ -318,6 +331,11 @@ private Status(LuceneOperator operator) {
sliceMin = in.readVInt();
sliceMax = in.readVInt();
current = in.readVInt();
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) {
rowsEmitted = in.readVLong();
} else {
rowsEmitted = 0;
}
}

@Override
Expand All @@ -336,6 +354,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(sliceMin);
out.writeVInt(sliceMax);
out.writeVInt(current);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) {
out.writeVLong(rowsEmitted);
}
}

@Override
Expand Down Expand Up @@ -383,6 +404,10 @@ public int current() {
return current;
}

public long rowsEmitted() {
return rowsEmitted;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand All @@ -399,6 +424,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("slice_min", sliceMin);
builder.field("slice_max", sliceMax);
builder.field("current", current);
builder.field("rows_emitted", rowsEmitted);
return builder.endObject();
}

Expand All @@ -416,12 +442,13 @@ public boolean equals(Object o) {
&& pagesEmitted == status.pagesEmitted
&& sliceMin == status.sliceMin
&& sliceMax == status.sliceMax
&& current == status.current;
&& current == status.current
&& rowsEmitted == status.rowsEmitted;
}

@Override
public int hashCode() {
return Objects.hash(processedSlices, sliceIndex, totalSlices, pagesEmitted, sliceMin, sliceMax, current);
return Objects.hash(processedSlices, sliceIndex, totalSlices, pagesEmitted, sliceMin, sliceMax, current, rowsEmitted);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ public Page getCheckedOutput() throws IOException {
}
Page page = null;
if (currentPagePos >= minPageSize || remainingDocs <= 0 || scorer.isDone()) {
pagesEmitted++;
IntBlock shard = null;
IntBlock leaf = null;
IntVector docs = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ private Page emit(boolean startEmitting) {
Releasables.closeExpectNoException(shard, segments, docs, docBlock, scores);
}
}
pagesEmitted++;
return page;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,8 @@ public String toString() {
}

@Override
protected Status status(long processNanos, int pagesProcessed) {
return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed);
protected Status status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) {
return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed, rowsReceived, rowsEmitted);
}

public static class Status extends AbstractPageMappingOperator.Status {
Expand All @@ -559,8 +559,8 @@ public static class Status extends AbstractPageMappingOperator.Status {

private final Map<String, Integer> readersBuilt;

Status(Map<String, Integer> readersBuilt, long processNanos, int pagesProcessed) {
super(processNanos, pagesProcessed);
Status(Map<String, Integer> readersBuilt, long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) {
super(processNanos, pagesProcessed, rowsReceived, rowsEmitted);
this.readersBuilt = readersBuilt;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ public abstract class AbstractPageMappingOperator implements Operator {
* Count of pages that have been processed by this operator.
*/
private int pagesProcessed;
/**
* Count of rows this operator has received.
*/
private long rowsReceived;
/**
* Count of rows this operator has emitted.
*/
private long rowsEmitted;

protected abstract Page process(Page page);

Expand All @@ -52,6 +60,7 @@ public final boolean needsInput() {
public final void addInput(Page page) {
assert prev == null : "has pending input page";
prev = page;
rowsReceived += page.getPositionCount();
}

@Override
Expand All @@ -75,18 +84,21 @@ public final Page getOutput() {
long start = System.nanoTime();
Page p = process(prev);
pagesProcessed++;
if (p != null) {
rowsEmitted += p.getPositionCount();
}
processNanos += System.nanoTime() - start;
prev = null;
return p;
}

@Override
public final Status status() {
return status(processNanos, pagesProcessed);
return status(processNanos, pagesProcessed, rowsReceived, rowsEmitted);
}

protected Status status(long processNanos, int pagesProcessed) {
return new Status(processNanos, pagesProcessed);
protected Status status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) {
return new Status(processNanos, pagesProcessed, rowsReceived, rowsEmitted);
}

@Override
Expand All @@ -105,15 +117,26 @@ public static class Status implements Operator.Status {

private final long processNanos;
private final int pagesProcessed;
private final long rowsReceived;
private final long rowsEmitted;

public Status(long processNanos, int pagesProcessed) {
public Status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) {
this.processNanos = processNanos;
this.pagesProcessed = pagesProcessed;
this.rowsReceived = rowsReceived;
this.rowsEmitted = rowsEmitted;
}

protected Status(StreamInput in) throws IOException {
processNanos = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readVLong() : 0;
pagesProcessed = in.readVInt();
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) {
rowsReceived = in.readVLong();
rowsEmitted = in.readVLong();
} else {
rowsReceived = 0;
rowsEmitted = 0;
}
}

@Override
Expand All @@ -122,6 +145,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(processNanos);
}
out.writeVInt(pagesProcessed);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) {
out.writeVLong(rowsReceived);
out.writeVLong(rowsEmitted);
}
}

@Override
Expand All @@ -133,6 +160,14 @@ public int pagesProcessed() {
return pagesProcessed;
}

public long rowsReceived() {
return rowsReceived;
}

public long rowsEmitted() {
return rowsEmitted;
}

public long processNanos() {
return processNanos;
}
Expand All @@ -153,20 +188,23 @@ protected final XContentBuilder innerToXContent(XContentBuilder builder) throws
if (builder.humanReadable()) {
builder.field("process_time", TimeValue.timeValueNanos(processNanos));
}
return builder.field("pages_processed", pagesProcessed);
return builder.field("pages_processed", pagesProcessed).field("rows_received", rowsReceived).field("rows_emitted", rowsEmitted);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Status status = (Status) o;
return processNanos == status.processNanos && pagesProcessed == status.pagesProcessed;
return processNanos == status.processNanos
&& pagesProcessed == status.pagesProcessed
&& rowsReceived == status.rowsReceived
&& rowsEmitted == status.rowsEmitted;
}

@Override
public int hashCode() {
return Objects.hash(processNanos, pagesProcessed);
return Objects.hash(processNanos, pagesProcessed, rowsReceived, rowsEmitted);
}

@Override
Expand Down
Loading

0 comments on commit b7ab8f8

Please sign in to comment.