Skip to content

[SPARK-52689][SQL] Send DML Metrics to V2Write #51377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from

Conversation

szehon-ho
Copy link
Member

@szehon-ho szehon-ho commented Jul 4, 2025

What changes were proposed in this pull request?

Send some DML execution metrics (ie, MergeRowsExec) to the write of these data source, so they can persist them for debugging purpose.

Why are the changes needed?

DML row-level-operations, ie MERGE, UPDATE, DELETE are a critical functionality of V2 data sources (like Iceberg). It will be nice, if we can send some DML metrics to the commit of these data source, so they can persist them for debugging purpose on commit metadata.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit test

Was this patch authored or co-authored using generative AI tooling?

No

@szehon-ho szehon-ho changed the title [SPARK-52689][SQL] Send DML Metrics to V2Write [WIP] [SPARK-52689][SQL] Send DML Metrics to V2Write Jul 4, 2025
@github-actions github-actions bot added the SQL label Jul 4, 2025
@szehon-ho szehon-ho force-pushed the metric_to_write branch 3 times, most recently from 3fc94aa to de9d47d Compare July 12, 2025 00:19
@szehon-ho szehon-ho changed the title [WIP] [SPARK-52689][SQL] Send DML Metrics to V2Write [SPARK-52689][SQL] Send DML Metrics to V2Write Jul 12, 2025
/**
* Whether this batch write requests merge execution metrics.
*/
default boolean requestMergeMetrics() {
Copy link
Contributor

@aokolnychyi aokolnychyi Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a performance hit for requesting metrics? If not, I'd drop this method and always call commitMerge. The fewer public methods we have the better.

Copy link
Member Author

@szehon-ho szehon-ho Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The perf hit is a execution graph walk. Anyway, i removed the check, and walk in all the cases.

@@ -275,7 +277,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}

case AppendData(r: DataSourceV2Relation, query, _, _, Some(write), _) =>
AppendDataExec(planLater(query), refreshCache(r), write) :: Nil
AppendDataExec(planLater(query), refreshCache(r), write, getCommand(r)) :: Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for cases when MERGE is rewritten as INSERT? I thought we would skip populating metrics for appends, but let me think about it. What does Delta do when MERGE becomes INSERT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I dont handle that yet.

import java.util.OptionalLong;

/**
* Execution metrics for a Merge Operation for a Connector that supports RowLevelOperations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC different MERGE strategy have different metrics, do we need to separate them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean MERGE actions? Actually should always be same metrics (some will be 0 if not activated). Anyway , removed this class in favor of more generic Map.

* this batch write.
* @param metrics merge execution metrics
*/
default void commitMerge(WriterCommitMessage[] messages, MergeMetrics metrics) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make it more general? I think INSERT/UPDATE/DELETE can have metrics as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just Map[String, Long]?

Copy link
Member Author

@szehon-ho szehon-ho Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we did an analysis with @aokolnychyi , INSERT/UPDATE/DELETE currently dont need metric from Spark because DSV2 connector can calculate themselves. But I agree, it shouldnt preclude getting more metric later. Changed to Map

/**
* Similar to {@link #commit(WriterCommitMessage[])}, but providing operation metrics to
* this batch write.
* @param metrics operation metrics. The keys will be prefixed by operation type, eg `merge`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to list all the metric keys and their meanings in the API doc, as it's part of the API.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the javadocs

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 570b325 Jul 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants