1、2024 Databricks Inc.All rights reservedHOW TO USE DELTA HOW TO USE DELTA SHARING FOR SHARING FOR STREAMING DATASTREAMING DATAMatt Slack-Senior Specialist Solution ArchitectSurya Sai Turaga-Senior Solution Architect13th June 2024This information is provided to outline Databricks general product direc
2、tion and is for informational purposes only.Customers who purchase Databricks services should make their purchase decisions relying solely upon services,features,and functions that are currently available.Unreleased features or functionality described in forward-looking statements are subject to cha
3、nge at Databricks discretion and may not be delivered as planned or at allProduct safe harbor statement2024 Databricks Inc.All rights reserved Company-data provider for financial market data(similar to Bloomberg/State Street)Use Case-create a marketplace for financial data products,e.g.:pricing and
4、market datacompany datarisk intelligenceeconomic datanewscommodities data Success Criteriaconsistent sub 10 second data delivery to customersintegrates into customers existing systems requires minimal additional infrastructure3Customer Story2024 Databricks Inc.All rights reservedAchievable latency c
5、urrently around 10sSupports high volumes of dataEnables many other use casesSupports medium latencyReduces complexitySharing externally and cross-cloud4When is streaming for Delta Sharing a good fit?No additional infrastructure needed(Kafka,Eventhubs,Kinesis etc.)No(de)serialization to AVRO/Protobuf
6、/JSONSchema management without a schema registryCommon semantics for accessing tables with batch/streamSecure for cross cloud data sharingMake your data available anywhere with low latency2024 Databricks Inc.All rights reserved5Recap Delta Sharing architectureNotes:Sharing happens on Delta part file
7、s,supporting full tables,partitions,delta versions,Client is system independent,just needs to be able to read parquet filesIn Databricks Sharing Server and ACL checks are integrated with Unity CatalogDelta TableDelta Sharing ServerDelta Sharing ClientData ProviderData RecipientDelta Sharing Protocol
8、Access PermissionsParquet filesrequest tablecloud tokens/pre-signed short-lived URLsdirect access to files(parquet format)in the object storeDelta Sharing Protocol:Client authenticates to Sharing ServerClient requests a table(including filters)Server checks access permissionsServer generates and ret
9、urns cloud tokens/pre-signed short-lived URLsClient uses cloud tokens/URLs to directly read files from object storagepermissions can be restricted to specific partitionsspark.read.format(“deltaSharing”)2024 Databricks Inc.All rights reserved7Streaming with Delta SharingNotes:Client maintains current
10、 table version in the checkpoint directory(same as when streaming from a Delta table)Delta TableDelta Sharing ServerDelta Sharing ClientData ProviderData RecipientDelta Sharing ProtocolAccess PermissionsParquet filespolls for new table versionscloud tokens/pre-signed short-lived URLsdirect access to
11、 files(parquet format)in the object storeDelta Sharing Protocol:Client authenticates to Sharing ServerClient polls for new table versionsClient requests a table version(including filters)Server checks access permissionsServer generates and returns cloud token/pre-signed short-lived URLsClient uses c
12、loud token/URLs to directly read files from object storage that correspond to that Delta table versionpermissions can be restricted to specific partitionsStreaming Sources(Kafka,Kinesis,Pub Sub,Auto Loader,Web Sockets etc.)spark.readStream.format(“deltaSharing”)2024 Databricks Inc.All rights reserve
13、dOptimising streaming read latencyDelta Sharing server is a shared resource,so streaming client has built in throttling-can be reduced for low latency use casesspark.delta.sharing.streaming.queryTableVersionIntervalSecondsCalls to the Delta Sharing server can require unpacking the delta log which ma
14、y take a few seconds,depending on cloud provider access timesPartitioned tables will increase the number of files for each Delta table version,so more overhead for the Delta Sharing server8Default client behavior is to throttle to prevent overloading the server2024 Databricks Inc.All rights reserved
15、What latency is achievable?9Looking at different settings to improve throughput/latency2024 Databricks Inc.All rights reservedMonitoring streaming client progress10How can we ensure all clients have read a given table versionSELECTevent_time,request_params.recipient_name recipient_name,CAST(GET_JSON
16、_OBJECT(response.result,$.scannedAddFileSize)AS INT)file_size,GET_JSON_OBJECT(response.result,$.tableFullName)table_full_name,CAST(GET_JSON_OBJECT(response.result,$.numRecords)AS INT)num_records,GET_JSON_OBJECT(response.result,$.deltaSharingShareName)share_name,GET_JSON_OBJECT(response.result,$.tabl
17、eVersion)table_versionFROMsystem.access.auditWHEREservice_name=unityCatalogAND action_name=deltaSharingQueriedTableAND event_time current_date()AND GET_JSON_OBJECT(response.result,$.tableFullName)RLIKE.*dais_streaming_demo.*ORDER BYevent_time DESCOne row per client streaming queryShows all versions
18、consumedAllows identification of which versions can be VACUUMed2024 Databricks Inc.All rights reserved11Streaming Demo2024 Databricks Inc.All rights reserved12Demo ArchitectureDelta TableDelta Sharing ServerCustomer Delta Sharing Clientspark.readStream.format(“deltaSharing”)Python app polls WebSocke
19、t and writes to in-memory queuedelta-rs writes Pandas dataframe to Delta tableJSONSpark writes Pandas dataframe to Delta table using PhotonPython app consumes from in-memory queue and builds Pandas dataframePower BIDashboard using APIMachine Learning e.g.Anomaly DetectionSharing WorkspaceClient Work
20、space2024 Databricks Inc.All rights reservedMaintaining consistent write SLAsSpark Delta writer can cause 10s spikes in write times-exceeding customer SLAsSwitching to delta-rs provides more control over the delta logSome caveatsdelta-rs does not respect delta.checkpointIntervaldelta-rs does not imp
21、lement auto compactionEach append adds a new entry in _delta_log-ADLS file listing API slows down as the number of entries in _delta_logincreasesWrite time increases by 3s for every hourManually checkpoint,OPTIMIZE and VACUUM the table regularly to speed-up directory listingPR to add cleanup_metadat
22、a to the Python API from the Rust API-allows clean-up of files in _delta_log13delta-rs allows low-level tuning of Delta table writesimport deltalake as dldt=dl.DeltaTable(path,storage_options=storage_cfg)configuration=delta.logRetentionDuration:interval 5 minute,delta.enableExpiredLogCleanup:truedl.write_deltalake(dt,data=message_pd,mode=append,storage_options=storage_cfg,partition_by=partition_cfg,configuration=cfg)#run this every 25 batchesdt.create_checkpoint()dt.cleanup_metadata()#new method added to Python API#run this every 100 pact()2024 Databricks Inc.All rights reserved14Questions