❶ C#多线程写数据库
首先对数据库(尤其是Access)使用多线程大多不会提高效率(除非SQL中有耗时但不好资源的操作,如T-SQL中休眠之类的语句)。
建议楼主:使用队列,将要执行的SQL语句放入队列中(如:System.Collection.Queue或ArrayList),然后用一根线程一条一条执行,另外Access不支持事物回滚只有自己想办法实现了。滥用多线程会加大程序开发的难度,以及包括程序的不稳定。
另外:cbyvft的答案“……所有的线程使用同一个连接”
,是严重错误的!!连接对象Connection不能迸发,也就是不能多根李空线程共享一个连接对象,否则很容易引发异常(报错为:...基础对象与RAW分离之类的信息)。
若非要用多线程来做,我可以给你一段代码(我以前开发的项目中一部分),请加我的“网络Hi”并发消息给我,我传给你。
我不在这里帖代码了,因为实现的代码较多,而且比较复杂(使用多线程要哪扮瞎考虑很缺碧多问题,代码要硕壮通用,所以代码量较大)。
❷ C语言怎样实现多线程
首先你要有控制蛇移动方向的全局变量(定义在main以外因为线程函数也要调用它,每次键盘输入都会修改它的值), 比如 char direction 'a' ==左 'w' == 右 'd'==上 's' == 下,然后你在移动时应该是在while里面操作的吧,你每移动一步前都读一下direction这个变量的数值然后再控制移动方向(注意s这个键可以忽略因为不会倒着走) 然后你可以用pthread.h这个库 例子是 pthread t;// 定义一个线程 pthread_create(&t, null, listen_keyboard_input, null);//建立线程执行listen_keyboard_input这个函数 这个线程执行的函数 void listen_keyboard_input(){ while(应该通过某个信号来退出这个循环,从而表示游戏结束){ direction =getchar(); } } 但是这里存在同步问题, 比如当这个线程的getchar()在给direction辅助的同时,你控制贪吃蛇移动的线程正在调用 direction的值来判断下一个移动方向,这就会出问题,所以要加一个锁,叫 mutex lock;这个也定义成全局变量可以使各线程共享。 pthread_mutex_t mutex; //定义一个锁 pthread_mutex_init(&mutex, null, null);//初始化 然后把函数修改成 void listen_keyboard_input(){ while(应该通过某个信号来退出这个循环,从而表示游戏结束){ pthread_mutex_lock(&mutex); direction =getchar(); pthread_mutex_unlock(&mutex); } } 另外一个控制贪吃蛇移动的时候也要加锁 while(.....){ char c; pthread_mutex_lock(&mutex); c = direction; pthread_mutex_unlock(&mutex); switch(c){ ................ } ................................... } 这样就好了 注意你的控制贪吃蛇移动的部分也必须要放在另外一个pthread 里面执行,如果放在主线程, 主线程会一直等listen_keyboard_input而什么事都不会做 你把这两个线程用 pthread_create 创建完成后 用 t1.join(); t2.join(); 就可以使这两个线程并发执行了 如果你用的是linux 来编译的,你再输入gcc 指令后加上 -lpthread 就可以了 还有什么不懂的你可以多找找 pthread 类的例子
❸ 求思路:linux C上多线程接收数据怎么进行存储
在Linux系统中使用C/C++进行多线程编程时,我们遇到最多的就是对同一变量的多线程读写问题,大多情况下遇到这类问题都是通过锁机制来处理,但这对程序的性能带来了很大的影响,当然对于那些系统原生支持原子操作的数据类型来说,我们可以使用原子操作来处理,这能对程序的性能会得到一定的提高。那么对于那些系统不支持原子操作的自定义数据类型,在不使用锁的情况下如何做到线程安全呢?本文将从线程局部存储方面,简单讲解处理这一类线程安全问题的方法。
一、数据类型
在C/C++程序中常存在全局变量、函数内定义的静态变量以及局部变量,对于局部变量来说,其不存在线程安全问题,因此不在本文讨论的范围之内。全局变量和函数内定义的静态变量,是同一进程中各个线程都可以访问的共享变量,因此它们存在多线程读写问题。在一个线程中修改了变量中的内容,其他线程都能感知并且能读取已更改过的内容,这对数据交换来说是非常快捷的,但是由于多线程的存在,对于同一个变量可能存在两个或两个以上的线程同时修改变量所在的内存内容,同时又存在多个线程在变量在修改的时去读取该内存值,如果没有使用相应的同步机制来保护该内存的话,那么所读取到的数据将是不可预知的,甚至可能导致程序崩溃。
如果需要在一个线程内部的各个函数调用都能访问、但其它线程不能访问的变量,这就需要新的机制来实现,我们称之为Static memory local to a thread (线程局部静态变量),同时也可称之为线程特有数据(TSD: Thread-Specific Data)或者线程局部存储(TLS: Thread-Local Storage)。这一类型的数据,在程序中每个线程都会分别维护一份变量的副本(),并且长期存在于该线程中,对此类变量的操作不影响其他线程。如下图:
二、一次性初始化
在讲解线程特有数据之前,先让我们来了解一下一次性初始化。多线程程序有时有这样的需求:不管创建多少个线程,有些数据的初始化只能发生一次。列如:在C++程序中某个类在整个进程的生命周期内只能存在一个实例对象,在多线程的情况下,为了能让该对象能够安全的初始化,一次性初始化机制就显得尤为重要了。——在设计模式中这种实现常常被称之为单例模式(Singleton)。Linux中提供了如下函数来实现一次性初始化:
#include <pthread.h>
// Returns 0 on success, or a positive error number on error
int pthread_once (pthread_once_t *once_control, void (*init) (void));
利用参数once_control的状态,函数pthread_once()可以确保无论有多少个线程调用多少次该函数,也只会执行一次由init所指向的由调用者定义的函数。init所指向的函数没有任何参数,形式如下:
void init (void)
{
// some variables initializtion in here
}
另外,参数once_control必须是pthread_once_t类型变量的指针,指向初始化为PTHRAD_ONCE_INIT的静态变量。在C++0x以后提供了类似功能的函数std::call_once (),用法与该函数类似。使用
❹ 多线程并发访问数据库并同时开启事务的情况下,可能产生的问题包括
AB
C不是问题,C是可重复读隔离级别下的一个正常现象。
❺ C/C++用一个连接多线程并发访问数据库会不会有问题
加个原子锁吧,尽量异步访问
❻ c++数据库那种最快,支持多线程,或文本数据库
选择成品数据库,要看之后的应用结构的才能确定用哪个快宽伍..
如果仅仅是要写入时快,不考虑查询情况..那当然直慎陪或接把C/C++的数据结构给保存了最快..
比如保存一个struct Record,或class Record,限定好成员大小后,直接内存到磁盘的写盘...
这样写最快,而读取只能顺序读取....
另json等是交换格式不乱森是存储格式更不能当数据库用哇....
❼ MySQL数据库是一个多用户,多线程的关系数据库管理系统,其主要技术都包括哪些
在MySQL 8.0 之前, 我们假设一下有一条烂SQL,
mysqlselect * from t1 order by rand() ;
以多个线程在跑,导致CPU被跑满了,其他的请求只能被阻塞进不来。那这种情况怎么办?
大概有以下几种解决办法:
设置max_execution_time 来阻止太长的读SQL。那可能存在的问题是会把所有长SQL都给KILL 掉。有些必须要执行很长时间的也会被误杀。
自己写个脚本检测这类语句,比如order by rand(), 超过一定时间用Kill query thread_id 给杀掉。
那能不能不要杀掉而让他正常运行,但是又仿则碧不影响其他的请求呢?
那mysql 8.0 引入的资源组(resource group,后面简写微RG)可以基本盯敬上解决这类问题。
比如我可以用 RG 来在SQL层面给他限制在特定的一个CPU核上,这样我就不管他,让他备举继续运行,如果有新的此类语句,让他排队好了。
为什么说基本呢?目前只能绑定CPU资源,其他的暂时不行。
那我来演示下如何使用RG。
创建一个资源组user_ytt. 这里解释下各个参数的含义,
type = user 表示这是一个用户态线程,也就是前台的请求线程。如果type=system,表示后台线程,用来限制mysql自己的线程,比如Innodb purge thread,innodb read thread等等。
vcpu 代表cpu的逻辑核数,这里0-1代表前两个核被绑定到这个RG。可以用lscpu,top等列出自己的CPU相关信息。
thread_priority 设置优先级。user 级优先级设置大于0。
mysqlmysql> create resource group user_ytt type = user vcpu = 0-1 thread_priority=19 enable;Query OK, 0 rows affected (0.03 sec)
RG相关信息可以从 information_schema.resource_groups 系统表里检索。
mysqlmysql> select * from information_schema.resource_groups;+---------------------+---------------------+------------------------+----------+-----------------+| RESOURCE_GROUP_NAME | RESOURCE_GROUP_TYPE | RESOURCE_GROUP_ENABLED | VCPU_IDS | THREAD_PRIORITY |+---------------------+---------------------+------------------------+----------+-----------------+| USR_default | USER | 1 | 0-3 | 0 || SYS_default | SYSTEM | 1 | 0-3 | 0 || user_ytt | USER | 1 | 0-1 | 19 |+---------------------+---------------------+------------------------+----------+-----------------+3 rows in set (0.00 sec)
我们来给语句select guid from t1 group by left(guid,8) order by rand() 赋予RG user_ytt。
mysql> show processlist;+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+| Id | User | Host | db | Command | Time | State | Info |+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+| 4 | event_scheler | localhost | NULL | Daemon | 10179 | Waiting on empty queue | NULL || 240 | root | localhost | ytt | Query | 101 | Creating sort index | select guid from t1 group by left(guid,8) order by rand() || 245 | root | localhost | ytt | Query | 0 | starting | show processlist |+-----+-----------------+-----------+------+---------+-------+------------------------+-----------------------------------------------------------+3 rows in set (0.00 sec)
找到连接240对应的thread_id。
mysqlmysql> select thread_id from performance_schema.threads where processlist_id = 240;+-----------+| thread_id |+-----------+| 278 |+-----------+1 row in set (0.00 sec)
给这个线程278赋予RG user_ytt。没报错就算成功了。
mysqlmysql> set resource group user_ytt for 278;Query OK, 0 rows affected (0.00 sec)
当然这个是在运维层面来做的,我们也可以在开发层面结合 MYSQL HINT 来单独给这个语句赋予RG。比如:
mysqlmysql> select /*+ resource_group(user_ytt) */guid from t1 group by left(guid,8) order by rand()....8388602 rows in set (4 min 46.09 sec)
RG的限制:
Linux 平台上需要开启 CAPSYSNICE 特性。比如我机器上用systemd 给mysql 服务加上
systemctl edit mysql@80 [Service]AmbientCapabilities=CAP_SYS_NICE
mysql 线程池开启后RG失效。
freebsd,solaris 平台thread_priority 失效。
目前只能绑定CPU,不能绑定其他资源。
❽ C语言,OCI多线程建立session的问题,需要一个多线程连接的示例代码
。。
你子线程控制同步了么? 断错误一般是内存操作出错 和oci 或者pthread的关系不大!
void* OracleProcess(GPS_DATA GpsRec) // 数据库数迹念嫌姿手据处理
{
interval = 0;
struct HashItem* pHash;
pHash = inithashtable(MAX_REC<<2);
char sql[384] = {0};
char temp[256] = {0};
char tName[10] = {0}; // 表名字
int i,k;
int j = TotalRec >> RATE;
double distance;
for(i=0; i < j; i++)
{
sprintf(temp,"%s%f%f%f%d",gps_last[i].tid,gps_last[i].lon,gps_last[i].lat,gps_last[i].speed,gps_last[i].udate);
InsertHash(temp, pHash, MAX_REC<<2); // 插入最后高搭GPS信息到hash
memset(temp,0x00,256);
}
for(i = 0; i < TotalRec; i++)
{
for(k=0; k<j; k++) // 查询车机是否在册
if(strcmp(GpsRec[i].tid,tid[k]) == 0)
break;
if(k < j)
{
if(GpsRec[i].udate != 0.00)
{
distance = InfoUpdate(GpsRec,i); // 最新GPS数据更新
sprintf(temp,"%s%f%f%f%d",GpsRec[i].tid,GpsRec[i].lon,GpsRec[i].lat,GpsRec[i].speed,GpsRec[i].udate);
if(GetHashTablePos(temp, pHash, MAX_REC<<2) == -1) // 查找hash是否存在
{
if (distance > 0.0001)
{
sprintf(tName,"GPS_%d_Y",tf[k]);
InsertHash(temp, pHash, MAX_REC<<2); // 插入
sprintf(sql,"insert into %s (id,tm_id,lon,lat, speed, utc_time, udate,mileage,DIRECTION,DISTANCE) values (seq_gps.nextVal,'%s','%f','%f','%f','%d','%d','%f','%d','%f','%d')",
tName,GpsRec[i].tid,GpsRec[i].lon,GpsRec[i].lat,GpsRec[i].speed,GpsRec[i].utime,GpsRec[i].udate,GpsRec[i].mileage,GpsRec[i].dir,distance,interval);
printf("%s\n",sql);
oci_excu(oracle_env,(text *)sql,0); // 插入数据
memset(tName,0x00,10);
}
}
memset(sql,0x00,384);
memset(temp,0x00,256);
}
}
}
memset(GpsRec,0x00,sizeof(GpsRec));
free(pHash);
pthread_exit(NULL);
}
void TcpProcess(int tfd) // 处理TCP连接上的事务
{
struct timeval ntime;
int index = 0,times,ret;
int rlen = 0,rflag = 0;
char recvbuf[513] = {0};
bzero(recvbuf,513);
while(1)
{
ret = rlen = read(tfd,recvbuf,512);
if(rlen <= 0)
break;
if((rlen%32) == 0) // 32长度为标准TCP信息
{
times = 0;
ret >>= 5;
while(ret--)
{
if(tflag[tfd] == tfd) // 已经存在的socket
{
LOVENIX *info = (LOVENIX *)malloc(sizeof(LOVENIX));
memset(info,0x00,sizeof(LOVENIX));
if(recvbuf[times] == 0x58 || recvbuf[times] == 0x59)
ProtocolAnalysisLovenixTcp(&recvbuf[times],info);
else if(recvbuf[times] == 0x24)
ProtocolAnalysisLovenixUdp(&recvbuf[times],info);
sprintf(info->tid,"%s",seq[tfd]); // 合成车辆ID
DataProcess(info); // 处理GPS数据
free(info);
gettimeofday(&ntime, NULL);
cntime[tfd] = ntime.tv_sec; // 更新时间
times += 32;
}
}
}
else if(rlen > 32)
{
if(!rflag)
{
if((index = RegLovenix(tfd,recvbuf)) > -1)
{
sprintf(seq[tfd],"%s",tid[index]); // 将对应的socket设备ID保存
gettimeofday(&ntime, NULL);
sfd[tfd] = tfd;
cntime[tfd] = ntime.tv_sec;
tflag[tfd] = tfd;
rflag = 1;
}
}
}
if(rlen < 512); // 已经读完
break;
memset(recvbuf,0x00,rlen);
}
}
void *TcpServer(void *arg)
{
int port = (unsigned int) arg;
int efd,i;
struct timeval ntime;
int listener, nfds, n, listen_opt = 1, lisnum;
struct sockaddr_in my_addr, their_addr;
socklen_t len = sizeof(their_addr);
lisnum = MAXLISTEN;
for(i=0; i<MAX_REC; i++)
{
sfd[i] = 0;
tflag[i] = 0;
}
if ((listener = socket(PF_INET, SOCK_STREAM, 0)) == -1) // 开启 socket 监听
{
lprintf(lfd, FATAL, "TCP Socket error!\n");
exit(1);
}
else
lprintf(lfd, INFO, "TCP socket creat susscess!\n");
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, (void *) &listen_opt,(int) sizeof(listen_opt)); // 设置端口多重邦定
setnonblocking(listener);
bzero(&my_addr, sizeof(my_addr));
my_addr.sin_family = PF_INET;
my_addr.sin_port = htons(port);
my_addr.sin_addr.s_addr = INADDR_ANY;
if (bind(listener, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)) == -1)
{
lprintf(lfd, FATAL, "TCP bind error!\n");
exit(1);
}
else
lprintf(lfd, INFO, "TCP bind susscess!\n");
if (listen(listener, lisnum) == -1)
{
lprintf(lfd, FATAL, "TCP listen error!\n");
exit(1);
}
else
lprintf(lfd, INFO, "TCP listen susscess!\n");
kdpfd = epoll_create(MAXEPOLLSIZE); // 创建 epoll句柄,把监听socket加入到epoll集合里
ev.events = EPOLLIN | EPOLLET; // 注册epoll 事件
ev.data.fd = listener;
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &ev) < 0)
lprintf(lfd, FATAL, "EPOLL_CTL_ADD error!\n");
while (1)
{
sem_wait(&sem_tcp); // 等待 sem_TCP
sem_wait(&sem_tp); // 将tp值减一
nfds = epoll_wait(kdpfd, events, MAXEPOLLSIZE, 1); // 等待有事件发生
if (nfds == -1)
lprintf(lfd, FATAL,"EPOLL_WAIT error!\n");
for (n = 0; n < nfds; ++n) // 处理epoll所有事件
{
if (events[n].data.fd == listener) // 如果是连接事件
{
if ((efd = accept(listener, (struct sockaddr *) &their_addr,&len)) < 0)
{
lprintf(lfd, FATAL, "accept error!\n");
continue;
}
else
lprintf(lfd, INFO, "Client from :%s\tSocket ID:%d\n", inet_ntoa(their_addr.sin_addr) ,efd);
setnonblocking(efd); // 设置新连接为非阻塞模式
ev.events = EPOLLIN | EPOLLET; // 注册新连接
ev.data.fd = efd;
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, efd, &ev) < 0) // 将新连接加入EPOLL的监听队列
lprintf(lfd, FATAL, "EPOLL_CTL_ADD error!\n");
else
{
gettimeofday(&ntime, NULL);
cntime[efd] = ntime.tv_sec;
sfd[efd] = efd;
}
}
else if (events[n].events & EPOLLIN)
tpool_add_work(pool, TcpProcess, (void*)events[n].data.fd); // 读取分析TCP信息
else
{
close(events[n].data.fd);
epoll_ctl(kdpfd, EPOLL_CTL_DEL, events[n].data.fd, &ev);
}
}
sem_post(&sem_cm);
sem_post(&sem_udp);
}
close(listener);
}
int DataProcess(LOVENIX *info) // 处理GPS数据
{
if(sflag == 0 && (CacheRec != TotalRec)) // 缓存1可用且没有满
{
gps_cache[CacheRec].lat = info->lat;
gps_cache[CacheRec].mileage = info->mileage;
gps_cache[CacheRec].lon = info->lon;
gps_cache[CacheRec].speed = atod(info->speed, strlen(info->speed))*0.514444444*3.6;
gps_cache[CacheRec].udate = atoi(info->udate);
gps_cache[CacheRec].utime = atoi(info->utime);
gps_cache[CacheRec].dir = atoi(info->dir);
sprintf(gps_cache[CacheRec].tid ,"%s",info->tid);
CacheRec++;
// printf("CacheRec %d\tTotalRec %d \t sflag:%d\n",CacheRec,TotalRec,sflag);
if(CacheRec == TotalRec)
{
sflag = 1;
pthread_attr_init(&attr); // 初始化属性值,均设为默认值
pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); // 设置线程为分离属性
if (pthread_create(&thread, &attr,(void*) OracleProcess,(void*)gps_cache)) // 创建数据处理线程
lprintf(lfd, FATAL, "oracle pthread_creat error!\n");
CacheRec = 0;
}
}
else if(sflag == 1 && (Cache1Rec != TotalRec)) // 缓存2可用且没有满
{
gps_cache1[Cache1Rec].mileage = info->mileage;
gps_cache1[Cache1Rec].lat = info->lat;
gps_cache1[Cache1Rec].lon = info->lon;
gps_cache1[Cache1Rec].speed = atod(info->speed, strlen(info->speed))*0.514444444*3.6;
gps_cache1[Cache1Rec].udate = atoi(info->udate);
gps_cache1[Cache1Rec].utime = atoi(info->utime);
gps_cache1[Cache1Rec].dir = atoi(info->dir);
sprintf(gps_cache1[Cache1Rec].tid ,"%s",info->tid);
Cache1Rec++;
if(Cache1Rec == TotalRec)
{
sflag = 0;
pthread_attr_init(&attr); // 初始化属性值,均设为默认值
pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); // 设置线程为分离属性
if (pthread_create(&thread, &attr,(void*) OracleProcess,(void*)gps_cache1)) // 创建数据处理线程
lprintf(lfd, FATAL, "oracle pthread_creat error!\n");
Cache1Rec = 0;
}
}
else
{
lprintf(lfd, FATAL, "No cache to use!\n");
return (0);
}
return (1);
}