導航:首頁 > 網路數據 > 進程間大數據同步

進程間大數據同步

發布時間:2023-09-06 03:43:06

Ⅰ 進程間通信的方法有哪些那一種方法效率最高進程間同步機制有哪些

進程間通信機制

1 文件映射
文件映射(Memory-Mapped Files)能使進程把文件內容當作進程地址區間一塊內存那樣來對待。因此,進程不必使用文件I/O操作,只需簡單的指針操作就可讀取和修改文件的內容。
Win32 API允許多個進程訪問同一文件映射對象,各個進程在它自己的地址空間里接收內存的指針。通過使用這些指針,不同進程就可以讀或修改文件的內容,實現了對文件中數據的共享。
應用程序有三種方法來使多個進程共享一個文件映射對象。
(1)繼承:第一個進程建立文件映射對象,它的子進程繼承該對象的句柄。
(2)命名文件映射:第一個進程在建立文件映射對象時可以給該對象指定一個名字(可與文件名不同)。第二個進程可通過這個名字打開此文件映射對象。另外,第一個進程也可以通過一些其它IPC機制(有名管道、郵件槽等)把名字傳給第二個進程。
(3)句柄復制:第一個進程建立文件映射對象,然後通過其它IPC機制(有名管道、郵件槽等)把對象句柄傳遞給第二個進程。第二個進程復制該句柄就取得對該文件映射對象的訪問許可權。
文件映射是在多個進程間共享數據的非常有效方法,有較好的安全性。但文件映射只能用於本地機器的進程之間,不能用於網路中,而開發者還必須控制進程間的同步。
2 共享內存
Win32 API中共享內存(Shared Memory)實際就是文件映射的一種特殊情況。進程在創建文件映射對象時用0xFFFFFFFF來代替文件句柄(HANDLE),就表示了對應的文件映射對象是從操作系統頁面文件訪問內存,其它進程打開該文件映射對象就可以訪問該內存塊。由於共享內存是用文件映射實現的,所以它也有較好的安全性,也只能運行於同一計算機上的進程之間。

注意點: 要控制同步,而且CString、list、arry、map等的collect class都不能安全的使用於共享內存中

不要把擁有虛函數之C++類放到共享內存中

不要把CObject派生類之MFC對象放到共享內存中

不要使用"point within the shared memory"的指針

不要使用"point outside of the shared memory"的指針

