導航:首頁 > 數據分析 > flink怎麼一條條處理數據

flink怎麼一條條處理數據

發布時間:2023-03-21 17:49:25

1. 關於 Flink checkpoint,都在這里(二)

目前業界主流的實時計算框架包括 Flink、Spark Streaming、Storm。相比於 Batch,Stream 的容錯則需要考慮更多。Batch 數據通常基於穩定性較高的分布式存儲進行數據的讀寫(如 HDFS、S3 等),當數據計算出現異常時可以通過重新計算的方式保證最終結果的一致性,Spark 就是基於這樣的思路設計的,它通過 lineage 機制來重新計算。並且 Batch 計算往往不需要過多的考慮數據的時效性,而且不需要做到 7×24 小時的運行。但相對於 Stream 而言則會更加復雜。

對於 Stream 而言需要面對不同的流式數據源,可以是 File Stream、隊列(如 Kafka),甚至可能是某個服務發來的消息。數據源的多樣性就註定了 Stream 的容錯需要重新進行考慮。並且 Stream 數據的容錯需要在短時間內進行恢復,否則將可能會導致大量的數據積壓甚至丟失,因為 Stream 數據鏈路不會因為下游處理任務的異常而停止數據的產出。

讓我們將問題描述的更具體一些,方便更清楚的了解 Stream 的容錯思想。對於分布式計算而言,目前主流的思路都是採用 Master-Slave 架構。Master 主要用於進行 Slave 節點的管理(比如檢測 Slave 是否存活,狀態管理等),Slave 主要是擔當數據計算的職責。因此,從容錯角度而言分為:

Master 容錯相對而言較為簡單,因為不需要直接參與數據計算。主要分為有狀態的 Master 和無狀態的 Master 兩類。

像 Storm 這類無狀態的實時計算框架,Master(即 Storm 的 Nimbus 節點)的異常往往不影響 Slave(即 Storm worker 節點)的數據計算,只需要重新啟動一個 Master 即可,這個過程中不需要進行 Master 狀態的恢復,也不會影響實時數據的處理。甚至 Slave 節點在無感知的情況下就完成了 Master 的恢復。但是這種方式會犧牲一定的功能,實時計算框架本身無法支持狀態流的處理。

像 Flink 、 Spark Streaming 這類包含狀態的實時計算框架,需要恢復 Master 節點的同時還需要對其狀態進行恢復,Master 狀態信息包含一些必要的配置、以及對 Slave 節點狀態管理的信息(如「某個 Slave 節點的狀態快照所在的 HDFS 路徑」)。Spark Streaming、Flink 的做法都是基於 checkpoint 機制對 Master 節點的狀態進行備份,異常發生時需要基於上一次的狀態備份進行恢復。

Flink 還提供了 HA 機制,即同時運行至少 2 個 JobManager 節點,但是只有其中一個真正的處理管理事務(稱為主節點——Leader),其他的僅僅保持狀態信息的同步(稱為從節點——Standby)。一旦 Leader 發生異常,其中一個 Standby 將會代替異常節點繼續進行任務的管理。 更多關於 Flink HA 可以參考官方文檔 。這種機制是犧牲更多的資源來換取任務的穩定性,主從切換的成本相比於從狀態備份中恢復速度更快。

Stream 數據處理整體而言可以分為 3 部分:

根據不同的保障級別,Stream 數據容錯級別又分為 3 種語義:

我們之所以將數據接收和寫入單獨拿出來,是因為在面對不同的數據源時,實時框架的容錯機制與最高語義保障級別是不同的。如 Flink 而言,它的 exactly-once 語義總的來說是針對於數據處理階段而言,即只有框架內數據的處理可以保障 exactly-once,而數據的接收、寫入是否是 exactly-once 語義取決於數據源本身與 Source、Sink 運算元的實現邏輯。通常來說,我們將能夠保障數據接收、數據處理、數據接入整體數據一致性稱為 端到端(end-to-end) 的數據一致性。

端到端的數據一致性保障相對而言是很復雜的,因為數據源的種類眾多,這些一般都不是分布式實時框架中的一部分,數據的發送與接收邏輯不受實時框架的控制。

對於 Storm 而言,框架內僅提供了相關的介面用於用戶自己實現一致性語義,並沒有直接提供各種存儲的一致性 Spouts,數據寫入也是如此。數據處理過程提供 at-least-once 語義保障(exactly-once 語義由 Storm Trident 提供了保障,本篇暫不做討論)。Storm 通過 ACK 的機制保證 at-least-once 語義。簡單來說,當 Storm 接收到一條數據後將會給這條數據唯一的 id,數據被下游 Bolts 處理但是處理後的 id 不會發生改變,當且僅當該 id 的數據經過的 Bolts 全部 ACK 後才認定該數據被 徹底處理(fully processed) ,否則 Spout 將再次發送該數據直到數據被徹底處理。

