1、Jacek Laskowski/jaceklaskowskiOptimizing Batch and Streaming AggregationsData+AI Summit 2023About the SpeakerJacek Laskowski is a Freelance IT ConsultantSpecializing in Apache Spark,Delta Lake,Databricks,Apache Kafka(incl.Kafka Streams and ksqlDB)Best known by The Internals Of online booksContact me
2、 at jacekjapila.plFollow me at JacekLaskowskiConnect on LinkedInTable of Contents1.The Intro to The Internals of Structured Queries2.The Internals of Aggregate Queries3.Scala UDAFs and Aggregators4.Streaming Aggregates5.Streaming Aggregates Performance Tuning Gig6.Things to Watch Out For(Recap)The I
3、ntro toThe Internals ofStructured QueriesStructured Queries Apache Spark is a general-purpose distributed compute platform Spark SQL is a module of Apache Spark to describe batch queries over structured and semi-structured datasets(of any size)Spark Structured Streaming is a module of Apache Spark f
4、or streaming queries over unbounded data Queries are described using High-Level Query OperatorsDataFrame APISQL In most cases,optimizing streaming queries is to optimize corresponding batch queriesNo need to focus on streaming features(less to worry about)Caveat:streaming issues may really be relate
5、d to how streaming queries workHigh-Level Query Language-DataFrame APIHigh-Level Query Language-SQLQueryExecutionQueryExecution is the execution pipeline(workflow)of a structured queryMade up of execution phasesLogical and Physical OperatorsLogical Operators are building blocks of logical query plan
6、s in Spark SQLAggregateJoinLocalRelationLogicalRDDMergeIntoTableProjectSortPhysical Operators are executable nodes of physical query plans in Spark SQLAdaptiveSparkPlanExecBroadcastHashJoinExecHashAggregateExecObjectHashAggregateExecProjectExecSortAggregateExecThe Internals of Aggregate QueriesAggre
7、gate QueriesCalculate single value for a set of rowsCan be broken down to the following sectionsGrouping(using GROUP BY clause in SQL or Dataset.groupBy operator)that arranges rows into groups(possibly guarded by HAVING SQL clause)Aggregation(using Aggregate Functions)to apply to a set of rows and c
8、alculate single values per groupsStructured queries with Aggregate logical operatorGROUP BY(incl.GROUPING SETS,WITH CUBE,WITH ROLLUP)Dataset.groupByDataset.groupByKeyBasic AggregationMulti-Dimensional AggregationWindow Aggregation(not covered)Aggregate Query ExampleLogical Query Plan-Aggregate Opera
9、torPhysical Query Plan-BaseAggregateExec Operators3 Possible Aggregate Physical Operators(from the fastest to the slowest)HashAggregateExecObjectHashAggregateExecSortAggregateExecAggregation Execution Planning StrategyHashAggregateExec Physical OperatorHash-Based AggregationUses UnsafeRow keys and v
10、aluesUses TungstenAggregationIteratorUses UnsafeFixedWidthAggregationMap until it runs out of memory(and starts spilling to disk)and eventually switches to Sort-Based Aggregationnumber of sort fallback tasks Performance MetricSupports Whole-Stage Java Code GenerationObjectHashAggregateExec Physical
11、OperatorObject AggregationUses Scala custom object keys and valuesUses ObjectAggregationIterator Uses ObjectAggregationMap until it crosses spark.sql.objectHashAggregate.sortBased.fallbackThreshold and switches to Sort-Based Aggregationnumber of sort fallback tasks Performance MetricVery similar to
12、HashAggregateExecNo Support for Whole-Stage Java Code GenerationSort-Based AggregationUses SortBasedAggregationIterator The least performant out of 3 aggregate operators availableThe last resort when the others could not be usedSupports Whole-Stage Java Code GenerationSortAggregateExec Physical Oper
13、atornumber of sort fallback tasks MetricHash-vs Sort-Based AggregationsHashAggregateExecThe underlying UnsafeFixedWidthAggregationMap ran out of memory to hold key-value pairsObjectHashAggregateExecspark.sql.objectHashAggregate.sortBased.fallbackThresholdThe number of entries in an in-memory hash ma
14、p(to store aggregation buffers per grouping keys)before ObjectHashAggregateExec falls back to sort-based aggregationDefault:128(key-value pairs)Scala UDAFsandAggregatorsScala UDAFs and AggregatorsScalaUDAF is an ImperativeAggregate expression for User-Defined Aggregate FunctionsAggregator should now
15、 be registered as a UDF via the functions.udaf(agg)methodOnly 2 Possible Aggregate Physical OperatorsHashAggregateExecObjectHashAggregateExecSortAggregateExecUDFs(incl.UDAFs)are black boxes to the Catalyst OptimizerWhatever happens in UDFs,stays in UDFsDont Use Them Unless Youve Got No Choice Use Co
16、lumn Expressions(not Scala code)insteadStreaming AggregatesStreaming AggregatesThe very same high-level operators yet with Streaming Data Sources(Connectors)Dataset.groupByGROUP BYExtra streaming-related issuesOutput ModesLate Data,Watermarking and Event TimeLimitations related to aggregate queriesT
17、rigger.ProcessingTimeEnough to think about(“scare you”)while optimizing streaming queriesAmong Streaming Connectors is Kafka ConnectorKafka ConnectorSupport for Apache KafkaLoading records from topics(Kafka Consumer)Writing out records to topics(Kafka Producer)The number of partitions of a streaming
18、 DataFrame is the number of partitions of input topic(s)Streaming AggregatesPerformance Tuning GigI had a clientScala DevsStreaming AggregatesUDAFsKafka ConnectorObjectHashAggregateExecThe number of partitions is the number of topic partitionsspark.sql.objectHashAggregate.sortBased.fallbackThreshold
19、number of sort fallback tasks MetricMy RecommendationRewrite UDAFs to Columns(Catalyst Expressions)RepartitionShorten Trigger.ProcessingTimeThings toWatch Out For(Recap)Recap1.There are 3 possible Aggregate Physical Operatorsa.HashAggregateExecb.ObjectHashAggregateExecc.SortAggregateExec2.Use Column
20、 Expressions(no Scala/Python/Java code)3.Whole-Stage Java Code Generationa.HashAggregateExecb.SortAggregateExec4.Dont use UDAFsa.Unless youve got no choice5.Use Mutable DataTypes for grouping keysa.Basic Primitives(e.g.Boolean,Byte,Dates,Numbers)b.Promote HashAggregateExec6.Observe number of sort fallback tasks Metric7.Consider Repartitioning8.Learn aggregate-related Configuration PropertiesThats all folks!Thank you!Any questions?Jacek Laskowski/jaceklaskowski/jacekjapila.pl