LV05-04-线程应用-01-线程池

本文主要是线程的应用——线程池的相关笔记,若笔记中有错误或者不合适的地方,欢迎批评指正😃。

点击查看使用工具及版本
Windows windows11
Ubuntu Ubuntu16.04的64位版本
VMware® Workstation 16 Pro 16.2.3 build-19376536
SecureCRT Version 8.7.2 (x64 build 2214) - 正式版-2020年5月14日
开发板 正点原子 i.MX6ULL Linux阿尔法开发板
uboot NXP官方提供的uboot,NXP提供的版本为uboot-imx-rel_imx_4.1.15_2.1.0_ga(使用的uboot版本为U-Boot 2016.03)
linux内核 linux-4.15(NXP官方提供)
STM32开发板 正点原子战舰V3(STM32F103ZET6)
点击查看本文参考资料
参考方向 参考原文
------
点击查看相关文件下载
--- ---

一、线程池原理

我们需要使用线程的时候就去创建一个线程,不需要就将线程销毁,但是就会有这样一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程都是需要时间的。

这个时候我们就可以使用线程池来解决这个问题了。线程池的主要思想是:在进程开始时创建一定数量的线程,并加到线程池中以等待工作。当服务器收到请求时,它会唤醒池内的一个线程(如果有可用线程),并将需要服务的请求传递给它。一旦线程完成了服务,它会返回到池中再等待工作。如果线程池内没有可用线程,那么服务器会等待,直到有空线程为止。

image-20230628213732591

二、线程池实现

1. 结构体实现

1.1 任务结构体

我们将使用链表构建一个任务队列,每个任务的结构体成员设计如下:

1
2
3
4
5
6
typedef struct Task
{
void *(*func)(void *arg);/* 函数指针,指向需要执行的任务函数 */
void *taskArg; /* 任务参数,传递给任务函数的参数 */
struct Task *next; /* 指向下一个任务的指针变量 */
} Task;

【成员说明】

  • func,函数指针变量,可以指向一个返回值为void *的带有一个void *类型参数的真实任务函数。
  • taskArgvoid 类型的指针变量,表示传给执行真实任务的函数的参数。
  • next:指针域,指向下一个任务结构体。

1.2线程池结构体

我们也是通过链表来存储线程池中的各个工作线程,每个工作线程的结构体成员设计如下:

1
2
3
4
5
6
7
8
9
10
11
#define POOL_NUM 10 /* 线程池最大线程数 */
typedef struct ThreadPool
{
pthread_mutex_t taskLock;/* 互斥锁 */
pthread_cond_t newTask; /* 条件变量 */

pthread_t tid[POOL_NUM]; /* 工作线程的线程号 tid */
Task *queueHead; /* 任务队列头指针 */
int busyWork; /* 线程池中已使用线程数 */

}ThreadPool;

1.3两个结构体联系

上边两个结构体的联系如下图所示:

image-20220527173459949

2. 线程池初始化

2.1 实现步骤

  • (1)申请线程池的内存空间并判断是否申请成功;
  • (2)对线程池的各个参数进行初始化;
  • (3)创建线程,这里要注意不要超过系统可创建的最大线程数。

2.2 函数实现

点击查看实现源码

注意先定义一个指向线程池的全局指针变量:

1
ThreadPool * pool = NULL;/* 定义一个线程池,需要申请内存 */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* @Function: void threadPoolInit(void)
* @Description: 线程池初始化
* @param : none
* @return : 返回一个整型数值
* 0,成功;
* -1,失败
*/
int threadPoolInit(void)
{
int i = 0;
/* 1.申请线程池的内存空间 */
pool = (ThreadPool *)malloc(sizeof(ThreadPool));
/* 2. 判断是否申请成功 */
if( pool == NULL)
{
printf("ThreadPool malloc failed!\n");
return -1;
}
/* 3. 初始化互斥锁 */
pthread_mutex_init(&pool->taskLock,NULL);
/* 4. 初始化条件变量 */
pthread_cond_init(&pool->newTask,NULL);
/* 5. 初始化链式任务队列指针 */
pool->queueHead = NULL;
/* 6. 初始化线程池已占用线程数量 */
pool->busyWork=0;
/* 7. 创建所有的工作线程 */
for(i = 0; i < POOL_NUM; i++)
{
pthread_create(&pool->tid[i], NULL, workThread, (void *)i);
}
return 0;
}
1
int threadPoolInit(void);/* 线程池初始化 */

3. 线程开始函数

3.1 实现步骤

