导航:首页 > 网络数据 > 进程间大数据同步

进程间大数据同步

发布时间:2023-09-06 03:43:06

Ⅰ 进程间通信的方法有哪些那一种方法效率最高进程间同步机制有哪些

进程间通信机制

1 文件映射
文件映射(Memory-Mapped Files)能使进程把文件内容当作进程地址区间一块内存那样来对待。因此,进程不必使用文件I/O操作,只需简单的指针操作就可读取和修改文件的内容。
Win32 API允许多个进程访问同一文件映射对象,各个进程在它自己的地址空间里接收内存的指针。通过使用这些指针,不同进程就可以读或修改文件的内容,实现了对文件中数据的共享。
应用程序有三种方法来使多个进程共享一个文件映射对象。
(1)继承:第一个进程建立文件映射对象,它的子进程继承该对象的句柄。
(2)命名文件映射:第一个进程在建立文件映射对象时可以给该对象指定一个名字(可与文件名不同)。第二个进程可通过这个名字打开此文件映射对象。另外,第一个进程也可以通过一些其它IPC机制(有名管道、邮件槽等)把名字传给第二个进程。
(3)句柄复制:第一个进程建立文件映射对象,然后通过其它IPC机制(有名管道、邮件槽等)把对象句柄传递给第二个进程。第二个进程复制该句柄就取得对该文件映射对象的访问权限。
文件映射是在多个进程间共享数据的非常有效方法,有较好的安全性。但文件映射只能用于本地机器的进程之间,不能用于网络中,而开发者还必须控制进程间的同步。
2 共享内存
Win32 API中共享内存(Shared Memory)实际就是文件映射的一种特殊情况。进程在创建文件映射对象时用0xFFFFFFFF来代替文件句柄(HANDLE),就表示了对应的文件映射对象是从操作系统页面文件访问内存,其它进程打开该文件映射对象就可以访问该内存块。由于共享内存是用文件映射实现的,所以它也有较好的安全性,也只能运行于同一计算机上的进程之间。

注意点: 要控制同步,而且CString、list、arry、map等的collect class都不能安全的使用于共享内存中

不要把拥有虚函数之C++类放到共享内存中

不要把CObject派生类之MFC对象放到共享内存中

不要使用"point within the shared memory"的指针

不要使用"point outside of the shared memory"的指针

