導航:首頁 > 編程語言 > java環形隊列

java環形隊列

發布時間:2024-10-21 18:27:39

『壹』 Hadoop讀寫文件時內部工作機制是怎樣的

客戶端通過調用FileSystem對象(對應於HDFS文件系統,調用DistributedFileSystem對象)的open()方法來打開文件(也即圖中的第一步),DistributedFileSystem通過RPC(Remote Procere Call)調用詢問NameNode來得到此文件最開始幾個block的文件位置(第二步)。對每一個block來說,namenode返回擁有此block備份的所有namenode的地址信息(按集群的拓撲網路中與客戶端距離的遠近排序,關於在Hadoop集群中如何進行網路拓撲請看下面介紹)。如果客戶端本身就是一個datanode(如客戶端是一個maprece任務)並且此datanode本身就有所需文件block的話,客戶端便從本地讀取文件。

以上步驟完成後,DistributedFileSystem會返回一個FSDataInputStream(支持文件seek),客戶端可以從FSDataInputStream中讀取數據。FSDataInputStream包裝了一個DFSInputSteam類,用來處理namenode和datanode的I/O操作。

客戶端然後執行read()方法(第三步),DFSInputStream(已經存儲了欲讀取文件的開始幾個block的位置信息)連接到第一個datanode(也即最近的datanode)來獲取數據。通過重復調用read()方法(第四、第五步),文件內的數據就被流式的送到了客戶端。當讀到該block的末尾時,DFSInputStream就會關閉指向該block的流,轉而找到下一個block的位置信息然後重復調用read()方法繼續對該block的流式讀取。這些過程對於用戶來說都是透明的,在用戶看來這就是不間斷的流式讀取整個文件。

當真個文件讀取完畢時,客戶端調用FSDataInputSteam中的close()方法關閉文件輸入流(第六步)。

如果在讀某個block是DFSInputStream檢測到錯誤,DFSInputSteam就會連接下一個datanode以獲取此block的其他備份,同時他會記錄下以前檢測到的壞掉的datanode以免以後再無用的重復讀取該datanode。DFSInputSteam也會檢查從datanode讀取來的數據的校驗和,如果發現有數據損壞,它會把壞掉的block報告給namenode同時重新讀取其他datanode上的其他block備份。

這種設計模式的一個好處是,文件讀取是遍布這個集群的datanode的,namenode只是提供文件block的位置信息,這些信息所需的帶寬是很少的,這樣便有效的避免了單點瓶頸問題從而可以更大的擴充集群的規模。


Hadoop中的網路拓撲


在Hadoop集群中如何衡量兩個節點的遠近呢?要知道,在高速處理數據時,數據處理速率的唯一限制因素就是數據在不同節點間的傳輸速度:這是由帶寬的可怕匱乏引起的。所以我們把帶寬作為衡量兩個節點距離大小的標准。

但是計算兩個節點之間的帶寬是比較復雜的,而且它需要在一個靜態的集群下才能衡量,但Hadoop集群一般是隨著數據處理的規模動態變化的(且兩兩節點直接相連的連接數是節點數的平方)。於是Hadoop使用了一個簡單的方法來衡量距離,它把集群內的網路表示成一個樹結構,兩個節點之間的距離就是他們離共同祖先節點的距離之和。樹一般按數據中心(datacenter),機架(rack),計算節點(datanode)的結構組織。計算節點上的本地運算速度最快,跨數據中心的計算速度最慢(現在跨數據中心的Hadoop集群用的還很少,一般都是在一個數據中心內做運算的)。

假如有個計算節點n1處在數據中心c1的機架r1上,它可以表示為/c1/r1/n1,下面是不同情況下兩個節點的距離:

• distance(/d1/r1/n1, /d1/r1/n1) = 0 (processes on the same node)

• distance(/d1/r1/n1, /d1/r1/n2) = 2 (different nodes on the same rack)

• distance(/d1/r1/n1, /d1/r2/n3) = 4 (nodes on different racks in the same data center)

• distance(/d1/r1/n1, /d2/r3/n4) = 6 (nodes in different data centers)

如下圖所示:


Hadoop

寫文件


現在我們來看一下Hadoop中的寫文件機制解析,通過寫文件機制我們可以更好的了解一下Hadoop中的一致性模型。


Hadoop

上圖為我們展示了一個創建一個新文件並向其中寫數據的例子。