线程被创建之后,就开始运行线程开始函数,在线程开始函数中我们要做的事有:

  • (1)获取互斥锁
  • (2)等待条件变量,就是等待任务队列中有任务产生;
  • (3)收到信号时,从任务队列取出一个任务;
  • (4)释放互斥锁;
  • (5)传入任务参数,执行任务函数;
  • (6)执行完毕后,更新线程池中已被占用的线程数量。

3.2 函数实现

点击查看实现源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* @Function: void *workThread(void *arg)
* @Description: 线程池中线程的运行函数
* @param arg : 线程创建函数传递给工作线程的参数
* @return : 返回一个void指针类型数据
*/
void *workThread(void *arg)
{
/* 定义一个任务结构体指针变量 */
Task *ptask;
printf("This is %d thread\n", (int)arg);
while(1)
{
/* 获取互斥锁 */
pthread_mutex_lock(&pool->taskLock);

/* 等待条件变量信号(收到信号时会唤醒至少一个线程) */
pthread_cond_wait(&pool->newTask, &pool->taskLock);
printf("Thread [%ld] execution,working[%d]!",pthread_self(), pool->busyWork);
/* 从链式任务队列取出一个任务 */
ptask = pool->queueHead;/* 指向任务队列的头节点 */
pool->queueHead = pool->queueHead->next;/* 头指针指向下一个任务节点 */
/* 解除互斥锁 */
pthread_mutex_unlock(&pool->taskLock);

/* 线程执行任务函数 */
ptask->func(ptask->taskArg);
/* 更新线程池已占用线程数量(任务函数执行完毕后,线程空闲) */
pool->busyWork--;
}
return (void *)0;
}

1
void *workThread(void *arg);/* 线程池中线程的运行函数 */

4. 添加任务

4.1 实现步骤

  • (1)获取互斥锁
  • (2)判断线程池中是否还有空闲的线程可以执行任务,若没有则释放互斥锁,休眠等待,进行下一轮判断前要先获取已经释放的互斥锁;
  • (3)申请新任务节点内存,并判断是否申请成功;
  • (4)对新任务进行初始化;
  • (5)添加新任务到任务队列,任务队列是链表,注意插入的方式;
  • (6)新任务到来,需要唤醒一个线程,所以这里已占用线程数需要更新一下;
  • (7)发送有新任务的信号,至少唤醒一个空闲线程即可。

4.2 函数实现

点击查看实现源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* @Function: int poolAddTask(int taskArg)
* @Description: 向任务队列中添加新的任务节点
* @param taskArg : 递给实际执行的任务函数的参数
* @return : 返回一个整型数值
* 0,成功;
* -1,失败
*/
int poolAddTask(int taskArg)
{
/* 1. 定义两个任务链表结构体指针变量 */
Task * newTask;
Task * p;

pthread_mutex_lock(&pool->taskLock);/* 获取互斥锁 */

/* 2. 判断线程池中是否还有线程可以执行新添加的任务 */
while(pool->busyWork >= POOL_NUM)
{
pthread_mutex_unlock(&pool->taskLock);/* 解除互斥锁 */
usleep(10000);/* us休眠 */
pthread_mutex_lock(&pool->taskLock); /* 获取互斥锁 */
}
pthread_mutex_unlock(&pool->taskLock); /* 解除互斥锁 */

/* 3. 申请新线程任务节点内存 */
newTask = (Task *)malloc(sizeof(Task));
/* 4. 判断是否申请成功 */
if( newTask == NULL)
{
printf("newTask malloc failed!\n");
return -1;
}
/* 5. 对新建的任务节点进行初始化 */
newTask->func = realWork;/* 线程运行函数中所执行的真正的任务内容函数 */
newTask->taskArg = (void *)taskArg;

/* 6. 添加新任务到链式任务队列 */
pthread_mutex_lock(&pool->taskLock);/* 获取互斥锁 */
p = pool->queueHead;
if(p == NULL)/* 判断线程池是否为空 */
{
pool->queueHead = newTask;
}
else
{
while(p->next != NULL)
{
p = p->next;
}
p->next = newTask;/* 新任务添加到线程池链表结尾 */

}
/* 7. 更新已使用线程数 */
pool->busyWork++;/* 已使用线程数加一 */
/* 8. 发送信号到工作线程 */
pthread_cond_signal(&pool->newTask);

pthread_mutex_unlock(&pool->taskLock);/* 解除互斥锁 */

return 0;
}
1
int poolAddTask(int taskArg);/* 向任务队列中添加新的任务节点 */

5. 任务函数

线程调用的任务函数,这里的休眠时间的延长可以更好的反映实验现象。

点击查看实现源码
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* @Function: void *realWork(void *taskArg)
* @Description: 线程池任务执行内容函数
* @param taskArg : 任务参数
* @return : 返回(void *)0
*/
void *realWork(void *taskArg)
{
printf(" Finish work %d\n",(int)taskArg);
sleep(2);
return (void *)0;
}

