導航:首頁 > 文件教程 > hadoop寫文本文件java

hadoop寫文本文件java

發布時間:2024-11-13 21:36:02

① HDFS文件

Hadoop支持的文件系統由很多(見下圖),HDFS只是其中一種實現。java抽象類 org.apache.hadoop.fs.FileSystem 定義了Hadoop中一個文件系統的客戶端介面,並且該抽象類有幾個具體實現。Hadoop一般使用URI(下圖)方案來選取合適的文件系統實例進行交互。

特別的,HDFS文件系統的操作可以使用 FsSystem shell 、客戶端(http rest api、Java api、C api等)。

FsSystem shell 的用法基本同本地shell類似,命令可參考 FsSystem shell

Hadoop是用Java寫的,通過Java Api( FileSystem 類)可以調用大部分Hadoop文件系統的交互操作。更詳細的介紹可參考 hadoop Filesystem 。

非Java開發的應用可以使用由WebHDFS協議提供的HTTP REST API,但是HTTP比原生的Java客戶端要慢,所以不到萬不得已盡量不要使用HTTP傳輸特大數據。通過HTTP來訪問HDFS有兩種方法:

兩種如圖

在第一種情況中,namenode和datanode內嵌的web服務作為WebHDFS的端節點運行(是否啟用WebHDFS可通過dfs.webhdfs.enabled設置,默認為true)。文件元數據在namenode上,文件讀寫操作首先被發往namenode,有namenode發送一個HTTP重定向至某個客戶端,指示以流的方式傳輸文件數據的目的或源datanode。

第二種方法依靠一個或多個獨立代理伺服器通過HTTP訪問HDFS。所有集群的網路通信都需要通過代理,因此客戶端從來不直接訪問namenode或datanode。使用代理後可以使用更嚴格的防火牆策略和帶寬策略。

HttpFs代理提供和WebHDFS相同的HTTP介面,這樣客戶端能夠通過webhdfs URI訪問介面。HttpFS代理啟動獨立於namenode和datanode的守護進程,使用httpfs.sh 腳本,默認在一個不同的埠上監聽(14000)。

下圖描述了

讀文件時客戶端與 HDFS 中的 namenode, datanode 之間的數據流動。

對上圖的解釋如下:

在讀取過程中, 如果 FSDataInputStream 在和一個 datanode 進行交流時出現了一個錯誤,他就去試一試下一個最接近的塊,他當然也會記住剛才發生錯誤的 datanode 以至於之後不會再在這個 datanode 上進行沒必要的嘗試。 DFSInputStream 也會在 datanode 上傳輸出的數據上核查檢查數(checknums).如果損壞的塊被發現了, DFSInputStream 就試圖從另一個擁有備份的 datanode 中去讀取備份塊中的數據。

在這個設計中一個重要的方面就是客戶端直接從 datanode 上檢索數據,並通過 namenode 指導來得到每一個塊的最佳 datanode。這種設計允許 HDFS 擴展大量的並發客戶端,因為數據傳輸只是集群上的所有 datanode 展開的。期間,namenode 僅僅只需要服務於獲取塊位置的請求(塊位置信息是存放在內存中,所以效率很高)。如果不這樣設計,隨著客戶端數據量的增長,數據服務就會很快成為一個瓶頸。

我們知道,相對於客戶端(之後就是 maprece task 了),塊的位置有以下可能性:

我們認為他們對於客戶端的帶寬遞減,距離遞增(括弧中表示距離)。示意圖如下:

如果集群中的機器都在同一個機架上,我們無需其他配置,若集群比較復雜,由於hadoop無法自動發現網路拓撲,所以需要額外配置網路拓撲。

基本讀取程序,將文件內容輸出到console

FileSystemCat

隨機讀取

展開原碼

下圖描述了寫文件時客戶端與 HDFS 中的 namenode, datanode 之間的數據流動。

對上圖的解釋如下:

如果在任何一個 datanode 在寫入數據的時候失敗了,接下來所做的一切對客戶端都是透明的:首先, pipeline 被關閉,在確認隊列中的剩下的包會被添加進數據隊列的起始位置上,以至於在失敗的節點下游的任 何節點都不會丟失任何的包。然後與 namenode 聯系後,當前在一個好的 datanode 會聯系 namenode, 給失敗節點上還未寫完的塊生成一個新的標識ID, 以至於如果這個失敗的 datanode 不久後恢復了,這個不完整的塊將會被刪除。失敗節點會從 pipeline 中移除,然後剩下兩個好的 datanode 會組成一個的新的 pipeline ,剩下的 這些塊的包(也就是剛才放在數據隊列隊首的包)會繼續寫進 pipeline 中好的 datanode 中。最後,namenode 注意到塊備份數小於規定的備份數,他就安排在另一個節點上創建完成備份,直接從已有的塊中復制就可以。然後一直到滿足了備份數( dfs.replication )。如果有多個節點的寫入失敗了,如果滿足了最小備份數的設置( dfs.namenode.repliction.min ),寫入也將會成功,然後剩下的備份會被集群非同步的執行備份,直到滿足了備份數( dfs.replication )。

