如何閱讀spark源碼(spark源碼看什么書)
本篇文章給大家談?wù)勅绾伍喿xspark源碼,以及spark源碼看什么書對應(yīng)的知識點,希望對各位有所幫助,不要忘了收藏本站喔。
本文目錄一覽:
- 1、如何成為Spark高手
- 2、可能是全網(wǎng)最詳細的 Spark Sql Aggregate 源碼剖析
- 3、spark sql 2.3 源碼解讀 - Execute (7)
- 4、怎么用Eclipse搭建Spark源碼閱讀環(huán)境
- 5、怎么在Idea IDE里面打開Spark源碼而不報錯
- 6、Spark源碼分析之SparkSubmit的流程
如何成為Spark高手
第一階段:熟練掌握Scala語言
1,spark框架是采用scala語言寫的,精致優(yōu)雅。想要成為spark高手,你就必須閱讀spark源碼,就必須掌握scala。?
2,雖然現(xiàn)在的spark可以使用多種語言開發(fā),java,python,但是最快速和支持最好的API依然并將永遠是Scala的API,所以必須掌握scala來編寫復(fù)雜的和高性能的spark分布式程序。?
3尤其是熟練掌握Scala的trait,apply,函數(shù)式編程,泛型,逆變,與協(xié)變等。
第二階段:精通spark平臺本身提供給開發(fā)折的API
1,掌握spark中面向RDD的開發(fā)模式,掌握各種transformation和action函數(shù)的使用。?
2,掌握Spark中的款依賴和窄依賴,lineage機制。?
3,掌握RDD的計算流程,如Stage的劃分,spark應(yīng)用程序提交給集群的基礎(chǔ)過程和Work節(jié)點基礎(chǔ)的工作原理。
第三階段:深入Spark內(nèi)核
此階段主要是通過Spark框架的源碼研讀來深入Spark內(nèi)核部分:?
1,通過源碼掌握Spark的任務(wù)提交,?
2,通過源碼掌握Spark的集群的任務(wù)調(diào)度,?
3,尤其要精通DAGScheduler,TaskScheduler和Worker節(jié)點內(nèi)部的工作的每一步細節(jié)。
第四階段:掌握Spark上的核心框架的使用
Spark作為云計算大數(shù)據(jù)時代的集大成者,在實時流式處理,圖技術(shù),機器學(xué)習(xí),nosql查詢等方面具有明顯的優(yōu)勢,我們使用Spark的時候大部分時間都是在使用其框架:?
sparksql,spark streaming等?
1,spark streaming是出色的實時流失處理框架,要掌握,DStream,transformation和checkpoint等。?
2,spark sql是離線統(tǒng)計分析工具,shark已經(jīng)沒落。?
3,對于spark中的機器學(xué)習(xí)和Graphx等要掌握其原理和用法。
第五階段:做商業(yè)級的spark項目
通過一個完整的具有代表性的spark項目來貫穿spark的方方面面,包括項目的框架設(shè)計,用到的技術(shù)的剖析,開始實現(xiàn),運維等,完善掌握其中的每一個階段和細節(jié),以后你就可以從容的面對絕大多數(shù)spark項目。
第六階段:提供spark解決方案
1,徹底掌握spark框架源碼的每一個細節(jié),?
2,根據(jù)步同的業(yè)務(wù)場景的需要提供spark在不同場景的解決方案,?
3,根據(jù)實際需要,在spark框架基礎(chǔ)上經(jīng)行2次開發(fā),打造自己的spark框架。
可能是全網(wǎng)最詳細的 Spark Sql Aggregate 源碼剖析
縱觀 Spark Sql 源碼,聚合的實現(xiàn)是其中較為復(fù)雜的部分,本文希望能以例子結(jié)合流程圖的方式來說清楚整個過程。這里僅關(guān)注 Aggregate 在物理執(zhí)行計劃相關(guān)的內(nèi)容,之前的 parse、analyze 及 optimize 階段暫不做分析。在 Spark Sql 中,有一個專門的 Aggregation strategy 用來處理聚合,我們先來看看這個策略。
本文暫不討論 distinct Aggregate 的實現(xiàn)(有興趣的可以看看另一篇博文 ),我們來看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理執(zhí)行計劃的
創(chuàng)建聚合分為兩個階段:
AggregateExpression 共有以下幾種 mode:
Q:是否支持使用 hash based agg 是如何判斷的?
摘自我另一篇文章:
為了說明最常用也是最復(fù)雜的的 hash based agg,本小節(jié)暫時將示例 sql 改為
這樣就能進入 HashAggregateExec 的分支
構(gòu)造函數(shù)主要工作就是對 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 進行了初始化
在 enable code gen 的情況下,會調(diào)用 HashAggregateExec#inputRDDs 來生成 RDD,為了分析 HashAggregateExec 是如何生成 RDD 的,我們設(shè)置 spark.sql.codegen.wholeStage 為 false 來 disable code gen,這樣就會調(diào)用 HashAggregateExec#doExecute 來生成 RDD,如下:
可以看到,關(guān)鍵的部分就是根據(jù) child.execute() 生成的 RDD 的每一個 partition 的迭代器轉(zhuǎn)化生成一個新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各個 partition。由于 TungstenAggregationIterator 涉及內(nèi)容非常多,我們單開一大節(jié)來進行介紹。
此迭代器:
注:UnsafeKVExternalSorter 的實現(xiàn)可以參考:
UnsafeRow 是 InternalRow(表示一行記錄) 的 unsafe 實現(xiàn),由原始內(nèi)存(byte array)而不是 Java 對象支持,由三個區(qū)域組成:
使用 UnsafeRow 的收益:
構(gòu)造函數(shù)的主要流程已在上圖中說明,需要注意的是:當內(nèi)存不足時(畢竟每個 grouping 對應(yīng)的 agg buffer 直接占用內(nèi)存,如果 grouping 非常多,或者 agg buffer 較大,容易出現(xiàn)內(nèi)存用盡)會從 hash based aggregate 切換為 sort based aggregate(會 spill 數(shù)據(jù)到磁盤),后文會進行詳述。先來看看最關(guān)鍵的 processInputs 方法的實現(xiàn)
上圖中,需要注意的是:hashMap 中 get 一個 groupingKey 對應(yīng)的 agg buffer 時,若已經(jīng)存在該 buffer 則直接返回;若不存在,嘗試申請內(nèi)存新建一個:
上圖中,用于真正處理一條 row 的 AggregationIterator#processRow 還需進一步展開分析。在此之前,我們先來看看 AggregateFunction 的分類
AggregateFunction 可以分為 DeclarativeAggregate 和 ImperativeAggregate 兩大類,具體的聚合函數(shù)均為這兩類的子類。
DeclarativeAggregate 是一類直接由 Catalyst 中的 Expressions 構(gòu)成的聚合函數(shù),主要邏輯通過調(diào)用 4 個表達式完成,分別是:
我們再次以容易理解的 Count 來舉例說明:
通常來講,實現(xiàn)一個基于 Expressions 的 DeclarativeAggregate 函數(shù)包含以下幾個重要的組成部分:
再來看看 AggregationIterator#processRow
AggregationIterator#processRow 會調(diào)用
生成用于處理一行數(shù)據(jù)(row)的函數(shù)
說白了 processRow 生成了函數(shù)才是直接用來接受一條 input row 來更新對應(yīng)的 agg buffer,具體是根據(jù) mode 及 aggExpression 中的 aggFunction 的類型調(diào)用其 updateExpressions 或 mergeExpressions 方法:
比如,對于 aggFunction 為 DeclarativeAggregate 類型的 Partial 下的 Count 來說就是調(diào)用其 updateExpressions 方法,即:
對于 Final 的 Count 來說就是調(diào)用其 mergeExpressions 方法,即:
對于 aggFunction 為 ImperativeAggregate 類型的 Partial 下的 Collect 來說就是調(diào)用其 update 方法,即:
對于 Final 的 Collect 來說就是調(diào)用其 merge 方法,即:
我們都知道,讀取一個迭代器的數(shù)據(jù),是要不斷調(diào)用 hasNext 方法進行 check 是否還有數(shù)據(jù),當該方法返回 true 的時候再調(diào)用 next 方法取得下一條數(shù)據(jù)。所以要知道如何讀取 TungstenAggregationIterator 的數(shù)據(jù),就得分析其這兩個方法。
分為兩種情況,分別是:
Agg 的實現(xiàn)確實復(fù)雜,本文雖然篇幅已經(jīng)很長,但還有很多方面沒有 cover 到,但基本最核心、最復(fù)雜的點都詳細介紹了,如果對于未 cover 的部分有興趣,請自行閱讀源碼進行分析~
spark sql 2.3 源碼解讀 - Execute (7)
終于到了最后一步執(zhí)行了:
最關(guān)鍵的兩個函數(shù)便是 doPrepare和 doExecute了。
還是以上一章的sql語句為例,其最終生成的sparkplan為:
看一下SortExec的doPrepare 和 doExecute方法:
下面看child也就是ShuffleExchangeExec:
先看沒有exchangeCoordinator的情況,
首先執(zhí)行:
上面的方法會返回一個ShuffleDependency,ShuffleDependency中最重要的是rddWithPartitionIds,它決定了每一條InternalRow shuffle后的partition id:
接下來:
返回結(jié)果是ShuffledRowRDD:
CoalescedPartitioner的邏輯:
再看有exchangeCoordinator的情況:
同樣返回的是ShuffledRowRDD:
再看doEstimationIfNecessary:
estimatePartitionStartIndices 函數(shù)得到了 partitionStartIndices:
有exchangeCoordinator的情況就生成了partitionStartIndices,從而對分區(qū)進行了調(diào)整。
最后來一個例子:
未開啟exchangeCoordinator的plan:
開啟exchangeCoordinator的plan:
不同之處是 兩個Exchange都帶了coordinator,且都是同一個coordinator。
執(zhí)行withExchangeCoordinator前:
執(zhí)行withExchangeCoordinator后:
生成了coordinator,且執(zhí)行了 doPrepare后,可以看到兩個exchange都向其注冊了。
doExecute后:
原先的numPartitions是200,經(jīng)過執(zhí)行后,生成的partitionStartIndices為[1],也就是只有1個partition,顯然在測試數(shù)據(jù)量很小的情況下,1個partition是更為合理的。這就是ExchangeCoordinator的功勞。
execute 最終的輸出是rdd,剩下的結(jié)果便是spark對rdd的運算了。其實 spark sql 最終的目標便也是生成rdd,交給spark core來運算。
spark sql的介紹到這里就結(jié)束了。
怎么用Eclipse搭建Spark源碼閱讀環(huán)境
應(yīng)該說這個和是不是Spark項目沒什么關(guān)系。
建議你使用intellij idea,在spark目錄下執(zhí)行"sbt/sbt gen-idea",會自動生成.idea項目,導(dǎo)入即可。
idea我不熟,還需要做一些其他的插件配置(python, sbt等)和環(huán)境設(shè)置。
你也可以使用Eclipse看,Eclipse有scala IDE,把Spark項目當maven工程導(dǎo)入。但是子項目之間的依賴會有點問題,會報錯。
推薦使用前者,向Databricks的開發(fā)者看齊;我使用的是后者,我直接依賴了編譯好的包就不會報錯了,純讀源碼的話也勉強可以跟蹤和調(diào)試。
另外,我也看有的Committer用vim看spark代碼的,所以怎么看源碼都無所謂,你熟悉就好,而且這和是不是Spark項目也沒什么關(guān)系。:)
怎么在Idea IDE里面打開Spark源碼而不報錯
首先我們先點擊一個工程的Project Structure菜單,這時候會彈出一個對話框,仔細的用戶肯定會發(fā)現(xiàn)里面列出來的模塊(Module)居然沒有yarn!就是這個原因?qū)е聐arn模塊相關(guān)的代碼老是報錯!只需要將yarn模塊加入到這里即可。
步驟依次選擇 Add-Import Module-選擇pom.xml,然后一步一步點擊確定,這時候會在對話框里面多了spark-yarn_2.10模塊,
然后點擊Maven Projects里面的Reimport All Maven Projects,等yarn模塊里面的所有依賴全部下載完的時候,我們就可以看到這個模塊里面的代碼終于不再報錯了?。?/p>
Spark源碼分析之SparkSubmit的流程
本文主要對SparkSubmit的任務(wù)提交流程源碼進行分析。 Spark源碼版本為2.3.1。
首先閱讀一下啟動腳本,看看首先加載的是哪個類,我們看一下 spark-submit 啟動腳本中的具體內(nèi)容。
可以看到這里加載的類是org.apache.spark.deploy.SparkSubmit,并且把啟動相關(guān)的參數(shù)也帶過去了。下面我們跟一下源碼看看整個流程是如何運作的...
SparkSubmit的main方法如下
這里我們由于我們是提交作業(yè),所有會走上面的submit(appArgs, uninitLog)方法
可以看到submit方法首先會準備任務(wù)提交的環(huán)境,調(diào)用了prepareSubmitEnvironment,該方法會返回四元組,該方法中會調(diào)用doPrepareSubmitEnvironment,這里我們重點注意 childMainClass類具體是什么 ,因為這里涉及到后面啟動我們主類的過程。
以下是doPrepareSubmitEnvironment方法的源碼...
可以看到該方法首先是解析相關(guān)的參數(shù),如jar包,mainClass的全限定名,系統(tǒng)配置,校驗一些參數(shù),等等,之后的關(guān)鍵點就是根據(jù)我們 deploy-mode 參數(shù)來判斷是如何運行我們的mainClass,這里主要是通過childMainClass這個參數(shù)來決定下一步首先啟動哪個類。
childMainClass根據(jù)部署模型有不同的值:
之后該方法會把準備好的四元組返回,我們接著看之前的submit方法
可以看到這里最終會調(diào)用doRunMain()方法去進行下一步。
doRunMain的實現(xiàn)如下...
doRunMain方法中會判斷是否需要一個代理用戶,然后無論需不需要都會執(zhí)行runMain方法,我們接下來看看runMain方法是如何實現(xiàn)的。
這里我們只假設(shè)以集群模式啟動,首先會加載類,將我們的childMainClass加載為字節(jié)碼對象mainClass ,然后將mainClass 映射成SparkApplication對象,因為我們以集群模式啟動,那么上一步返回四元組中的childMainClass的參數(shù)為ClientApp的全限定名,而這里會調(diào)用app實例的start方法因此,這里最終調(diào)用的是ClientApp的start方法。
ClientApp的start方法如下...
可以看到這里和之前我們的master啟動流程有些相似。
可以參考我上一篇文章 Spark源碼分析之Master的啟動流程 對這一流程加深理解。
首先是準備rpcEnv環(huán)境,之后通過master的地址獲取masterEndpoints端點相關(guān)信息,因為這里運行start方法時會將之前配置的相關(guān)參數(shù)都傳進來,之后就會通過rpcEnv注冊相關(guān)clientEndPoint端點信息,同時需要注意,這里會把masterEndpoints端點信息也作為構(gòu)造ClientEndpoint端點的參數(shù),也就是說這個ClientEndpoint會和masterEndpoints通信。
而在我上一篇文章中說過,只要是setupEndpoint方法被調(diào)用,一定會調(diào)用相關(guān)端點的的onStart方法,而這會調(diào)用clientEndPoint的onStart方法。
ClientEndPoint類中的onStart方法會匹配launch事件。源碼如下
onStart中匹配我們的launch的過程,這個過程是啟動driverWrapper的過程,可以看到上面源碼中封裝了mainClass ,該參數(shù)對應(yīng)DriverWrapper類的全限定名,之后將mainClass封裝到command中,然后封裝到driverDescription中,向Master申請啟動Driver。
這個過程會向Mster發(fā)送消息,是通過rpcEnv來實現(xiàn)發(fā)射消息的,而這里就涉及到outbox信箱,會調(diào)用postToOutbox方法,向outbox信箱中添加消息,然后通過TransportClient的send或sendRpc方法發(fā)送消息。發(fā)件箱以及發(fā)送過程是在同一個線程中進行。
而細心的同學(xué)會注意到這里調(diào)用的方法名為SendToMasterAndForwardReply,見名之意,發(fā)送消息到master并且期待回應(yīng)。
下面是rpcEnv來實現(xiàn)向遠端發(fā)送消息的一個調(diào)用流程,最終會通過netty中的TransportClient來寫出。
之后,Master端會觸發(fā)receiveAndReply函數(shù),匹配RequestSubmitDriver樣例類,完成模式匹配執(zhí)行后續(xù)流程。
可以看到這里首先將Driver信息封裝成DriverInfo,然后添加待調(diào)度列表waitingDrivers中,然后調(diào)用通用的schedule函數(shù)。
由于waitingDrivers不為空,則會走LaunchDriver的流程,當前的application申請資源,這時會向worker發(fā)送消息,觸發(fā)Worker的receive方法。
Worker的receive方法中,當Worker遇到LaunchDriver指令時,創(chuàng)建并啟動一個DriverRunner,DriverRunner啟動一個線程,異步的處理Driver啟動工作。這里說啟動的Driver就是剛才說的org.apache.spark.deploy.worker.DriverWrapper
可以看到上面在DriverRunner中是開辟線程異步的處理Driver啟動工作,不會阻塞主進程的執(zhí)行,而prepareAndRunDriver方法中最終調(diào)用 runDriver..
runDriver中主要先做了一些初始化工作,接著就開始啟動driver了。
上述Driver啟動工作主要分為以下幾步:
下面我們直接看DriverWrapper的實現(xiàn)
DriverWrapper,會創(chuàng)建了一個RpcEndpoint與RpcEnv,RpcEndpoint為WorkerWatcher,主要目的為監(jiān)控Worker節(jié)點是否正常,如果出現(xiàn)異常就直接退出,然后當前的ClassLoader加載userJar,同時執(zhí)行userMainClass,在執(zhí)行用戶的main方法后關(guān)閉workerWatcher。
以上就是SparkSubmit的流程,下一篇我會對SparkContext的源碼進行解析。
歡迎關(guān)注...
如何閱讀spark源碼的介紹就聊到這里吧,感謝你花時間閱讀本站內(nèi)容,更多關(guān)于spark源碼看什么書、如何閱讀spark源碼的信息別忘了在本站進行查找喔。
掃描二維碼推送至手機訪問。
版權(quán)聲明:本文由飛速云SEO網(wǎng)絡(luò)優(yōu)化推廣發(fā)布,如需轉(zhuǎn)載請注明出處。