Spark Streaming 的數據接收通過預寫入日誌的機制保障了 at-least-once 語義。簡單來說,就是將接收到的數據以日誌的形式寫入到穩定的存儲中(存儲位置基於 checkpoint 配置獲取),這樣一來就與數據源解耦,可以基於預寫入日誌實現數據重發的能力,從而保障 at-least-once 語義。數據處理過程中基於 RDD 的容錯機制進行恢復,提供了精確一次的語義。數據寫入需要用戶自己實現,Spark Streaming 提供了兩種思路:冪等寫入和事務性寫入。

Flink 全局基於 checkpoint 進行容錯,通過向流數據中插入特殊的事件——checkpoint barrier 來觸發各個運算元製作狀態快照,快照會寫入到持久化的存儲中。Flink Source、Sink 的語義保障需要依賴數據源以及自身的實現邏輯。但是 Flink 提供了多種狀態介面,如 ListState、MapState,用於進行運算元狀態的記錄,狀態容錯可以保障 exactly-once 語義。這也是與 Spark Streaming 的不同之處。

到這里我們大致了解了各個框架的容錯機制,我們不禁想回味一下:分布式實時計算框架的容錯機制的本質是什麼?容錯到底在保障什麼?

從本質上講,容錯在保障數據可以被正確的處理,即使在發生異常的情況下。實時流處理的正確性又體現在 處理過程的完整性 時序的正確性 。即一條數據要被所有的邏輯完整的處理一次(根據語義的不同也可能是多次),且多條數據之間的處理的時序不發生改變。

舉個例子,如下圖所示的數據流 DAG 圖中,流數據序列 [1, 2, 3, …, n] 被輸入到 A 中,然後最終流向 D。完整性即每一個事件都被完整的 DAG 路徑處理,即 A -> B -> D 或 A -> C -> D ,時序性即事件 1 永遠先於事件 2 被處理,即使在發生了異常後恢復的情況下也是如此。

從整體來看,實時分布式計算框架的容錯機制核心思想是 確認 重試 ,但是不同的框架重試過程中回滾的數據量不同。

Storm 通過 ACK 機制判斷數據是否完整處理,否則將重發數據,重新進行計算。這種單條數據粒度的 ACK 與重試機制即可以保障時序性,也可以保障處理過程的完整性。但是這樣細的粒度犧牲了一定的性能。Storm 並沒有將數據流進行冗餘存儲來保障容錯,從這個角度而言它的容錯是輕量級的。

Spark Streaming 通過微批次的方式將數據進行截斷,以批次為單位進行容錯。這種方式一旦發生了異常,可以從上一個批次中恢復繼續執行。這種機制從一定程度上提升了性能,但是對時效性有損。因為微批次的思路對數據流進行了截斷,時間語義上的單位時間也只能根據批次的大小來界定。Spark Streaming 提供了數據流的冗餘(預寫入日誌)可以真正做到與數據源解耦,對於所有的數據源均可以保障容錯的語義,但是這類的容錯是重量級的。

Flink 的思路也是對數據進行截斷,從而可以分段治之。相比於 Spark Streaming 而言這種截斷並沒有改變數據流的連續性,時間語義上的單位時間仍然是以事件粒度來界定。並且 Flink 不會對數據流進行冗餘(雖然 unaligned-checkpoint 會產生一部分的數據冗餘,但是與 Spark Streaming 這種全部數據冗餘的思路是不同的),只關注計算中的狀態容錯。這種思路較為輕量級,並且能夠保障 exactly-once 語義。但是這種思路無法應對所有的數據源場景,需要強依賴數據源的實現與 Source、Sink 運算元的邏輯。

總體而言,實時流的容錯核心是基於 數據截斷 重試機制 。Storm 的數據截斷粒度是單條數據級別的,通過 ACK 的機制實現的重試機制,此截斷粒度不會影響數據的時效性。Spark Streaming 的截斷粒度是微批次的,截斷會影響數據的時效性,然後通過數據冗餘的方式保障了重試機制,這種冗餘數據的方式可以面對任何數據源時都能夠保證數據一致性。Flink 是基於 checkpoint barrier 將數據流截斷,barrier 會隨著數據流而流動,避免了流量截斷帶來的時效性影響,並且 Flink 容錯只關注狀態,藉助狀態的回滾來保證數據一致性。

從容錯實現來看,三種框架的側重點有所不同。Storm 作為無狀態計算框架,採用的是非常簡單有效的機制保障容錯。Spark Streaming 更注重數據的可恢復性,希望通過備份原始數據能夠在任何情況下、面對任何數據源都能夠保證數據一致性。Flink 相對而言更加輕量,更注重數據的時效性,不希望容錯機制帶來太多的時效性損失(例如 unaligned-checkpoint)。

