1. 如何用go語言每分鍾處理100萬個請求
在Malwarebytes 我們經歷了顯著的增長,自從我一年前加入了矽谷的公司,一個主要的職責成了設計架構和開發一些系統來支持一個快速增長的信息安全公司和所有需要的設施來支持一個每天百萬用戶使用的產品。我在反病毒和反惡意軟體行業的不同公司工作了12年,從而我知道由於我們每天處理大量的數據,這些系統是多麼復雜。
有趣的是,在過去的大約9年間,我參與的所有的web後端的開發通常是通過Ruby on Rails技術實現的。不要錯怪我。我喜歡Ruby on Rails,並且我相信它是個令人驚訝的環境。但是一段時間後,你會開始以ruby的方式開始思考和設計系統,你會忘記,如果你可以利用多線程、並行、快速執行和小內存開銷,軟體架構本來應該是多麼高效和簡單。很多年期間,我是一個c/c++、Delphi和c#開發者,我剛開始意識到使用正確的工具可以把復雜的事情變得簡單些。
作為首席架構師,我不會很關心在互聯網上的語言和框架戰爭。我相信效率、生產力。代碼可維護性主要依賴於你如何把解決方案設計得很簡單。
問題
當工作在我們的匿名遙測和分析系統中,我們的目標是可以處理來自於百萬級別的終端的大量的POST請求。web處理服務可以接收包含了很多payload的集合的jsON數據,這些數據需要寫入Amazon S3中。接下來,map-rece系統可以操作這些數據。
按照習慣,我們會調研服務層級架構,涉及的軟體如下:
Sidekiq
Resque
DelayedJob
Elasticbeanstalk Worker Tier
RabbitMQ
and so on…
搭建了2個不同的集群,一個提供web前端,另外一個提供後端處理,這樣我們可以橫向擴展後端服務的數量。
但是,從剛開始,在 討論階段我們的團隊就知道我們應該使用Go,因為我們看到這會潛在性地成為一個非常龐大( large traffic)的系統。我已經使用了Go語言大約2年時間,我們開發了幾個系統,但是很少會達到這樣的負載(amount of load)。
我們開始創建一些結構,定義從POST調用得到的web請求負載,還有一個上傳到S3 budket的函數。
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}
type Payload struct {
// [redacted]
}
func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}
// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
本地Go routines方法
剛開始,我們採用了一個非常本地化的POST處理實現,僅僅嘗試把發到簡單go routine的job並行化:
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items indivially to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS
}
w.WriteHeader(http.StatusOK)
}
對於中小負載,這會對大多數的人適用,但是大規模下,這個方案會很快被證明不是很好用。我們期望的請求數,不在我們剛開始計劃的數量級,當我們把第一個版本部署到生產環境上。我們完全低估了流量。
上面的方案在很多地方很不好。沒有辦法控制我們產生的go routine的數量。由於我們收到了每分鍾1百萬的POST請求,這段代碼很快就崩潰了。
再次嘗試
我們需要找一個不同的方式。自開始我們就討論過, 我們需要保持請求處理程序的生命周期很短,並且進程在後台產生。當然,這是你在Ruby on Rails的世界裡必須要做的事情,否則你會阻塞在所有可用的工作 web處理器上,不管你是使用puma、unicore還是passenger(我們不要討論JRuby這個話題)。然後我們需要利用常用的處理方案來做這些,比如Resque、 Sidekiq、 SQS等。這個列表會繼續保留,因為有很多的方案可以實現這些。
所以,第二次迭代,我們創建了一個緩沖channel,我們可以把job排隊,然後把它們上傳到S3。因為我們可以控制我們隊列中的item最大值,我們有大量的內存來排列job,我們認為只要把job在channel裡面緩沖就可以了。
var Queue chan Payload
func init() {
Queue = make(chan Payload, MAX_QUEUE)
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items indivially to be posted to S3
for _, payload := range content.Payloads {
Queue <- payload
}
...
}
接下來,我們再從隊列中取job,然後處理它們。我們使用類似於下面的代碼:
func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- STILL NOT GOOD
}
}
}
說實話,我不知道我們在想什麼。這肯定是一個滿是Red-Bulls的夜晚。這個方法不會帶來什麼改善,我們用了一個 有缺陷的緩沖隊列並發,僅僅是把問題推遲了。我們的同步處理器同時僅僅會上傳一個數據到S3,因為來到的請求遠遠大於單核處理器上傳到S3的能力,我們的帶緩沖channel很快達到了它的極限,然後阻塞了請求處理邏輯的queue更多item的能力。
我們僅僅避免了問題,同時開始了我們的系統掛掉的倒計時。當部署了這個有缺陷的版本後,我們的延時保持在每分鍾以常量增長。
最好的解決方案
我們討論過在使用用Go channel時利用一種常用的模式,來創建一個二級channel系統,一個來queue job,另外一個來控制使用多少個worker來並發操作JobQueue。
想法是,以一個恆定速率並行上傳到S3,既不會導致機器崩潰也不好產生S3的連接錯誤。這樣我們選擇了創建一個Job/Worker模式。對於那些熟悉Java、C#等語言的開發者,可以把這種模式想像成利用channel以golang的方式來實現了一個worker線程池,作為一種替代。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// Job represents the job to be run
type Job struct {
Payload Payload
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
我們已經修改了我們的web請求handler,用payload創建一個Job實例,然後發到JobQueue channel,以便於worker來獲取。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items indivially to be posted to S3
for _, payload := range content.Payloads {
// let's create a job with the payload
work := Job{Payload: payload}
// Push the work onto the queue.
JobQueue <- work
}
w.WriteHeader(http.StatusOK)
}
在web server初始化時,我們創建一個Dispatcher,然後調用Run()函數創建一個worker池子,然後開始監聽JobQueue中的job。
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
下面是dispatcher的實現代碼:
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
注意到,我們提供了初始化並加入到池子的worker的最大數量。因為這個工程我們利用了Amazon Elasticbeanstalk帶有的docker化的Go環境,所以我們常常會遵守12-factor方法論來配置我們的生成環境中的系統,我們從環境變了讀取這些值。這種方式,我們控制worker的數量和JobQueue的大小,所以我們可以很快的改變這些值,而不需要重新部署集群。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
直接結果
我們部署了之後,立馬看到了延時降到微乎其微的數值,並未我們處理請求的能力提升很大。
Elastic Load Balancers完全啟動後,我們看到ElasticBeanstalk 應用服務於每分鍾1百萬請求。通常情況下在上午時間有幾個小時,流量峰值超過每分鍾一百萬次。
我們一旦部署了新的代碼,伺服器的數量從100台大幅 下降到大約20台。
我們合理配置了我們的集群和自動均衡配置之後,我們可以把伺服器的數量降至4x EC2 c4.Large實例,並且Elastic Auto-Scaling設置為如果CPU達到5分鍾的90%利用率,我們就會產生新的實例。
總結
在我的書中,簡單總是獲勝。我們可以使用多隊列、後台worker、復雜的部署設計一個復雜的系統,但是我們決定利用Elasticbeanstalk 的auto-scaling的能力和Go語言開箱即用的特性簡化並發。
我們僅僅用了4台機器,這並不是什麼新鮮事了。可能它們還不如我的MacBook能力強大,但是卻處理了每分鍾1百萬的寫入到S3的請求。
處理問題有正確的工具。當你的 Ruby on Rails 系統需要更強大的web handler時,可以考慮下ruby生態系統之外的技術,或許可以得到更簡單但更強大的替代方案。
2. 大數據產業集群創新特徵有哪些
大數據產業集群是指以大數據技術和應用為核心,由企業、政府、高校、科研機構等多方組成的區域性、產業化的協同創新體系。其創新特徵主要包括以下幾個方面:
1、多元化的合作夥伴:大數據產業集群通常涵蓋了多碰悉個領域、行業和組織,可以匯聚不同類笑洞乎型的合作夥伴,包括政府部門、高校、科研機構、企業、投資機構等,並通過開放式的合作模式來促進產業協同創新。
2、創新驅動的發展模式:大數據產業集群往往以創新引領為核心,緊密結合產學研一體化,通過技術研發、人才培養、投融資等方面的支持,推動園區內企業和組織的技術創新和實踐探索,從而實現集群內部的技術優勢轉化和商業價值輸出。
3、聚集效應的經濟規模:大數據產業集群具有聚集效應,使得在同一地域內的企業和組織能夠通過資源共享、信息互通、市場協同等產生經濟規模效應,提高集群的整體競爭力。
4、開放式的創新環境:大數據產業集群為企業和組織提供了一個開放的創新環境,鼓勵創新思維和實顫皮踐,促進產業鏈上下游的知識和技術的交流與融合,同時也給創新創業者提供了更便捷的創新平台和資源支持。
5、效率與可持續性:大數據產業集群通過優化產業布局和組織結構,強化供應鏈管理和服務體系建設,提高集群運營效率和服務水平,同時也注重生態環保和可持續發展,保證集群的長期穩健發展。
3. 大數據集群
大數據(big data),指無法在一定時間范圍內用常規軟體工具進行捕捉、管理和處理的數據集合,是需要新處理模式才能具有更強的決策力、洞察發現力和流程優化能力的海量、高增長率和多樣化的信息資產。
魔方(大數據模型平台)
大數據模型平台是一款基於服務匯流排與分布式雲計算兩大技術架構的一款數據分析、挖掘的工具平台,其採用分布式文件系統對數據進行存儲,支持海量數據的處理。採用多種的數據採集技術,支持結構化數據及非結構化數據的採集。通過圖形化的模型搭建工具,支持流程化的模型配置。通過第三方插件技術,很容易將其他工具及服務集成到平台中去。數據分析研判平台就是海量信息的採集,數據模型的搭建,數據的挖掘、分析最後形成知識服務於實戰、服務於決策的過程,平台主要包括數據採集部分,模型配置部分,模型執行部分及成果展示部分等。
大數據平台數據抽取工具
大數據平台數據抽取工具實現db到hdfs數據導入功能,藉助Hadoop提供高效的集群分布式並行處理能力,可以採用資料庫分區、按欄位分區、分頁方式並行批處理抽取db數據到hdfs文件系統中,能有效解決大數據傳統抽取導致的作業負載過大抽取時間過長的問題,為大數據倉庫提供傳輸管道。數據處理伺服器為每個作業分配獨立的作業任務處理工作線程和任務執行隊列,作業之間互不幹擾靈活的作業任務處理模式:可以增量方式執行作業任務,可配置的任務處理時間策略,根據不同需求定製。採用非同步事件驅動模式來管理和分發作業指令、採集作業狀態數據。通過管理監控端,可以實時監控作業在各個數據處理節點作業任務的實時運行狀態,查看作業的歷史執行狀態,方便地實現提交新的作業、重新執行作業、停止正在執行的作業等操作。
互聯網數據採集工具
網路信息雷達是一款網路信息定向採集產品,它能夠對用戶設置的網站進行數據採集和更新,實現靈活的網路數據採集目標,為互聯網數據分析提供基礎。
未至·雲(互聯網推送服務平台)
雲計算數據中心以先進的中文數據處理和海量數據支撐為技術基礎,並在各個環節輔以人工服務,使得數據中心能夠安全、高效運行。根據雲計算數據中心的不同環節,我們專門配備了系統管理和維護人員、數據加工和編撰人員、數據採集維護人員、平台系統管理員、機構管理員、輿情監測和分析人員等,滿足各個環節的需要。面向用戶我們提供面向政府和面向企業的解決方案。
顯微鏡(大數據文本挖掘工具)
文本挖掘是指從文本數據中抽取有價值的信息和知識的計算機處理技術, 包括文本分類、文本聚類、信息抽取、實體識別、關鍵詞標引、摘要等。基於Hadoop MapRece的文本挖掘軟體能夠實現海量文本的挖掘分析。CKM的一個重要應用領域為智能比對, 在專利新穎性評價、科技查新、文檔查重、版權保護、稿件溯源等領域都有著廣泛的應用。
數據立方(可視化關系挖掘)
大數據可視化關系挖掘的展現方式包括關系圖、時間軸、分析圖表、列表等多種表達方式,為使用者提供全方位的信息展現方式。