首先客戶端通過DistributedFileSystem上的create()方法指明一個欲創建的文件的文件名(第一步),DistributedFileSystem再通過RPC調用向NameNode申請創建一個新文件(第二步,這時該文件還沒有分配相應的block)。namenode檢查是否有同名文件存在以及用戶是否有相應的創建許可權,如果檢查通過,namenode會為該文件創建一個新的記錄,否則的話文件創建失敗,客戶端得到一個IOException異常。DistributedFileSystem返回一個FSDataOutputStream以供客戶端寫入數據,與FSDataInputStream類似,FSDataOutputStream封裝了一個DFSOutputStream用於處理namenode與datanode之間的通信。

當客戶端開始寫數據時(第三步),DFSOutputStream把寫入的數據分成包(packet), 放入一個中間隊列——數據隊列(data queue)中去。DataStreamer從數據隊列中取數據,同時向namenode申請一個新的block來存放它已經取得的數據。namenode選擇一系列合適的datanode(個數由文件的replica數決定)構成一個管道線(pipeline),這里我們假設replica為3,所以管道線中就有三個datanode。DataSteamer把數據流式的寫入到管道線中的第一個datanode中(第四步),第一個datanode再把接收到的數據轉到第二個datanode中(第四步),以此類推。

DFSOutputStream同時也維護著另一個中間隊列——確認隊列(ack queue),確認隊列中的包只有在得到管道線中所有的datanode的確認以後才會被移出確認隊列(第五步)。

如果某個datanode在寫數據的時候當掉了,下面這些對用戶透明的步驟會被執行:

1)管道線關閉,所有確認隊列上的數據會被挪到數據隊列的首部重新發送,這樣可以確保管道線中當掉的datanode下流的datanode不會因為當掉的datanode而丟失數據包。

2)在還在正常運行的datanode上的當前block上做一個標志,這樣當當掉的datanode重新啟動以後namenode就會知道該datanode上哪個block是剛才當機時殘留下的局部損壞block,從而可以把它刪掉。

3)已經當掉的datanode從管道線中被移除,未寫完的block的其他數據繼續被寫入到其他兩個還在正常運行的datanode中去,namenode知道這個block還處在under-replicated狀態(也即備份數不足的狀態)下,然後他會安排一個新的replica從而達到要求的備份數,後續的block寫入方法同前面正常時候一樣。

有可能管道線中的多個datanode當掉(雖然不太經常發生),但只要dfs.replication.min(默認為1)個replica被創建,我們就認為該創建成功了。剩餘的replica會在以後非同步創建以達到指定的replica數。

當客戶端完成寫數據後,它會調用close()方法(第六步)。這個操作會沖洗(flush)所有剩下的package到pipeline中,等待這些package確認成功,然後通知namenode寫入文件成功(第七步)。這時候namenode就知道該文件由哪些block組成(因為DataStreamer向namenode請求分配新block,namenode當然會知道它分配過哪些blcok給給定文件),它會等待最少的replica數被創建,然後成功返回。


replica是如何分布的


Hadoop在創建新文件時是如何選擇block的位置的呢,綜合來說,要考慮以下因素:帶寬(包括寫帶寬和讀帶寬)和數據安全性。如果我們把三個備份全部放在一個datanode上,雖然可以避免了寫帶寬的消耗,但幾乎沒有提供數據冗餘帶來的安全性,因為如果這個datanode當機,那麼這個文件的所有數據就全部丟失了。另一個極端情況是,如果把三個冗餘備份全部放在不同的機架,甚至數據中心裏面,雖然這樣數據會安全,但寫數據會消耗很多的帶寬。Hadoop 0.17.0給我們提供了一個默認replica分配策略(Hadoop 1.X以後允許replica策略是可插拔的,也就是你可以自己制定自己需要的replica分配策略)。replica的默認分配策略是把第一個備份放在與客戶端相同的datanode上(如果客戶端在集群外運行,就隨機選取一個datanode來存放第一個replica),第二個replica放在與第一個replica不同機架的一個隨機datanode上,第三個replica放在與第二個replica相同機架的隨機datanode上。如果replica數大於三,則隨後的replica在集群中隨機存放,Hadoop會盡量避免過多的replica存放在同一個機架上。選取replica的放置位置後,管道線的網路拓撲結構如下所示:


Hadoop

總體來說,上述默認的replica分配策略給了我們很好的可用性(blocks放置在兩個rack上,較為安全),寫帶寬優化(寫數據只需要跨越一個rack),讀帶寬優化(你可以從兩個機架中選擇較近的一個讀取)。


一致性模型


HDFS某些地方為了性能可能會不符合POSIX(是的,你沒有看錯,POSIX不僅僅只適用於linux/unix, Hadoop 使用了POSIX的設計來實現對文件系統文件流的讀取 ),所以它看起來可能與你所期望的不同,要注意。