使用"based"指针是安全的,但要小心使用
3 匿名管道
管道(Pipe)是一种具有两个端点的通信通道:有一端句柄的进程可以和有另一端句柄的进程通信。管道可以是单向-一端是只读的,另一端点是只写的;也可以是双向的一管道的两端点既可读也可写。
匿名管道(Anonymous Pipe)是 在父进程和子进程之间,或同一父进程的两个子进程之间传输数据的无名字的单向管道。通常由父进程创建管道,然后由要通信的子进程继承通道的读端点句柄或写 端点句柄,然后实现通信。父进程还可以建立两个或更多个继承匿名管道读和写句柄的子进程。这些子进程可以使用管道直接通信,不需要通过父进程。
匿名管道是单机上实现子进程标准I/O重定向的有效方法,它不能在网上使用,也不能用于两个不相关的进程之间。
4 命名管道
命名管道(Named Pipe)是服务器进程和一个或多个客户进程之间通信的单向或双向管道。不同于匿名管道的是命名管道可以在不相关的进程之间和不同计算机之间使用,服务器建立命名管道时给它指定一个名字,任何进程都可以通过该名字打开管道的另一端,根据给定的权限和服务器进程通信。
命名管道提供了相对简单的编程接口,使通过网络传输数据并不比同一计算机上两进程之间通信更困难,不过如果要同时和多个进程通信它就力不从心了。
5 邮件槽
邮件槽(Mailslots)提 供进程间单向通信能力,任何进程都能建立邮件槽成为邮件槽服务器。其它进程,称为邮件槽客户,可以通过邮件槽的名字给邮件槽服务器进程发送消息。进来的消 息一直放在邮件槽中,直到服务器进程读取它为止。一个进程既可以是邮件槽服务器也可以是邮件槽客户,因此可建立多个邮件槽实现进程间的双向通信。
通过邮件槽可以给本地计算机上的邮件槽、其它计算机上的邮件槽或指定网络区域中所有计算机上有同样名字的邮件槽发送消息。广播通信的消息长度不能超过400字节,非广播消息的长度则受邮件槽服务器指定的最大消息长度的限制。
邮件槽与命名管道相似,不过它传输数据是通过不可靠的数据报(如TCP/IP协议中的UDP包)完成的,一旦网络发生错误则无法保证消息正确地接收,而命名管道传输数据则是建立在可靠连接基础上的。不过邮件槽有简化的编程接口和给指定网络区域内的所有计算机广播消息的能力,所以邮件槽不失为应用程序发送和接收消息的另一种选择。
6 剪贴板
剪贴板(Clipped Board)实质是Win32 API中一组用来传输数据的函数和消息,为Windows应用程序之间进行数据共享提供了一个中介,Windows已建立的剪切(复制)-粘贴的机制为不同应用程序之间共享不同格式数据提供了一条捷径。当用户在应用程序中执行剪切或复制操作时,应用程序把选取的数据用一种或多种格式放在剪贴板上。然后任何其它应用程序都可以从剪贴板上拾取数据,从给定格式中选择适合自己的格式。
剪贴板是一个非常松散的交换媒介,可以支持任何数据格式,每一格式由一无符号整数标识,对标准(预定义)剪贴板格式,该值是Win32 API定义的常量;对非标准格式可以使用Register Clipboard Format函数注册为新的剪贴板格式。利用剪贴板进行交换的数据只需在数据格式上一致或都可以转化为某种格式就行。但剪贴板只能在基于Windows的程序中使用,不能在网络上使用。
7 动态数据交换
动态数据交换(DDE)是使用共享内存在应用程序之间进行数据交换的一种进程间通信形式。应用程序可以使用DDE进行一次性数据传输,也可以当出现新数据时,通过发送更新值在应用程序间动态交换数据。
DDE和剪贴板一样既支持标准数据格式(如文本、位图等),又可以支持自己定义的数据格式。但它们的数据传输机制却不同,一个明显区别是剪贴板操作几乎总是用作对用户指定操作的一次性应答-如从菜单中选择Paste命令。尽管DDE也可以由用户启动,但它继续发挥作用一般不必用户进一步干预。DDE有三种数据交换方式:
(1) 冷链:数据交换是一次性数据传输,与剪贴板相同。
(2) 温链:当数据交换时服务器通知客户,然后客户必须请求新的数据。
(3) 热链:当数据交换时服务器自动给客户发送数据。
DDE交换可以发生在单机或网络中不同计算机的应用程序之间。开发者还可以定义定制的DDE数据格式进行应用程序之间特别目的IPC,它们有更紧密耦合的通信要求。大多数基于Windows的应用程序都支持DDE。
8 对象连接与嵌入
应用程序利用对象连接与嵌入(OLE)技术管理复合文档(由多种数据格式组成的文档),OLE提供使某应用程序更容易调用其它应用程序进行数据编辑的服务。例如,OLE支持的字处理器可以嵌套电子表格,当用户要编辑电子表格时OLE库可自动启动电子表格编辑器。当用户退出电子表格编辑器时,该表格已在原始字处理器文档中得到更新。在这里电子表格编辑器变成了字处理器的扩展,而如果使用DDE,用户要显式地启动电子表格编辑器。
同DDE技术相同,大多数基于Windows的应用程序都支持OLE技术。
9 动态连接库
Win32动态连接库(DLL)中的全局数据可以被调用DLL的所有进程共享,这就又给进程间通信开辟了一条新的途径,当然访问时要注意同步问题。
虽然可以通过DLL进行进程间数据共享,但从数据安全的角度考虑,我们并不提倡这种方法,使用带有访问权限控制的共享内存的方法更好一些。
10 远程过程调用
Win32 API提供的远程过程调用(RPC)使应用程序可以使用远程调用函数,这使在网络上用RPC进行进程通信就像函数调用那样简单。RPC既可以在单机不同进程间使用也可以在网络中使用。
由于Win32 API提供的RPC服从OSF-DCE(Open Software Foundation Distributed Computing Environment)标准。所以通过Win32 API编写的RPC应用程序能与其它操作系统上支持DEC的RPC应用程序通信。使用RPC开发者可以建立高性能、紧密耦合的分布式应用程序。
11 NetBios函数
Win32 API提供NetBios函数用于处理低级网络控制,这主要是为IBM NetBios系统编写与Windows的接口。除非那些有特殊低级网络功能要求的应用程序,其它应用程序最好不要使用NetBios函数来进行进程间通信。
12 Sockets
Windows Sockets规范是以U.C.Berkeley大学BSD UNIX中流行的Socket接口为范例定义的一套Windows下的网络编程接口。除了Berkeley Socket原有的库函数以外,还扩展了一组针对Windows的函数,使程序员可以充分利用Windows的消息机制进行编程。
现在通过Sockets实现进程通信的网络应用越来越多,这主要的原因是Sockets的跨平台性要比其它IPC机制好得多,另外WinSock 2.0不仅支持TCP/IP协议,而且还支持其它协议(如IPX)。Sockets的唯一缺点是它支持的是底层通信操作,这使得在单机的进程间进行简单数据传递不太方便,这时使用下面将介绍的WM_COPYDATA消息将更合适些。
13 WM_COPYDATA消息
WM_COPYDATA是一种非常强大却鲜为人知的消息。当一个应用向另一个应用传送数据时,发送方只需使用调用SendMessage函数,参数是目的窗口的句柄、传递数据的起始地址、WM_COPYDATA消息。接收方只需像处理其它消息那样处理WM_COPY DATA消息,这样收发双方就实现了数据共享。
WM_COPYDATA是一种非常简单的方法,它在底层实际上是通过文件映射来实现的。它的缺点是灵活性不高,并且它只能用于Windows平台的单机环境下。

