㈠ socket 通信粘包怎麼處理
一、socket 通信粘包的處理方法:
1、對於發送方引起的粘包現象,用戶可通過編程設置來避免,TCP提供了強制數據立即傳送的操作指令push,TCP軟體收到該操作指令後,就立即將本段數據發送出去,而不必等待發送緩沖區滿;
2、對於接收方引起的粘包,則可通過優化程序設計、精簡接收進程工作量、提高接收進程優先順序等措施,使其及時接收數據,從而盡量避免出現粘包現象;
3、由接收方控制,將一包數據按結構欄位,人為控制分多次接收,然後合並,通過這種手段來避免粘包。
二、實現代碼:
三、方法注意事項:
1、第一種編程設置方法雖然可以避免發送方引起的粘包,但它關閉了優化演算法,降低了網路發送效率,影響應用程序的性能,一般不建議使用。
2、第二種方法只能減少出現粘包的可能性,但並不能完全避免粘包,當發送頻率較高時,或由於網路突發可能使某個時間段數據包到達接收方較快,接收方還是有可能來不及接收,從而導致粘包;
3、第三種方法雖然避免了粘包,但應用程序的效率較低,對實時應用的場合不適合。
四、實驗環境
1、硬體環境:伺服器:pentium 350 微機 、客戶機:pentium 166微機、網路平台:由10兆共享式hub連接而成的區域網;
2、軟體環境:操作系統:windows 98 、編程語言:visual c++ 5.0
㈡ 從go-micro的broker開始學習go-micro
go-micro簡介我們在之前的文章中提到的Micro其實是Micro這個微服務框架的工具集,其核心還是go-micro.GoMicro是一個分布式系統開發框架。Micro哲學是具有可插拔架構的合理默認值。框架提供默認設置以幫助我們快速入門這個框架,但一切都可以輕松更換,在真實的項目中,一般是可以根據自己的需求來更換組件,比如注冊中心可以使用etcd,consul.而Boker發布訂閱可以使用Kafka,nsq等工具。
go-micro的主要功能GoMicro抽象了分布式系統的細節。以下是主要功能。
認證:Auth內置為一等公民。身份驗證和授權通過為每項服務提供身份和證書來實現安全的零信任網路。這還包括基於規則的訪問控制。
動態配置:從任何地方載入和熱重載動態配置。配置介面提供了一種從任何來源(例如envvars、文件、etcd)載入應用程序級別配置的方法。您可以合並源,甚至定義回退。
數據存儲:一個簡單的數據存儲介面,用於讀取、寫入和刪除記錄。它默認支持內存、文件和CockroachDB。狀態和持久性成為原型之外的核心需求,Micro希望將其構建到框架中。
服務發現:自動服務注冊和名稱解析。服務發現是微服務開發的核心。當服務A需要與服務B通話時,它需要該服務的位置。默認發現機制是多播DNS(mdns),一個zeroconf系統。
負載均衡:基於服務發現的客戶端負載平衡。一旦我們獲得了任意數量的服務實例的地址,我們現在需要一種方法來決定路由到哪個節點。我們使用隨機散列負載平衡來提供跨服務的均勻分布,並在出現問題時重試不同的節點。
消息編碼:基於內容類型的動態消息編碼。客戶端和伺服器將使用編解碼器和內容類型為您無縫編碼和解碼Go類型。任何種類的消息都可以被編碼並從不同的客戶端發送。客戶端和伺服器默認處理這個。默認情況下,這包括protobuf和json。
RPC客戶端/伺服器:基於RPC的請求/響應,支持雙向流。我們為同步通信提供了一個抽象。對服務提出的請求將被自動解析、負載平衡、撥號和流式傳輸。
非同步消息:PubSub內置為非同步通信和事件驅動架構的一等公民。事件通知是微服務開發的核心模式。默認消息系統是HTTP事件消息代理。
同步:分布式系統通常以最終一致的方式構建。對分布式鎖定和領導的支持作為同步介面內置。使用最終一致的資料庫或調度時,請使用Sync介面。
可插拔介面:GoMicro為每個分布式系統抽象使用Go介面。因此,這些介面是可插拔的,並允許GoMicro與運行時無關。您可以插入任何底層技術
broker組件下面我們就從broker組件開始,一起學習一下如何使用go-micro,我們知道,broker是go-micro用來實現非同步消息的一種模式,其本質是PubSub。你可以自定義使用不同的工具,比如kafka等。而go-micro也實現了一個默認的消息隊列系統,方便我們學習go-micro。我們一起看一個examples中的實例,一起學習一下,如何使用默認的消息系統來實現非同步消息。
packagemainimport("fmt""log""time""github.com/asim/go-micro/v3/broker""github.com/asim/go-micro/v3/cmd")var(topic="go.micro.topic.foo")funcpub(){tick:=time.NewTicker(time.Second)i:=0for_=rangetick.C{msg:=&broker.Message{Header:map[string]string{"id":fmt.Sprintf("%d",i),},Body:[]byte(fmt.Sprintf("%d:%s",i,time.Now().String())),}iferr:=broker.Publish(topic,msg);err!=nil{log.Printf("[pub]failed:%v",err)}else{fmt.Println("[pub]pubbedmessage:",string(msg.Body))}i++}}funcsub(){_,err:=broker.Subscribe(topic,func(pbroker.Event)error{fmt.Println("[sub]receivedmessage:",string(p.Message().Body),"header",p.Message().Header)returnnil})iferr!=nil{fmt.Println(err)}}funcmain(){cmd.Init()iferr:=broker.Init();err!=nil{log.Fatalf("BrokerIniterror:%v",err)}iferr:=broker.Connect();err!=nil{log.Fatalf("BrokerConnecterror:%v",err)}gopub()gosub()<-time.After(time.Second*10)}例子其實很簡單,實現一個發布者函數pub,該函數在每個tick中,向指定topic發布一條。
同時,實現了一個訂閱者sub,訂閱者從指定topic中獲取消息,並執行處理函數。處理函數的參數是一個Event介面,通過它,我們可以獲取消息.該消息包含Header和Body信息。
這些就是一個建議的消息系統,那如何讓這個消息系統跑起來的呢,其主要邏輯就是main函數中的邏輯,我們一個來看看上面的main函數。首先調用cmd.init方法,該方法的作用是
funcInit(opts...Option)error{returnDefaultCmd.Init(opts...)}DefaultCmd是一個所有組件都是默認值的Cmd實例。
然後初始化一個broker,在這里,默認的broker就是一個httpbroker.
funcnewHttpBroker(opts...Option)Broker{options:=Options{Codec:json.Marshaler{},Context:context.TODO(),Registry:registry.DefaultRegistry,}for_,o:=rangeopts{o(&options)}//setaddressaddr:=DefaultAddressiflen(options.Addrs)>0&&len(options.Addrs[0])>0{addr=options.Addrs[0]}h:=&httpBroker{id:uuid.New().String(),address:addr,opts:options,r:options.Registry,c:&http.Client{Transport:newTransport(options.TLSConfig)},subscribers:make(map[string][]*httpSubscriber),exit:make(chanchanerror),mux:http.NewServeMux(),inbox:make(map[string][][]byte),}//specifythemessagehandlerh.mux.Handle(DefaultPath,h)//getoptionalhandlersifh.opts.Context!=nil{handlers,ok:=h.opts.Context.Value("http_handlers").(map[string]http.Handler)ifok{forpattern,handler:=rangehandlers{h.mux.Handle(pattern,handler)}}}returnh}如果addroption沒有指定,就是默認值"127.0.0.1:0"
然後調用httpbroker的Connect方法,來連接伺服器
func(h*httpBroker)Connect()error{h.RLock()ifh.running{h.RUnlock()returnnil}h.RUnlock()h.Lock()deferh.Unlock()varlnet.Listenervarerrerrorifh.opts.Secure||h.opts.TLSConfig!=nil{config:=h.opts.TLSConfigfn:=func(addrstring)(net.Listener,error){ifconfig==nil{hosts:=[]string{addr}//checkifitsavalidhost:portifhost,_,err:=net.SplitHostPort(addr);err==nil{iflen(host)==0{hosts=maddr.IPs()}else{hosts=[]string{host}}}//generateacertificatecert,err:=mls.Certificate(hosts...)iferr!=nil{returnnil,err}config=&tls.Config{Certificates:[]tls.Certificate{cert}}}returntls.Listen("tcp",addr,config)}l,err=mnet.Listen(h.address,fn)}else{fn:=func(addrstring)(net.Listener,error){returnnet.Listen("tcp",addr)}l,err=mnet.Listen(h.address,fn)}iferr!=nil{returnerr}addr:=h.addressh.address=l.Addr().String()gohttp.Serve(l,h.mux)gofunc(){h.run(l)h.Lock()h.opts.Addrs=[]string{addr}h.address=addrh.Unlock()}()//getregistryreg:=h.opts.Registryifreg==nil{reg=registry.DefaultRegistry}//setcacheh.r=cache.New(reg)//setrunningh.running=truereturnnil}在連接了伺服器之後,我們就可以建立連接進行監聽處理消息了。有沒有發現,其實就是一個httpserver,監聽服務請求,然後進行事件處理。
而在發布和訂閱相應的代碼邏輯了,我們可以看到,在發布的邏輯里,我們會首先將msg,saveMessage到指定的topic中,然後非同步處理消息,那麼如何進行saveMessage,以及如何進行非同步消息處理的呢,首先,
func(h*httpBroker)saveMessage(topicstring,msg[]byte){h.mtx.Lock()deferh.mtx.Unlock()//getmessagesc:=h.inbox[topic]//savemessagec=append(c,msg)//maxlength64iflen(c)>64{c=c[:64]}//saveinboxh.inbox[topic]=c}保存消息的邏輯足夠簡單,僅僅是將數據保存到httpbroker的某一個Map中。至於消息處理
//dotherestasyncgofunc(){//getathirdofthebacklogmessages:=h.getMessage(topic,8)delay:=(len(messages)>1)//publishallthemessagesfor_,msg:=rangemessages{//serializeheresrv(s,msg)//{time.Sleep(time.Millisecond*100)}}}()在一個goroutine里,從topic中取出message,然後從注冊中心中根據服務名獲取到服務的列表,然後再列表中找到符合條件的節點,找到節點後就可以將消息通過bttpbroker結構體中的client的Post方法,將消息發送給伺服器,並根據返回結果進行錯誤處理。
func(h*httpBroker)Publish(topicstring,msg*Message,opts...PublishOption)error{//createthemessagefirstm:=&Message{Header:make(map[string]string),Body:msg.Body,}fork,v:=rangemsg.Header{m.Header[k]=v}m.Header["Micro-Topic"]=topic//encodethemessageb,err:=h.opts.Codec.Marshal(m)iferr!=nil{returnerr}//savethemessageh.saveMessage(topic,b)//nowattempttogettheserviceh.RLock()s,err:=h.r.GetService(serviceName)iferr!=nil{h.RUnlock()returnerr}h.RUnlock()pub:=func(node*registry.Node,tstring,b[]byte)error{scheme:="http"//.Metadata["secure"]=="true"{scheme="https"}vals:=url.Values{}vals.Add("id",node.Id)uri:=fmt.Sprintf("%s://%s%s?%s",scheme,node.Address,DefaultPath,vals.Encode())r,err:=h.c.Post(uri,"application/json",bytes.NewReader(b))iferr!=nil{returnerr}//discardresponsebodyio.Copy(ioutil.Discard,r.Body)r.Body.Close()returnnil}srv:=func(s[]*registry.Service,b[]byte){for_,service:=ranges{varnodes[]*registry.Nodefor_,node:=rangeservice.Nodes{//.Metadata["broker"]!="http"{continue}//lookfornodesforthetopicifnode.Metadata["topic"]!=topic{continue}nodes=append(nodes,node)}//onlyprocessifwehavenodesiflen(nodes)==0{continue}switchservice.Version{//:varsuccessbool//publishtoallnodesfor_,node:=rangenodes{//publishasynciferr:=pub(node,topic,b);err==nil{success=true}}//!success{h.saveMessage(topic,b)}default://selectnodetopublishtonode:=nodes[rand.Int()%len(nodes)]//publishasynctoonenodeiferr:=pub(node,topic,b);err!=nil{//iffailedsaveith.saveMessage(topic,b)}}}}//dotherestasyncgofunc(){//getathirdofthebacklogmessages:=h.getMessage(topic,8)delay:=(len(messages)>1)//publishallthemessagesfor_,msg:=rangemessages{//serializeheresrv(s,msg)//{time.Sleep(time.Millisecond*100)}}}()returnnil}而訂閱的邏輯就是生產一個訂閱者
//generatesubscribersubscriber:=&httpSubscriber{opts:options,hb:h,id:node.Id,topic:topic,fn:handler,svc:service,}然後調用httpbroker的subscribe方法,真正的進行訂閱操作。
func(h*httpBroker)subscribe(s*httpSubscriber)error{h.Lock()deferh.Unlock()iferr:=h.r.Register(s.svc,registry.RegisterTTL(registerTTL));err!=nil{returnerr}h.subscribers[s.topic]=append(h.subscribers[s.topic],s)returnnil}這個邏輯就很簡單了,將服務注冊到注冊中心,然後把這個訂閱著追加到httpbroker的訂閱者的map里,方便消息的通信。
看到這里,你可能有一個疑惑,我並沒有看到,我的消息處理函數是如何在訂閱著接受到消息進行數據處理的呢,我們都知道,當我們使用net/http起一個httpserve的時候,會傳入一個handler.
//Servealwaysreturnsanon-nilerror.funcServe(lnet.Listener,handlerHandler)error{srv:=&Server{Handler:handler}returnsrv.Serve(l)}而我們在httpbrokerConnect的時候,啟動的server,傳入的的正式h.mux而在指定handler的時候,我們是
funcInit(opts...Option)error{returnDefaultCmd.Init(opts...)}0DefaultPath是/,h都是我們的httpbroker.之所以可以將它作為handler傳入到Handle方法中,是因為,httpbroker實現了ServeHTTP方法,在這里就是路由的處理函數。
funcInit(opts...Option)error{returnDefaultCmd.Init(opts...)}1在這里就可以解碼消息,獲取消息體,並進行消息處理函數的調用。
funcInit(opts...Option)error{returnDefaultCmd.Init(opts...)}2這就是整個httpbroker的處理流程,相信你現在就可以明白了go-micro的非同步消息系統的默認實現了,如果你想實現自己的broker,只要在實現中實現了Broker介面,就可以在項目中使用自定義的broker了。無縫切換。
funcInit(opts...Option)error{returnDefaultCmd.Init(opts...)}3至此整個broker部分就結束了,下一篇文章,我們就來看看如何實現一個自定義的broker.
作者:第八共同體