A. 如何讓Hadoop結合R語言做大數據分析
R語言和讓我們體會到了,兩種技術在各自領域的強大。很多開發人員在計算機的角度,都會提出下面2個問題。問題1: Hadoop的家族如此之強大,為什麼還要結合R語言?
問題2: Mahout同樣可以做數據挖掘和機器學習,和R語言的區別是什麼?下面我嘗試著做一個解答:問題1: Hadoop的家族如此之強大,為什麼還要結合R語言?
a. Hadoop家族的強大之處,在於對大數據的處理,讓原來的不可能(TB,PB數據量計算),成為了可能。
b. R語言的強大之處,在於統計分析,在沒有Hadoop之前,我們對於大數據的處理,要取樣本,假設檢驗,做回歸,長久以來R語言都是統計學家專屬的工具。
c. 從a和b兩點,我們可以看出,hadoop重點是全量數據分析,而R語言重點是樣本數據分析。 兩種技術放在一起,剛好是最長補短!
d. 模擬場景:對1PB的新聞網站訪問日誌做分析,預測未來流量變化
d1:用R語言,通過分析少量數據,對業務目標建回歸建模,並定義指標d2:用Hadoop從海量日誌數據中,提取指標數據d3:用R語言模型,對指標數據進行測試和調優d4:用Hadoop分步式演算法,重寫R語言的模型,部署上線這個場景中,R和Hadoop分別都起著非常重要的作用。以計算機開發人員的思路,所有有事情都用Hadoop去做,沒有數據建模和證明,」預測的結果」一定是有問題的。以統計人員的思路,所有的事情都用R去做,以抽樣方式,得到的「預測的結果」也一定是有問題的。所以讓二者結合,是產界業的必然的導向,也是產界業和學術界的交集,同時也為交叉學科的人才提供了無限廣闊的想像空間。問題2: Mahout同樣可以做數據挖掘和機器學習,和R語言的區別是什麼?
a. Mahout是基於Hadoop的數據挖掘和機器學習的演算法框架,Mahout的重點同樣是解決大數據的計算的問題。
b. Mahout目前已支持的演算法包括,協同過濾,推薦演算法,聚類演算法,分類演算法,LDA, 樸素bayes,隨機森林。上面的演算法中,大部分都是距離的演算法,可以通過矩陣分解後,充分利用MapRece的並行計算框架,高效地完成計算任務。
c. Mahout的空白點,還有很多的數據挖掘演算法,很難實現MapRece並行化。Mahout的現有模型,都是通用模型,直接用到的項目中,計算結果只會比隨機結果好一點點。Mahout二次開發,要求有深厚的JAVA和Hadoop的技術基礎,最好兼有 「線性代數」,「概率統計」,「演算法導論」 等的基礎知識。所以想玩轉Mahout真的不是一件容易的事情。
d. R語言同樣提供了Mahout支持的約大多數演算法(除專有演算法),並且還支持大量的Mahout不支持的演算法,演算法的增長速度比mahout快N倍。並且開發簡單,參數配置靈活,對小型數據集運算速度非常快。
雖然,Mahout同樣可以做數據挖掘和機器學習,但是和R語言的擅長領域並不重合。集百家之長,在適合的領域選擇合適的技術,才能真正地「保質保量」做軟體。
如何讓Hadoop結合R語言?
從上一節我們看到,Hadoop和R語言是可以互補的,但所介紹的場景都是Hadoop和R語言的分別處理各自的數據。一旦市場有需求,自然會有商家填補這個空白。
1). RHadoop
RHadoop是一款Hadoop和R語言的結合的產品,由RevolutionAnalytics公司開發,並將代碼開源到github社區上面。RHadoop包含三個R包 (rmr,rhdfs,rhbase),分別是對應Hadoop系統架構中的,MapRece, HDFS, HBase 三個部分。
2). RHiveRHive是一款通過R語言直接訪問Hive的工具包,是由NexR一個韓國公司研發的。
3). 重寫Mahout用R語言重寫Mahout的實現也是一種結合的思路,我也做過相關的嘗試。
4).Hadoop調用R
上面說的都是R如何調用Hadoop,當然我們也可以反相操作,打通JAVA和R的連接通道,讓Hadoop調用R的函數。但是,這部分還沒有商家做出成形的產品。
5. R和Hadoop在實際中的案例
R和Hadoop的結合,技術門檻還是有點高的。對於一個人來說,不僅要掌握Linux, Java, Hadoop, R的技術,還要具備 軟體開發,演算法,概率統計,線性代數,數據可視化,行業背景 的一些基本素質。在公司部署這套環境,同樣需要多個部門,多種人才的的配合。Hadoop運維,Hadoop演算法研發,R語言建模,R語言MapRece化,軟體開發,測試等等。所以,這樣的案例並不太多。
B. R語言可以處理大的數據嗎
「參考網址1」中提到如果只是對整數運算(運算過程和結果都只使用整數),沒有必要使用「double」(8 byte),而應該用更小的「integer」(4 byte)。使用storage.mode(x)查看對象存數的模式,storage.mode(x) <- 進行賦值;使用format(object.size(a), units = 'auto')查看對象佔用的內存空間(此處有疑問,即在R中每個integer到底佔用了多大的空間?)。
需要解釋gc()函數,可以查看內存使用情況。同樣,在清除了大的對象之後,使用gc()以釋放內存使用空間。
李航在」參考網址2「中提到,對於大矩陣的操作,盡量避免使用cbind和rbind之類,因為這會讓內存不停地分配空間。「對於長度增加的矩陣,盡量先定義一個大矩陣,然後逐步增加」和「注意清除中間對象」。
使用bigmemory家族:bigmemory, biganalytics, synchronicity, bigtabulate and bigalgebra, 同時還有
biglm。
bigmemory package的使用:
1. 建立big.memory對象
bigmemory採用C++的數據格式來「模仿」R中的matrix。
編寫大數據格式文件時候,可以先建立filebacked.big.matrix
big.matrix(nrow, ncol, type = options()$bigmemory.default.type, init = NULL, dimnames = NULL, separated = FALSE, backingfile = NULL, backingpath = NULL, descriptorfile = NULL, shared = TRUE)
filebacked.big.matrix(nrow, ncol, type = options()$bigmemory.default.type, init = NULL, dimnames = NULL, separated = FALSE, backingfile = NULL, backingpath = NULL, descriptorfile = NULL)
as.big.matrix(x, type = NULL, separated = FALSE, backingfile = NULL, backingpath = NULL, descriptorfile = NULL, shared=TRUE)
使用注意:
big.matrix採用兩種方式儲存數據:一種是big.matrix默認的方式,如果內存空間比較大,可以嘗試使用;另外一種是filebacked.big.matrix,這種儲存方法可能會備份文件(file-backings),而且需要descriptor file;
「init」指矩陣的初始化數值,如果設定,會事先將設定的數值填充到矩陣中;如果不設置,將處理為NA
"type"是指在big.matrix中atomic element的儲存格式,默認是「double」(8 byte),可以改為「integer」(4 byte), "short"(2 byte) or "char"(1 byte)。注意:這個包不支持字元串的儲存,type = "char"是指ASCII碼字母。
在big.matrix非常大的時候,避免使用rownames和colnames(並且bigmemory禁止用名稱訪問元素),因為這種做法非常佔用內存。如果一定要改變,使用options(bigmemory.allow.dimnames=TRUE),之後colnames, rownames設置。
直接在命令提示符後輸入x(x是一個big matrix),將返回x的描述,不會出現所有x中所有內容。因此,注意x[ , ](列印出矩陣全部內容);
如果big.matrix有很多列,那麼應該將其轉置後儲存;(不推薦)或者將參數「separated」設置為TRUE,這樣就將每一列分開儲存。否則,將用R的傳統方式(column major的方式)儲存數據。
如果建立一個filebacked.big.matrix,那麼需要指定backingfile的名稱和路徑+descriptorfile。可能多個big.matrix對象對應唯一一個descriptorfile,即如果descriptorfile改變,所以對應的big.matrix隨之改變;同樣,decriptorfile隨著big.matrix的改變而改變;如果想維持一種改變,需要重新建立一個filebacked.big.matrix。attach.big.matrix(descriptorfile or describe(big.matrix))函數用於將一個descriptorfile賦值給一個big.matrix。這個函數很好用,因為每次在創建一個filebacked.big.matrix後,保存R並退出後,先前創建的矩陣會消失,需要再attach.big.matrix以下
2. 對big.matrix的列的特定元素進行條件篩選
對內存沒有限制;而且比傳統的which更加靈活(贊!)
mwhich(x, cols, vals, comps, op = 'AND')
x既可以是big.matrix,也可以是傳統的R對象;
cols:行數
vals:cutoff,可以設定兩個比如c(1, 2)
comps:'eq'(==), 'neq'(!=), 'le'(<), 'lt'(<=), 'ge'(>) and 'gt'(>=)
op:「AND」或者是「OR」
可以直接比較NA,Inf和-Inf
3.bigmemory中其他函數
nrow, ncol, dim, dimnames, tail, head, typeof繼承base包
big.matrix, is.big.matrix, as.big.matrix, attach.big.matrix, describe, read.big.matrix, write.big.matrix, sub.big.matrix, is.sub.big.matrix為特有的big.matrix文件操作;filebacked.big.matrix, is.filebacked(判斷big.matrix是否硬碟備份) , flush(將filebacked的文件刷新到硬碟備份上)是filebacked的big.matrix的操作。
mwhich增強base包中的which, morder增強order,mpermute(對matrix中的一列按照特定序列操作,但是會改變原來對象,這是為了避免內存溢出)
big.matrix對象的使用deep(x, cols = NULL, rows = NULL, y = NULL, type = NULL, separated = NULL, backingfile = NULL, backingpath = NULL, descriptorfile = NULL, shared=TRUE)
biganalytics package的使用
biganalytics主要是一些base基本函數的擴展,主要有max, min, prod, sum, range, colmin, colmax, colsum, colprod, colmean, colsd, colvar, summary, apply(只能用於行或者列,不能用行列同時用)等
比較有特色的是bigkmeans的聚類
剩下的biglm.big.matrix和bigglm.big.matrix可以參考Lumley's biglm package。
bigtabulate package的使用
並行計算限制的突破:
使用doMC家族:doMC, doSNOW, doMPI, doRedis, doSMP和foreach packages.
foreach package的使用
foreach(..., .combine, .init, .final=NULL, .inorder=TRUE, .multicombine=FALSE, .maxcombine=if (.multicombine) 100 else 2, .errorhandling=c('stop', 'remove', 'pass'), .packages=NULL, .export=NULL, .noexport=NULL, .verbose=FALSE)
foreach的特點是可以進行並行運算,如在NetWorkSpace和snow?
%do%嚴格按照順序執行任務(所以,也就非並行計算),%dopar%並行執行任務
...:指定循環的次數;
.combine:運算之後結果的顯示方式,default是list,「c」返回vector, cbind和rbind返回矩陣,"+"和"*"可以返回rbind之後的「+」或者「*」
.init:.combine函數的第一個變數
.final:返回最後結果
.inorder:TRUE則返回和原始輸入相同順序的結果(對結果的順序要求嚴格的時候),FALSE返回沒有順序的結果(可以提高運算效率)。這個參數適合於設定對結果順序沒有需求的情況。
.muticombine:設定.combine函數的傳遞參數,default是FALSE表示其參數是2,TRUE可以設定多個參數
.maxcombine:設定.combine的最大參數
.errorhandling:如果循環中出現錯誤,對錯誤的處理方法
.packages:指定在%dopar%運算過程中依賴的package(%do%會忽略這個選項)。
getDoParWorkers( ) :查看注冊了多少個核,配合doMC package中的registerDoMC( )使用
getDoParRegistered( ) :查看doPar是否注冊;如果沒有注冊返回FALSE
getDoParName( ) :查看已經注冊的doPar的名字
getDoParVersion( ):查看已經注冊的doPar的version
===================================================
# foreach的循環次數可以指定多個變數,但是只用其中最少?的
> foreach(a = 1:10, b = rep(10, 3)) %do% (a*b)
[[1]]
[1] 10
[[2]]
[1] 20
[[3]]
[1] 30
# foreach中.combine的「+」或者「*」是cbind之後的操作;這也就是說"expression"返回一個向量,會對向量+或者*
> foreach(i = 1:4, .combine = "+") %do% 2
[1] 8
> foreach(i = 1:4, .combine = "rbind") %do% rep(2, 5)
[,1] [,2] [,3] [,4] [,5]
result.1 2 2 2 2 2
result.2 2 2 2 2 2
result.3 2 2 2 2 2
result.4 2 2 2 2 2
> foreach(i = 1:4, .combine = "+") %do% rep(2, 5)
[1] 8 8 8 8 8
> foreach(i = 1:4, .combine = "*") %do% rep(2, 5)
[1] 16 16 16 16 16
=============================================
iterators package的使用
iterators是為了給foreach提供循環變數,每次定義一個iterator,它都內定了「循環次數」和「每次循環返回的值」,因此非常適合結合foreach的使用。
iter(obj, ...):可以接受iter, vector, matrix, data.frame, function。
nextElem(obj, ...):接受iter對象,顯示對象數值。
以matrix為例,
iter(obj, by=c('column', 'cell', 'row'), chunksize=1L, checkFunc=function(...) TRUE, recycle=FALSE, ...)
by:按照什麼順序循環;matrix和data.frame都默認是「row」,「cell」是按列依次輸出(所以對於「cell」,chunksize只能指定為默認值,即1)
chunksize:每次執行函數nextElem後,按照by的設定返回結果的長度。如果返回結構不夠,將取剩餘的全部。
checkFunc=function(...) TRUE:執行函數checkFun,如果返回TRUE,則返回;否則,跳過。
recycle:設定在nextElem循環到底(「錯誤: StopIteration」)是否要循環處理,即從頭再來一遍。
以function為例
iter(function()rnorm(1)),使用nextElem可以無限重復;但是iter(rnorm(1)),只能來一下。
更有意思的是對象如果是iter,即test1 <- iter(obj); test2 <- iter(test1),那麼這兩個對象是連在一起的,同時變化。
==============================================
> a
[,1] [,2] [,3] [,4] [,5]
[1,] 1 5 9 13 17
[2,] 2 6 10 14 18
[3,] 3 7 11 15 19
[4,] 4 8 12 16 20
> i2 <- iter(a, by = "row", chunksize=3)
> nextElem(i2)
[,1] [,2] [,3] [,4] [,5]
[1,] 1 5 9 13 17
[2,] 2 6 10 14 18
[3,] 3 7 11 15 19
> nextElem(i2) #第二次iterate之後,只剩下1行,全部返回
[,1] [,2] [,3] [,4] [,5]
[1,] 4 8 12 16 20
> i2 <- iter(a, by = "column", checkFunc=function(x) sum(x) > 50)
> nextElem(i2)
[,1]
[1,] 13
[2,] 14
[3,] 15
[4,] 16
> nextElem(i2)
[,1]
[1,] 17
[2,] 18
[3,] 19
[4,] 20
> nextElem(i2)
錯誤: StopIteration
> colSums(a)
[1] 10 26 42 58 74
> testFun <- function(x){return(x+2)}
> i2 <- iter(function()testFun(1))
> nextElem(i2)
[1] 3
> nextElem(i2)
[1] 3
> nextElem(i2)
[1] 3
> i2 <- iter(testFun(1))
> nextElem(i2)
[1] 3
> nextElem(i2)
錯誤: StopIteration
> i2 <- iter(testFun(1))
> i3 <- iter(i2)
> nextElem(i3)
[1] 3
> nextElem(i2)
錯誤: StopIteration
============================================
iterators package中包括
irnorm(..., count);irunif(..., count);irbinom(..., count);irnbinom(..., count);irpois(..., count)中內部生成iterator的工具,分別表示從normal,uniform,binomial,negativity binomial和Poisson分布中隨機選取N個元素,進行count次。其中,negative binomial分布:其概率積累函數(probability mass function)為擲骰子,每次骰子為3點的概率為p,在第r+k次恰好出現r次的概率。
icount(count)可以生成1:conunt的iterator;如果count不指定,將從無休止生成1:Inf
icountn(vn)比較好玩,vn是指一個數值向量(如果是小數,則向後一個數取整,比如2.3 --> 3)。循環次數為prod(vn),每次返回的向量中每個元素都從1開始,不超過設定 vn,變化速率從左向右依次遞增。
idiv(n, ..., chunks, chunkSize)返回截取從1:n的片段長度,「chunks」和「chunkSize」不能同時指定,「chunks」為分多少片段(長度從大到小),「chunkSize」為分段的最大長度(長度由大到小)
iapply(X, MARGIN):與apply很像,MARGIN中1是row,2是column
isplit(x, f, drop=FALSE, ...):按照指定的f劃分矩陣
=============================================
> i2 <- icountn(c(3.4, 1.2))
> nextElem(i2)
[1] 1 1
> nextElem(i2)
[1] 2 1
> nextElem(i2)
[1] 3 1
> nextElem(i2)
[1] 4 1
> nextElem(i2)
[1] 1 2
> nextElem(i2)
[1] 2 2
> nextElem(i2)
[1] 3 2
> nextElem(i2)
[1] 4 2
> nextElem(i2)
錯誤: StopIteration