Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47690][SQL] Enable hash aggregation support for all collations (StringType) #46640

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

uros-db
Copy link
Contributor

@uros-db uros-db commented May 17, 2024

What changes were proposed in this pull request?

Enable collation support for hash aggregation on StringType, for aggregates where aggregate expressions don't include a non-binary collation expression. Note: support for complex types will be added separately.

  • Logical plan is rewritten in analysis to replace non-binary strings with CollationKey
  • CollationKey is a unary expression that transforms StringType to BinaryType
  • Collation keys allow correct & efficient string comparison under specific collation rules

Why are the changes needed?

Improve GROUP BY performance for collated strings.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • E2e SQL tests for RewriteGroupByCollation in CollationSuite
  • Various queries with GROUP BY in existing TPCDS collation test suite

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label May 17, 2024
@uros-db uros-db changed the title [WIP][SQL] Enable hash aggregation support for all collations (StringType) [SPARK-47690][SQL] Enable hash aggregation support for all collations (StringType) May 30, 2024
// This rewrite rule is used to enabled hash aggregation on collated string columns. However,
// hash aggregation is currently only supported for grouping aggregations - this means that no
// string type can be found in the aggregate expressions, so we avoid rewrite in this case.
!aggregate.aggregateExpressions.exists(e => e.dataType.isInstanceOf[StringType])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that you should check if hash aggregation is supported at all, regardless of StringType.
If we are going to end up doing merge agg there is no need to insert collation_key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I just try to use supportsHashAggregate here, I might find that the aggregate does not support hash aggregation before the rewrite, but will support it after the rewrite (as a result of this, the rewrite rule will never actually execute)

However, we perform this check before doing the plan rewrite, so the point of this check is to verify that the current Aggregate is only a grouping aggregate with respect to StringType (i.e. StringType is not found in aggregateExpressions).

Any ideas on how to make this better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can call supportsHashAggregate by just passing agg keys and empty seq for group by?

@@ -169,7 +169,11 @@ object ExprUtils extends QueryErrorsBase {
a.failAnalysis(
errorClass = "MISSING_GROUP_BY",
messageParameters = Map.empty)
case e: Attribute if !a.groupingExpressions.exists(_.semanticEquals(e)) =>
case e: Attribute if !a.groupingExpressions.exists {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need this? afaik you are doing collationkey insertion only there is no string in aggregate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct, this is not needed - we won't need to "match" aggregate and grouping expressions with collationKey, since we only support collationKey injection for "pure" grouping aggregation on collated string columns

val newPlan1 = RewriteGroupByCollation(logicalPlan1)
val newNewPlan1 = RewriteGroupByCollation(newPlan1)
assert(newPlan1 == newNewPlan1)
// get the query execution result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is not super useful :)
we should be adding more detailed comments for many things, but checkAnswer is pretty self descriptive :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixing

assert(collectFirst(queryPlan2) { case _: HashAggregateExec => () }.nonEmpty)
assert(collectFirst(queryPlan2) { case _: SortAggregateExec => () }.isEmpty)
// check that CollationKey is injected into the Aggregate logical plan
assert(collectFirst(queryPlan1) { case s: HashAggregateExec =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be cleaner if you would explicitly check whether head is instanceof CollationKey, instead of relying on return type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, changing

import org.apache.spark.sql.catalyst.util.CollationFactory
import org.apache.spark.sql.types.StringType

object RewriteGroupByCollation extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add class header explaining why we are doing this and what is being done.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants