1、劉巖/Cloudera Solution EngApacheApache AtlasAtlas MeetsMeets ApacheApache FlinkFlink為什么 Flink 需要對接血緣系統#1#1什么是 Apache Atlas#2#2對接技術實現及未來#3#3目錄為什么 Flink 需要對接血緣系統#1#1Flink的三大應用場景Data PipelineAnalyticEvent Driven ApplicationClickstreamApplication DBService LogSTGODSDWDMReportAD-HocService Log離線數倉的基本流程和問題最
2、大延遲=T+N 天 where N=1數倉的實時加工鏈路 STG-ODSFLINK實時源修訂源晚到數據異常數據?修訂任務FLINKFLINK正常數據正常修訂數據?修訂任務?補數修復任務目標表數倉的實時加工鏈路 ODS-DWFLINK實時源修訂源晚到數據異常數據?修訂任務FLINKFLINK正常數據正常修訂數據?修訂任務?補數修復任務目標表UpsertKafkaFLINKDim Update TopicTransactionTopicDimHive TableDim Upsert TopicEnrichedTransWindow Sum1Window Sum2Window Sum3FlinkSQ
3、L 1FlinkSQL 2FlinkSQL 3FlinkSQL 4FlinkSQL 5FlinkSQL 6Batch Source 實時分析/事件驅動為什么需要血緣管理和檢索無法追蹤Source/Sink的上游或下游全景鏈路什么是 Apache Atlas#2#2Apache Atlas 核心功能#1 元數據檢索Atlas 提供了對元數據進行了全量的收集,并支持多種元數據查詢或檢索方式Apache Atlas 核心功能#2 元數據標簽Atlas 提供了對元數據進行打標簽的功能,并且可以通過標簽進行反向查找反向查找Apache Atlas 核心功能#3 血緣管理Atlas 提供了對納管的數據資產
4、進行交互式血緣分析交互式血緣分析和管理的能力Apache Atlas 技術架構#1 元數據捕獲Atlas 通過插件(Hook)的方式在服務段注入捕獲代碼,并將元數據提交至KafkaApache Atlas 技術架構#2 元數據寫入Atlas服務從Kafka中消費元數據信息,并將元數據寫入到JanusGraph(on HBase)和 Solr兩個系統Apache Atlas 技術架構#3 元數據查詢Atlas 通過其他應用通過RestAPI方式向其他第三方服務提供元數據查詢和檢索的服務對接技術實現及未來#3#3如何將兩個系統打通ATLAS-3812-Add Apache Flink entity
5、 definitionFLINK-6757-Collect flink application metadata with Atlas 如何將兩個系統打通1.在Atlas中定義Flink的相關類型2.在Flink中定義相關的Hook3.抓取Flink Connector的元數據1.在Atlas中定義Flink的相關類型Category:TypeCategorycreateTime:numbercreatedBy:stringdateFormatter:DateFormatDescription:string.subTypes:array of stringsuperTypes:array of
6、 stringcategory:TypeCategorycreateTime:numbercreatedBy:stringdateFormatter:DateFormatdescription:stringguid:stringname:stringclassificationDefsenumDefscategory:TypeCategorycreateTime:numbercreatedBy:stringdateFormatter:DateFormatdescription:stringguid:stringname:stringoptions:map of string.structDef
7、sName:flink_applicationsuperTypes:ProcessserviceType:flinktypeVersion:1.0“name:id,typeName:string,cardinality:SINGLE,isIndexable:true,isOptional:false,isUnique:trueentityDefsendDef1:AtlasRelationshipEndDeendDef2:AtlasRelationshipEndDefpropagateTags:PropagateTagrelationshipLabel:stringrelationshipDef
8、sattributeDefs2.在Flink中定義相關的Hook修改ClusterClientJobClientAdapter修改AbstractJobClusterExecutor修改AbstractSessionClusterExecutor修改LocalExecutor修改FlinkKafkaConsumerBase修改FlinkKafkaConsumer修改FlinkKafkaProducerExpose Kafka connector propertiesExpose Pipeline Class for Executor新建 JobListenerFactory修改StreamEx
9、ecutionEnvironmentMake JobListeners configurable in through flink conf修改StreamingFileSink修改ContinuousFileMonitoringFunctionExpose File source and sink properties3.抓取Flink Connector的元數據List sources=streamGraph.getSourceIDs().stream().map(streamGraph:getStreamNode).collect(Collectors.toList();Get Sour
10、ce List sinks=streamGraph.getSinkIDs().stream().map(streamGraph:getStreamNode).collect(Collectors.toList();Get Sink 最終的效果#1 單個Flink任務單任務的上下游全景鏈路最終的效果#2 多個Flink任務多任務血緣關系和全景鏈路傳輸鏈路的安全控制FlinkAtlasHook的數據會發送至ATLAS_HOOK 的Topic,這個Topic需要1.1 啟用Kerberos1.2 TLS1.3 Apache Ranger授權當前實現的不足之處和后續改進1.Hook段代碼獲取元數據的侵入性太大2.支持的Flink Source/Sink 不完整