使用"based"指針是安全的,但要小心使用
3 匿名管道
管道(Pipe)是一種具有兩個端點的通信通道:有一端句柄的進程可以和有另一端句柄的進程通信。管道可以是單向-一端是只讀的,另一端點是只寫的;也可以是雙向的一管道的兩端點既可讀也可寫。
匿名管道(Anonymous Pipe)是 在父進程和子進程之間,或同一父進程的兩個子進程之間傳輸數據的無名字的單向管道。通常由父進程創建管道,然後由要通信的子進程繼承通道的讀端點句柄或寫 端點句柄,然後實現通信。父進程還可以建立兩個或更多個繼承匿名管道讀和寫句柄的子進程。這些子進程可以使用管道直接通信,不需要通過父進程。
匿名管道是單機上實現子進程標准I/O重定向的有效方法,它不能在網上使用,也不能用於兩個不相關的進程之間。
4 命名管道
命名管道(Named Pipe)是伺服器進程和一個或多個客戶進程之間通信的單向或雙向管道。不同於匿名管道的是命名管道可以在不相關的進程之間和不同計算機之間使用,伺服器建立命名管道時給它指定一個名字,任何進程都可以通過該名字打開管道的另一端,根據給定的許可權和伺服器進程通信。
命名管道提供了相對簡單的編程介面,使通過網路傳輸數據並不比同一計算機上兩進程之間通信更困難,不過如果要同時和多個進程通信它就力不從心了。
5 郵件槽
郵件槽(Mailslots)提 供進程間單向通信能力,任何進程都能建立郵件槽成為郵件槽伺服器。其它進程,稱為郵件槽客戶,可以通過郵件槽的名字給郵件槽伺服器進程發送消息。進來的消 息一直放在郵件槽中,直到伺服器進程讀取它為止。一個進程既可以是郵件槽伺服器也可以是郵件槽客戶,因此可建立多個郵件槽實現進程間的雙向通信。
通過郵件槽可以給本地計算機上的郵件槽、其它計算機上的郵件槽或指定網路區域中所有計算機上有同樣名字的郵件槽發送消息。廣播通信的消息長度不能超過400位元組,非廣播消息的長度則受郵件槽伺服器指定的最大消息長度的限制。
郵件槽與命名管道相似,不過它傳輸數據是通過不可靠的數據報(如TCP/IP協議中的UDP包)完成的,一旦網路發生錯誤則無法保證消息正確地接收,而命名管道傳輸數據則是建立在可靠連接基礎上的。不過郵件槽有簡化的編程介面和給指定網路區域內的所有計算機廣播消息的能力,所以郵件槽不失為應用程序發送和接收消息的另一種選擇。
6 剪貼板
剪貼板(Clipped Board)實質是Win32 API中一組用來傳輸數據的函數和消息,為Windows應用程序之間進行數據共享提供了一個中介,Windows已建立的剪切(復制)-粘貼的機制為不同應用程序之間共享不同格式數據提供了一條捷徑。當用戶在應用程序中執行剪切或復制操作時,應用程序把選取的數據用一種或多種格式放在剪貼板上。然後任何其它應用程序都可以從剪貼板上拾取數據,從給定格式中選擇適合自己的格式。
剪貼板是一個非常鬆散的交換媒介,可以支持任何數據格式,每一格式由一無符號整數標識,對標准(預定義)剪貼板格式,該值是Win32 API定義的常量;對非標准格式可以使用Register Clipboard Format函數注冊為新的剪貼板格式。利用剪貼板進行交換的數據只需在數據格式上一致或都可以轉化為某種格式就行。但剪貼板只能在基於Windows的程序中使用,不能在網路上使用。
7 動態數據交換
動態數據交換(DDE)是使用共享內存在應用程序之間進行數據交換的一種進程間通信形式。應用程序可以使用DDE進行一次性數據傳輸,也可以當出現新數據時,通過發送更新值在應用程序間動態交換數據。
DDE和剪貼板一樣既支持標准數據格式(如文本、點陣圖等),又可以支持自己定義的數據格式。但它們的數據傳輸機制卻不同,一個明顯區別是剪貼板操作幾乎總是用作對用戶指定操作的一次性應答-如從菜單中選擇Paste命令。盡管DDE也可以由用戶啟動,但它繼續發揮作用一般不必用戶進一步干預。DDE有三種數據交換方式:
(1) 冷鏈:數據交換是一次性數據傳輸,與剪貼板相同。
(2) 溫鏈:當數據交換時伺服器通知客戶,然後客戶必須請求新的數據。
(3) 熱鏈:當數據交換時伺服器自動給客戶發送數據。
DDE交換可以發生在單機或網路中不同計算機的應用程序之間。開發者還可以定義定製的DDE數據格式進行應用程序之間特別目的IPC,它們有更緊密耦合的通信要求。大多數基於Windows的應用程序都支持DDE。
8 對象連接與嵌入
應用程序利用對象連接與嵌入(OLE)技術管理復合文檔(由多種數據格式組成的文檔),OLE提供使某應用程序更容易調用其它應用程序進行數據編輯的服務。例如,OLE支持的字處理器可以嵌套電子表格,當用戶要編輯電子表格時OLE庫可自動啟動電子表格編輯器。當用戶退出電子表格編輯器時,該表格已在原始字處理器文檔中得到更新。在這里電子表格編輯器變成了字處理器的擴展,而如果使用DDE,用戶要顯式地啟動電子表格編輯器。
同DDE技術相同,大多數基於Windows的應用程序都支持OLE技術。
9 動態連接庫
Win32動態連接庫(DLL)中的全局數據可以被調用DLL的所有進程共享,這就又給進程間通信開辟了一條新的途徑,當然訪問時要注意同步問題。
雖然可以通過DLL進行進程間數據共享,但從數據安全的角度考慮,我們並不提倡這種方法,使用帶有訪問許可權控制的共享內存的方法更好一些。
10 遠程過程調用
Win32 API提供的遠程過程調用(RPC)使應用程序可以使用遠程調用函數,這使在網路上用RPC進行進程通信就像函數調用那樣簡單。RPC既可以在單機不同進程間使用也可以在網路中使用。
由於Win32 API提供的RPC服從OSF-DCE(Open Software Foundation Distributed Computing Environment)標准。所以通過Win32 API編寫的RPC應用程序能與其它操作系統上支持DEC的RPC應用程序通信。使用RPC開發者可以建立高性能、緊密耦合的分布式應用程序。
11 NetBios函數
Win32 API提供NetBios函數用於處理低級網路控制,這主要是為IBM NetBios系統編寫與Windows的介面。除非那些有特殊低級網路功能要求的應用程序,其它應用程序最好不要使用NetBios函數來進行進程間通信。
12 Sockets
Windows Sockets規范是以U.C.Berkeley大學BSD UNIX中流行的Socket介面為範例定義的一套Windows下的網路編程介面。除了Berkeley Socket原有的庫函數以外,還擴展了一組針對Windows的函數,使程序員可以充分利用Windows的消息機制進行編程。
現在通過Sockets實現進程通信的網路應用越來越多,這主要的原因是Sockets的跨平台性要比其它IPC機制好得多,另外WinSock 2.0不僅支持TCP/IP協議,而且還支持其它協議(如IPX)。Sockets的唯一缺點是它支持的是底層通信操作,這使得在單機的進程間進行簡單數據傳遞不太方便,這時使用下面將介紹的WM_COPYDATA消息將更合適些。
13 WM_COPYDATA消息
WM_COPYDATA是一種非常強大卻鮮為人知的消息。當一個應用向另一個應用傳送數據時,發送方只需使用調用SendMessage函數,參數是目的窗口的句柄、傳遞數據的起始地址、WM_COPYDATA消息。接收方只需像處理其它消息那樣處理WM_COPY DATA消息,這樣收發雙方就實現了數據共享。
WM_COPYDATA是一種非常簡單的方法,它在底層實際上是通過文件映射來實現的。它的缺點是靈活性不高,並且它只能用於Windows平台的單機環境下。

