多线程编程

08月31日 收藏 0 评论 2 java开发

多线程编程

转载声明:文章来源http://blog.chinaunix.net/uid-26833883-id-3224261.html

一、线程的基本概念

进程(process)和文件(files)是unix/linux操作系统两个最基本的抽象。进程是处于执行期的程序和它所包含的资源的总和,也就是说一个进程就是处于执行期的程序。一个线程(thread)就是运行在一个进程上下文中的一个逻辑流,不难看出,线程是进程中最基本的活动对象。

在传统的系统中,一个进程只包含有一个线程。但在现代操作系统中,允许一个进程里面可以同时运行多个线程,这类程序就被称为多线程程序。所有的程序都有一个主线程(main thread),主线程是进程的控制流或执行线程。在多线程程序中,主线程可以创建一个或多个对等线程(peer thread),从这个时间点开始,这些线程就开始并发执行。主线程和对等线程的区别仅在于主线程总是进程中的第一个运行的线程。从某种程度上看,线程可以看作是轻量级的进程。在linux操作系统中,内核调度的基本对象时线程,而不是进程,所以进程中的多个线程将由内核自动调度。

每个线程都拥有独立的线程上下文(thread context),线程ID(thread ID,TID),程序计数器(pc),线程栈(stack)。其中,内核正是通过线程ID(TID)来识别线程,进行线程调度的。

二、线程与进程的异同点

A.相同点

<1> 比如都具有ID,一组寄存器,状态,优先级以及所要遵循的调度策略。
<2>每个进程都有一个进程控制块,线程也拥有一个线程控制块(在Linux内核,线程控制块与进程控制块用同一个结构体描述,即struct task_struct),这个控制块包含线程的一些属性信息,操作系统使用这些属性信息来描述线程。
<3>线程和子进程的创建者可以在线程和子进程上实行某些控制,比如,创建者可以取消、挂起、继续和修改线程和子进程的优先级。

B.不同点

<1>主要区别:每个进程都拥有自己的地址空间,但线程没有自己独立的地址空间,而是运行在一个进程里的所有线程共享该进程的整个虚拟地址空间。
<2>线程的上下文切换时间开销比进程上下文切换时间开销要小的多
<3>线程的创建开销远远小于进程的创建
<4>子进程拥有父进程的地址空间和数据段的拷贝,因此当子进程修改它的变量和数据时,它不会影响父进程中的数据,但线程可以直接访问它进程中的数据段。
<5>进程之间通讯必须使用进程间通讯机制,但线程可以与进程中的其他线程直接通讯
<6>线程可以对同一进程中的其他线程实施大量控制,但进程只能对子进程实施控制
<7>改变主线程的属性可能影响进程中其他的线程,但对父进程的修改不影响子进程

三、线程相关的API

A.线程的创建

参数说明:

thread:指向pthread_t类型的指针,该地址将存放线程创建成功之后的线程TID。
attr:用户设置线程的属性,一般都不需要特殊设置,所以可简单设置为NULL。
*(*start_routine)(void *):传递新线程所要执行的函数的地址。
arg:新线程所有执行的函数的参数。

调用成功,则返回值是0,如果失败则返回错误代码。

注意:我们可以看到,如果想启动一个线程,就必须让这个线程关联一个子函数。我们一般称此函数为线程函数,但是我们又发现,在我们给线程函数传参数时,标准线程创建接口只留了 一个参数传递。思考?如果想给线程函数传递多个参数,该怎么解决呢?

B.线程终止

参数说明 :
value_ptr指向线程放回的某个对象

线程通过调用pthread_exit函数终止执行,并带回指向某个对象的指针。

注意:绝不能用它返回一个指向局部变量的指针,因为线程调用该函数后,这个局部变量就不存在了,这将引起严重的程序漏洞。

C.等待线程终止

一般此函数用在主线程中,等待通过thread指定的线程终止,此函数调用成功,可以通过value_ptr获取终止线程的返回值。
注意:如果等待的线程没有终止,此函数将引起调用者阻塞。成功返回0,失败返回-1。

D.线程取消

如果想取消一个正在运行的线程,可以调用此函数。
参数说明:
thread:要取消的线程
函数返回值:成功返回0,失败返回-1
案例一探究:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>

char buf[] = "Hello word";

void *pthread_function1(void *arg)
{
while(1)
{
sleep(2);
printf("pthread_function : %s.\n",buf);
}
}

void *pthread_function2(void *arg)
{
int i = 0;

for(i = 0;i < strlen(buf);i ++)
{
*(int *)arg += 1;
sleep(1);
}

pthread_exit("pthread_function2 over");
}