大数据常用同步工具

一、离线数据同步

DataX

阿里的Datax是比较优秀的产品,基于python,提供各种数据村塾的读写插件,多线程执行,使用起来也很简单,操作简单通常只需要两步;

创建作业的配置文件(json格式配置reader,writer);

启动执行配置作业。

非常适合离线数据,增量数据可以使用一些编码的方式实现,

缺点:仅仅针对insert数据比较有效,update数据就不适合。缺乏对增量更新的内置支持,因为DataX的灵活架构,可以通过shell脚本等方式方便实现增量同步。

参考资料:

github地址:https://github.com/alibaba/DataX

dataX3.0介绍:https://www.jianshu.com/p/65c440f9bce1

datax初体验:https://www.imooc.com/article/15640

文档:https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md

Sqoop

Sqoop(发音:skup)是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。

地址:http://sqoop.apache.org/

Sqoop导入:导入工具从RDBMS到HDFS导入单个表。表中的每一行被视为HDFS的记录。所有记录被存储在文本文件的文本数据或者在Avro和序列文件的二进制数据。

Sqoop导出:导出工具从HDFS导出一组文件到一个RDBMS。作为输入到Sqoop文件包含记录,这被称为在表中的行。那些被读取并解析成一组记录和分隔使用用户指定的分隔符。

Sqoop支持全量数据导入和增量数据导入(增量数据导入分两种,一是基于递增列的增量数据导入(Append方式)。二是基于时间列的增量数据导入(LastModified方式)),同时可以指定数据是否以并发形式导入。

Kettle

Kettle是一款国外开源的ETL工具,纯java编写,可以在Window、linux、Unix上运行,数据抽取高效稳定。

Kettle的Spoon有丰富的Steps可以组装开发出满足多种复杂应用场景的数据集成作业,方便实现全量、增量数据同步。缺点是通过定时运行,实时性相对较差。

NiFi

Apache NiFi 是一个易于使用、功能强大而且可靠的数据拉取、数据处理和分发系统,用于自动化管理系统间的数据流。它支持高度可配置的指示图的数据路由、转换和系统中介逻辑,支持从多种数据源动态拉取数据。

NiFi基于Web方式工作,后台在服务器上进行调度。 用户可以为数据处理定义为一个流程,然后进行处理,后台具有数据处理引擎、任务调度等组件。

几个核心概念:

Nifi 的设计理念接近于基于流的编程 Flow Based Programming。

FlowFile:表示通过系统移动的每个对象,包含数据流的基本属性

FlowFile Processor(处理器):负责实际对数据流执行工作

Connection(连接线):负责不同处理器之间的连接,是数据的有界缓冲区

Flow Controller(流量控制器):管理进程使用的线程及其分配

Process Group(过程组):进程组是一组特定的进程及其连接,允许组合其他组件创建新组件

参考资料

Nifi简介及核心概念整理

官方网站:http://nifi.apache.org/index.html

二、实时数据同步

