thread_pool.c
线程要执行的任务
/******************************************************************************************* @name : routine* @function : 线程要执行的任务* @paramsv : None* @retval : None* @author : Dazz* @date : 2024/6/2* @version : None* @note : 该函数会在routine线程被取消的时候自动执行*** *****************************************************************************************/
void *routine(void *arg)
{
// 调试
#ifdef DEBUGprintf("[%u] is started.\n",(unsigned)pthread_self());
#endif// 把需要传递给线程任务的参数进行备份thread_pool *pool = (thread_pool *)arg;// 新建一个任务结点用来备份struct task *p;while (1){/************************************************************************** pthread_cleanup_push是一个宏,需要配合pthread_cleanup_pop一起使用** 如在执行pthread_cleanup_pop之前,该线程被取消了,则会调用handler这个函数,** 并将pool->lock作为参数传递给handler******************************************************************************/pthread_cleanup_push(handler, (void *)&pool->lock);// 对互斥锁上锁pthread_mutex_lock(&pool->lock);/******************************任务量为0的情况********************************************************/// 当处于等待状态的线程数量为0且销毁线程池的标志为0时while (pool->waiting_tasks == 0 && !pool->shutdown){// 对该线程进行挂起pthread_cond_wait(&pool->cond, &pool->lock);}// 当处于等待状态的线程数量为0且销毁线程池的标志为1时if (pool->waiting_tasks == 0 && pool->shutdown == true){// 解锁互斥锁pthread_mutex_unlock(&pool->lock);// 结束该线程pthread_exit(NULL); // CANNOT use 'break';}/******************************任务量不为0的情况****************************************************/// 备份任务结点p = pool->task_list->next;// 让线程池里的当前任务指向下一个任务pool->task_list->next = p->next;// 令处于等待状态的线程数量减一pool->waiting_tasks--;//================================================//// 互斥锁解锁pthread_mutex_unlock(&pool->lock);// 不执行清理处理程序,参数为1则执行清理处理程序pthread_cleanup_pop(0);//================================================//// 设置线程为不可取消pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);// 执行p,即当前任务结点里需要执行的任务(p->do_task)(p->arg);// 设置线程为可取消pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);// 释放堆内存free(p);}// 结束线程pthread_exit(NULL);
}
初始化线池
/******************************************************** @name : init_pool* @function : 初始化线池* @params* @pool : 线程池的地址* @threads_number : 线程池中活跃的线程的数量** @retval : 成功返回1,否则返回0* @author : Dazz* @date : 2024/6/2* @version : None* @note : None*** *******************************************************/
bool init_pool(thread_pool *pool, unsigned int threads_number)
{// 以默认属性初始化互斥锁pthread_mutex_init(&pool->lock, NULL);// 以默认属性初始化条件量pthread_cond_init(&pool->cond, NULL);// 销毁标志设置为不销毁pool->shutdown = false;// 给链表的节点申请堆内存pool->task_list = (struct task *)malloc(sizeof(struct task));// 申请堆内存,用于存储创建出来的线程的IDpool->tids = (pthread_t *)malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);// 错误处理,对malloc进行错误处理if (pool->task_list == NULL || pool->tids == NULL){fprintf(stderr, "malloc for task_list or tids fail!,error: %d, %s\n", errno, strerror(errno));return false;}// 对任务链表中的节点的指针域进行初始化pool->task_list->next = NULL;// 设置线程池中线程数量的最大值pool->max_waiting_tasks = MAX_WAITING_TASKS;// 设置等待线程处理的任务的数量为0,说明现在没有任务pool->waiting_tasks = 0;// 设置线程池中活跃的线程的数量pool->active_threads = threads_number;// 循环创建活跃线程for (int i = 0; i < pool->active_threads; i++){// 创建线程 把线程的ID存储在申请的堆内存if (pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0){fprintf(stderr, " create threads fail!,error: %d, %s\n", errno, strerror(errno));return false;}// 用于调试
#ifdef DEBUGprintf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",(unsigned)pthread_self(), __FUNCTION__,i, (unsigned)pool->tids[i]);
#endif}return true;
}
向线程池的任务链表中添加任务
/******************************************************** @name : add_task* @function : 向线程池的任务链表中添加任务* @params* @pool : 线程池的地址* @do_task : 需要执行的任务的函数* @arg :需要执行的任务的函数的参数** @retval : 如成功返回1,否则返回0* @author : Dazz* @date : 2024/6/2* @version : None* @note : None*** *******************************************************/
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *arg)
{// 给任务链表节点申请内存struct task *new_task = malloc(sizeof(struct task));if (new_task == NULL){fprintf(stderr, " malloc for new task fail!,error: %d, %s\n", errno, strerror(errno));return false;}new_task->do_task = do_task; // 任务函数指针指向传进来的do_tasknew_task->arg = arg; // 设置函数参数new_task->next = NULL; // 指针域设置为NULL//============ LOCK =============//// 互斥锁上锁pthread_mutex_lock(&pool->lock);//===============================//// 说明要处理的任务的数量大于能处理的任务数量if (pool->waiting_tasks >= MAX_WAITING_TASKS){pthread_mutex_unlock(&pool->lock);fprintf(stderr, "too many tasks.\n");free(new_task);return false;}// 备份任务链表struct task *tmp = pool->task_list;// 遍历链表,找到单向链表的尾节点while (tmp->next != NULL)tmp = tmp->next;// 把新的要处理的任务插入到链表的尾部 尾插tmp->next = new_task;// 要处理的任务的数量+1pool->waiting_tasks++;//=========== UNLOCK ============//pthread_mutex_unlock(&pool->lock);//===============================//// 调试
#ifdef DEBUGprintf("[%u][%s] ==> a new task has been added.\n",(unsigned)pthread_self(), __FUNCTION__);
#endif// 唤醒第一个处于阻塞队列中的线程pthread_cond_signal(&pool->cond);return true;
}
向线程池加入新线程
/******************************************************** @name : add_thread* @function : 向线程池加入新线程* @params* @pool : 线程池的地址* @removing_threads : 需要增加的线程数量** @retval : 返回正在活跃的线程数量* @author : Dazz* @date : 2024/6/2* @version : None* @note : None*** *******************************************************/
int add_thread(thread_pool *pool, unsigned additional_threads)
{// 判断需要添加的新线程的数量是否为0if (additional_threads == 0)return 0;// 计算线程池中总线程的数量unsigned total_threads =pool->active_threads + additional_threads;int i, actual_increment = 0;// 循环创建新线程for (i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++){// 创建新线程if (pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0){perror("add threads error");// 如需要加入的新线程池数量为0,则直接退出if (actual_increment == 0)return -1;break;}actual_increment++;
// 用于调试
#ifdef DEBUGprintf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",(unsigned)pthread_self(), __FUNCTION__,i, (unsigned)pool->tids[i]);
#endif}// 记录此时线程池中活跃线程的总数pool->active_threads += actual_increment;return actual_increment;
}
从线程池中删除线程
/******************************************************** @name : remove_thread* @function : 从线程池中删除线程* @params* @pool : 线程池的地址* @removing_threads : 需要删除的线程数量** @retval : 返回正在活跃的线程数量* @author : Dazz* @date : 2024/6/2* @version : None* @note : None*** *******************************************************/
int remove_thread(thread_pool *pool, unsigned int removing_threads)
{// 如果需要删除的线程数量为0,则返回正在活跃的线程数量if (removing_threads == 0)return pool->active_threads;// 定义一个变量叫remaining_threads,remaining_threads的值为正在活跃的线程数量减去需要删除线程的数量int remaining_threads = pool->active_threads - removing_threads;// 如remaining_threads大于零,说明正在活跃的线程数量大于需要删除线程的数量// 如remaining_threads小于零,说明正在活跃的线程数量小于或等于需要删除线程的数量,并将remaining_threads赋值为1remaining_threads = remaining_threads > 0 ? remaining_threads : 1;int i;// 循环取消线程,直到正在活跃的线程数量为0for (i = pool->active_threads - 1; i > remaining_threads - 1; i--){// 如调用pthread_cancel函数失败,则退出循环errno = pthread_cancel(pool->tids[i]);if (errno != 0)break;#ifdef DEBUGprintf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n",(unsigned)pthread_self(), __FUNCTION__,i, (unsigned)pool->tids[i]);
#endif}if (i == pool->active_threads - 1)return -1;// 返回现在的线程数量else{pool->active_threads = i + 1;return i + 1;}
}
释放线程池
/******************************************************** @name : destroy_pool* @function : 释放线程池* @params* @pool : 线程池的地址** @retval : 成功返回1,否则返回0* @author : Dazz* @date : 2024/6/2* @version : None* @note : None*** *******************************************************/
bool destroy_pool(thread_pool *pool)
{// 设置销毁标志位1pool->shutdown = true;// 唤醒所有线程pthread_cond_broadcast(&pool->cond);// 循环等待线程结束for (int i = 0; i < pool->active_threads; i++){// 等待线程结束并释放相关资源errno = pthread_join(pool->tids[i], NULL);if (errno != 0){printf("join tids[%d] error: %s\n", i, strerror(errno));}elseprintf("[%u] is joined\n", (unsigned)pool->tids[i]);}// 释放相关内存free(pool->task_list);free(pool->tids);free(pool);return true;
}
thread_pool.h
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <errno.h>#include <errno.h>
#include <pthread.h>#define MAX_WAITING_TASKS 1000 // 处于等待状态的线程数量最大为1000
#define MAX_ACTIVE_THREADS 20 // 活跃的线程数量// 任务结点 单向链表的节点,类型
struct task
{void *(*do_task)(void *arg); // 任务函数指针 指向线程要执行的任务 格式是固定的void *arg; // 需要传递给任务的参数,如果不需要,则NULLstruct task *next; // 指向下一个任务结点的指针
};// 线程池的管理结构体
typedef struct thread_pool
{pthread_mutex_t lock; // 互斥锁pthread_cond_t cond; // 条件量bool shutdown; // 是否需要销毁线程池struct task *task_list; // 用于存储任务的链表pthread_t *tids; // 用于记录线程池中线程的IDunsigned max_waiting_tasks; // 线程池中线程的数量最大值unsigned waiting_tasks; // 处于等待状态的线程数量unsigned active_threads; // 正在活跃的线程数量
} thread_pool;/******************************************************** @name : init_pool* @function : 初始化线池* @params* @pool : 线程池的地址* @threads_number : 线程池中活跃的线程的数量** @retval : 成功返回1,否则返回0* @author : Dazz* @date : 2024/6/2* @version : None* @note : None*** *******************************************************/
bool init_pool(thread_pool *pool, unsigned int threads_number);/******************************************************** @name : add_task* @function : 向线程池的任务链表中添加任务* @params* @pool : 线程池的地址* @do_task : 需要执行的任务的函数* @arg :需要执行的任务的函数的参数** @retval : 如成功返回1,否则返回0* @author : Dazz* @date : 2024/6/2* @version : None* @note : None*** *******************************************************/
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task);/******************************************************** @name : add_thread* @function : 向线程池加入新线程* @params* @pool : 线程池的地址* @removing_threads : 需要增加的线程数量** @retval : 返回正在活跃的线程数量* @author : Dazz* @date : 2024/6/2* @version : None* @note : None*** *******************************************************/
int add_thread(thread_pool *pool, unsigned int additional_threads_number);/******************************************************** @name : remove_thread* @function : 从线程池中删除线程* @params* @pool : 线程池的地址* @removing_threads : 需要删除的线程数量** @retval : 返回正在活跃的线程数量* @author : Dazz* @date : 2024/6/2* @version : None* @note : None*** *******************************************************/
int remove_thread(thread_pool *pool, unsigned int removing_threads_number);/******************************************************** @name : destroy_pool* @function : 释放线程池* @params* @pool : 线程池的地址** @retval : 成功返回1,否则返回0* @author : Dazz* @date : 2024/6/2* @version : None* @note : None*** *******************************************************/
bool destroy_pool(thread_pool *pool);/******************************************************************************************* @name : routine* @function : 线程要执行的任务* @paramsv : None* @retval : None* @author : Dazz* @date : 2024/6/2* @version : None* @note : 该函数会在routine线程被取消的时候自动执行*** *****************************************************************************************/
void *routine(void *arg);#endif