int main()
{
pthread_t tid1,tid2;
int count = 0;
void *value_ptr;

if(pthread_create(&tid1,NULL,pthread_function1,NULL) != 0)
{
perror("Fail to pthread create");
return -1;
}else{
printf("create pthread %lu.\n",tid1);
}

if(pthread_create(&tid2,NULL,pthread_function2,(void *)&count) != 0)
{
perror("Fail to pthread create");
exit(EXIT_FAILURE);
}else{
printf("create pthread %lu.\n",tid2);
}

if(pthread_join(tid2,&value_ptr) < 0)
{
perror("Fail to pthread_join");
exit(EXIT_FAILURE);
}

printf("pthread %lu join success,return string : %s.\n",tid1,(char *)value_ptr);

if((pthread_cancel(tid1)) < 0)
{
perror("Fail to pthread cancel");
exit(EXIT_FAILURE);
}

printf("pthread %lu cancel success.\n",tid2);
printf("main pthread sleeping...\n");
sleep(5);

return 0;
}

在主线程中,创建了两个子线程tid1,tid2。
tid1线程每隔2秒钟打印一次全局数组buf的内容。
tid2线程没1秒统计一下buf数组中的字符,统计结束后调用pthread_exit退出。

运行结果如下:
可以看出,在调用pthread_cancel后,对应的线程就取消了。

案例二

有一buf[] = "hello word.\n";
创建两个线程A、B;
A线程将buf的内容向文件 test写5此;
B线程读取test文件五次,将读取的内容在终端上进行打印

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#define MAX 50

char buf[] = "hello word.\n";

void *write_file(void *file_name)

int fd;
int i = 0;

if((fd = open((char *)file_name,O_CREAT | O_TRUNC | O_APPEND | O_WRONLY,0666)) < 0)
{
fprintf(stderr,"Fail to open %s : %s.\n",(char *)file_name,strerror(errno));
pthread_exit(NULL);
}

for(i = 0;i < 5;i ++)
{
write(fd,buf,strlen(buf));
}

pthread_exit(NULL);
}

void *read_file(void *file_name)
{
int fd;
int n = 0,i = 0;
char buf[MAX];

if((fd = open((char *)file_name,O_CREAT | O_RDONLY,0666)) < 0)
{
fprintf(stderr,"Fail to open %s : %s.\n",(char *)file_name,strerror(errno));
pthread_exit(NULL);
}

for(i = 0;i < 5;i ++)
{
n = read(fd,buf,sizeof(buf));
buf[n] = '\0';
printf("read %d : %s\n",n,buf);
}

pthread_exit(NULL);
}

int main(int argc,char *argv[])
{
int res;
pthread_t tid1,tid2;
void *arg = argv[1];

if(argc < 2)
{
fprintf(stderr,"usage : %s argv[1].\n",argv[0]);
return -1;
}

res = pthread_create(&tid1,NULL,write_file,arg);
if(res != 0)
{
perror("Fail to pthread create");
exit(EXIT_FAILURE);
}

res = pthread_create(&tid2,NULL,read_file,arg);
if(res != 0)
{
perror("Fail to pthread create");
exit(EXIT_FAILURE);
}

pthread_join(tid1,NULL);
pthread_join(tid2,NULL);

exit(EXIT_SUCCESS);
}

运行结果如下:

什么玩意?和我猜想的结果不一样呀。思考?为什么是这种情况。
我们更希望,在A线程写完后,B线程读。B线程读完后,A线程在写。想要达到这种效果,我们可以通过线程的同步和互斥。

四、线程的同步

A.线程间机制

<1>多线程共享同一个进程的地址空间
<2>有点:线程间很容易进行通信,通过全局变量实现数据共享和交换
<3>缺点:多个线程同时访问共享对象时需要引入同步和互斥机制

B.线程间同步 - P/V操作

<1>信号量代表某一类资源,其值表示系统中该资源的数量
<2>信号量是 一个受保护的变量,只能通过三种操作来访问
a.初始化
b.P操作(申请资源)
c.v操作(释放资源)
<3>信号量的值为非负整数,其值为0时,表示当前系统中无此类资源
P(s)含义如下:
if(信号量的值大于0)
{
申请资源的任务继续运行;
信号量的值减一;
}else{
申请资源的任务阻塞;
}
V(s)含义如下:
if(没有任务在等待该资源)
{
信号量的值加一;
}else{
唤醒第一个等待的任务,让其继续运行;
}
注意:一个任务申请资源时有可能被阻塞,一个任务释放资源时一定不会被阻塞
C.posix中定义了两类信号量
<1>无名信号量(基于内存的信号量)
<2>有名信号量(Linux只实现了无名信号量)
D.pthread库中常用的信号量操作函数
<1>信号量初始化
sem_init()初始化一个定位在sem的匿名信号量。value参数指定信号量的初始值。pshared参数指明信号量是由进程内线程共享,还是由进程之间共享。如果pshared的值为0,那么信号量将被进程内的线程共享,并且应该设置在所有线程都可以看见的地址上(如全局变量,或者堆上动态分配的变量)。