实时同步最灵活的还是用kafka做中间转发,当数据发生变化时,记录变化到kafka,需要同步数据的程序订阅消息即可,需要研发编码支持。这里说个mysql数据库的同步组件,阿里的canal和otter

canal

https://github.com/alibaba/canal

数据抽取简单的来说,就是将一个表的数据提取到另一个表中。有很多的ETL工具可以帮助我们来进行数据的抽取和转换,ETL工具能进行一次性或者定时作业抽取数据,不过canal作为阿里巴巴提供的开源的数据抽取项目,能够做到实时抽取,原理就是伪装成mysql从节点,读取mysql的binlog,生成消息,客户端订阅这些数据变更消息,处理并存储。下面我们来一起搭建一下canal服务

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。

ps. 目前内部版本已经支持mysql和oracle部分版本的日志解析,当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)

基于日志增量订阅&消费支持的业务:

数据库镜像

数据库实时备份

多级索引 (卖家和买家各自分库索引)

search build

业务cache刷新

价格变化等重要业务消息

otter

https://github.com/alibaba/otter

otter是在canal基础上又重新实现了可配置的消费者,使用otter的话,刚才说过的消费者就不需要写了,而otter提供了一个web界面,可以自定义同步任务及map表。非常适合mysql库之间的同步。

另外:otter已在阿里云推出商业化版本 数据传输服务DTS, 开通即用,免去部署维护的昂贵使用成本。DTS针对阿里云RDS、DRDS等产品进行了适配,解决了Binlog日志回收,主备切换、VPC网络切换等场景下的同步高可用问题。同时,针对RDS进行了针对性的性能优化。出于稳定性、性能及成本的考虑,强烈推荐阿里云用户使用DTS产品。

Ⅲ Linux多进程和线程同步的几种方式

Linux 线程同步的三种方法
线程的最大特点是资源的共享性,但资源共享中的同步问题是多线程编程的难点。linux下提供了多种方式来处理线程同步,最常用的是互斥锁、条件变量和信号量。
一、互斥锁(mutex)
通过锁机制实现线程间的同步。
初始化锁。在Linux下,线程的互斥量数据类型是pthread_mutex_t。在使用前,要对它进行初始化。
静态分配:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
动态分配:int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutex_attr_t *mutexattr);
加锁。对共享资源的访问,要对互斥量进行加锁,如果互斥量已经上了锁,调用线程会阻塞,直到互斥量被解锁。
int pthread_mutex_lock(pthread_mutex *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
解锁。在完成了对共享资源的访问后,要对互斥量进行解锁。
int pthread_mutex_unlock(pthread_mutex_t *mutex);
销毁锁。锁在是使用完成后,需要进行销毁以释放资源。
int pthread_mutex_destroy(pthread_mutex *mutex);
[csharp] view plain
#include <cstdio>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
#include "iostream"
using namespace std;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int tmp;
void* thread(void *arg)
{
cout << "thread id is " << pthread_self() << endl;
pthread_mutex_lock(&mutex);
tmp = 12;
cout << "Now a is " << tmp << endl;
pthread_mutex_unlock(&mutex);
return NULL;
}
int main()
{
pthread_t id;
cout << "main thread id is " << pthread_self() << endl;
tmp = 3;
cout << "In main func tmp = " << tmp << endl;
if (!pthread_create(&id, NULL, thread, NULL))
{
cout << "Create thread success!" << endl;
}
else
{
cout << "Create thread failed!" << endl;
}
pthread_join(id, NULL);
pthread_mutex_destroy(&mutex);
return 0;
}
//编译:g++ -o thread testthread.cpp -lpthread
二、条件变量(cond)
互斥锁不同,条件变量是用来等待而不是用来上锁的。条件变量用来自动阻塞一个线程,直到某特殊情况发生为止。通常条件变量和互斥锁同时使用。条件变量分为两部分: 条件和变量。条件本身是由互斥量保护的。线程在改变条件状态前先要锁住互斥量。条件变量使我们可以睡眠等待某种条件出现。条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待"条件变量的条件成立"而挂起;另一个线程使"条件成立"(给出条件成立信号)。条件的检测是在互斥锁的保护下进行的。如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。如果另一个线程改变了条件,它发信号给关联的条件变量,唤醒一个或多个等待它的线程,重新获得互斥锁,重新评价条件。如果两进程共享可读写的内存,条件变量可以被用来实现这两进程间的线程同步。
初始化条件变量。
静态态初始化,pthread_cond_t cond = PTHREAD_COND_INITIALIER;
动态初始化,int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
等待条件成立。释放锁,同时阻塞等待条件变量为真才行。timewait()设置等待时间,仍未signal,返回ETIMEOUT(加锁保证只有一个线程wait)
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timewait(pthread_cond_t *cond,pthread_mutex *mutex,const timespec *abstime);
激活条件变量。pthread_cond_signal,pthread_cond_broadcast(激活所有等待线程)
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond); //解除所有线程的阻塞
清除条件变量。无线程等待,否则返回EBUSY
int pthread_cond_destroy(pthread_cond_t *cond);
[cpp] view plain
#include <stdio.h>
#include <pthread.h>
#include "stdlib.h"
#include "unistd.h"
pthread_mutex_t mutex;
pthread_cond_t cond;
void hander(void *arg)
{
free(arg);
(void)pthread_mutex_unlock(&mutex);
}
void *thread1(void *arg)
{
pthread_cleanup_push(hander, &mutex);
while(1)
{
printf("thread1 is running\n");
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);
printf("thread1 applied the condition\n");
pthread_mutex_unlock(&mutex);
sleep(4);
}
pthread_cleanup_pop(0);
}
void *thread2(void *arg)
{
while(1)
{
printf("thread2 is running\n");
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);
printf("thread2 applied the condition\n");
pthread_mutex_unlock(&mutex);
sleep(1);
}
}
int main()
{
pthread_t thid1,thid2;
printf("condition variable study!\n");
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond, NULL);
pthread_create(&thid1, NULL, thread1, NULL);
pthread_create(&thid2, NULL, thread2, NULL);
sleep(1);
do
{
pthread_cond_signal(&cond);
}while(1);
sleep(20);
pthread_exit(0);
return 0;
}
[cpp] view plain
#include <pthread.h>
#include <unistd.h>
#include "stdio.h"
#include "stdlib.h"
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
struct node
{
int n_number;
struct node *n_next;
}*head = NULL;