大數據常用同步工具

一、離線數據同步

DataX

阿里的Datax是比較優秀的產品,基於python,提供各種數據村塾的讀寫插件,多線程執行,使用起來也很簡單,操作簡單通常只需要兩步;

創建作業的配置文件(json格式配置reader,writer);

啟動執行配置作業。

非常適合離線數據,增量數據可以使用一些編碼的方式實現,

缺點:僅僅針對insert數據比較有效,update數據就不適合。缺乏對增量更新的內置支持,因為DataX的靈活架構,可以通過shell腳本等方式方便實現增量同步。

參考資料:

github地址:https://github.com/alibaba/DataX

dataX3.0介紹:https://www.jianshu.com/p/65c440f9bce1

datax初體驗:https://www.imooc.com/article/15640

文檔:https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md

Sqoop

Sqoop(發音:skup)是一款開源的工具,主要用於在Hadoop(Hive)與傳統的資料庫(mysql、postgresql…)間進行數據的傳遞,可以將一個關系型資料庫(例如 : MySQL ,Oracle ,Postgres等)中的數據導進到Hadoop的HDFS中,也可以將HDFS的數據導進到關系型資料庫中。

地址:http://sqoop.apache.org/

Sqoop導入:導入工具從RDBMS到HDFS導入單個表。表中的每一行被視為HDFS的記錄。所有記錄被存儲在文本文件的文本數據或者在Avro和序列文件的二進制數據。