創建了一個文件以後,它是可以在命名空間(namespace)中可以看到的:

Path p = new Path("p");

fs.create(p);

assertThat(fs.exists(p), is(true));

但是任何向此文件中寫入的數據並不能保證是可見的,即使你flush了已經寫入的數據,此文件的長度可能仍然為零:

Path p = new Path("p");

OutputStream out = fs.create(p);

out.write("content".getBytes("UTF-8"));

out.flush();

assertThat(fs.getFileStatus(p).getLen(), is(0L));

這是因為,在Hadoop中,只有滿一個block數據量的數據被寫入文件後,此文件中的內容才是可見的(即這些數據會被寫入到硬碟中去),所以當前正在寫的block中的內容總是不可見的。

Hadoop提供了一種強制使buffer中的內容沖洗到datanode的方法,那就是FSDataOutputStream的sync()方法。調用了sync()方法後,Hadoop保證所有已經被寫入的數據都被沖洗到了管道線中的datanode中,並且對所有讀者都可見了:

Path p = new Path("p");

FSDataOutputStream out = fs.create(p);

out.write("content".getBytes("UTF-8"));

out.flush();

out.sync();

assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));

這個方法就像POSIX中的fsync系統調用(它沖洗給定文件描述符中的所有緩沖數據到磁碟中)。例如,使用java API寫一個本地文件,我們可以保證在調用flush()和同步化後可以看到已寫入的內容:

FileOutputStream out = new FileOutputStream(localFile);

out.write("content".getBytes("UTF-8"));

out.flush(); // flush to operating system

out.getFD().sync(); // sync to disk (getFD()返回與該流所對應的文件描述符)

assertThat(localFile.length(), is(((long) "content".length())));

在HDFS中關閉一個流隱式的調用了sync()方法:

Path p = new Path("p");

OutputStream out = fs.create(p);

out.write("content".getBytes("UTF-8"));

out.close();

assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));


由於Hadoop中的一致性模型限制,如果我們不調用sync()方法的話,我們很可能會丟失多大一個block的數據。這是難以接受的,所以我們應該使用sync()方法來確保數據已經寫入磁碟。但頻繁調用sync()方法也是不好的,因為會造成很多額外開銷。我們可以再寫入一定量數據後調用sync()方法一次,至於這個具體的數據量大小就要根據你的應用程序而定了,在不影響你的應用程序的性能的情況下,這個數據量應越大越好。

『貳』 如何架構大數據系統 hadoop

大數據數量龐大,格式多樣化。大量數據由家庭、製造工廠和辦公場所的各種設備、互聯網事務交易、社交網路的活動、自動化感測器、移動設備以及科研儀器等生成。它的爆炸式增長已超出了傳統IT基礎架構的處理能力,給企業和社會帶來嚴峻的數據管理問題。因此必須開發新的數據架構,圍繞「數據收集、數據管理、數據分析、知識形成、智慧行動」的全過程,開發使用這些數據,釋放出更多數據的隱藏價值。

一、大數據建設思路

1)數據的獲得

四、總結

基於分布式技術構建的大數據平台能夠有效降低數據存儲成本,提升數據分析處理效率,並具備海量數據、高並發場景的支撐能力,可大幅縮短數據查詢響應時間,滿足企業各上層應用的數據需求。

『叄』 怎麼優化hadoop任務調度演算法