創建目錄

文件壓縮有兩大好處:

Hadoop 對於壓縮格式的是自動識別。如果我們壓縮的文件有相應壓縮格式的擴展名(比如 lzo,gz,bzip2 等)。Hadoop 會根據壓縮格式的擴展名自動選擇相對應的解碼器來解壓數據,此過程完全是 Hadoop 自動處理,我們只需要確保輸入的壓縮文件有擴展名。

Hadoop中有多種壓縮格式、演算法和工具,下圖列出了常用的壓縮方法。

表中的「是否可切分」表示對應的壓縮演算法是否支持切分,也就是說是否可以搜索數據流的任意位置並進一步往下讀取數據,可切分的壓縮格式尤其適合MapRece。

所有的壓縮演算法都需要權衡空間/時間:壓縮和解壓縮速度更快,其代價通常是只能節省少量的空間。不同的壓縮工具有不同的特性:

更詳細的比較如下

1.壓縮性能比較

2.優缺點

另外使用hadoop原生(native)類庫比其他java實現有更快的壓縮和解壓縮速度。特徵比較如下:

使用容器文件格式結合壓縮演算法也能更好的提高效率。順序文件、Arvo文件、ORCFiles、Parqurt文件同時支持壓縮和切分。

壓縮舉例(Java)

壓縮

解壓縮

六、文件序列化

序列化是指將結構化數據轉換為位元組流以便在網路上傳輸或寫到磁碟進行永久存儲。反序列化獅子將位元組流轉換回結構化對象的逆過程。

序列化用於分布式數據處理的兩大領域:進程間通信和永久存儲。

對序列化的要求時是格式緊湊(高效使用存儲空間)、快速(讀寫效率高)、可擴展(可以透明地讀取老格式數據)且可以互操作(可以使用不同的語言讀寫數據)。

Hadoop使用的是自己的序列化格式 Writable ,它絕對緊湊、速度快,但不太容易用java以外的語言進行擴展或使用。

當然,用戶也可以使用其他序列化框架或者自定義序列化方式,如 Avro 框架。

Hadoop內部還使用了 Apache Thrift 和 Protocal Buffers 來實現RPC和數據交換。

② Hadoop讀寫文件時內部工作機制是怎樣的

客戶端通過調用FileSystem對象(對應於HDFS文件系統,調用DistributedFileSystem對象)的open()方法來打開文件(也即圖中的第一步),DistributedFileSystem通過RPC(Remote Procere Call)調用詢問NameNode來得到此文件最開始幾個block的文件位置(第二步)。對每一個block來說,namenode返回擁有此block備份的所有namenode的地址信息(按集群的拓撲網路中與客戶端距離的遠近排序,關於在Hadoop集群中如何進行網路拓撲請看下面介紹)。如果客戶端本身就是一個datanode(如客戶端是一個maprece任務)並且此datanode本身就有所需文件block的話,客戶端便從本地讀取文件。

以上步驟完成後,DistributedFileSystem會返回一個FSDataInputStream(支持文件seek),客戶端可以從FSDataInputStream中讀取數據。FSDataInputStream包裝了一個DFSInputSteam類,用來處理namenode和datanode的I/O操作。

客戶端然後執行read()方法(第三步),DFSInputStream(已經存儲了欲讀取文件的開始幾個block的位置信息)連接到第一個datanode(也即最近的datanode)來獲取數據。通過重復調用read()方法(第四、第五步),文件內的數據就被流式的送到了客戶端。當讀到該block的末尾時,DFSInputStream就會關閉指向該block的流,轉而找到下一個block的位置信息然後重復調用read()方法繼續對該block的流式讀取。這些過程對於用戶來說都是透明的,在用戶看來這就是不間斷的流式讀取整個文件。

當真個文件讀取完畢時,客戶端調用FSDataInputSteam中的close()方法關閉文件輸入流(第六步)。