Sqoop導出:導出工具從HDFS導出一組文件到一個RDBMS。作為輸入到Sqoop文件包含記錄,這被稱為在表中的行。那些被讀取並解析成一組記錄和分隔使用用戶指定的分隔符。

Sqoop支持全量數據導入和增量數據導入(增量數據導入分兩種,一是基於遞增列的增量數據導入(Append方式)。二是基於時間列的增量數據導入(LastModified方式)),同時可以指定數據是否以並發形式導入。

Kettle

Kettle是一款國外開源的ETL工具,純java編寫,可以在Window、linux、Unix上運行,數據抽取高效穩定。

Kettle的Spoon有豐富的Steps可以組裝開發出滿足多種復雜應用場景的數據集成作業,方便實現全量、增量數據同步。缺點是通過定時運行,實時性相對較差。

NiFi

Apache NiFi 是一個易於使用、功能強大而且可靠的數據拉取、數據處理和分發系統,用於自動化管理系統間的數據流。它支持高度可配置的指示圖的數據路由、轉換和系統中介邏輯,支持從多種數據源動態拉取數據。

NiFi基於Web方式工作,後台在伺服器上進行調度。 用戶可以為數據處理定義為一個流程,然後進行處理,後台具有數據處理引擎、任務調度等組件。

幾個核心概念:

Nifi 的設計理念接近於基於流的編程 Flow Based Programming。

FlowFile:表示通過系統移動的每個對象,包含數據流的基本屬性

FlowFile Processor(處理器):負責實際對數據流執行工作

Connection(連接線):負責不同處理器之間的連接,是數據的有界緩沖區

Flow Controller(流量控制器):管理進程使用的線程及其分配

Process Group(過程組):進程組是一組特定的進程及其連接,允許組合其他組件創建新組件

參考資料

Nifi簡介及核心概念整理

官方網站:http://nifi.apache.org/index.html

二、實時數據同步

實時同步最靈活的還是用kafka做中間轉發,當數據發生變化時,記錄變化到kafka,需要同步數據的程序訂閱消息即可,需要研發編碼支持。這里說個mysql資料庫的同步組件,阿里的canal和otter

canal

https://github.com/alibaba/canal

數據抽取簡單的來說,就是將一個表的數據提取到另一個表中。有很多的ETL工具可以幫助我們來進行數據的抽取和轉換,ETL工具能進行一次性或者定時作業抽取數據,不過canal作為阿里巴巴提供的開源的數據抽取項目,能夠做到實時抽取,原理就是偽裝成mysql從節點,讀取mysql的binlog,生成消息,客戶端訂閱這些數據變更消息,處理並存儲。下面我們來一起搭建一下canal服務

早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的資料庫同步業務,主要是基於trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基於資料庫的日誌解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。

ps. 目前內部版本已經支持mysql和oracle部分版本的日誌解析,當前的canal開源版本支持5.7及以下的版本(阿里內部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)

基於日誌增量訂閱&消費支持的業務:

資料庫鏡像

資料庫實時備份

多級索引 (賣家和買家各自分庫索引)

search build

業務cache刷新

價格變化等重要業務消息

otter

https://github.com/alibaba/otter

otter是在canal基礎上又重新實現了可配置的消費者,使用otter的話,剛才說過的消費者就不需要寫了,而otter提供了一個web界面,可以自定義同步任務及map表。非常適合mysql庫之間的同步。

另外:otter已在阿里雲推出商業化版本 數據傳輸服務DTS, 開通即用,免去部署維護的昂貴使用成本。DTS針對阿里雲RDS、DRDS等產品進行了適配,解決了Binlog日誌回收,主備切換、VPC網路切換等場景下的同步高可用問題。同時,針對RDS進行了針對性的性能優化。出於穩定性、性能及成本的考慮,強烈推薦阿里雲用戶使用DTS產品。

Ⅲ Linux多進程和線程同步的幾種方式