如果pshared是非零值,那么信号量将在进程之间共享,并且应该定位共享内存区域。因为fork()创建的孩子继承其父亲的内存映射,因此它也可以见到这个信号量。所有可以访问共享内存区域的进程都可以用sem_post、sem_wait操作信号量。

参数说明:
sem :信号量对象
pshared:控制信号量的类型,0表示这个信号量时当前进程的局部信号量,否则,这个信号量就可以在多个进程间共享。
value:信号量的初始值(即资源的个数);
<2>p操作,即申请资源
sem_wait的作用是以原子操作的方式给信号量的值减1,但它会等到信号量非0时才会开始减法操作。如果此时信号量的值为0,这个函数就会等待,直到有线程增加了该信号量的值使其不再为0。
<2>V操作,即释放资源
sem_post的作用是以原子操作的方式给信号量的值加1。
<3>信号量销毁
这个函数的作用是,用完信号量后对它进行清理,清理该信号量所拥有的资源。

案例三、一个线程输入,两个线程读(读完清除buf的内容),实现同步操作

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <semaphore.h>

#define MAX 100
#define N 2

char share_buf[MAX];

sem_t rsem,wsem;

void* read_share_buf(void *arg)
{
int num = *((int *)arg);

printf("Create pthread %d success.\n",*((int *)arg));

while(1)
{
if(sem_wait(&rsem) < 0)
{
fprintf(stderr,"%d fail to sem wait : %s.\n",num,strerror(errno));
pthread_exit(NULL);
}

printf("pthread %d read buf : %s.\n",num,share_buf);
memset(share_buf,0,sizeof(share_buf));

if(sem_post(&wsem) < 0)
{
fprintf(stderr,"%d fail to sem post : %s.\n",num,strerror(errno));
pthread_exit(NULL);
}
}

}

int do_work()
{

while(1)
{
if(sem_wait(&wsem) < 0)
{
perror("Fail to sem wait");
exit(EXIT_FAILURE);
}

printf(">");
fgets(share_buf,MAX,stdin);
share_buf[strlen(share_buf) - 1] = '\0';

if(strncmp(share_buf,"quit",4) == 0)
{
break;
}

if(sem_post(&rsem) < 0)
{
perror("Fail to sem post");
exit(EXIT_FAILURE);
}
}

return 0;
}

int main()
{
int res,i;
pthread_t tid[N];

if(sem_init(&rsem,0,0) < 0)
{
perror("Fail to sem_init");
exit(EXIT_FAILURE);
}

if(sem_init(&wsem,0,1) < 0)
{
perror("Fail to sem_init");
exit(EXIT_FAILURE);
}

for(i = 0;i < N;i ++)
{
res = pthread_create(&tid[i],NULL,read_share_buf,(void *)&i);
usleep(500);
if(res != 0)
{
perror("Fail to create pthread");
exit(EXIT_FAILURE);
}
}

do_work();

for(i = 0;i < N;i ++)
{
pthread_cancel(tid[i]);
}

if(sem_destroy(&rsem) < 0)
{
perror("Fail to sem destroy rsem");
exit(EXIT_FAILURE);
}

if(sem_destroy(&wsem) < 0)
{
perror("Fail to sem destroy wsem");
exit(EXIT_FAILURE);
}

exit(EXIT_SUCCESS);
}

运行结果如下:

五、线程的互斥锁
<1>引入互斥(mutual exclusion)锁的目的是用来保证共享数据操作的完整性。
<2>互斥锁主要用来保护临界资源。
<3>每个临界资源都由一个互斥锁来保护,任何时刻最多只能有一个线程能访问该资源。
<4>线程必须先获得互斥锁才能访问临界资源,访问完临界资源后释放该锁。如果无法获得锁,线程会阻塞直到获得锁为止。

A、锁的初始化
当我们定义一个锁时我们应当对它初始化。