如果在讀某個block是DFSInputStream檢測到錯誤,DFSInputSteam就會連接下一個datanode以獲取此block的其他備份,同時他會記錄下以前檢測到的壞掉的datanode以免以後再無用的重復讀取該datanode。DFSInputSteam也會檢查從datanode讀取來的數據的校驗和,如果發現有數據損壞,它會把壞掉的block報告給namenode同時重新讀取其他datanode上的其他block備份。

這種設計模式的一個好處是,文件讀取是遍布這個集群的datanode的,namenode只是提供文件block的位置信息,這些信息所需的帶寬是很少的,這樣便有效的避免了單點瓶頸問題從而可以更大的擴充集群的規模。


Hadoop中的網路拓撲


在Hadoop集群中如何衡量兩個節點的遠近呢?要知道,在高速處理數據時,數據處理速率的唯一限制因素就是數據在不同節點間的傳輸速度:這是由帶寬的可怕匱乏引起的。所以我們把帶寬作為衡量兩個節點距離大小的標准。

但是計算兩個節點之間的帶寬是比較復雜的,而且它需要在一個靜態的集群下才能衡量,但Hadoop集群一般是隨著數據處理的規模動態變化的(且兩兩節點直接相連的連接數是節點數的平方)。於是Hadoop使用了一個簡單的方法來衡量距離,它把集群內的網路表示成一個樹結構,兩個節點之間的距離就是他們離共同祖先節點的距離之和。樹一般按數據中心(datacenter),機架(rack),計算節點(datanode)的結構組織。計算節點上的本地運算速度最快,跨數據中心的計算速度最慢(現在跨數據中心的Hadoop集群用的還很少,一般都是在一個數據中心內做運算的)。

假如有個計算節點n1處在數據中心c1的機架r1上,它可以表示為/c1/r1/n1,下面是不同情況下兩個節點的距離:

• distance(/d1/r1/n1, /d1/r1/n1) = 0 (processes on the same node)

• distance(/d1/r1/n1, /d1/r1/n2) = 2 (different nodes on the same rack)

• distance(/d1/r1/n1, /d1/r2/n3) = 4 (nodes on different racks in the same data center)

• distance(/d1/r1/n1, /d2/r3/n4) = 6 (nodes in different data centers)

如下圖所示:


Hadoop

寫文件


現在我們來看一下Hadoop中的寫文件機制解析,通過寫文件機制我們可以更好的了解一下Hadoop中的一致性模型。


Hadoop

上圖為我們展示了一個創建一個新文件並向其中寫數據的例子。

首先客戶端通過DistributedFileSystem上的create()方法指明一個欲創建的文件的文件名(第一步),DistributedFileSystem再通過RPC調用向NameNode申請創建一個新文件(第二步,這時該文件還沒有分配相應的block)。namenode檢查是否有同名文件存在以及用戶是否有相應的創建許可權,如果檢查通過,namenode會為該文件創建一個新的記錄,否則的話文件創建失敗,客戶端得到一個IOException異常。DistributedFileSystem返回一個FSDataOutputStream以供客戶端寫入數據,與FSDataInputStream類似,FSDataOutputStream封裝了一個DFSOutputStream用於處理namenode與datanode之間的通信。

當客戶端開始寫數據時(第三步),DFSOutputStream把寫入的數據分成包(packet), 放入一個中間隊列——數據隊列(data queue)中去。DataStreamer從數據隊列中取數據,同時向namenode申請一個新的block來存放它已經取得的數據。namenode選擇一系列合適的datanode(個數由文件的replica數決定)構成一個管道線(pipeline),這里我們假設replica為3,所以管道線中就有三個datanode。DataSteamer把數據流式的寫入到管道線中的第一個datanode中(第四步),第一個datanode再把接收到的數據轉到第二個datanode中(第四步),以此類推。

DFSOutputStream同時也維護著另一個中間隊列——確認隊列(ack queue),確認隊列中的包只有在得到管道線中所有的datanode的確認以後才會被移出確認隊列(第五步)。

如果某個datanode在寫數據的時候當掉了,下面這些對用戶透明的步驟會被執行:

1)管道線關閉,所有確認隊列上的數據會被挪到數據隊列的首部重新發送,這樣可以確保管道線中當掉的datanode下流的datanode不會因為當掉的datanode而丟失數據包。

2)在還在正常運行的datanode上的當前block上做一個標志,這樣當當掉的datanode重新啟動以後namenode就會知道該datanode上哪個block是剛才當機時殘留下的局部損壞block,從而可以把它刪掉。