static void cleanup_handler(void *arg)
{
printf("Cleanup handler of second thread./n");
free(arg);
(void)pthread_mutex_unlock(&mtx);
}
static void *thread_func(void *arg)
{
struct node *p = NULL;
pthread_cleanup_push(cleanup_handler, p);
while (1)
{
//这个mutex主要是用来保证pthread_cond_wait的并发性
pthread_mutex_lock(&mtx);
while (head == NULL)
{
//这个while要特别说明一下,单个pthread_cond_wait功能很完善,为何
//这里要有一个while (head == NULL)呢?因为pthread_cond_wait里的线
//程可能会被意外唤醒,如果这个时候head != NULL,则不是我们想要的情况。
//这个时候,应该让线程继续进入pthread_cond_wait
// pthread_cond_wait会先解除之前的pthread_mutex_lock锁定的mtx,
//然后阻塞在等待对列里休眠,直到再次被唤醒(大多数情况下是等待的条件成立
//而被唤醒,唤醒后,该进程会先锁定先pthread_mutex_lock(&mtx);,再读取资源
//用这个流程是比较清楚的
pthread_cond_wait(&cond, &mtx);
p = head;
head = head->n_next;
printf("Got %d from front of queue/n", p->n_number);
free(p);
}
pthread_mutex_unlock(&mtx); //临界区数据操作完毕,释放互斥锁
}
pthread_cleanup_pop(0);
return 0;
}
int main(void)
{
pthread_t tid;
int i;
struct node *p;
//子线程会一直等待资源,类似生产者和消费者,但是这里的消费者可以是多个消费者,而
//不仅仅支持普通的单个消费者,这个模型虽然简单,但是很强大
pthread_create(&tid, NULL, thread_func, NULL);
sleep(1);
for (i = 0; i < 10; i++)
{
p = (struct node*)malloc(sizeof(struct node));
p->n_number = i;
pthread_mutex_lock(&mtx); //需要操作head这个临界资源,先加锁,
p->n_next = head;
head = p;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mtx); //解锁
sleep(1);
}
printf("thread 1 wanna end the line.So cancel thread 2./n");
//关于pthread_cancel,有一点额外的说明,它是从外部终止子线程,子线程会在最近的取消点,退出
//线程,而在我们的代码里,最近的取消点肯定就是pthread_cond_wait()了。
pthread_cancel(tid);
pthread_join(tid, NULL);
printf("All done -- exiting/n");
return 0;
}
三、信号量(sem)
如同进程一样,线程也可以通过信号量来实现通信,虽然是轻量级的。信号量函数的名字都以"sem_"打头。线程使用的基本信号量函数有四个。
信号量初始化。
int sem_init (sem_t *sem , int pshared, unsigned int value);
这是对由sem指定的信号量进行初始化,设置好它的共享选项(linux 只支持为0,即表示它是当前进程的局部信号量),然后给它一个初始值VALUE。
等待信号量。给信号量减1,然后等待直到信号量的值大于0。
int sem_wait(sem_t *sem);
释放信号量。信号量值加1。并通知其他等待线程。
int sem_post(sem_t *sem);
销毁信号量。我们用完信号量后都它进行清理。归还占有的一切资源。
int sem_destroy(sem_t *sem);
[cpp] view plain
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <errno.h>
#define return_if_fail(p) if((p) == 0){printf ("[%s]:func error!/n", __func__);return;}
typedef struct _PrivInfo
{
sem_t s1;
sem_t s2;
time_t end_time;
}PrivInfo;

