本文主要是线程的应用——线程池的相关笔记,若笔记中有错误或者不合适的地方,欢迎批评指正😃。
点击查看使用工具及版本
Windows | windows11 |
Ubuntu | Ubuntu16.04的64位版本 |
VMware® Workstation 16 Pro | 16.2.3 build-19376536 |
SecureCRT | Version 8.7.2 (x64 build 2214) - 正式版-2020年5月14日 |
开发板 | 正点原子 i.MX6ULL Linux阿尔法开发板 |
uboot | NXP官方提供的uboot,NXP提供的版本为uboot-imx-rel_imx_4.1.15_2.1.0_ga(使用的uboot版本为U-Boot 2016.03) |
linux内核 | linux-4.15(NXP官方提供) |
STM32开发板 | 正点原子战舰V3(STM32F103ZET6) |
点击查看本文参考资料
点击查看相关文件下载
一、线程池原理
我们需要使用线程的时候就去创建一个线程,不需要就将线程销毁,但是就会有这样一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程都是需要时间的。
这个时候我们就可以使用线程池来解决这个问题了。线程池的主要思想是:在进程开始时创建一定数量的线程,并加到线程池中以等待工作。当服务器收到请求时,它会唤醒池内的一个线程(如果有可用线程),并将需要服务的请求传递给它。一旦线程完成了服务,它会返回到池中再等待工作。如果线程池内没有可用线程,那么服务器会等待,直到有空线程为止。
二、线程池实现
1. 结构体实现
1.1 任务结构体
我们将使用链表构建一个任务队列,每个任务的结构体成员设计如下:
1 2 3 4 5 6
| typedef struct Task { void *(*func)(void *arg); void *taskArg; struct Task *next; } Task;
|
【成员说明】
func
,函数指针变量,可以指向一个返回值为void *
的带有一个void *
类型参数的真实任务函数。
taskArg
:void
类型的指针变量,表示传给执行真实任务的函数的参数。
next
:指针域,指向下一个任务结构体。
1.2线程池结构体
我们也是通过链表来存储线程池中的各个工作线程,每个工作线程的结构体成员设计如下:
1 2 3 4 5 6 7 8 9 10 11
| #define POOL_NUM 10 typedef struct ThreadPool { pthread_mutex_t taskLock; pthread_cond_t newTask;
pthread_t tid[POOL_NUM]; Task *queueHead; int busyWork;
}ThreadPool;
|
1.3两个结构体联系
上边两个结构体的联系如下图所示:
2. 线程池初始化
2.1 实现步骤
- (1)申请线程池的内存空间并判断是否申请成功;
- (2)对线程池的各个参数进行初始化;
- (3)创建线程,这里要注意不要超过系统可创建的最大线程数。
2.2 函数实现
点击查看实现源码
注意先定义一个指向线程池的全局指针变量:
1
| ThreadPool * pool = NULL;
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
int threadPoolInit(void) { int i = 0; pool = (ThreadPool *)malloc(sizeof(ThreadPool)); if( pool == NULL) { printf("ThreadPool malloc failed!\n"); return -1; } pthread_mutex_init(&pool->taskLock,NULL); pthread_cond_init(&pool->newTask,NULL); pool->queueHead = NULL; pool->busyWork=0; for(i = 0; i < POOL_NUM; i++) { pthread_create(&pool->tid[i], NULL, workThread, (void *)i); } return 0; }
|
1
| int threadPoolInit(void);
|
3. 线程开始函数
3.1 实现步骤
线程被创建之后,就开始运行线程开始函数,在线程开始函数中我们要做的事有:
- (1)获取互斥锁
- (2)等待条件变量,就是等待任务队列中有任务产生;
- (3)收到信号时,从任务队列取出一个任务;
- (4)释放互斥锁;
- (5)传入任务参数,执行任务函数;
- (6)执行完毕后,更新线程池中已被占用的线程数量。
3.2 函数实现
点击查看实现源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
void *workThread(void *arg) { Task *ptask; printf("This is %d thread\n", (int)arg); while(1) { pthread_mutex_lock(&pool->taskLock);
pthread_cond_wait(&pool->newTask, &pool->taskLock); printf("Thread [%ld] execution,working[%d]!",pthread_self(), pool->busyWork); ptask = pool->queueHead; pool->queueHead = pool->queueHead->next; pthread_mutex_unlock(&pool->taskLock);
ptask->func(ptask->taskArg); pool->busyWork--; } return (void *)0; }
|
1
| void *workThread(void *arg);
|
4. 添加任务
4.1 实现步骤
- (1)获取互斥锁
- (2)判断线程池中是否还有空闲的线程可以执行任务,若没有则释放互斥锁,休眠等待,进行下一轮判断前要先获取已经释放的互斥锁;
- (3)申请新任务节点内存,并判断是否申请成功;
- (4)对新任务进行初始化;
- (5)添加新任务到任务队列,任务队列是链表,注意插入的方式;
- (6)新任务到来,需要唤醒一个线程,所以这里已占用线程数需要更新一下;
- (7)发送有新任务的信号,至少唤醒一个空闲线程即可。
4.2 函数实现
点击查看实现源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
|
int poolAddTask(int taskArg) { Task * newTask; Task * p;
pthread_mutex_lock(&pool->taskLock);
while(pool->busyWork >= POOL_NUM) { pthread_mutex_unlock(&pool->taskLock); usleep(10000); pthread_mutex_lock(&pool->taskLock); } pthread_mutex_unlock(&pool->taskLock);
newTask = (Task *)malloc(sizeof(Task)); if( newTask == NULL) { printf("newTask malloc failed!\n"); return -1; } newTask->func = realWork; newTask->taskArg = (void *)taskArg;
pthread_mutex_lock(&pool->taskLock); p = pool->queueHead; if(p == NULL) { pool->queueHead = newTask; } else { while(p->next != NULL) { p = p->next; } p->next = newTask;
} pool->busyWork++; pthread_cond_signal(&pool->newTask);
pthread_mutex_unlock(&pool->taskLock);
return 0; }
|
1
| int poolAddTask(int taskArg);
|
5. 任务函数
线程调用的任务函数,这里的休眠时间的延长可以更好的反映实验现象。
点击查看实现源码
1 2 3 4 5 6 7 8 9 10 11 12 13
|
void *realWork(void *taskArg) { printf(" Finish work %d\n",(int)taskArg); sleep(2); return (void *)0; }
|
1
| void *realWork(void *arg);
|
6. 线程池销毁
6.1 销毁步骤
任务全部执行完毕后,进程结束,需要销毁线程池,释放内存:
- (1)释放任务链表所有节点内存;
- (2)销毁互斥锁;
- (3)销毁条件变量,这里需要注意一下,是若有个线程在
pthread_cond_wait
等着,那这个条件变量就不能销毁,高版本的Ubuntu
会卡死,低版本应该是不会的,这也是Ubuntu
的一个优化吧,应该还有其他解决办法可以兼顾,不过自己也是初学,随着后边深入学习,解决了再更新这里吧。
- (4)销毁线程池。
6.2 函数实现
点击查看实现源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
void poolDestory(void) {
Task * p; while(pool->queueHead != NULL) { p = pool->queueHead; pool->queueHead = pool->queueHead->next; free(p); }
pthread_mutex_destroy(&pool->taskLock);
free(pool); printf("free end!\n"); }
|
三、完整实例
1. main.c
点击查看详情
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| #include <stdio.h> #include "pthread_pool.h"
int main(int argc, char *argv[]) { int ret = 0; int i = 0; threadPoolInit(); sleep(1); printf("Start create Task!\n"); for(i = 0; i < 20; i++) { ret = poolAddTask(i); sleep(0.5); if(ret == 0) { printf("Add task[%d] success!\n", i); } else { printf("Add task[%d] failed!\n", i); } } sleep(5); poolDestory(); return 0; }
|
2. pthread_pool.c
点击查看详情