a.静态初始化可以通过PTHREAD_MUTEX_INITIALIZER宏
b.动态初始化可以通过pthread_mutex_init函数进行,一般在通过pthread_mutex_init初始化的锁,在不需要时应调用pthread_mutex_destroy销毁。

B.获得互斥锁和释放互斥锁

a.我们可以通过pthread_mutex_lock来获得锁

注意: 如果想获得的锁已经被别的线程获取了,此时pthread_mutex_lock将引起调用者阻塞

pthread_mutex_trylock我们称尝试获得锁,如果不能获得锁,它不会引起调用者阻塞而是立即放回。

b.我们可以通过pthread_mutex_unlock来释放获得的锁

案例四、通过互斥锁对文件进行读写操作,一个线程读,一个线程写。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#define MAX 100

pthread_mutex_t rwlock = PTHREAD_MUTEX_INITIALIZER;

void *read_file(void *arg)
{
int fd;
int n;
char buf[MAX];

if((fd = open((char *)arg,O_RDONLY | O_CREAT,0666)) < 0)
{
fprintf(stderr,"Fail to open %s : %s.\n",(char *)arg,strerror(errno));
pthread_exit(NULL);
}

while(1)
{
if(pthread_mutex_lock(&rwlock) < 0)
{
perror("Fail to pthread mutex lock");
pthread_exit(NULL);
}

n = read(fd,buf,MAX);
buf[n] = '\0';

printf("Read %d character : %s.\n",n,buf);

if(strncmp(buf,"quit",4) == 0)
{
if(pthread_mutex_unlock(&rwlock) < 0)
{
perror("Fail to pthread mutex unlock");
pthread_exit(NULL);
}

break;
}

if(pthread_mutex_unlock(&rwlock) < 0)
{
perror("Fail to pthread mutex unlock");
pthread_exit(NULL);
}

sleep(1);
}

close(fd);
pthread_exit(NULL);
}

void *write_file(void *arg)
{
int fd;
char buf[MAX];

if((fd = open((char *)arg,O_CREAT | O_TRUNC | O_WRONLY)) < 0)
{
fprintf(stderr,"Fail to open %s : %s.\n",(char *)arg,strerror(errno));
pthread_exit(NULL);
}

while(1)
{
printf(">");
if(pthread_mutex_lock(&rwlock) < 0)
{
perror("Fail to pthread mutex lock");
pthread_exit(NULL);
}

fgets(buf,sizeof(buf),stdin);
buf[strlen(buf)-1] = '\0';
write(fd,buf,strlen(buf));

if(strncmp(buf,"quit",4) == 0)
{
if(pthread_mutex_unlock(&rwlock) < 0)
{
perror("Fail to pthread mutex unlock");
pthread_exit(NULL);
}

break;
}

if(pthread_mutex_unlock(&rwlock) < 0)
{
perror("Fail to pthread mutex unlock");
pthread_exit(NULL);
}

sleep(1);
}

close(fd);
pthread_exit(NULL);
}

int main(int argc,char *argv[])
{
pthread_t rtid,wtid;
int res;

if(argc < 2)
{
fprintf(stderr,"usage : %s argv[1].\n",argv[0]);
exit(EXIT_FAILURE);
}

res = pthread_create(&rtid,NULL,read_file,(void *)argv[1]);
if(res != 0){
perror("Fail to create pthread");
exit(EXIT_FAILURE);
}

res = pthread_create(&wtid,NULL,write_file,(void *)argv[1]);
if(res != 0){
perror("Fail to create pthread");
exit(EXIT_FAILURE);
}

pthread_join(wtid,NULL);
pthread_join(rtid,NULL);

exit(EXIT_SUCCESS);
}

运行结果如下:

上面的代码,读线程和写线程每次操作完之后都用了一下sleep(1),如果不这样做想想后果会则样?
其实我们更希望,当读线程发现文件中并没有数据后,就进行阻塞.当写线程写完数据之后就去通知读线程进行读操作。
要想实现这种异步通知的方法,我们就可以通过线程的条件变量来实现。

六、线程的条件变量

A.条件变量定义及初始化

和线程的互斥锁一样,我们也有两种方式初始化条件变量;
a.静态初始化:可以通过PTHREAD_COND_INITIALIZER初始化。
b.动态初始化:可以通过pthread_cond_init进行,如果不需要在使用此条件变量,也应该调用pthread_cond_destroy销毁

B.等待条件变量

参数说明:
第一个参数是等待的条件变量,第二个参数在是线程互斥锁。

pthread_cond_wait函数使用时一般都和互斥锁一起用,他们一起完成线程的同步。

