-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-52689][SQL] Send DML Metrics to V2Write #51377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
021dfb4
to
78235a6
Compare
3fc94aa
to
de9d47d
Compare
...core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/MergeMetrics.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/MergeMetrics.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java
Outdated
Show resolved
Hide resolved
/** | ||
* Whether this batch write requests merge execution metrics. | ||
*/ | ||
default boolean requestMergeMetrics() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a performance hit for requesting metrics? If not, I'd drop this method and always call commitMerge
. The fewer public methods we have the better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The perf hit is a execution graph walk. Anyway, i removed the check, and walk in all the cases.
@@ -275,7 +277,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat | |||
} | |||
|
|||
case AppendData(r: DataSourceV2Relation, query, _, _, Some(write), _) => | |||
AppendDataExec(planLater(query), refreshCache(r), write) :: Nil | |||
AppendDataExec(planLater(query), refreshCache(r), write, getCommand(r)) :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this for cases when MERGE is rewritten as INSERT? I thought we would skip populating metrics for appends, but let me think about it. What does Delta do when MERGE becomes INSERT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I dont handle that yet.
...e/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
Outdated
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
Outdated
Show resolved
Hide resolved
de9d47d
to
781f32d
Compare
8f97c64
to
4482ffc
Compare
import java.util.OptionalLong; | ||
|
||
/** | ||
* Execution metrics for a Merge Operation for a Connector that supports RowLevelOperations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC different MERGE strategy have different metrics, do we need to separate them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean MERGE actions? Actually should always be same metrics (some will be 0 if not activated). Anyway , removed this class in favor of more generic Map.
* this batch write. | ||
* @param metrics merge execution metrics | ||
*/ | ||
default void commitMerge(WriterCommitMessage[] messages, MergeMetrics metrics) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make it more general? I think INSERT/UPDATE/DELETE can have metrics as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just Map[String, Long]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we did an analysis with @aokolnychyi , INSERT/UPDATE/DELETE currently dont need metric from Spark because DSV2 connector can calculate themselves. But I agree, it shouldnt preclude getting more metric later. Changed to Map
/** | ||
* Similar to {@link #commit(WriterCommitMessage[])}, but providing operation metrics to | ||
* this batch write. | ||
* @param metrics operation metrics. The keys will be prefixed by operation type, eg `merge` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to list all the metric keys and their meanings in the API doc, as it's part of the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added the javadocs
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java
Outdated
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
Outdated
Show resolved
Hide resolved
dce9b8e
to
667829e
Compare
667829e
to
52d3e71
Compare
thanks, merging to master! |
What changes were proposed in this pull request?
Send some DML execution metrics (ie, MergeRowsExec) to the write of these data source, so they can persist them for debugging purpose.
Why are the changes needed?
DML row-level-operations, ie MERGE, UPDATE, DELETE are a critical functionality of V2 data sources (like Iceberg). It will be nice, if we can send some DML metrics to the commit of these data source, so they can persist them for debugging purpose on commit metadata.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test
Was this patch authored or co-authored using generative AI tooling?
No