❶ spark中怎麼將讀取的每一行的數據按某幾行拼接成一行 新手,求指教,謝謝!
assert(args.length>1)
val_from=args(0)
val_to=args(1)
vals=sc.textFile(_from).collect()
valn=if(args.length>2)args(2).toIntelse2
valnumSlices=s.length/n
valx=sc.parallelize(s,numSlices).zipWithIndex().aggregate(List.empty[List[String]])(seqOp=(result,lineWithIndex)=>{
lineWithIndexmatch{
case(line,index)=>
if(index%n==0)List(line)::resultelse(line::result.head)::result.tail
}
},combOp=(x,y)=>x:::y).map(_.reversemkString"")
sc.parallelize(x).saveAsTextFile(_to)
textFile 不能指定分區數目,所以只能parallelize, n是每幾行一合並,RDD的aggregate方法與foldLeft類似,因為並行,合並之後的行間順序不確定的
下面給出非RDD操作示例
vals=List("123","234","345","456","567","678")
valn=3
//sparkRDD沒有foldLeft
valx=s.zipWithIndex.foldLeft(List.empty[List[String]])((result,lineWithIndex)=>{
lineWithIndexmatch{
case(line,index)=>
if(index%n==0)List(line)::resultelse(line::result.head)::result.tail
}
}).reverse.map(_.reverse)
println(x)
❷ 用spark獲取日誌文件中記錄內容
使用Apache Spark可以方便地讀取並處理日誌文件中的記錄內容。
下面是一個使用Spark讀取日誌文件中的記錄內容的示例代碼:
# 導入Spark相關的庫
from pyspark import SparkContext, SparkConf
# 創建SparkContext對象
sc = SparkContext(appName="Log Processing")
# 讀取日誌文件
log_file = sc.textFile("/path/to/log/file.log")
# 按行解析日誌記錄
log_records = log_file.map(lambda line: line.split(" "))
# 過濾出指定類型的日誌記錄
filtered_records = log_records.filter(lambda record: record[2] == "ERROR")
# 對日誌記錄進行處理,如統計數量或分析日誌信息等
# ...
# 關閉SparkContext
sc.stop()
上面的示例代碼首先使用Spark的textFile()方法讀取日誌文件,然後使用map()方法將日誌文件的每一行按空格分割成一個數組,得到一個日誌記錄的RDD。接著使用filter()方法過濾出指定類型的日誌記錄,最後對日誌記錄進行處理。
使用Spark處理日誌文件的優點在於,可以利用Spark的分布式計算能力,對大量的日誌文件進行快速的處理。例如,可以使用Spark的MapRece演算法快速統計日誌文件中各種類型的記錄數量,或者使用Spark SQL快速查詢日誌文件中的特定信息。
此外,Spark還提供了豐富的API和演算法庫,可以方便地進行數據清洗、數據分析和特徵提取等復雜的數據處理任務。例如,可以使用Spark MLlib庫進行機器學習,或者使用Spark GraphX庫進行圖計算等。
總之,使用Spark可以方便地讀取並處理日誌文件中的記錄內容,是一種高效的數據處理方式。
❸ spark與hdfs怎麼載入本地文件
hdfs和本地文件系統是並列的, 都可以使用spark進行載入, 只不過 hdfs是分布式的文件系統, spark 可以使用 sc.textFile("hdfs://....")操作hdfs的文件也可以 sc.textFile("本地文件路徑")來載入本地文件的路徑。
❹ Spark 怎麼讀文件名
java">//多易大數據
importorg.apache.hadoop.io.{LongWritable,Text}
importorg.apache.hadoop.mapred.{FileSplit,InputSplit,TextInputFormat}
importorg.apache.spark.{SparkConf,SparkContext}
importorg.apache.spark.rdd.HadoopRDDobjecttestPathOld{
defmain(args:Array[String]):Unit={
valconf=newSparkConf()
conf.setAppName("testtoParquet")
conf.setMaster("local")
valsc=newSparkContext(conf)
varinput="/home/dwj/data/testSpark/20180409"
valfileRDD=sc.hadoopFile[LongWritable,Text,TextInputFormat](input)
valhadoopRDD=fileRDD.asInstanceOf[HadoopRDD[LongWritable,Text]]
valfileAdnLine=hadoopRDD.mapPartitionsWithInputSplit((inputSplit:InputSplit,iterator:Iterator[(LongWritable,Text)])=>{
valfile=inputSplit.asInstanceOf[FileSplit]
iterator.map(x=>{file.getPath.toString()+" "+x._2})
})
fileAdnLine.foreach(println)
}
}
❺ spark saveAsTextfile 方法保存的文件part-00000 是空文件
spark中saveAsTextFile如何最終生成一個文件
一般而言,saveAsTextFile會按照執行task的多少生成多少個文件,比如part-00一直到part-0n,n自然就是task的個數,亦即是最後的stage的分區數。那麼有沒有辦法最後只生成一個文件,而不是成百上千個文件了?答案自然是有辦法。
在RDD上調用coalesce(1,true).saveAsTextFile(),意味著做完計算之後將數據匯集到一個分區,然後再執行保存的動作,顯然,一個分區,Spark自然只起一個task來執行保存的動作,也就只有一個文件產生了。又或者,可以調用repartition(1),它其實是coalesce的一個包裝,默認第二個參數為true。
事情最終就這么簡單嗎?顯然不是。你雖然可以這么做,但代價是巨大的。因為Spark面對的是大量的數據,並且是並行執行的,如果強行要求最後
只有一個分區,必然導致大量的磁碟IO和網路IO產生,並且最終執行rece操作的節點的內存也會承受很大考驗。Spark程序會很慢,甚至死掉。
這往往是初學Spark的一個思維陷阱,需要改變原先那種單線程單節點的思維,對程序的理解要轉變多多個節點多個進程中去,需要熟悉多節點集群自然產生多個文件這種模式。
此外,saveAsTextFile要求保存的目錄之前是沒有的,否則會報錯。所以,最好程序中保存前先判斷一下目錄是否存在。
❻ [spark] Shuffle Read解析 (Sort Based Shuffle)
Shuffle Write 請看 Shuffle Write解析 。
本文將講解shuffle Rece部分,shuffle的下游Stage的第一個rdd是ShuffleRDD,通過其compute方法來獲取上游Stage Shuffle Write溢寫到磁碟文件數據的一個迭代器:
從SparkEnv中獲取shuffleManager(這里是SortShuffleManager),通過manager獲取Reader並調用其read方法來得到一個迭代器。
getReader方法實例化了一個BlockStoreShuffleReader,參數有需要獲取分區對應的partitionId,看看起read方法:
首先實例化了ShuffleBlockFetcherIterator對象,其中一個參數:
該方法獲取rece端數據的來源的元數據,返回的是 Seq[(BlockManagerId, Seq[(BlockId, Long)])],即數據是來自於哪個節點的哪些block的,並且block的數據大小是多少,看看getMapSizesByExecutorId是怎麼實現的:
跟進getStatuses:
若能從mapStatuses獲取到則直接返回,若不能則向mapOutputTrackerMaster通信發送GetMapOutputStatuses消息來獲取元數據。
我們知道一個Executor對應一個CoarseGrainedExecutorBackend,構建CoarseGrainedExecutorBackend的時候會創建一個SparkEnv,創建SparkEnv的時候會創建一個mapOutputTracker,即mapOutputTracker和Executor一一對應,也就是每一個Executor都有一個mapOutputTracker來維護元數據信息。
這里的mapStatuses就是mapOutputTracker保存元數據信息的,mapOutputTracker和Executor一一對應,在該Executor上完成的Shuffle Write的元數據信息都會保存在其mapStatus裡面,另外通過遠程獲取的其他Executor上完成的Shuffle Write的元數據信息也會在當前的mapStatuses中保存。
Executor對應的是mapOutputTrackerWorker,而Driver對應的是mapOutputTrackerMaster,兩者都是在實例化SparkEnv的時候創建的,每個在Executor上完成的Shuffle Task的結果都會注冊到driver端的mapOutputTrackerMaster中,即driver端的mapOutputTrackerMaster的mapStatuses保存這所有元數據信息,所以當一個Executor上的任務需要獲取一個shuffle的輸出時,會先在自己的mapStatuses中查找,找不到再和mapOutputTrackerMaster通信獲取元數據。
mapOutputTrackerMaster收到消息後的處理邏輯:
調用了tracker的post方法:
將該Message加入了mapOutputRequests中,mapOutputRequests是一個鏈式阻塞隊列,在mapOutputTrackerMaster初始化的時候專門啟動了一個線程池來執行這些請求:
看看線程處理類MessageLoop的run方法是怎麼定義的:
通過shuffleId獲取對應序列化後的元數據信息並返回,具體看看的實現:
大體思路是先從緩存中獲取元數據(MapStatuses),獲取到直接返回,若沒有則從mapStatuses獲取,獲取到後將其序列化後返回,隨後返回給mapOutputTrackerWorker(剛才與之通信的節點),mapOutputTracker收到回復後又將元數據序列化並加入當前Executor的mapStatuses中。
再回到getMapSizesByExecutorId方法中,getStatuses得到shuffleID對應的所有的元數據信息後,通過convertMapStatuses方法將獲得的元數據信息轉化成形如Seq[(BlockManagerId, Seq[(BlockId, Long)])]格式的位置信息,用來讀取指定的分區的數據:
這里的參數statuses:Array[MapStatus]是前面獲取的上游stage所有的shuffle Write 文件的元數據,並且是按map端的partitionId排序的,通過zipWithIndex將元素和這個元素在數組中的ID(索引號)組合成鍵/值對,這里的索引號即是map端的partitionId,再根據shuffleId、mapPartitionId、recePartitionId來構建ShuffleBlockId(在map端的ShuffleBlockId構建中的recePartitionId始終是0,因為一個ShuffleMapTask就一個Block,而這里加入的真正的recePartitionId在後面通過index文件獲取對應rece端partition偏移量的時候需要用到),並估算得到對應數據的大小,因為後面獲取遠程數據的時候需要限制大小,最後返回位置信息。
至此mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition)方法完成,返回了指定分區對應的元數據MapStatus信息。
在初始化對象ShuffleBlockFetcherIterator的時候調用了其初始化方法initialize():
先看是怎麼區分local blocks和remote blocks的:
區分完local remote blocks後加入到了隊列fetchRequests中,並調用fetchUpToMaxBytes()來獲取遠程數據:
從fetchRequests中取出FetchRequest,並調用了sendRequest方法:
通過shuffleClient的fetchBlocks方法來獲取對應遠程節點上的數據,默認是通過NettyBlockTransferService的fetchBlocks方法實現的,不管是成功還是失敗都將構建SuccessFetchResult & FailureFetchResult 結果放入results中。
獲取完遠程的數據接著通過fetchLocalBlocks()方法來獲取本地的blocks信息:
迭代需要獲取的block,直接從blockManager中獲取數據,並通過結果數據構建SuccessFetchResult或者FailureFetchResult放入results中,看看在blockManager.getBlockData(blockId)的實現:
再看看getBlockData方法:
根據shuffleId和mapId獲取index文件,並創建一個讀文件的文件流,根據block的receId(上面獲取對應partition元數據的時候提到過)跳過對應的Block的數據區,先後獲取開始和結束的offset,然後在數據文件中讀取數據。
得到所有數據結果result後,再回到read()方法中:
這里的ShuffleBlockFetcherIterator繼承了Iterator,results可以被迭代,在其next()方法中將FetchResult以(blockId,inputStream)的形式返回:
在read()方法的後半部分會進行聚合和排序,和Shuffle Write部分很類似,這里大致描述一下。
在需要聚合的前提下,有map端聚合的時候執行combineCombinersByKey,沒有則執行combineValuesByKey,但最終都調用了ExternalAppendOnlyMap的insertAll(iter)方法:
在裡面的迭代最終都會調用上面提到的ShuffleBlockFetcherIterator的next方法來獲取數據。
每次update&insert也會估算currentMap的大小,並判斷是否需要溢寫到磁碟文件,若需要則將map中的數據根據定義的keyComparator對key進行排序後返回一個迭代器,然後寫到一個臨時的磁碟文件,然後新建一個map來放新的數據。
執行完combiners[ExternalAppendOnlyMap]的insertAll後,調用其iterator來返回一個代表一個完整partition數據(內存及spillFile)的迭代器:
跟進ExternalIterator類的實例化:
將currentMap中的數據經過排序後和spillFile數據的iterator組合在一起得到inputStreams ,迭代這個inputStreams ,將所有數據都保存在mergeHeadp中,在ExternalIterator方法的next()方法中將被訪問到。
最後若需要對數據進行全局的排序,則通過只有排序參數的ExternalSorter的insertAll方法來進行排序,和Shuffle Write一樣的這里就不細講了。
最終返回一個指定partition所有數據的一個迭代器。
❼ spark進入txt文件的命令
spark進入txt文件的命令
1、首先啟動spark-shell進入Spark-shell模式:(進入spark目錄下後 輸入命令 bin/spark-shell啟動spark-shell模式)
2、載入text文件(spark創建sc,可以載入本地文件和HDFS文件創建RDD)
val textFile = sc.textFile("file:///home/hadoop/test1.txt") #注意file:後是三個「/」
注意:載入HDFS文件和本地文件都是使用textFile,區別是添加前綴(hdfs://和file://)進行標識。
3、獲取RDD文件textFile所有項(文本文件即總共行數)的計數(還有很多其他的RDD操作,自行網路)
textFile.count() #統計結果顯示 1 行
二、在 spark-shell 中讀取 HDFS 系統文件「/home/hadoop/test.csv(也可以是txt文件)」(如果該文件不存在, 請先創建),然後,統計出文件的行數:
方法一:
1、載入text文件(spark創建sc,可以載入本地文件和HDFS文件創建RDD)
val textFile = sc.textFile("hdfs:///home/hadoop/test.csv") #注意hdfs:後是三個「/」
注意:載入HDFS文件和本地文件都是使用textFile,區別是添加前綴(hdfs://和file://)進行標識。
2、獲取RDD文件textFile所有項的計數
textFile.count() #統計結果顯示 1 行
方法二:(Spark shell 默認是讀取 HDFS 中的文件,需要先上傳文件到 HDFS 中,否則會有「org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/README.md」的錯誤。)
1、省去方法一中第一步的命令(1)中的「hdfs://」,其他部分相同,命令如下:
三、編寫獨立應用程序,讀取 HDFS 系統文件「/user/hadoop/test.txt」(如果該文件不存在, 請先創建),然後,統計出文件的行數;通過 sbt 工具將整個應用程序編譯打包成 JAR 包, 並將生成的 JAR 包通過 spark-submit 提交到 Spark 中運行命令:
1、首先輸入:quit 命令退出spark-shell模式:
2、在終端中執行如下命令創建一個文件夾 sparkapp3 作為應用程序根目錄:
cd ~ # 進入用戶主文件夾
mkdir ./sparkapp3 # 創建應用程序根目錄
mkdir -p ./sparkapp3/src/main/scala # 創建所需的文件夾結構
3、在 ./sparkapp3/src/main/scala 下建立一個名為 SimpleApp.scala 的文件(vim ./sparkapp3/src/main/scala/SimpleApp.scala),添加代碼如下:
/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
def main(args: Array[String]) {
val logFile = "hdfs://localhost:9000/home/hadoop/test.csv"
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2)
val num = logData.count()
println("這個文件有 %d 行!".format(num))
}
}
4、該程序依賴 Spark API,因此我們需要通過 sbt 進行編譯打包。 ./sparkapp3 中新建文件 simple.sbt(vim ./sparkapp3/simple.sbt),添加內容如下,聲明該獨立應用程序的信息以及與 Spark 的依賴關系:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.10"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0-preview2"
❽ Spark 怎麼讀文件名
Apache Spark 本身
1.MLlib
AMPLab
Spark最初誕生於伯克利 AMPLab實驗室,如今依然還是AMPLab所致力的項目,盡管這些不處於Apache Spark Foundation中,但是依然在你日常的github項目中享有相當的地位。
ML Base
Spark本身的MLLib位於三層ML Base中的最底層,MLI位於中間層,ML Optimizer則處於最為抽象的頂層。
2.MLI
3.ML Optimizer (又稱 Ghostface)
Ghostware這個項目在2014年就開始進行了,不過從未對外公布。在這39個機器學習庫中,這是唯一一個霧件,之所以能囊括在這列表中,全憑著AMPLab與ML Base的地位支撐。
ML Base之外
4.Splash
這是近期2015年6月的一個項目,在運行隨機梯度下降(SGD)時這套隨機學習演算法聲稱在性能上比Spark MLib中快了25%-75%。這是AMPLab實驗室的sp標記項目,因此值得我們去閱讀。
5.Keystone ML
KML將端到端的機器學習管道引進到了Spark中,但在近期Spark版本中管道已經趨於成熟。同樣也承諾具有一些計算機視覺能力,我曾經在博客中也提到過這也存在一些局限。
6.Velox
作為一個伺服器專門負責管理大量機器學習模型的收集。
7.CoCoA
通過優化通信模式與shuffles來實現更快的機器學習,詳情可見這篇論文的描述《高效通信分布式雙坐標上升》。
框架
GPU-based
8.DeepLearning4j
我曾經的一則博客有進行說明 《DeepLearning4J 增加了Spark gpu的支持》。
9.Elephas
全新的概念,這也是我寫這篇博客的初衷。它提供了一個介面給Keras。
Non-GPU-based
10.DistML
模式並行下而並非數據並行的參數伺服器(正如 Spark MLib)。
11.Aerosolve
來自Airbnb,用於他們自動化定價。
12. Zen
邏輯斯諦回歸、隱含狄利克雷分布(LDA)、因子分解機、神經網路、受限玻爾茲曼機。
13.Distributed Data Frame
與Spark DataFrame類似,但是引擎是不可知的(例如在未來它將運行在引擎上而不是Spark)。其中包括了交叉驗證和外部機器學習庫的介面。
其他機器學習系統的介面
14. spark-corenlp
封裝了斯坦福CoreNLP。
15. Sparkit-learn
給Python Scikit-learn的介面。
16. Sparkling Water
給 的介面。
17. hivemall-spark
封裝了Hivemall,,在Hive中的機器學習。
18. spark-pmml-exporter-validator
可導出預測模型標記語言(PMML),一種用於傳遞機器學習模型的行業標準的XML格式。
附加組件:增強MLlib中現有的演算法。
19. MLlib-dropout
為Spark MLLib 增加dropout能力。基於以下這篇論文進行的實現,《Dropout:一個簡單的方法來防止神經網路中的過擬合》。
20.generalized-kmeans-clustering
為K-Means演算法增加任意距離函數。
21. spark-ml-streaming
可視化的流式機器學習演算法內置於Spark MLlib。
演算法
監督學習
22. spark-libFM
因子分解機。
23. ScalaNetwork
遞歸神經網路(RNNs)。
24. dissolve-struct
基於上文中提到的高性能Spark通信框架CoCoA下的支持向量機(SVM)。
25. Sparkling Ferns
基於以下這篇論文進行的實現,《通過使用隨機森林與隨機蕨演算法的圖像分類技術》。
26. streaming-matrix-factorization
矩陣分解推薦系統。
❾ spark與hdfs怎麼載入本地文件
利用Spark構建一個樹索引結構。具體內容如下:
(1)數據形式是20-dimension 的double向量
(2)樹的結構是:
1>內部結點都保存一個數據對象和指向子樹的指針,每個子樹中數據對象與父節點中保留的對象的距離由遠到近不同
2>葉子結點保存數據對象的列表
(3)想通過Spark載入數據,將整個數據源文件抽象為一個RDD, RDD分區存儲文件的不同塊,然後寫一個索引構建運算元作用到每個分區上,然後構建的索引寫到文件中(索引文件的個數與RDD分區的個數相同)
(4)查找時查找運算元作用到每個索引文件
現在有一個問題就是在構建索引時一個分區中的源數據是滿的,構建的索引數據大於一個分區的空間(因為空間佔用方面,索引數據肯定大於源數據),索引數據會寫到不同的分區中,查找過程中,查找運算元是以分區為單位進行查找的,但是單個分區數據不是一個完整的索引
默認是從hdfs讀取文件,也可以指定sc.textFile("路徑").在路徑前面加上hdfs://表示從hdfs文件系統上讀
本地文件讀取 sc.textFile("路徑").在路徑前面加上file:// 表示從本地文件系統讀,如file:///home/user/spark/README.md
❿ spark怎樣讀取本地文件
默認是從hdfs讀取抄文件,也可以指定sc.textFile("路徑").在路徑前面加上hdfs://表示從hdfs文件系統上讀
本地文件讀取 sc.textFile("路徑").在路徑前面加上file:// 表示從本地文件系統讀,如file:///home/user/spark/README.md