⑴ C语言,OCI多线程建立session的问题,需要一个多线程连接的示例代码
。。
你子线程控制同步了么? 断错误一般是内存操作出错 和oci 或者pthread的关系不大!
void* OracleProcess(GPS_DATA GpsRec) // 数据库数迹念嫌姿手据处理
{
interval = 0;
struct HashItem* pHash;
pHash = inithashtable(MAX_REC<<2);
char sql[384] = {0};
char temp[256] = {0};
char tName[10] = {0}; // 表名字
int i,k;
int j = TotalRec >> RATE;
double distance;
for(i=0; i < j; i++)
{
sprintf(temp,"%s%f%f%f%d",gps_last[i].tid,gps_last[i].lon,gps_last[i].lat,gps_last[i].speed,gps_last[i].udate);
InsertHash(temp, pHash, MAX_REC<<2); // 插入最后高搭GPS信息到hash
memset(temp,0x00,256);
}
for(i = 0; i < TotalRec; i++)
{
for(k=0; k<j; k++) // 查询车机是否在册
if(strcmp(GpsRec[i].tid,tid[k]) == 0)
break;
if(k < j)
{
if(GpsRec[i].udate != 0.00)
{
distance = InfoUpdate(GpsRec,i); // 最新GPS数据更新
sprintf(temp,"%s%f%f%f%d",GpsRec[i].tid,GpsRec[i].lon,GpsRec[i].lat,GpsRec[i].speed,GpsRec[i].udate);
if(GetHashTablePos(temp, pHash, MAX_REC<<2) == -1) // 查找hash是否存在
{
if (distance > 0.0001)
{
sprintf(tName,"GPS_%d_Y",tf[k]);
InsertHash(temp, pHash, MAX_REC<<2); // 插入
sprintf(sql,"insert into %s (id,tm_id,lon,lat, speed, utc_time, udate,mileage,DIRECTION,DISTANCE) values (seq_gps.nextVal,'%s','%f','%f','%f','%d','%d','%f','%d','%f','%d')",
tName,GpsRec[i].tid,GpsRec[i].lon,GpsRec[i].lat,GpsRec[i].speed,GpsRec[i].utime,GpsRec[i].udate,GpsRec[i].mileage,GpsRec[i].dir,distance,interval);
printf("%s\n",sql);
oci_excu(oracle_env,(text *)sql,0); // 插入数据
memset(tName,0x00,10);
}
}
memset(sql,0x00,384);
memset(temp,0x00,256);
}
}
}
memset(GpsRec,0x00,sizeof(GpsRec));
free(pHash);
pthread_exit(NULL);
}
void TcpProcess(int tfd) // 处理TCP连接上的事务
{
struct timeval ntime;
int index = 0,times,ret;
int rlen = 0,rflag = 0;
char recvbuf[513] = {0};
bzero(recvbuf,513);
while(1)
{
ret = rlen = read(tfd,recvbuf,512);
if(rlen <= 0)
break;
if((rlen%32) == 0) // 32长度为标准TCP信息
{
times = 0;
ret >>= 5;
while(ret--)
{
if(tflag[tfd] == tfd) // 已经存在的socket
{
LOVENIX *info = (LOVENIX *)malloc(sizeof(LOVENIX));
memset(info,0x00,sizeof(LOVENIX));
if(recvbuf[times] == 0x58 || recvbuf[times] == 0x59)
ProtocolAnalysisLovenixTcp(&recvbuf[times],info);
else if(recvbuf[times] == 0x24)
ProtocolAnalysisLovenixUdp(&recvbuf[times],info);
sprintf(info->tid,"%s",seq[tfd]); // 合成车辆ID
DataProcess(info); // 处理GPS数据
free(info);
gettimeofday(&ntime, NULL);
cntime[tfd] = ntime.tv_sec; // 更新时间
times += 32;
}
}
}
else if(rlen > 32)
{
if(!rflag)
{
if((index = RegLovenix(tfd,recvbuf)) > -1)
{
sprintf(seq[tfd],"%s",tid[index]); // 将对应的socket设备ID保存
gettimeofday(&ntime, NULL);
sfd[tfd] = tfd;
cntime[tfd] = ntime.tv_sec;
tflag[tfd] = tfd;
rflag = 1;
}
}
}
if(rlen < 512); // 已经读完
break;
memset(recvbuf,0x00,rlen);
}
}
void *TcpServer(void *arg)
{
int port = (unsigned int) arg;
int efd,i;
struct timeval ntime;
int listener, nfds, n, listen_opt = 1, lisnum;
struct sockaddr_in my_addr, their_addr;
socklen_t len = sizeof(their_addr);
lisnum = MAXLISTEN;
for(i=0; i<MAX_REC; i++)
{
sfd[i] = 0;
tflag[i] = 0;
}
if ((listener = socket(PF_INET, SOCK_STREAM, 0)) == -1) // 开启 socket 监听
{
lprintf(lfd, FATAL, "TCP Socket error!\n");
exit(1);
}
else
lprintf(lfd, INFO, "TCP socket creat susscess!\n");
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, (void *) &listen_opt,(int) sizeof(listen_opt)); // 设置端口多重邦定
setnonblocking(listener);
bzero(&my_addr, sizeof(my_addr));
my_addr.sin_family = PF_INET;
my_addr.sin_port = htons(port);
my_addr.sin_addr.s_addr = INADDR_ANY;
if (bind(listener, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)) == -1)
{
lprintf(lfd, FATAL, "TCP bind error!\n");
exit(1);
}
else
lprintf(lfd, INFO, "TCP bind susscess!\n");
if (listen(listener, lisnum) == -1)
{
lprintf(lfd, FATAL, "TCP listen error!\n");
exit(1);
}
else
lprintf(lfd, INFO, "TCP listen susscess!\n");
kdpfd = epoll_create(MAXEPOLLSIZE); // 创建 epoll句柄,把监听socket加入到epoll集合里
ev.events = EPOLLIN | EPOLLET; // 注册epoll 事件
ev.data.fd = listener;
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &ev) < 0)
lprintf(lfd, FATAL, "EPOLL_CTL_ADD error!\n");
while (1)
{
sem_wait(&sem_tcp); // 等待 sem_TCP
sem_wait(&sem_tp); // 将tp值减一
nfds = epoll_wait(kdpfd, events, MAXEPOLLSIZE, 1); // 等待有事件发生
if (nfds == -1)
lprintf(lfd, FATAL,"EPOLL_WAIT error!\n");
for (n = 0; n < nfds; ++n) // 处理epoll所有事件
{
if (events[n].data.fd == listener) // 如果是连接事件
{
if ((efd = accept(listener, (struct sockaddr *) &their_addr,&len)) < 0)
{
lprintf(lfd, FATAL, "accept error!\n");
continue;
}
else
lprintf(lfd, INFO, "Client from :%s\tSocket ID:%d\n", inet_ntoa(their_addr.sin_addr) ,efd);
setnonblocking(efd); // 设置新连接为非阻塞模式
ev.events = EPOLLIN | EPOLLET; // 注册新连接
ev.data.fd = efd;
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, efd, &ev) < 0) // 将新连接加入EPOLL的监听队列
lprintf(lfd, FATAL, "EPOLL_CTL_ADD error!\n");
else
{
gettimeofday(&ntime, NULL);
cntime[efd] = ntime.tv_sec;
sfd[efd] = efd;
}
}
else if (events[n].events & EPOLLIN)
tpool_add_work(pool, TcpProcess, (void*)events[n].data.fd); // 读取分析TCP信息
else
{
close(events[n].data.fd);
epoll_ctl(kdpfd, EPOLL_CTL_DEL, events[n].data.fd, &ev);
}
}
sem_post(&sem_cm);
sem_post(&sem_udp);
}
close(listener);
}
int DataProcess(LOVENIX *info) // 处理GPS数据
{
if(sflag == 0 && (CacheRec != TotalRec)) // 缓存1可用且没有满
{
gps_cache[CacheRec].lat = info->lat;
gps_cache[CacheRec].mileage = info->mileage;
gps_cache[CacheRec].lon = info->lon;
gps_cache[CacheRec].speed = atod(info->speed, strlen(info->speed))*0.514444444*3.6;
gps_cache[CacheRec].udate = atoi(info->udate);
gps_cache[CacheRec].utime = atoi(info->utime);
gps_cache[CacheRec].dir = atoi(info->dir);
sprintf(gps_cache[CacheRec].tid ,"%s",info->tid);
CacheRec++;
// printf("CacheRec %d\tTotalRec %d \t sflag:%d\n",CacheRec,TotalRec,sflag);
if(CacheRec == TotalRec)
{
sflag = 1;
pthread_attr_init(&attr); // 初始化属性值,均设为默认值
pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); // 设置线程为分离属性
if (pthread_create(&thread, &attr,(void*) OracleProcess,(void*)gps_cache)) // 创建数据处理线程
lprintf(lfd, FATAL, "oracle pthread_creat error!\n");
CacheRec = 0;
}
}
else if(sflag == 1 && (Cache1Rec != TotalRec)) // 缓存2可用且没有满
{
gps_cache1[Cache1Rec].mileage = info->mileage;
gps_cache1[Cache1Rec].lat = info->lat;
gps_cache1[Cache1Rec].lon = info->lon;
gps_cache1[Cache1Rec].speed = atod(info->speed, strlen(info->speed))*0.514444444*3.6;
gps_cache1[Cache1Rec].udate = atoi(info->udate);
gps_cache1[Cache1Rec].utime = atoi(info->utime);
gps_cache1[Cache1Rec].dir = atoi(info->dir);
sprintf(gps_cache1[Cache1Rec].tid ,"%s",info->tid);
Cache1Rec++;
if(Cache1Rec == TotalRec)
{
sflag = 0;
pthread_attr_init(&attr); // 初始化属性值,均设为默认值
pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); // 设置线程为分离属性
if (pthread_create(&thread, &attr,(void*) OracleProcess,(void*)gps_cache1)) // 创建数据处理线程
lprintf(lfd, FATAL, "oracle pthread_creat error!\n");
Cache1Rec = 0;
}
}
else
{
lprintf(lfd, FATAL, "No cache to use!\n");
return (0);
}
return (1);
}
⑵ linux C下多线程接收数据怎么进行存储再统一处理
在Linux系统中使用C/C++进行多线程编程时,我们遇到最多的就是对同一变量的多线程读写问题,大多情况下遇到这类问题都是通过锁机制来处理,但这对程序的性能带来了很大的影响,当然对于那些系统原生支持原子操作的数据类型来说,我们可以使用原子操作来处理,这能对程序的性能会得到一定的提高。那么对于那些系统不支持原子操作的自定义数据类型,在不使用锁的情况下如何做到线程安全呢?本文将从线程局部存储方面,简单讲解处理这一类线程安全问题的方法。
一、数据类型
在C/C++程序中常存在全局变量、函数内定义的静态变量以及局部变量,对于局部变量来说,其不存在线程安全问题,因此不在本文讨论的范围之内。全局变量和函数内定义的静态变量,是同一进程中各个线程都可以访问的共享变量,因此它们存在多线程读写问题。在一个线程中修改了变量中的内容,其他线程都能感知并且能读取已更改过的内容,这对数据交换来说是非常快捷的,但是由于多线程的存在,对于同一个变量可能存在两个或两个以上的线程同时修改变量所在的内存内容,同时又存在多个线程在变量在修改的时去读取该内存值,如果没有使用相应的同步机制来保护该内存的话,那么所读取到的数据将是不可预知的,甚至可能导致程序崩溃。
如果需要在一个线程内部的各个函数调用都能访问、但其它线程不能访问的变量,这就需要新的机制来实现,我们称之为Static memory local to a thread (线程局部静态变量),同时也可称之为线程特有数据(TSD: Thread-Specific Data)或者线程局部存储(TLS: Thread-Local Storage)。这一类型的数据,在程序中每个线程都会分别维护一份变量的副本(),并且长期存在于该线程中,对此类变量的操作不影响其他线程。如下图:
二、一次性初始化
在讲解线程特有数据之前,先让我们来了解一下一次性初始化。多线程程序有时有这样的需求:不管创建多少个线程,有些数据的初始化只能发生一次。列如:在C++程序中某个类在整个进程的生命周期内只能存在一个实例对象,在多线程的情况下,为了能让该对象能够安全的初始化,一次性初始化机制就显得尤为重要了。——在设计模式中这种实现常常被称之为单例模式(Singleton)。Linux中提供了如下函数来实现一次性初始化:
#include <pthread.h>
// Returns 0 on success, or a positive error number on error
int pthread_once (pthread_once_t *once_control, void (*init) (void));
利用参数once_control的状态,函数pthread_once()可以确保无论有多少个线程调用多少次该函数,也只会执行一次由init所指向的由调用者定义的函数。init所指向的函数没有任何参数,形式如下:
void init (void)
{
// some variables initializtion in here
}
另外,参数once_control必须是pthread_once_t类型变量的指针,指向初始化为PTHRAD_ONCE_INIT的静态变量。在C++0x以后提供了类似功能的函数std::call_once (),用法与该函数类似。使用实例请参考https://github.com/ApusApp/Swift/blob/master/swift/base/singleton.hpp实现。
⑶ C#多线程写数据库
首先对数据库(尤其是Access)使用多线程大多不会提高效率(除非SQL中有耗时但不好资源的操作,如T-SQL中休眠之类的语句)。
建议楼主:使用队列,将要执行的SQL语句放入队列中(如:System.Collection.Queue或ArrayList),然后用一根线程一条一条执行,另外Access不支持事物回滚只有自己想办法实现了。滥用多线程会加大程序开发的难度,以及包括程序的不稳定。
另外:cbyvft的答案“……所有的线程使用同一个连接”
,是严重错误的!!连接对象Connection不能迸发,也就是不能多根李空线程共享一个连接对象,否则很容易引发异常(报错为:...基础对象与RAW分离之类的信息)。
若非要用多线程来做,我可以给你一段代码(我以前开发的项目中一部分),请加我的“网络Hi”并发消息给我,我传给你。
我不在这里帖代码了,因为实现的代码较多,而且比较复杂(使用多线程要哪扮瞎考虑很缺碧多问题,代码要硕壮通用,所以代码量较大)。
⑷ 关于C++多线程编程教学
在Windows NT和Windows 9x中,多线程的编程实现需要调用一系列的API函数,如CreateThread、ResumeThread等,比较麻烦而且容易出错。我们使用Inprise公司的新一代RAD开发工具C++Builder,可以方便地实现多线程的编程。与老牌RAD工具Visual Basic和Delphi比,C++Builer不仅功能非常强大,而且它的编程语言是C++,对于系统开发语言是C的Windows系列操作系统,它具有其它编程语言无可比拟的优势。利用C++Builder提供的TThread对象,多线程的编程变得非常简便易用。那么,如何实现呢?且待我慢慢道来,让你体会一下多线程樱配的强大功能。
1. 创建多线程程序:
首先,先介绍一下实现多线程的具体步骤。在C++Builder中虽然用Tthread对象说明了线程的概念,但是Tthread对象本身并不完整,需要在TThread下新建其子类,并重载Execute方法来使用线程对象。在C++Builder下可以很方便地实现这一点。
在C++Builder IDE环境下选择菜单File|New,在肆李New栏中选中Thread Object,按OK,接下来弹出输入框,输入TThread对象子类的名字MyThread,这样C++Builder自动为你创建了一个名为TMyThread的TThread子类。同时编辑器中多了一个名为Unit2.cpp的单元,这就是我们创建的TMyThread子类脊雹指的原码,如下:
#include
#pragma hdrstop
#include “Unit2.h”
#pragma package(smart_init)
//---------------------
// Important: Methods and properties of objects in VCL can only be
// used in a method called using Synchronize, for example:
//
// Synchronize(UpdateCaption);
//
// where UpdateCaption could look like:
//
// void __fastcall MyThread::UpdateCaption()
// {
// Form1->Caption = “Updated in a thread”;
// }
//--------------------
__fastcall MyThread::MyThread(bool CreateSuspended)
: TThread(CreateSuspended)
{
}
//--------------------
void __fastcall MyThread::Execute()
{
//---- Place thread code here ----
}
//---------------------
其中的Execute()函数就是我们要在线程中实现的任务的代码所在处。在原代码中包含Unit2.cpp,这个由我们创建的TMyThread对象就可以使用了。使用时,动态创建一个TMyThread 对象,在构造函数中使用Resume()方法,那么程序中就增加了一个新的我们自己定义的线程TMyThread,具体执行的代码就是Execute()方法重载的代码。要加载更多的线程,没关系,只要继续创建需要数量的TMyThread 对象就成。
⑸ C语言多线程的操作步骤
线程创建
函数原型:intpthread_create(pthread_t*restrict tidp,const pthread_attr_t *restrict attr,void *(*start_rtn)(void),void *restrict arg);
返回值:若是成功建立线程返回0,否则返回错误的编号。
形式参数:_t*restrict tidp要创建的线程的线程id指针;const pthread_attr_t *restrict attr创建线程时的线程属性;void *(start_rtn)(void)返回值是void类型的指针函数;void *restrict arg start_rtn的形参。
线程挂起:该函数的作用使得当前线程挂起,等待另一个线程返回才继续执行。也就是说当程序运行到这个地方时,程序会先停止,然后等线程id为thread的这个线程返回,然后程序才会断续执行。
函数原型:intpthread_join(pthread_tthread, void **value_ptr);
参数说明如下:thread等待退出线程的线程号;value_ptr退出线程的返回值。
返回值:若成功,则返回0;若失败,则返回错误号。
线程退出
函数原型:voidpthread_exit(void *rval_ptr);
获取当前线程id
函数原型:pthread_tpthread_self(void);
互斥锁
创建pthread_mutex_init;销毁pthread_mutex_destroy;加锁pthread_mutex_lock;解锁pthread_mutex_unlock。
条件锁
创建pthread_cond_init;销毁pthread_cond_destroy;触发pthread_cond_signal;广播pthread_cond_broadcast;等待pthread_cond_wait。