| #include "pthread_pool.h"
ThreadPool * pool = NULL;
void *workThread(void *arg) { Task *ptask; printf("This is %d thread\n", (int)arg); while(1) { pthread_mutex_lock(&pool->taskLock); pthread_cond_wait(&pool->newTask, &pool->taskLock); printf("Thread [%ld] execution,working[%d]!",pthread_self(), pool->busyWork); ptask = pool->queueHead; pool->queueHead = pool->queueHead->next; pthread_mutex_unlock(&pool->taskLock);
ptask->func(ptask->taskArg); pool->busyWork--; } return (void *)0; }
int threadPoolInit(void) { int i = 0; pool = (ThreadPool *)malloc(sizeof(ThreadPool)); if( pool == NULL) { printf("ThreadPool malloc failed!\n"); return -1; } pthread_mutex_init(&pool->taskLock,NULL); pthread_cond_init(&pool->newTask,NULL); pool->queueHead = NULL; pool->busyWork=0; for(i = 0; i < POOL_NUM; i++) { pthread_create(&pool->tid[i], NULL, workThread, (void *)i); } return 0; }
int poolAddTask(int taskArg) { Task * newTask; Task * p;
pthread_mutex_lock(&pool->taskLock);
while(pool->busyWork >= POOL_NUM) { pthread_mutex_unlock(&pool->taskLock); usleep(10000); pthread_mutex_lock(&pool->taskLock); } pthread_mutex_unlock(&pool->taskLock);
newTask = (Task *)malloc(sizeof(Task)); if( newTask == NULL) { printf("newTask malloc failed!\n"); return -1; } newTask->func = realWork; newTask->taskArg = (void *)taskArg;
pthread_mutex_lock(&pool->taskLock); p = pool->queueHead; if(p == NULL) { pool->queueHead = newTask; } else { while(p->next != NULL) { p = p->next; } p->next = newTask;
} pool->busyWork++; pthread_cond_signal(&pool->newTask);
pthread_mutex_unlock(&pool->taskLock);
return 0; }
void *realWork(void *taskArg) { printf(" Finish work %d\n",(int)taskArg); sleep(2); return (void *)0; }
void poolDestory(void) {
Task * p; while(pool->queueHead != NULL) { p = pool->queueHead; pool->queueHead = pool->queueHead->next; free(p); }
pthread_mutex_destroy(&pool->taskLock);
free(pool); printf("free end!\n"); }
|
3. pthread_pool.h
点击查看详情
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| #ifndef __THREAD_POOL_H #define __THREAD_POOL_H
#include <stdio.h> #include <pthread.h> #include <unistd.h> #include <string.h> #include <stdlib.h>
typedef struct Task { void *(*func)(void *arg); void *taskArg; struct Task *next; } Task;
#define POOL_NUM 10 typedef struct ThreadPool { pthread_mutex_t taskLock; pthread_cond_t newTask;
pthread_t tid[POOL_NUM]; Task *queueHead; int busyWork;
}ThreadPool;
extern ThreadPool * pool; void *workThread(void *arg); int threadPoolInit(void); int poolAddTask(int taskArg); void *realWork(void *arg); void poolDestory(void); #endif
|