3)已經當掉的datanode從管道線中被移除,未寫完的block的其他數據繼續被寫入到其他兩個還在正常運行的datanode中去,namenode知道這個block還處在under-replicated狀態(也即備份數不足的狀態)下,然後他會安排一個新的replica從而達到要求的備份數,後續的block寫入方法同前面正常時候一樣。

有可能管道線中的多個datanode當掉(雖然不太經常發生),但只要dfs.replication.min(默認為1)個replica被創建,我們就認為該創建成功了。剩餘的replica會在以後非同步創建以達到指定的replica數。

當客戶端完成寫數據後,它會調用close()方法(第六步)。這個操作會沖洗(flush)所有剩下的package到pipeline中,等待這些package確認成功,然後通知namenode寫入文件成功(第七步)。這時候namenode就知道該文件由哪些block組成(因為DataStreamer向namenode請求分配新block,namenode當然會知道它分配過哪些blcok給給定文件),它會等待最少的replica數被創建,然後成功返回。


replica是如何分布的


Hadoop在創建新文件時是如何選擇block的位置的呢,綜合來說,要考慮以下因素:帶寬(包括寫帶寬和讀帶寬)和數據安全性。如果我們把三個備份全部放在一個datanode上,雖然可以避免了寫帶寬的消耗,但幾乎沒有提供數據冗餘帶來的安全性,因為如果這個datanode當機,那麼這個文件的所有數據就全部丟失了。另一個極端情況是,如果把三個冗餘備份全部放在不同的機架,甚至數據中心裏面,雖然這樣數據會安全,但寫數據會消耗很多的帶寬。Hadoop 0.17.0給我們提供了一個默認replica分配策略(Hadoop 1.X以後允許replica策略是可插拔的,也就是你可以自己制定自己需要的replica分配策略)。replica的默認分配策略是把第一個備份放在與客戶端相同的datanode上(如果客戶端在集群外運行,就隨機選取一個datanode來存放第一個replica),第二個replica放在與第一個replica不同機架的一個隨機datanode上,第三個replica放在與第二個replica相同機架的隨機datanode上。如果replica數大於三,則隨後的replica在集群中隨機存放,Hadoop會盡量避免過多的replica存放在同一個機架上。選取replica的放置位置後,管道線的網路拓撲結構如下所示:


Hadoop

總體來說,上述默認的replica分配策略給了我們很好的可用性(blocks放置在兩個rack上,較為安全),寫帶寬優化(寫數據只需要跨越一個rack),讀帶寬優化(你可以從兩個機架中選擇較近的一個讀取)。


一致性模型


HDFS某些地方為了性能可能會不符合POSIX(是的,你沒有看錯,POSIX不僅僅只適用於linux/unix, Hadoop 使用了POSIX的設計來實現對文件系統文件流的讀取 ),所以它看起來可能與你所期望的不同,要注意。

創建了一個文件以後,它是可以在命名空間(namespace)中可以看到的:

Path p = new Path("p");

fs.create(p);

assertThat(fs.exists(p), is(true));

但是任何向此文件中寫入的數據並不能保證是可見的,即使你flush了已經寫入的數據,此文件的長度可能仍然為零:

Path p = new Path("p");

OutputStream out = fs.create(p);

out.write("content".getBytes("UTF-8"));

out.flush();

assertThat(fs.getFileStatus(p).getLen(), is(0L));

這是因為,在Hadoop中,只有滿一個block數據量的數據被寫入文件後,此文件中的內容才是可見的(即這些數據會被寫入到硬碟中去),所以當前正在寫的block中的內容總是不可見的。

Hadoop提供了一種強制使buffer中的內容沖洗到datanode的方法,那就是FSDataOutputStream的sync()方法。調用了sync()方法後,Hadoop保證所有已經被寫入的數據都被沖洗到了管道線中的datanode中,並且對所有讀者都可見了:

Path p = new Path("p");

FSDataOutputStream out = fs.create(p);

out.write("content".getBytes("UTF-8"));

out.flush();

out.sync();

assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));

這個方法就像POSIX中的fsync系統調用(它沖洗給定文件描述符中的所有緩沖數據到磁碟中)。例如,使用java API寫一個本地文件,我們可以保證在調用flush()和同步化後可以看到已寫入的內容:

FileOutputStream out = new FileOutputStream(localFile);

out.write("content".getBytes("UTF-8"));

out.flush(); // flush to operating system

out.getFD().sync(); // sync to disk (getFD()返回與該流所對應的文件描述符)

assertThat(localFile.length(), is(((long) "content".length())));