首先介紹了Hadoop平台下作業的分布式運行機制,然後對Hadoop平台自帶的4種任務調度器做分析和比較,最後在分析JobTracker類文件的基礎上指出了創建自定義任務調度器所需完成的工作。
首先Hadoop集群式基於單伺服器的,只有一個伺服器節點負責調度整個集群的作業運行,主要的具體工作是切分大數據量的作業,指定哪些Worker節點做Map工作、哪些Worker節點做Rece工作、與Worker節點通信並接受其心跳信號、作為用戶的訪問入口等等。其次,集群中的每個Worker節點相當於一個器官,運行著主節點所指派的具體作業。這些節點會被分為兩種類型,一種是接收分塊之後的作業並做映射工作。另一種是負責把前面所做的映射工作按照約定的規則做一個統計。
Task-Tracker通過運行一個簡單循環來定期地發送心跳信號(heartbeat)給JobTracker.這個心跳信號會把TaskTracker是否還在存活告知JobTracker,TaskTracker通過信號指明自己是否已經准備
好運行新的任務.一旦TaskTracker已經准備好接受任務,JobTracker就會從作業優先順序表中選定一個作業並分配下去.至於到底是執行Map任務還是Rece任務,是由TaskTracker的任務槽所決定的.默認的任務調度器在處理Rece任務之前,會優先填滿空閑的Map任務槽.因此,如果TaskTracker滿足存在至少一個空閑任務槽時,JobTracker會為它分配Map任務,否則為它選擇一個Rece任務.TaskTracker在運行任務的時候,第一步是從共享文件系統中把作業的JAR文件復制過來,從而實現任務文件的本地化.第二步是TaskTracker為任務新建一個本地文件夾並把作業文件解壓在此目錄中.第三步是由Task-Tracker新建一個TaskRunner實例來運行該任務.
Hadoop平台默認的調度方案就是JobQueueTaskScheler,這是一種按照任務到來的時間先後順序而執行的調度策略.這種方式比較簡單,JobTracker作為主控節點,僅僅是依照作業到來的先後順序而選擇將要執行的作業.當然,這有一定的缺陷,由於Hadoop平台是默認將作業運行在整個集群上的,那麼如果一個耗時非常大的作業進入執行期,將會導致其餘大量作業長時間得不到運行.這種長時間運行的優先順序別並不高的作業帶來了嚴重的作業阻塞,使得整個平台的運行效率處在較低的水平.Hadoop平台對這種FIFO(FirstINAndFirstOut)機制所給出的解決辦法是調用SetJobPriority()方法,通過設置作業的權重級別來做平衡調度.
FairScheler是一種「公平」調度器,它的目標是讓每個用戶能夠公平地共享Hadoop集群計算能力.當只有一個作業運行的時候,它會得到整個集群的資源.隨著提交到作業表中作業的增多,Hadoop平台會把集群中空閑出來的時間槽公平分配給每個需要執行的作業.這樣即便其中某些作業需要較長時間運行,平台仍然有能力讓那些短作業在合理時間內完成[3].FairScheler支持資源搶占,當一個資源池在一定時段內沒有得到公平共享時,它會終止該資源池所獲得的過多的資源,同時把這些釋放的資源讓給那些資源不足的資源池.
Hadoop平台中的CapacityScheler是由Yahoo貢獻的,在調度器上,設置了三種粒度的對象:queue,job,task.在該策略下,平台可以有多個作業隊列,每個作業隊列經提交後,都會獲得一定數量的TaskTracker資源.具體調度流程如下.
(1)選擇queue,根據資源庫的使用情況從小到大排序,直到找到一個合適的job.
(2)選擇job,在當前所選定的queue中,按照作業提交的時間先後以及作業的權重優先順序別進行排序,選擇合適的job.當然,在job選擇時還需要考慮所選作業是否超出目前現有的資源上限,以及資源池中的內存是否夠該job的task用等因素.
(3)選擇task,根據本地節點的資源使用情況來選擇合適的task.
雖然Hadoop平台自帶了幾種調度器,但是上述3種調度方案很難滿足公司復雜的應用需求.因此作為平台的個性化使用者,往往需要開發自己的調度器.Hadoop的調度器是在JobTracker中載入和調用的,因此開發一個自定義的調度器就必須搞清楚JobTracker類文件的內部機制.作為Hadoop平台的核心組件,JobTracker監控著整個集群的作業運行情況並對資源進行管理調度.每個Task-Tracker每隔3s通過heartbeat向JobTracker匯報自己管理的機器的一些基本信息,包括內存使用量、內存的剩餘量以及空閑的slot數目等等[5].一
旦JobTracker發現了空閑slot,便會調用調度器中的AssignTask方法為該TaskTracker分配task。

閱讀全文

與java環形隊列相關的資料

熱點內容
哪個網站查離婚法規 瀏覽:595
安卓有哪些好的運動app 瀏覽:99
考拉網易app是干什麼的 瀏覽:535
linuxl但是nm沒有看到 瀏覽:744
天津修手機用什麼app 瀏覽:919
百度雲文件審查 瀏覽:399
iphone左下角黑影 瀏覽:867
ps打開文件提示不能完成 瀏覽:885
網路斷了打什麼電話投訴 瀏覽:609
串口編程軟體有哪些 瀏覽:176
mastercam自動編程後怎麼看坐標 瀏覽:264
js獲取cpu序列號 瀏覽:99
extjs延遲函數 瀏覽:240
win10excel臨時文件 瀏覽:806
蘋果自帶地圖使用技巧 瀏覽:972
php怎麼刪除文件 瀏覽:908
eps文件怎麼在ps中釋放圖層 瀏覽:996
買什麼視頻app 瀏覽:329
微信如何恢復app自動扣費 瀏覽:856
手機升級怎樣升級到win10 瀏覽:994

友情鏈接