1
void *realWork(void *arg);/* 线程池中线程执行的任务函数 */

6. 线程池销毁

6.1 销毁步骤

任务全部执行完毕后,进程结束,需要销毁线程池,释放内存:

  • (1)释放任务链表所有节点内存;
  • (2)销毁互斥锁;
  • (3)销毁条件变量,这里需要注意一下,是若有个线程在pthread_cond_wait等着,那这个条件变量就不能销毁,高版本的Ubuntu会卡死,低版本应该是不会的,这也是Ubuntu的一个优化吧,应该还有其他解决办法可以兼顾,不过自己也是初学,随着后边深入学习,解决了再更新这里吧。
  • (4)销毁线程池。

6.2 函数实现

点击查看实现源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* @Function: void poolDestory(void)
* @Description: 线程池销毁
* @param : none
* @return : none
*/
void poolDestory(void)
{

/* 1. 定义一个任务链表结构体指针变量 */
Task * p;
/* 2. 释放线程任务节点 */
while(pool->queueHead != NULL)
{
p = pool->queueHead;
pool->queueHead = pool->queueHead->next;
free(p);
}

/* 释放互斥锁 */
pthread_mutex_destroy(&pool->taskLock);

/* 释放条件变量 */
/* 是因为有个线程在pthread_cond_wait等着,所以不能销毁,高版本的Ubuntu会卡死,低版本应该是不会的,这也是Ubuntu的一个优化把 */
// pthread_cond_destroy(&pool->newTask);

free(pool);
printf("free end!\n");
}
1
void poolDestory(void);/* 线程池销毁 */

三、完整实例

1. main.c

点击查看详情
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/* 头文件 */
#include <stdio.h>
#include "pthread_pool.h"
/* 主函数 */
int main(int argc, char *argv[])
{
int ret = 0;
int i = 0;
threadPoolInit();
sleep(1);
printf("Start create Task!\n");
for(i = 0; i < 20; i++)
{
ret = poolAddTask(i);
sleep(0.5);
if(ret == 0)
{
printf("Add task[%d] success!\n", i);
}
else
{
printf("Add task[%d] failed!\n", i);
}
}

sleep(5);
poolDestory();
return 0;
}

2. pthread_pool.c

点击查看详情
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#include "pthread_pool.h"

ThreadPool * pool = NULL;/* 定义一个线程池,需要申请内存 */
/**
* @Function: void *workThread(void *arg)
* @Description: 线程池中线程的运行函数
* @param arg : 线程创建函数传递给工作线程的参数
* @return : 返回一个void指针类型数据
*/
void *workThread(void *arg)
{
/* 定义一个任务结构体指针变量 */
Task *ptask;
printf("This is %d thread\n", (int)arg);
while(1)
{
/* 获取互斥锁 */
pthread_mutex_lock(&pool->taskLock);

/* 等待条件变量信号(收到信号时会唤醒至少一个线程) */
pthread_cond_wait(&pool->newTask, &pool->taskLock);
printf("Thread [%ld] execution,working[%d]!",pthread_self(), pool->busyWork);
/* 从链式任务队列取出一个任务 */
ptask = pool->queueHead;/* 指向任务队列的头节点 */
pool->queueHead = pool->queueHead->next;/* 头指针指向下一个任务节点 */
/* 解除互斥锁 */
pthread_mutex_unlock(&pool->taskLock);

/* 线程执行任务函数 */
ptask->func(ptask->taskArg);
/* 更新线程池已占用线程数量(任务函数执行完毕后,线程空闲) */
pool->busyWork--;
}
return (void *)0;
}

/**
* @Function: void threadPoolInit(void)
* @Description: 线程池初始化
* @param : none
* @return : 返回一个整型数值
* 0,成功;
* -1,失败
*/
int threadPoolInit(void)
{
int i = 0;
/* 1.申请线程池的内存空间 */
pool = (ThreadPool *)malloc(sizeof(ThreadPool));
/* 2. 判断是否申请成功 */
if( pool == NULL)
{
printf("ThreadPool malloc failed!\n");
return -1;
}
/* 3. 初始化互斥锁 */
pthread_mutex_init(&pool->taskLock,NULL);
/* 4. 初始化条件变量 */
pthread_cond_init(&pool->newTask,NULL);
/* 5. 初始化链式任务队列指针 */
pool->queueHead = NULL;
/* 6. 初始化线程池已占用线程数量 */
pool->busyWork=0;
/* 7. 创建所有的工作线程 */
for(i = 0; i < POOL_NUM; i++)
{
pthread_create(&pool->tid[i], NULL, workThread, (void *)i);
}
return 0;
}

