本文主要是线程的应用——线程池的相关笔记,若笔记中有错误或者不合适的地方,欢迎批评指正😃。
点击查看使用工具及版本
Windows | windows11 |
Ubuntu | Ubuntu16.04的64位版本 |
VMware® Workstation 16 Pro | 16.2.3 build-19376536 |
SecureCRT | Version 8.7.2 (x64 build 2214) - 正式版-2020年5月14日 |
开发板 | 正点原子 i.MX6ULL Linux阿尔法开发板 |
uboot | NXP官方提供的uboot,NXP提供的版本为uboot-imx-rel_imx_4.1.15_2.1.0_ga(使用的uboot版本为U-Boot 2016.03) |
linux内核 | linux-4.15(NXP官方提供) |
STM32开发板 | 正点原子战舰V3(STM32F103ZET6) |
点击查看本文参考资料
点击查看相关文件下载
一、线程池原理
我们需要使用线程的时候就去创建一个线程,不需要就将线程销毁,但是就会有这样一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程都是需要时间的。
这个时候我们就可以使用线程池来解决这个问题了。线程池的主要思想是:在进程开始时创建一定数量的线程,并加到线程池中以等待工作。当服务器收到请求时,它会唤醒池内的一个线程(如果有可用线程),并将需要服务的请求传递给它。一旦线程完成了服务,它会返回到池中再等待工作。如果线程池内没有可用线程,那么服务器会等待,直到有空线程为止。
二、线程池实现
1. 结构体实现
1.1 任务结构体
我们将使用链表构建一个任务队列,每个任务的结构体成员设计如下:
1 2 3 4 5 6
| typedef struct Task { void *(*func)(void *arg); void *taskArg; struct Task *next; } Task;
|
【成员说明】
func
,函数指针变量,可以指向一个返回值为void *
的带有一个void *
类型参数的真实任务函数。
taskArg
:void
类型的指针变量,表示传给执行真实任务的函数的参数。
next
:指针域,指向下一个任务结构体。
1.2线程池结构体
我们也是通过链表来存储线程池中的各个工作线程,每个工作线程的结构体成员设计如下:
1 2 3 4 5 6 7 8 9 10 11
| #define POOL_NUM 10 typedef struct ThreadPool { pthread_mutex_t taskLock; pthread_cond_t newTask;
pthread_t tid[POOL_NUM]; Task *queueHead; int busyWork;
}ThreadPool;
|
1.3两个结构体联系
上边两个结构体的联系如下图所示:
2. 线程池初始化
2.1 实现步骤
- (1)申请线程池的内存空间并判断是否申请成功;
- (2)对线程池的各个参数进行初始化;
- (3)创建线程,这里要注意不要超过系统可创建的最大线程数。
2.2 函数实现
点击查看实现源码
注意先定义一个指向线程池的全局指针变量:
1
| ThreadPool * pool = NULL;
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
int threadPoolInit(void) { int i = 0; pool = (ThreadPool *)malloc(sizeof(ThreadPool)); if( pool == NULL) { printf("ThreadPool malloc failed!\n"); return -1; } pthread_mutex_init(&pool->taskLock,NULL); pthread_cond_init(&pool->newTask,NULL); pool->queueHead = NULL; pool->busyWork=0; for(i = 0; i < POOL_NUM; i++) { pthread_create(&pool->tid[i], NULL, workThread, (void *)i); } return 0; }
|
1
| int threadPoolInit(void);
|
3. 线程开始函数
3.1 实现步骤
线程被创建之后,就开始运行线程开始函数,在线程开始函数中我们要做的事有:
- (1)获取互斥锁
- (2)等待条件变量,就是等待任务队列中有任务产生;
- (3)收到信号时,从任务队列取出一个任务;
- (4)释放互斥锁;
- (5)传入任务参数,执行任务函数;
- (6)执行完毕后,更新线程池中已被占用的线程数量。
3.2 函数实现
点击查看实现源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
void *workThread(void *arg) { Task *ptask; printf("This is %d thread\n", (int)arg); while(1) { pthread_mutex_lock(&pool->taskLock);
pthread_cond_wait(&pool->newTask, &pool->taskLock); printf("Thread [%ld] execution,working[%d]!",pthread_self(), pool->busyWork); ptask = pool->queueHead; pool->queueHead = pool->queueHead->next; pthread_mutex_unlock(&pool->taskLock);
ptask->func(ptask->taskArg); pool->busyWork--; } return (void *)0; }
|
1
| void *workThread(void *arg);
|
4. 添加任务
4.1 实现步骤
- (1)获取互斥锁
- (2)判断线程池中是否还有空闲的线程可以执行任务,若没有则释放互斥锁,休眠等待,进行下一轮判断前要先获取已经释放的互斥锁;
- (3)申请新任务节点内存,并判断是否申请成功;
- (4)对新任务进行初始化;
- (5)添加新任务到任务队列,任务队列是链表,注意插入的方式;
- (6)新任务到来,需要唤醒一个线程,所以这里已占用线程数需要更新一下;
- (7)发送有新任务的信号,至少唤醒一个空闲线程即可。
4.2 函数实现
点击查看实现源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
|
int poolAddTask(int taskArg) { Task * newTask; Task * p;
pthread_mutex_lock(&pool->taskLock);
while(pool->busyWork >= POOL_NUM) { pthread_mutex_unlock(&pool->taskLock); usleep(10000); pthread_mutex_lock(&pool->taskLock); } pthread_mutex_unlock(&pool->taskLock);
newTask = (Task *)malloc(sizeof(Task)); if( newTask == NULL) { printf("newTask malloc failed!\n"); return -1; } newTask->func = realWork; newTask->taskArg = (void *)taskArg;
pthread_mutex_lock(&pool->taskLock); p = pool->queueHead; if(p == NULL) { pool->queueHead = newTask; } else { while(p->next != NULL) { p = p->next; } p->next = newTask;
} pool->busyWork++; pthread_cond_signal(&pool->newTask);
pthread_mutex_unlock(&pool->taskLock);
return 0; }
|
1
| int poolAddTask(int taskArg);
|
5. 任务函数
线程调用的任务函数,这里的休眠时间的延长可以更好的反映实验现象。
点击查看实现源码
1 2 3 4 5 6 7 8 9 10 11 12 13
|
void *realWork(void *taskArg) { printf(" Finish work %d\n",(int)taskArg); sleep(2); return (void *)0; }
|
1
| void *realWork(void *arg);
|
6. 线程池销毁
6.1 销毁步骤
任务全部执行完毕后,进程结束,需要销毁线程池,释放内存:
- (1)释放任务链表所有节点内存;
- (2)销毁互斥锁;
- (3)销毁条件变量,这里需要注意一下,是若有个线程在
pthread_cond_wait
等着,那这个条件变量就不能销毁,高版本的Ubuntu
会卡死,低版本应该是不会的,这也是Ubuntu
的一个优化吧,应该还有其他解决办法可以兼顾,不过自己也是初学,随着后边深入学习,解决了再更新这里吧。
- (4)销毁线程池。
6.2 函数实现
点击查看实现源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
void poolDestory(void) {
Task * p; while(pool->queueHead != NULL) { p = pool->queueHead; pool->queueHead = pool->queueHead->next; free(p); }
pthread_mutex_destroy(&pool->taskLock);
free(pool); printf("free end!\n"); }
|
三、完整实例
1. main.c
点击查看详情
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| #include <stdio.h> #include "pthread_pool.h"
int main(int argc, char *argv[]) { int ret = 0; int i = 0; threadPoolInit(); sleep(1); printf("Start create Task!\n"); for(i = 0; i < 20; i++) { ret = poolAddTask(i); sleep(0.5); if(ret == 0) { printf("Add task[%d] success!\n", i); } else { printf("Add task[%d] failed!\n", i); } } sleep(5); poolDestory(); return 0; }
|
2. pthread_pool.c
点击查看详情
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
| #include "pthread_pool.h"
ThreadPool * pool = NULL;
void *workThread(void *arg) { Task *ptask; printf("This is %d thread\n", (int)arg); while(1) { pthread_mutex_lock(&pool->taskLock); pthread_cond_wait(&pool->newTask, &pool->taskLock); printf("Thread [%ld] execution,working[%d]!",pthread_self(), pool->busyWork); ptask = pool->queueHead; pool->queueHead = pool->queueHead->next; pthread_mutex_unlock(&pool->taskLock);
ptask->func(ptask->taskArg); pool->busyWork--; } return (void *)0; }
int threadPoolInit(void) { int i = 0; pool = (ThreadPool *)malloc(sizeof(ThreadPool)); if( pool == NULL) { printf("ThreadPool malloc failed!\n"); return -1; } pthread_mutex_init(&pool->taskLock,NULL); pthread_cond_init(&pool->newTask,NULL); pool->queueHead = NULL; pool->busyWork=0; for(i = 0; i < POOL_NUM; i++) { pthread_create(&pool->tid[i], NULL, workThread, (void *)i); } return 0; }
int poolAddTask(int taskArg) { Task * newTask; Task * p;
pthread_mutex_lock(&pool->taskLock);
while(pool->busyWork >= POOL_NUM) { pthread_mutex_unlock(&pool->taskLock); usleep(10000); pthread_mutex_lock(&pool->taskLock); } pthread_mutex_unlock(&pool->taskLock);
newTask = (Task *)malloc(sizeof(Task)); if( newTask == NULL) { printf("newTask malloc failed!\n"); return -1; } newTask->func = realWork; newTask->taskArg = (void *)taskArg;
pthread_mutex_lock(&pool->taskLock); p = pool->queueHead; if(p == NULL) { pool->queueHead = newTask; } else { while(p->next != NULL) { p = p->next; } p->next = newTask;
} pool->busyWork++; pthread_cond_signal(&pool->newTask);
pthread_mutex_unlock(&pool->taskLock);
return 0; }
void *realWork(void *taskArg) { printf(" Finish work %d\n",(int)taskArg); sleep(2); return (void *)0; }
void poolDestory(void) {
Task * p; while(pool->queueHead != NULL) { p = pool->queueHead; pool->queueHead = pool->queueHead->next; free(p); }
pthread_mutex_destroy(&pool->taskLock);
free(pool); printf("free end!\n"); }
|
3. pthread_pool.h
点击查看详情
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| #ifndef __THREAD_POOL_H #define __THREAD_POOL_H
#include <stdio.h> #include <pthread.h> #include <unistd.h> #include <string.h> #include <stdlib.h>
typedef struct Task { void *(*func)(void *arg); void *taskArg; struct Task *next; } Task;
#define POOL_NUM 10 typedef struct ThreadPool { pthread_mutex_t taskLock; pthread_cond_t newTask;
pthread_t tid[POOL_NUM]; Task *queueHead; int busyWork;
}ThreadPool;
extern ThreadPool * pool; void *workThread(void *arg); int threadPoolInit(void); int poolAddTask(int taskArg); void *realWork(void *arg); void poolDestory(void); #endif
|