Linux 線程同步的三種方法
線程的最大特點是資源的共享性,但資源共享中的同步問題是多線程編程的難點。linux下提供了多種方式來處理線程同步,最常用的是互斥鎖、條件變數和信號量。
一、互斥鎖(mutex)
通過鎖機制實現線程間的同步。
初始化鎖。在Linux下,線程的互斥量數據類型是pthread_mutex_t。在使用前,要對它進行初始化。
靜態分配:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
動態分配:int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutex_attr_t *mutexattr);
加鎖。對共享資源的訪問,要對互斥量進行加鎖,如果互斥量已經上了鎖,調用線程會阻塞,直到互斥量被解鎖。
int pthread_mutex_lock(pthread_mutex *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
解鎖。在完成了對共享資源的訪問後,要對互斥量進行解鎖。
int pthread_mutex_unlock(pthread_mutex_t *mutex);
銷毀鎖。鎖在是使用完成後,需要進行銷毀以釋放資源。
int pthread_mutex_destroy(pthread_mutex *mutex);
[csharp] view plain
#include <cstdio>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
#include "iostream"
using namespace std;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int tmp;
void* thread(void *arg)
{
cout << "thread id is " << pthread_self() << endl;
pthread_mutex_lock(&mutex);
tmp = 12;
cout << "Now a is " << tmp << endl;
pthread_mutex_unlock(&mutex);
return NULL;
}
int main()
{
pthread_t id;
cout << "main thread id is " << pthread_self() << endl;
tmp = 3;
cout << "In main func tmp = " << tmp << endl;
if (!pthread_create(&id, NULL, thread, NULL))
{
cout << "Create thread success!" << endl;
}
else
{
cout << "Create thread failed!" << endl;
}
pthread_join(id, NULL);
pthread_mutex_destroy(&mutex);
return 0;
}
//編譯:g++ -o thread testthread.cpp -lpthread
二、條件變數(cond)
互斥鎖不同,條件變數是用來等待而不是用來上鎖的。條件變數用來自動阻塞一個線程,直到某特殊情況發生為止。通常條件變數和互斥鎖同時使用。條件變數分為兩部分: 條件和變數。條件本身是由互斥量保護的。線程在改變條件狀態前先要鎖住互斥量。條件變數使我們可以睡眠等待某種條件出現。條件變數是利用線程間共享的全局變數進行同步的一種機制,主要包括兩個動作:一個線程等待"條件變數的條件成立"而掛起;另一個線程使"條件成立"(給出條件成立信號)。條件的檢測是在互斥鎖的保護下進行的。如果一個條件為假,一個線程自動阻塞,並釋放等待狀態改變的互斥鎖。如果另一個線程改變了條件,它發信號給關聯的條件變數,喚醒一個或多個等待它的線程,重新獲得互斥鎖,重新評價條件。如果兩進程共享可讀寫的內存,條件變數可以被用來實現這兩進程間的線程同步。
初始化條件變數。
靜態態初始化,pthread_cond_t cond = PTHREAD_COND_INITIALIER;
動態初始化,int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
等待條件成立。釋放鎖,同時阻塞等待條件變數為真才行。timewait()設置等待時間,仍未signal,返回ETIMEOUT(加鎖保證只有一個線程wait)
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timewait(pthread_cond_t *cond,pthread_mutex *mutex,const timespec *abstime);
激活條件變數。pthread_cond_signal,pthread_cond_broadcast(激活所有等待線程)
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond); //解除所有線程的阻塞
清除條件變數。無線程等待,否則返回EBUSY
int pthread_cond_destroy(pthread_cond_t *cond);
[cpp] view plain
#include <stdio.h>
#include <pthread.h>
#include "stdlib.h"
#include "unistd.h"
pthread_mutex_t mutex;
pthread_cond_t cond;
void hander(void *arg)
{
free(arg);
(void)pthread_mutex_unlock(&mutex);
}
void *thread1(void *arg)
{
pthread_cleanup_push(hander, &mutex);
while(1)
{
printf("thread1 is running\n");
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);
printf("thread1 applied the condition\n");
pthread_mutex_unlock(&mutex);
sleep(4);
}
pthread_cleanup_pop(0);
}
void *thread2(void *arg)
{
while(1)
{
printf("thread2 is running\n");
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);
printf("thread2 applied the condition\n");
pthread_mutex_unlock(&mutex);
sleep(1);
}
}
int main()
{
pthread_t thid1,thid2;
printf("condition variable study!\n");
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond, NULL);
pthread_create(&thid1, NULL, thread1, NULL);
pthread_create(&thid2, NULL, thread2, NULL);
sleep(1);
do
{
pthread_cond_signal(&cond);
}while(1);
sleep(20);
pthread_exit(0);
return 0;
}
[cpp] view plain
#include <pthread.h>
#include <unistd.h>
#include "stdio.h"
#include "stdlib.h"
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
struct node
{
int n_number;
struct node *n_next;
}*head = NULL;