/**
* @Function: int poolAddTask(int taskArg)
* @Description: 向任务队列中添加新的任务节点
* @param taskArg : 递给实际执行的任务函数的参数
* @return : 返回一个整型数值
* 0,成功;
* -1,失败
*/
int poolAddTask(int taskArg)
{
/* 1. 定义两个任务链表结构体指针变量 */
Task * newTask;
Task * p;

pthread_mutex_lock(&pool->taskLock);/* 获取互斥锁 */

/* 2. 判断线程池中是否还有线程可以执行新添加的任务 */
while(pool->busyWork >= POOL_NUM)
{
pthread_mutex_unlock(&pool->taskLock);/* 解除互斥锁 */
usleep(10000);/* us休眠 */
pthread_mutex_lock(&pool->taskLock); /* 获取互斥锁 */
}
pthread_mutex_unlock(&pool->taskLock); /* 解除互斥锁 */

/* 3. 申请新线程任务节点内存 */
newTask = (Task *)malloc(sizeof(Task));
/* 4. 判断是否申请成功 */
if( newTask == NULL)
{
printf("newTask malloc failed!\n");
return -1;
}
/* 5. 对新建的任务节点进行初始化 */
newTask->func = realWork;/* 线程运行函数中所执行的真正的任务内容函数 */
newTask->taskArg = (void *)taskArg;

/* 6. 添加新任务到链式任务队列 */
pthread_mutex_lock(&pool->taskLock);/* 获取互斥锁 */
p = pool->queueHead;
if(p == NULL)/* 判断线程池是否为空 */
{
pool->queueHead = newTask;
}
else
{
while(p->next != NULL)
{
p = p->next;
}
p->next = newTask;/* 新任务添加到线程池链表结尾 */

}
/* 7. 更新已使用线程数 */
pool->busyWork++;/* 已使用线程数加一 */
/* 8. 发送信号到工作线程 */
pthread_cond_signal(&pool->newTask);

pthread_mutex_unlock(&pool->taskLock);/* 解除互斥锁 */

return 0;
}

/**
* @Function: void *realWork(void *taskArg)
* @Description: 线程池任务执行内容函数
* @param taskArg : 任务参数
* @return : 返回(void *)0
*/
void *realWork(void *taskArg)
{
printf(" Finish work %d\n",(int)taskArg);
sleep(2);
return (void *)0;
}

/**
* @Function: void poolDestory(void)
* @Description: 线程池销毁
* @param : none
* @return : none
*/
void poolDestory(void)
{

/* 1. 定义一个任务链表结构体指针变量 */
Task * p;
/* 2. 释放线程任务节点 */
while(pool->queueHead != NULL)
{
p = pool->queueHead;
pool->queueHead = pool->queueHead->next;
free(p);
}

/* 释放互斥锁 */
pthread_mutex_destroy(&pool->taskLock);

/* 释放条件变量 */
/* 是因为有个线程在pthread_cond_wait等着,所以不能销毁,高版本的Ubuntu会卡死,低版本应该是不会的,这也是Ubuntu的一个优化把 */
// pthread_cond_destroy(&pool->newTask);

free(pool);
printf("free end!\n");
}

3. pthread_pool.h

点击查看详情
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#ifndef __THREAD_POOL_H
#define __THREAD_POOL_H
/* 头文件 */
#include <stdio.h>
#include <pthread.h>/* pthread_create pthread_exit pthread_cancel pthread_self */
#include <unistd.h> /* sleep */
#include <string.h> /* strerror */
#include <stdlib.h> /* malloc */

/* 任务结构体定义 */
typedef struct Task
{
void *(*func)(void *arg);/* 函数指针,指向需要执行的任务函数 */
void *taskArg; /* 任务参数,传递给任务函数的参数 */
struct Task *next; /* 指向下一个任务的指针变量 */
} Task;

/* 线程池结构体的定义 */
#define POOL_NUM 10 /* 线程池最大线程数 */
typedef struct ThreadPool
{
pthread_mutex_t taskLock;/* 互斥变量 */
pthread_cond_t newTask; /* 条件变量 */

pthread_t tid[POOL_NUM]; /* 线程池中线程的线程号 tid */
Task *queueHead; /* 指向任务队列队头的指针 */
int busyWork; /* 线程池已在执行任务的线程数 */

}ThreadPool;

extern ThreadPool * pool;
void *workThread(void *arg);/* 线程池中线程的运行函数 */
int threadPoolInit(void);/* 线程池初始化 */
int poolAddTask(int taskArg);/* 向任务队列中添加新的任务节点 */
void *realWork(void *arg);/* 线程池中线程执行的任务函数 */
void poolDestory(void);/* 线程池销毁 */
#endif