static void info_init (PrivInfo* thiz);
static void info_destroy (PrivInfo* thiz);
static void* pthread_func_1 (PrivInfo* thiz);
static void* pthread_func_2 (PrivInfo* thiz);

int main (int argc, char** argv)
{
pthread_t pt_1 = 0;
pthread_t pt_2 = 0;
int ret = 0;
PrivInfo* thiz = NULL;
thiz = (PrivInfo* )malloc (sizeof (PrivInfo));
if (thiz == NULL)
{
printf ("[%s]: Failed to malloc priv./n");
return -1;
}
info_init (thiz);
ret = pthread_create (&pt_1, NULL, (void*)pthread_func_1, thiz);
if (ret != 0)
{
perror ("pthread_1_create:");
}
ret = pthread_create (&pt_2, NULL, (void*)pthread_func_2, thiz);
if (ret != 0)
{
perror ("pthread_2_create:");
}
pthread_join (pt_1, NULL);
pthread_join (pt_2, NULL);
info_destroy (thiz);
return 0;
}
static void info_init (PrivInfo* thiz)
{
return_if_fail (thiz != NULL);
thiz->end_time = time(NULL) + 10;
sem_init (&thiz->s1, 0, 1);
sem_init (&thiz->s2, 0, 0);
return;
}
static void info_destroy (PrivInfo* thiz)
{
return_if_fail (thiz != NULL);
sem_destroy (&thiz->s1);
sem_destroy (&thiz->s2);
free (thiz);
thiz = NULL;
return;
}
static void* pthread_func_1 (PrivInfo* thiz)
{
return_if_fail(thiz != NULL);
while (time(NULL) < thiz->end_time)
{
sem_wait (&thiz->s2);
printf ("pthread1: pthread1 get the lock./n");
sem_post (&thiz->s1);
printf ("pthread1: pthread1 unlock/n");
sleep (1);
}
return;
}
static void* pthread_func_2 (PrivInfo* thiz)
{
return_if_fail (thiz != NULL);
while (time (NULL) < thiz->end_time)
{
sem_wait (&thiz->s1);
printf ("pthread2: pthread2 get the unlock./n");
sem_post (&thiz->s2);
printf ("pthread2: pthread2 unlock./n");
sleep (1);
}
return;
}

阅读全文

与进程间大数据同步相关的资料

热点内容
qq聊天记录彻底删除pc 浏览:11
无线网络打印机怎么连接电脑 浏览:983
健美租车app怎么用 浏览:298
怎么查看c盘所有文件内容 浏览:591
web服务器数据库 浏览:194
阿里云数据库怎么连接 浏览:160
使用ug编程配什么显卡 浏览:115
ipad百度云文件找不到 浏览:581
java中变量的存储 浏览:795
linux搭建bugfree 浏览:652
win10专业版小功能介绍 浏览:16
学数控编程如何学 浏览:14
win10直接删除文件 浏览:349
少儿编程和奥数哪个大 浏览:956
学校网站查成绩怎么办 浏览:657
javaweb面试 浏览:4
qq空间说说点不进去 浏览:772
nodejscms系统 浏览:822
追星数据组是什么东西 浏览:3
文件的格式怎么建立 浏览:529

友情链接