static void cleanup_handler(void *arg)
{
printf("Cleanup handler of second thread./n");
free(arg);
(void)pthread_mutex_unlock(&mtx);
}
static void *thread_func(void *arg)
{
struct node *p = NULL;
pthread_cleanup_push(cleanup_handler, p);
while (1)
{
//這個mutex主要是用來保證pthread_cond_wait的並發性
pthread_mutex_lock(&mtx);
while (head == NULL)
{
//這個while要特別說明一下,單個pthread_cond_wait功能很完善,為何
//這里要有一個while (head == NULL)呢?因為pthread_cond_wait里的線
//程可能會被意外喚醒,如果這個時候head != NULL,則不是我們想要的情況。
//這個時候,應該讓線程繼續進入pthread_cond_wait
// pthread_cond_wait會先解除之前的pthread_mutex_lock鎖定的mtx,
//然後阻塞在等待對列里休眠,直到再次被喚醒(大多數情況下是等待的條件成立
//而被喚醒,喚醒後,該進程會先鎖定先pthread_mutex_lock(&mtx);,再讀取資源
//用這個流程是比較清楚的
pthread_cond_wait(&cond, &mtx);
p = head;
head = head->n_next;
printf("Got %d from front of queue/n", p->n_number);
free(p);
}
pthread_mutex_unlock(&mtx); //臨界區數據操作完畢,釋放互斥鎖
}
pthread_cleanup_pop(0);
return 0;
}
int main(void)
{
pthread_t tid;
int i;
struct node *p;
//子線程會一直等待資源,類似生產者和消費者,但是這里的消費者可以是多個消費者,而
//不僅僅支持普通的單個消費者,這個模型雖然簡單,但是很強大
pthread_create(&tid, NULL, thread_func, NULL);
sleep(1);
for (i = 0; i < 10; i++)
{
p = (struct node*)malloc(sizeof(struct node));
p->n_number = i;
pthread_mutex_lock(&mtx); //需要操作head這個臨界資源,先加鎖,
p->n_next = head;
head = p;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mtx); //解鎖
sleep(1);
}
printf("thread 1 wanna end the line.So cancel thread 2./n");
//關於pthread_cancel,有一點額外的說明,它是從外部終止子線程,子線程會在最近的取消點,退出
//線程,而在我們的代碼里,最近的取消點肯定就是pthread_cond_wait()了。
pthread_cancel(tid);
pthread_join(tid, NULL);
printf("All done -- exiting/n");
return 0;
}
三、信號量(sem)
如同進程一樣,線程也可以通過信號量來實現通信,雖然是輕量級的。信號量函數的名字都以"sem_"打頭。線程使用的基本信號量函數有四個。
信號量初始化。
int sem_init (sem_t *sem , int pshared, unsigned int value);
這是對由sem指定的信號量進行初始化,設置好它的共享選項(linux 只支持為0,即表示它是當前進程的局部信號量),然後給它一個初始值VALUE。
等待信號量。給信號量減1,然後等待直到信號量的值大於0。
int sem_wait(sem_t *sem);
釋放信號量。信號量值加1。並通知其他等待線程。
int sem_post(sem_t *sem);
銷毀信號量。我們用完信號量後都它進行清理。歸還佔有的一切資源。
int sem_destroy(sem_t *sem);
[cpp] view plain
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <errno.h>
#define return_if_fail(p) if((p) == 0){printf ("[%s]:func error!/n", __func__);return;}
typedef struct _PrivInfo
{
sem_t s1;
sem_t s2;
time_t end_time;
}PrivInfo;