感謝你讀到這里,希望你現在對 Flink 容錯機制和其他的實時計算框架的容錯機制有了一個基本的了解,對其容錯思路和本質有了不同的想法。 下一篇 我們將討論 Flink checkpoint 的數據結構,探索它究竟是如何存儲的?都存儲了哪些內容?基於這些備份數據如何在異常中恢復?

可可 @ 歡迎郵件聯系我

2. 一文搞懂 Flink 處理 Barrier 全過程

上次我們講到了 Flink Checkpoint Barrier 全流程 還有 Flink 消費消息的全流程

Flink 處理 Barrier 分兩種:

關鍵就是 getNextNonBlocked 方法

當沒有發生 barrier 對齊完成 這個動作時,currentBuffered == null,currentBuffered 就是當前要處理的 buffer,當 buffer 是數據的時候它就正常消費數據走 Flink 消費消息的全流程 ,當遇到頃亂 barrier 時,開始處理 barrier

numBarriersReceived 的默認值是0,所以第一個 barrier 進來後,會進入 beginNewAlignment 方法

當再有其他相同的激乎吵 barrier 進入時,barrierId == currentCheckpointId 為 true,直到 numBarriersReceived + numClosedChannels == totalNumberOfInputChannels 時,觸發 notifyCheckpoint,並報告 alignment buffer 以及 alignment time。(彩蛋: 稍後會更新 checkpoint 全流程歡迎關注 )。

如果其他的 channel 中的 barrier 延遲了,即 numBarriersReceived + numClosedChannels != totalNumberOfInputChannels,已經 receive barrier 對應的 channel 數據會進入 bufferBlocker。
bufferBlocker 是通過 ArrayDeque<BufferOrEvent> currentBuffers 來存儲數據的,也就是說明侍默認情況下 bufferBlocker.currentBuffers 會無限增大。

當 numBarriersReceived + numClosedChannels == totalNumberOfInputChannels 時,會先進行 releaseBlocksAndResetBarriers() 在進行 notifyCheckpoint。
releaseBlocksAndResetBarriers 主要的目的是要先消費已加入緩存中的數據。

當執行完 releaseBlocksAndResetBarriers 方法時,currentBuffered!=null 了,會進入

然後直接消費數據

一直消費緩存中的數據( 此過程會阻塞不會繼續消費 inputGate 中的數據),直至消耗完成

完成了之後,就跟程序第一次運行至此一樣,循環往復。

3. 使用Flink批處理完成數據比對(對賬)三

前面的文章 使用Flink批處理完成數據比對(對賬)二 討論了使用Table API來處理數據比對的問題,但有些場景下還會有一些比較復雜的業務需求,如輸出的時候要將兩邊的數據合並在一起輸出,這個時候用Table API就不太好完成這樣的需求了,這就需要藉助底層的DataSet API和Process Function。

這篇文章准備利用DataSet API來完成數據比對的需求,至於流數據的實時比對,下一篇文章將介紹。

核心的思想就是用兩個流(DataSet其實也是一種特殊的DataStream)中的數據進行處理,Flink中就具備這樣的API。

通過 coGroup 、 where 和 equalTo 很容易講兩個流中orderNo相同的數據關聯在一起, coGroup 和 join 不同, join 只會關聯key相同的數據,形成一個數據集。而 coGroup 遇到指定key只有一個數據集中有記錄的情況時,會將這個Group和空的Group關聯。

源碼

可以看到,利用Flink將兩方數據關聯是非常容易的。筆者在實際業務場景中,有些需求不僅需要關聯兩方數據的,在下發回盤文件的時候,還要關聯上其他方數據的情況(如商戶數據),這種情況目前想到的辦法有:

閱讀全文

與flink怎麼一條條處理數據相關的資料

熱點內容
52好壓右鍵沒有壓縮文件選項 瀏覽:98
avi什麼類型的文件格式 瀏覽:418
分區表與文件系統 瀏覽:786
獲得文件夾路徑的對話框 瀏覽:179
弟子規哪個版本的好 瀏覽:423
二手蘋果6p的價格 瀏覽:111
微信公眾號版頭設計 瀏覽:917
jdk18讀取配置文件 瀏覽:72
優化關鍵字挖掘工具 瀏覽:672
markdown代碼塊語法 瀏覽:249
arcgis面文件屬性 瀏覽:43
當數據都帶有標准差如何計算 瀏覽:936
聲音挑選程序掃描本地 瀏覽:57
編程語言中如何拼接兩個字串符 瀏覽:482
工地數據中心包括哪些 瀏覽:972
人工成本分析工具 瀏覽:565
蘋果qq群文件在哪裡 瀏覽:724
產品和單位成本分析後有哪些數據 瀏覽:144
日語教程軟體 瀏覽:99
有哪些事業編制的app 瀏覽:89

友情鏈接