《深入了解 Apache Spark 3.5 的新功能.pdf》由會員分享,可在線閱讀,更多相關《深入了解 Apache Spark 3.5 的新功能.pdf(70頁珍藏版)》請在三個皮匠報告上搜索。
1、2024 Databricks Inc.All rights reserved1Explore the New Functionality of Apache Spark 3.5Data+AI Summit 20241Daniel Tenedoriodtenedor2024 Databricks Inc.All rights reserved2Transforming and Querying Data for Everyone!2024 Databricks Inc.All rights reserved3100+Data Sources1+BillionAnnual Downloads10
2、0K+Stack Overflow Questions41K+Commits3700+GitHub Contributors2024 Databricks Inc.All rights reserved4Still#1 in developer activity for over ten years!3,700 contributors,41,000 commits2024 Databricks Inc.All rights reserved52024 Databricks Inc.All rights reservedAbout UsDaniel Tenedorio(GitHub:dtene
3、dor)Wenchen Fan(GitHub:cloud-fan)Xiao Li(GitHub:gatorsmile)6The Spark team at 2024 Databricks Inc.All rights reserved7AgendaSpark Connect Deploy and update Spark clusters independently from their clients SQL FeaturesHyperLogLog aggregates based on Apache Datasketches,array manipulation functions,IDE
4、NTIFIER clause,and morePySpark FeaturesArrow-optimized Python UDFs,Python UDTFs,new testing API,improved error messages,and moreSpark StreamingSupport multiple stateful operators,checkpointing for RocksDB state store,dropDuplicatesWithinWatermark2024 Databricks Inc.All rights reserved8 8Spark Connec
5、t2024 Databricks Inc.All rights reservedHow to embed Spark in applications?Up until Spark Connect:Hard to support todays developer experience requirementsApplicationsIDEs/NotebooksProgramming Languages/SDKsNo JVM InterOpClose to REPLSQL onlySparks Monolith DriverApplication LogicAnalyzerOptimizerSch
6、edulerDistributed Execution EngineModern data application2024 Databricks Inc.All rights reservedSpark Connect General AvailabilityThin client,with full power of Apache SparkSpark Connect Client APISparks DriverApplication GatewayAnalyzerOptimizerSchedulerDistributed Execution EngineApplicationsIDEs/
7、NotebooksProgramming Languages/SDKsModern data application2024 Databricks Inc.All rights reservedConnect to Spark from Any Applicationpip install pyspark=3.5.0pip install pyspark=3.5.0in your favorite IDE!Interactively develop&debug from your IDE Check out Databricks Connect,use&contribute the Go cl
8、ientNew Connectors and SDKs in any language!11Build interactive Data ApplicationsGet started with our GitHub example!Databricks ConnectScala32024 Databricks Inc.All rights reservedNew Spark Connect Scala Client Features!SPARK-42554 The Scala client now supports more features in Spark 3.5!Part of thi
9、s work was a major refactoring to split the sql submodule to into client(sql-api)and server-compatible(sql)modules to reduce the set of dependencies needed on the client for classpath isolation(SPARK-44273).New!2024 Databricks Inc.All rights reservedNew Spark Connect Scala Client Features!SPARK-4255
10、4 It is now possible to use MLlib directly with Spark Connect to do distributed training and inference(design doc).This supports logistic regression classifiers,basic feature transformers,basic model evaluators,and more!This also integrates with Sparks vectorized Python UDF framework.New!2024 Databr
11、icks Inc.All rights reservedNew Spark Connect Scala Client Features!SPARK-425542024 Databricks Inc.All rights reservedNew Spark Connect Scala Client Features!SPARK-42497 Parity of the Pandas API on Spark using Spark Connect has improved.The Spark Connect client for structured streaming workloads bot
12、h in Python and Scala now also supports all available features.New!2024 Databricks Inc.All rights reservedNew Spark Connect Scala Client Features!SPARK-43351 The community also started a client for Spark Connect in Golang in a separate repository here: Databricks Inc.All rights reservedPandas API Su
13、pport for Spark ConnectSPARK-42497 Spark Connect now includes the capability to execute Pandas functions and logic as needed in your PySpark programs.New!2024 Databricks Inc.All rights reserved18AgendaSpark Connect Deploy and update Spark clusters independently from their clients SQL FeaturesHyperLo
14、gLog aggregates based on Apache Datasketches,array manipulation functions,IDENTIFIER clause,and morePySpark FeaturesArrow-optimized Python UDFs,Python UDTFs,new testing API,improved error messages,and moreSpark StreamingSupport multiple stateful operators,checkpointing for RocksDB state store,dropDu
15、plicatesWithinWatermark2024 Databricks Inc.All rights reserved1919SQL Features2024 Databricks Inc.All rights reservedThe IDENTIFIER ClauseSPARK-41231 The new IDENTIFIER clause provides flexibility to avoid risk of SQL injection attacks.Using this feature to specify table/column/function names is pow
16、erful when paired with the query parameter feature added in the previous Spark release.2024 Databricks Inc.All rights reservedThe IDENTIFIER ClauseSPARK-41231 The new IDENTIFIER clause provides flexibility to avoid risk of SQL injection attacks.spark.sql(“CREATE TABLE IDENTIFIER(:tbl)(col INT)”,args
17、=“tbl”:“my_schema.my_tbl”2024 Databricks Inc.All rights reservedThe IDENTIFIER ClauseSPARK-41231 The new IDENTIFIER clause provides flexibility to avoid risk of SQL injection attacks.spark.sql(“SELECT IDENTIFIER(:col)FROM IDENTIFIER(:tbl)”,args=“col”:“col”,“tbl”:“my_schema.my_tbl”2024 Databricks Inc
18、.All rights reservedNamed Argument Syntax for Function CallsSPARK-44059 Spark SQL now lets users call functions with parameter names preceding their values.SELECT mask(AbCD123-$#,lowerChar=q,upperChar=Q,digitChar=d);2024 Databricks Inc.All rights reservedHyperLogLog Approx.Aggregate FunctionsSPARK-1
19、6484 New SQL functions count unique values within groups with precision and efficiency,including storing the result of intermediate computations to sketch buffers which can be persistent into storage and loaded back later.2024 Databricks Inc.All rights reservedHyperLogLog Approx.Aggregate FunctionsS
20、PARK-16484 These implementations use the Apache Datasketches library for consistency with the open-source community and easy integration.2024 Databricks Inc.All rights reservedHyperLogLog Approx.Aggregate FunctionsSPARK-16484 These implementations use the Apache Datasketches library for consistency
21、with the open-source community and easy integration.SELECT hll_sketch_estimate(hll_sketch_agg(col)FROM VALUES(”abc”),(”def”),(”abc”),(”ghi”),(”abc”)tab(col);42024 Databricks Inc.All rights reservedNew Functions for Manipulating ArraysSPARK-41231SELECT array_append(array(1,2,3),”HELLO”);1,2,3,“HELLO”
22、SELECT array_prepend(array(1,2,3),99);99,1,2,3 SELECT array_insert(array(1,2,3),0,4);4,1,2,3 SELECT array_compact(array(1,NULL,3);1,3 2024 Databricks Inc.All rights reservedSQL Functions Scala,Python,R APIsSPARK-43907 Before Spark 3.5,there were many SQL functions that were not available in the Scal
23、a,Python,or R DataFrame APIs.2024 Databricks Inc.All rights reservedSQL Functions Scala,Python,R APIsSPARK-43907 Before Spark 3.5,there were many SQL functions that were not available in the Scala,Python,or R DataFrame APIs.This presented difficulties invoking the functions within DataFrames as user
24、s found it necessary to type the function name in string literals without any help from auto-completion.2024 Databricks Inc.All rights reservedSQL Functions Scala,Python,R APIsSPARK-43907 Spark 3.5 removes this problem by making 150+SQL functions available in the DataFrame APIs.2024 Databricks Inc.A
25、ll rights reservedSQL Functions Scala,Python,R APIsSPARK-43907spark.conf.set(“spark.sql.session.timeZone”,“America/Los_Angeles”)df=spark.createDataFrame(”2015-07-22 10:00:00”,),“t”)Spark 3.5 removes this problem by making 150+SQL functions available in the DataFrame APIs.2024 Databricks Inc.All righ
26、ts reservedSQL Functions Scala,Python,R APIsdf.select(unix_micros(to_timestamp(df.t).alias(“n”).collect()Row(n=1437584400000000)SPARK-43907 Spark 3.5 removes this problem by making 150+SQL functions available in the DataFrame APIs.New!2024 Databricks Inc.All rights reservedSQL Functions Scala,Python
27、,R APIsSPARK-439072024 Databricks Inc.All rights reserved34AgendaSpark Connect Deploy and update Spark clusters independently from their clients SQL FeaturesHyperLogLog aggregates based on Apache Datasketches,array manipulation functions,IDENTIFIER clause,and morePySpark FeaturesArrow-optimized Pyth
28、on UDFs,Python UDTFs,new testing API,improved error messages,and moreSpark StreamingSupport multiple stateful operators,checkpointing for RocksDB state store,dropDuplicatesWithinWatermark2024 Databricks Inc.All rights reserved3535PySpark2024 Databricks Inc.All rights reservedArrow-Optimized Python U
29、DFsSPARK-40307spark.conf.set(“spark.sql.execution.pythonUDF.arrow.enabled”,True)udf(“integer”)def my_len_udf(s:str)-int:return len(s)Python UDFs run 2X faster on modern CPU architectures,thanks to vectorized I/O!2024 Databricks Inc.All rights reservedArrow-Optimized Python UDFsSPARK-40307udf(“intege
30、r”,useArrow=True)def my_len_udf(s:str)-int:return len(s)You can also specify useArrow=True at registration time instead of using the config.2024 Databricks Inc.All rights reservedArrow-Optimized Python UDFsSPARK-403072024 Databricks Inc.All rights reservedPython User Defined Table FunctionsThis is a
31、 new kind of function that returns an entire table as output instead of a single scalar result value Once registered,they can appear in the FROM clause of a SQL querySpark 3.52024 Databricks Inc.All rights reservedPython User Defined Table FunctionsThis is a new kind of function that returns an enti
32、re table as output instead of a single scalar result value Once registered,they can appear in the FROM clause of a SQL query Or use the DataFrame API to call themSpark 3.52024 Databricks Inc.All rights reservedPython User Defined Table Functionsfrom pyspark.sql.functions import udtfudtf(returnType=n
33、um:int,squared:int)class SquareNumbers:def eval(self,start:int,end:int):for num in range(start,end+1):yield(num,num*num)Spark 3.52024 Databricks Inc.All rights reservedPython User Defined Table FunctionsSELECT*FROM SquareNumbers(start=1,end=3);+-+-+|num|squared|+-+-+|1|1|2|4|3|9|+-+-+Spark 3.52024 D
34、atabricks Inc.All rights reservedPython User Defined Table FunctionsSquareNumbers(lit(1),lit(3).show()+-+-+|num|squared|+-+-+|1|1|2|4|3|9|+-+-+Spark 3.52024 Databricks Inc.All rights reservedPython User Defined Table Functionsclass ReadFromConfigFile:staticmethoddef analyze(filename:AnalyzeArgument)
35、:with open(os.path.join(SparkFiles.getRootDirectory(),filename.value),”r”)as f:#Compute the UDTF output schema#based on the contents of the file.return AnalyzeResult(from_file(f.read().Polymorphic AnalysisCompute the output schema for each call depending on arguments,using analyzeSpark 4.02024 Datab
36、ricks Inc.All rights reservedPython User Defined Table FunctionsPolymorphic AnalysisCompute the output schema for each call depending on arguments,using analyzeReadFromConfigFile(lit(“config.txt”).show()+-+-+|start_date|other_field|+-+-+|2024-04-02|1|+-+-+Spark 4.02024 Databricks Inc.All rights rese
37、rvedPython User Defined Table Functionsclass CountAndMax:def _init_(self):self._count=0self._max=0def eval(self,row:Row):self._count+=1self._max=max(self._max,row0)def terminate(self):yield self._count,self._maxInput Table PartitioningSplit input rows among instances:eval runs once per row,then term
38、inate runs lastSpark 4.02024 Databricks Inc.All rights reservedPython User Defined Table FunctionsWITH t AS(SELECT id FROM RANGE(0,100)SELECT*FROM CountAndMax(TABLE(t)PARTITION BY id/10 ORDER BY id);+-+-+|count|max|+-+-+|10|0|10|1|.Input Table PartitioningSplit input rows among instances:eval runs o
39、nce per row,then terminate runs lastSpark 4.02024 Databricks Inc.All rights reservedPython User Defined Table FunctionsVariable Keyword ArgumentsThe analyze and eval methods may accept*args or*kwargsclass VarArgs:staticmethoddef analyze(*kwargs:AnalyzeArgument):return AnalyzeResult(StructType(Struct
40、Field(key,arg.dataType)for key,arg in sorted(kwargs.items()def eval(self,*kwargs):yield tuple(value for _,valuein sorted(kwargs.items()Spark 4.02024 Databricks Inc.All rights reservedPython User Defined Table FunctionsVariable Keyword ArgumentsThe analyze and eval methods may accept*args or*kwargsSE
41、LECT*FROM VarArgs(a=10,b=x);+-+-+|a|b|+-+-+|10|“x”|+-+-+Spark 4.02024 Databricks Inc.All rights reservedPython User Defined Table FunctionsCustom InitializationCreate a subclass of AnalyzeResult and consume it in each subsequent _init_class SplitWords:dataclassclass MyAnalyzeResult(AnalyzeResult):nu
42、mWords:intnumArticles:intdef _init_(self,r:MyAnalyzeResult):.Spark 4.02024 Databricks Inc.All rights reservedPython User Defined Table FunctionsCustom InitializationCreate a subclass of AnalyzeResult and consume it in each subsequent _init_staticmethoddef analyze(text:str):words=text.split(”)return
43、MyAnalyzeResult(schema=StructType().add(”word”,StringType().add(”total”,IntegerType(),withSinglePartition=true,numWords=len(words)numArticles=len(word for word in wordsif word in(”a”,”an”,”the”)Spark 4.02024 Databricks Inc.All rights reserved Previously,the set of exceptions thrown from the Python S
44、park driver did not leverage the error classes introduced in Apache Spark3.3.All of the errors from DataFrame and SQL have been migrated,and contain the appropriate error classes and codes.New!Enhanced error messages in PySparkSPARK-429862024 Databricks Inc.All rights reservedEnhanced error messages
45、 in PySparkSPARK-429862024 Databricks Inc.All rights reservedDataFrame Equality Testing APISPARK-44042 New DataFrame equality test utility functions including detailed,color-coded test error messages,which clearly indicate differences between DataFrame schemas and data within DataFrames.2024 Databri
46、cks Inc.All rights reservedDataFrame Equality Testing APISPARK-44042 This lets developers easily add equality tests that produce actionable results for their applications to enhance productivity.2024 Databricks Inc.All rights reservedDataFrame Equality Testing APISPARK-44042 pyspark.testing.assertDa
47、taFrameEqual pyspark.testing.assertPandasOnSparkEqual pyspark.testing.assertSchemaEqual2024 Databricks Inc.All rights reservedDataFrame Equality Testing APISPARK-44042pyspark.errors.exceptions.base.PySparkAssertError:DIFFERENT_ROWS Results do not match:(33.33333%)*actual*Row(name=Amy,languages=C+,Ru
48、st)!Row(name=Jane,languages=Scala,SQL,Java)Row(name=John,languages=Python,Java)*expected*Row(name=Amy,languages=C+,Rust)!Row(name=Jane,languages=Scala,Java)Row(name=John,languages=Python,Java)2024 Databricks Inc.All rights reservedPySpark DeepSpeed DistributorSPARK-44264 This makes it easier for Pys
49、park users to run distributed training and inference with DeepSpeed on Spark clusters.+2024 Databricks Inc.All rights reserved59AgendaSpark Connect Deploy and update Spark clusters independently from their clients SQL FeaturesHyperLogLog aggregates based on Apache Datasketches,array manipulation fun
50、ctions,IDENTIFIER clause,and morePySpark FeaturesArrow-optimized Python UDFs,Python UDTFs,new testing API,improved error messages,and moreSpark StreamingSupport multiple stateful operators,checkpointing for RocksDB state store,dropDuplicatesWithinWatermark2024 Databricks Inc.All rights reserved6060S
51、treaming2024 Databricks Inc.All rights reservedFull Support for Multiple Stateful OperatorsSPARK-42376 Time interval joins between streams are now supported,possibly followed by other stateful operators.For example,workloads can now join streams of ads and clicks,then aggregate over time windows.202
52、4 Databricks Inc.All rights reservedFull Support for Multiple Stateful OperatorsSPARK-423762024 Databricks Inc.All rights reservedFull Support for Multiple Stateful OperatorsSPARK-423762024 Databricks Inc.All rights reservedChangelog Checkpointing for RocksDB State Store ProvidersSPARK-43421 This ne
53、w checkpoint mechanism for the RocksDB state store provider persists the changelog(updates)of the state.This reduces the commit latency significantly which also reduces end to end latency significantly.To enable,set this config to true:spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.en
54、abled2024 Databricks Inc.All rights reservedChangelog Checkpointing for RocksDB State Store ProvidersSPARK-434212024 Databricks Inc.All rights reservedRocksDB State Store Provider Memory Management EnhancementsSPARK-43311 New fine-grained memory management lets users cap the total memory usage acros
55、s RocksDB instances in the same executor process.Users can now reason about and configure the memory usage per executor process.2024 Databricks Inc.All rights reservedIntroducing dropDuplicatesWithinWatermarkSPARK-42931 This new API new API deduplicates events without requiring the timestamp for eve
56、nt time to be the same,as long as the timestamp for these events are close enough to fit within the watermark delay.With this new feature,users can avoid errors like“Timestamp for event time could differ even for events to be considered as duplicates.”2024 Databricks Inc.All rights reservedIntroducing dropDuplicatesWithinWatermarkSPARK-42931Thank you for your contributions!Daniel Tenedorio(daniel.tenedorio databricks)