在HDFS中關閉一個流隱式的調用了sync()方法:

Path p = new Path("p");

OutputStream out = fs.create(p);

out.write("content".getBytes("UTF-8"));

out.close();

assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));


由於Hadoop中的一致性模型限制,如果我們不調用sync()方法的話,我們很可能會丟失多大一個block的數據。這是難以接受的,所以我們應該使用sync()方法來確保數據已經寫入磁碟。但頻繁調用sync()方法也是不好的,因為會造成很多額外開銷。我們可以再寫入一定量數據後調用sync()方法一次,至於這個具體的數據量大小就要根據你的應用程序而定了,在不影響你的應用程序的性能的情況下,這個數據量應越大越好。

③ 運行hadoop的wordcount時沒有hadoop-***-core.jar文件

你仿照書上寫的wordcount代碼其實是一個maprece程序,其運行在hadoop平台上,按照正常的開發實現步驟專,應屬該現在linux搭建hadoop集群或者偽分布,然後當你在Eclipse裡面寫了maprece程序之後,將你的項目打成jar包之後再hadoop集群裡面運行,或者用Eclipse集成hadoop做測試。你這樣直接在Eclipse裡面寫,是沒辦法運行的,就像你看了Java書在書上有一個helloword程序,然後你照著敲了一遍,然後直接在本機上運行,卻忽略了你沒裝jdk一樣,你的helloword是肯定沒辦法運行的

④ hadoop的幾個問題 1.將本地文件復制到hdfs中,那麼在hdfs中這個文件是存放在namenode還是分開放在datanode

試著回答:
先說明一下:
1. namenode負責管理目錄和文件信息,真正的文件塊是存放在datanode上。
2. 每個map和rece(即task)都是java進程,默版認是有單獨的jvm的,所以不可能同一個類的對象會在不同節點上。
看你的描述是把namenode,datanode和jobtracker,tasktracker有點混了。

所以:
問題1. 分塊存放在datanode上
問題2.inputformat是在datanode上,確切的說是在tasktracker中。每權個map和rece都會有自己的對象,當多個map讀入一個文件時,實際上不同的map是讀的文件不同的塊,rece也是一樣,各個任務讀入的數據是不相交的。
問題3.rece輸出肯定是在hdfs上,和普通文件一樣在datanode上。
問題4.每個recer會有自己的outputformat對象,與前面inputformat原因一樣。

⑤ 如何編寫hadoop java程序

1.編譯java
# mkdir class
#Javac -classpath .:lib/hadoop-common-2.2.0.jar:lib/hadoop-annotations-2.2.0.jar -d class HADemo.java
2.生成jar包
#jar -cvf hademo.jar -C class/ .
added manifest
adding: com/(in = 0) (out= 0)(stored 0%)
adding: com/wan/(in = 0) (out= 0)(stored 0%)
adding: com/wan/demo/(in = 0) (out= 0)(stored 0%)
adding: com/wan/demo/HADemo.class(in = 844) (out= 520)(deflated 38%)
3.測試運行
#hadoop jar hademo.jar com.wan.demo.HADemo /test
檢測:
#hadoop fs -ls /

⑥ 想轉行大數據必須有兩年Java開發經驗嗎

大數據主要分兩個方向,一個是大數據的收集整理,一個是大數據的分析。收集的話目前主回要用hadoop,是答用java編寫的,會java最好。如果是大數據分析的話用Python、R、matlab都有,不需要會java。

閱讀全文

與hadoop寫文本文件java相關的資料

熱點內容
文件怎麼轉發 瀏覽:94
數控機床編程與操作怎麼啟動 瀏覽:636
linux查找c文件是否存在 瀏覽:150
從事程序員的身體要求 瀏覽:259
txt文件轉成json文件 瀏覽:941
iosapp怎麼讓未讀消息顯示 瀏覽:805
百度智能雲上傳文件軟體 瀏覽:756
怎麼把電腦盤設密碼 瀏覽:768
蘋果直徑怎麼量 瀏覽:542
alienware13升級 瀏覽:14
循環載入js 瀏覽:759
qq電話記錄在哪個文件夾 瀏覽:325
jsf如何返回json數據 瀏覽:136
javascript百度地圖 瀏覽:380
蘋果4怎麼弄3g網路 瀏覽:775
如何刪除公司文件 瀏覽:659
u盤歌曲怎麼從文件夾剪切出來 瀏覽:766
錯誤數據怎麼解決 瀏覽:835
株洲編程學校哪個好 瀏覽:266
linuxlast時間 瀏覽:305

友情鏈接