static void info_init (PrivInfo* thiz);
static void info_destroy (PrivInfo* thiz);
static void* pthread_func_1 (PrivInfo* thiz);
static void* pthread_func_2 (PrivInfo* thiz);

int main (int argc, char** argv)
{
pthread_t pt_1 = 0;
pthread_t pt_2 = 0;
int ret = 0;
PrivInfo* thiz = NULL;
thiz = (PrivInfo* )malloc (sizeof (PrivInfo));
if (thiz == NULL)
{
printf ("[%s]: Failed to malloc priv./n");
return -1;
}
info_init (thiz);
ret = pthread_create (&pt_1, NULL, (void*)pthread_func_1, thiz);
if (ret != 0)
{
perror ("pthread_1_create:");
}
ret = pthread_create (&pt_2, NULL, (void*)pthread_func_2, thiz);
if (ret != 0)
{
perror ("pthread_2_create:");
}
pthread_join (pt_1, NULL);
pthread_join (pt_2, NULL);
info_destroy (thiz);
return 0;
}
static void info_init (PrivInfo* thiz)
{
return_if_fail (thiz != NULL);
thiz->end_time = time(NULL) + 10;
sem_init (&thiz->s1, 0, 1);
sem_init (&thiz->s2, 0, 0);
return;
}
static void info_destroy (PrivInfo* thiz)
{
return_if_fail (thiz != NULL);
sem_destroy (&thiz->s1);
sem_destroy (&thiz->s2);
free (thiz);
thiz = NULL;
return;
}
static void* pthread_func_1 (PrivInfo* thiz)
{
return_if_fail(thiz != NULL);
while (time(NULL) < thiz->end_time)
{
sem_wait (&thiz->s2);
printf ("pthread1: pthread1 get the lock./n");
sem_post (&thiz->s1);
printf ("pthread1: pthread1 unlock/n");
sleep (1);
}
return;
}
static void* pthread_func_2 (PrivInfo* thiz)
{
return_if_fail (thiz != NULL);
while (time (NULL) < thiz->end_time)
{
sem_wait (&thiz->s1);
printf ("pthread2: pthread2 get the unlock./n");
sem_post (&thiz->s2);
printf ("pthread2: pthread2 unlock./n");
sleep (1);
}
return;
}

閱讀全文

與進程間大數據同步相關的資料

熱點內容
javaweb面試 瀏覽:4
qq空間說說點不進去 瀏覽:772
nodejscms系統 瀏覽:822
追星數據組是什麼東西 瀏覽:3
文件的格式怎麼建立 瀏覽:529
免費yoosee蘋果下載 瀏覽:447
網路大國與大數據 瀏覽:770
怎麼學plc的編程 瀏覽:643
javadnf輔助源碼 瀏覽:973
什麼app可以畫二維圖像 瀏覽:125
手機如何設置副路由器設置密碼 瀏覽:592
如何讓已經壓縮的文件恢復 瀏覽:344
網路atm取款支出是什麼意思 瀏覽:942
ios查看wifi密碼插件 瀏覽:742
win10因藍屏 瀏覽:322
app病毒是如何植入的 瀏覽:384
hadoop文件系統查看 瀏覽:317
熱門app免流都有哪些 瀏覽:619
cad怎麼轉換mpgis文件 瀏覽:631
win10照片不能用了 瀏覽:878

友情鏈接