注意:pthread_cond_wait函数一上来会判断等待的条件是否满足,如果不满足将引起调用的线程阻塞,直到条件满足。需要注意的时,pthread_cond_wait函数在阻塞前会释放互斥锁,这就要求在调用它之前应先获取互斥锁
(何时条件满足?当有线程调用pthread_cond_signal或pthread_cond_broadcase时,等待的条件满足,此时pthread_cond_wait函数将返回,返回时它将重新获取互斥锁)

C.唤醒等待条件变量的线程

pthread_cond_broadcase函数唤醒所有等待条件变量的线程。
pthread_cond_signal函数唤醒所有等待条件变量线程中的一个线程,一般唤醒的是第一个等待条件变量的线程。

案例5.异步通知读写

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#define MAX 100

pthread_mutex_t rwlock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t rwcond = PTHREAD_COND_INITIALIZER;

void *read_file(void *arg)
{
int fd;
int n;
char buf[MAX];

if((fd = open((char *)arg,O_RDONLY | O_CREAT | O_TRUNC,0666)) < 0)
{
fprintf(stderr,"Fail to open %s : %s.\n",(char *)arg,strerror(errno));
pthread_exit(NULL);
}

while(1)
{
if(pthread_mutex_lock(&rwlock) < 0)
{
perror("Fail to pthread mutex lock");
pthread_exit(NULL);
}

while((n = read(fd,buf,MAX)) == 0)
{
if(pthread_cond_wait(&rwcond,&rwlock) < 0)
{
perror("Fail to pthread cond wait");
pthread_exit(NULL);
}
}

buf[n] = '\0';

printf("Read %d character : %s.\n",n,buf);

if(strncmp(buf,"quit",4) == 0)
{
if(pthread_mutex_unlock(&rwlock) < 0)
{
perror("Fail to pthread mutex unlock");
pthread_exit(NULL);
}

break;
}

if(pthread_mutex_unlock(&rwlock) < 0)
{
perror("Fail to pthread mutex unlock");
pthread_exit(NULL);
}

}

close(fd);
pthread_exit(NULL);
}

void *write_file(void *arg)
{
int fd;
char buf[MAX];

if((fd = open((char *)arg,O_CREAT | O_TRUNC | O_WRONLY,0666)) < 0)
{
fprintf(stderr,"Fail to open %s : %s.\n",(char *)arg,strerror(errno));
pthread_exit(NULL);
}

while(1)
{
if(pthread_mutex_lock(&rwlock) < 0)
{
perror("Fail to pthread mutex lock");
pthread_exit(NULL);
}

printf(">");
fgets(buf,sizeof(buf),stdin);
buf[strlen(buf)-1] = '\0';
write(fd,buf,strlen(buf));

if(strncmp(buf,"quit",4) == 0)
{
if(pthread_mutex_unlock(&rwlock) < 0)
{
perror("Fail to pthread mutex unlock");
pthread_exit(NULL);
}

if(pthread_cond_signal(&rwcond) < 0)
{
perror("Fail to cond signal");
pthread_exit(NULL);
}

usleep(500);

break;
}

if(pthread_mutex_unlock(&rwlock) < 0)
{
perror("Fail to pthread mutex unlock");
pthread_exit(NULL);
}

if(pthread_cond_signal(&rwcond) < 0)
{
perror("Fail to cond signal");
pthread_exit(NULL);
}

usleep(500);
}

close(fd);
pthread_exit(NULL);
}

int main(int argc,char *argv[])
{
pthread_t rtid,wtid;
int res;

if(argc < 2)
{
fprintf(stderr,"usage : %s argv[1].\n",argv[0]);
exit(EXIT_FAILURE);
}

res = pthread_create(&rtid,NULL,read_file,(void *)argv[1]);
if(res != 0){
perror("Fail to create pthread");
exit(EXIT_FAILURE);
}

usleep(500);

res = pthread_create(&wtid,NULL,write_file,(void *)argv[1]);
if(res != 0){
perror("Fail to create pthread");
exit(EXIT_FAILURE);
}

pthread_join(wtid,NULL);
pthread_join(rtid,NULL);

exit(EXIT_SUCCESS);
}

运行结果:

注意:pthread_cond_signal要加上少量的延迟,因为内核需要时间通知等待条件变量的线程,pthread_cond_wait返回时需要重新获取锁,也需要时间。

C 2条回复 评论
山田心的平方

准备三刷这节课!

发表于 2022-09-05 21:00:00
0 0
壁虎极点多

看过之后很多感触,唯有谢谢最简单也最真诚

发表于 2022-04-18 22:00:00
0 0