赞
踩
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
线程池c代码简单实现:
大致思路如下:
一个管理线程轮询工作线程是否空闲,空闲的话从工作队列中取出work函数给工作线程处理
提示:以下是本篇文章正文内容,下面案例可供参考
线程池是一种用于管理一组预先创建的线程的技术,它能够高效地处理大量并发任务。线程池的核心思想是复用线程,而不是为每个任务都创建和销毁线程,这样可以显著减少线程创建和销毁所带来的开销,并提高系统的整体性能。
线程池的优点
1,减少线程创建和销毁的开销:线程的创建和销毁是一个相对耗时的过程,线程池通过复用线程来避免频繁的创建和销毁,提高了效率。
2,控制并发度:线程池可以限制同时运行的线程数量,从而控制系统的并发度,防止过度消耗系统资源。
3,提高响应速度:由于线程已经在池中预先创建,当新任务到来时,可以直接从池中获取线程执行,无需等待线程创建过程。
4,资源管理:线程池可以更有效地管理资源,比如限制最大线程数量,避免系统资源耗尽。
线程池的基本组成部分
1,线程池:包含一组预先创建的线程。
2,任务队列:用于保存待处理的任务。
3,任务调度器:负责将任务分发给空闲的线程执行。
4,线程工厂:用于创建新线程(虽然线程池通常会复用线程,但在某些情况下可能需要创建新线程)。
线程池的工作流程
1,任务提交:当一个任务需要被执行时,它会被提交给线程池。
2,任务分配:如果线程池中有空闲线程,那么任务会被分配给其中一个线程执行。
3,任务执行:线程开始执行任务。
4,任务完成:任务完成后,线程回到线程池中等待分配新的任务。
thread_pool.c
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> #include "double_list.h" #define CONTAINER_OF(ptr, type, member) \ ((type *)((char *)(ptr) - offsetof(type, member))) /* // 任务数据结构 typedef struct { int id; //最好能传递 提交任务的线程pid void* (*function)(void *); // 指向任务函数的指针 void *arg; // 传递给任务函数的参数 } task_t; */ typedef struct { DoublyLinkedList list; int stack_size; pthread_t *thread_id; //工作线程的 thread id task_t *thread_arg; //工作线程的 输入arg int thread_num; pthread_t thread_mg; //管理线程 int thread_pool_pause; int thread_pool_stop; void* ext_arg; } thread_pool_t; typedef struct { thread_pool_t * tp; int num; } work_th_arg; // 线程池工作线程的主循环 void *thread_pool_thread(void *arg) { thread_pool_t *tp = ((work_th_arg *)arg)->tp; int thread_num = ((work_th_arg *)arg)->num; int ret = 0; printf("tp = %x, thread_num = %d\n", tp, thread_num); tp->thread_arg[thread_num].id = -1; struct timespec ts, rem; // 设置要睡眠的时间为5毫秒 ts.tv_sec = 0; // 秒 ts.tv_nsec = 5 * 1000 * 1000; // 5毫秒转换为纳秒 free(arg); while (1) { // 如果线程池关闭并且队列为空,则退出线程 if(tp->thread_pool_stop == 1 || tp->thread_arg == NULL) { pthread_exit(NULL); } // 如果队列为空则等待 if(tp->thread_pool_pause == 1 || tp->thread_arg[thread_num].id == -1) { nanosleep(&ts, &rem); // 睡眠5ms continue; } // 获取任务 // 执行任务 ret = tp->thread_arg[thread_num].function(tp->thread_arg[thread_num].arg); tp->thread_arg[thread_num].id = -1; printf("thread_num = %d, do task done ret = %d\n", thread_num, ret); } } // 线程池 管理线程的主循环 void *thread_pool_manage(void *arg) { thread_pool_t *tp = (thread_pool_t *)arg; task_t tmp_ta = {0}; int ret = 0; struct timespec ts, rem; // 设置要睡眠的时间为2毫秒 ts.tv_sec = 0; // 秒 ts.tv_nsec = 2 * 1000 * 1000; // 5毫秒转换为纳秒 printf("thread_pool_manage enter\n"); while (1) { // 如果线程池关闭并且队列为空,则退出线程 if(tp->thread_pool_stop == 1 || tp->thread_arg == NULL) { pthread_exit(NULL); } // 如果队列为空则等待 if(tp->thread_pool_pause == 1 || is_list_empty(&(tp->list))) { //printf("nanosleep enter\n"); nanosleep(&ts, &rem); // 睡眠5ms continue; } for(int i = 0; i < tp->thread_num; i++) { if(tp->thread_arg[i].id == -1) { printf("queue_dequeue enter\n"); ret = queue_dequeue(&(tp->list),&tmp_ta); if(ret != 0) { break; } tp->thread_arg[i].function = tmp_ta.function; tp->thread_arg[i].arg = tmp_ta.arg; tp->thread_arg[i].id = tmp_ta.id; } } } } int thread_pool_init(thread_pool_t *tp,int thread_num, int stack_size) { int ret = 0; if(tp == NULL) { return -1; } if(thread_num == 0) { tp->thread_num = 100; }else { tp->thread_num = thread_num; } if(stack_size == 0) { tp->stack_size = 1000; }else { tp->stack_size = stack_size; } initDoublyList(&(tp->list)); list_set_maxlen(&(tp->list), tp->stack_size); tp->thread_arg = malloc(tp->thread_num * sizeof(task_t)); if(tp->thread_arg == NULL) { return -1; } tp->thread_id = malloc(tp->thread_num * sizeof(pthread_t)); if(tp->thread_arg == NULL) { ret = -1; goto error; } tp->thread_pool_pause = 1; tp->thread_pool_stop = 0; for(int i = 0; i < tp->thread_num; i++) { work_th_arg *th_arg = malloc(sizeof(work_th_arg)); if(th_arg == NULL) { return -1; } tp->thread_arg[i].id = -1; th_arg->tp = tp; th_arg->num = i; if (pthread_create(&(tp->thread_id[i]), NULL, thread_pool_thread, (void*)th_arg) != 0) { perror("Failed to create thread 1"); ret = -1; free(th_arg); goto error; } //printf("tp->thread_id[%d] = %d\n", i , tp->thread_id[i]); } if (pthread_create(&(tp->thread_mg), NULL, thread_pool_manage, (void*)tp) != 0) { perror("Failed to create thread 1"); ret = -1; goto error; } return 0; error: if(tp->thread_arg) free(tp->thread_arg); if(tp->thread_id) free(tp->thread_id); return ret; } int thread_pool_stop(thread_pool_t *tp) { if(NULL == tp) { return -1; } tp->thread_pool_stop = 1; for(int i = 0; i < tp->thread_num; i++) { //printf("thread_pool_stop tp->thread_id[%d] = %d\n", i , tp->thread_id[i]); pthread_join(tp->thread_id[i], NULL); } pthread_join(tp->thread_mg, NULL); return 0; } int thread_pool_run(thread_pool_t *tp, int pause) { if(NULL == tp) { return -1; } tp->thread_pool_pause = pause; return 0; } int thread_pool_uninit(thread_pool_t *tp) { int ret = 0; if(tp == NULL) { return -1; } thread_pool_stop(tp); clearDoublyList(&(tp->list)); if(tp->thread_arg) free(tp->thread_arg); if(tp->thread_id) free(tp->thread_id); tp->thread_pool_pause = 0; tp->thread_pool_stop = 0; tp->thread_mg = 0; error: return ret; } int task_fun(int arg) { printf("task_fun enter\n"); for(int i = 0; i < arg; i++) { for(int j = 0; j < 1000; j++) { asm("nop"); } } sleep(arg); return arg; } int main() { task_t task_test = {0}; thread_pool_t *tp = malloc(sizeof(thread_pool_t)); thread_pool_init(tp, 10, 100); //run thread_pool_run(tp, 0); for(int i = 0; i < 20; i++) { task_test.id = i; task_test.function = task_fun; task_test.arg = i; queue_enqueue(&(tp->list), task_test); } usleep(30* 1000 * 1000); thread_pool_uninit(tp); free(tp); }
double_list.h
#ifndef _DOUBLE_LIST_H_ #define _DOUBLE_LIST_H_ #define USE_MUTEX #define MYTYPE task_t //special struct define // 任务数据结构 typedef struct { int id; //最好能传递 提交任务的线程pid void* (*function)(void *); // 指向任务函数的指针 void *arg; // 传递给任务函数的参数 } task_t; typedef struct DoublyListNode { MYTYPE data; struct DoublyListNode* prev; struct DoublyListNode* next; } DoublyListNode; typedef struct DoublyLinkedList { DoublyListNode* head; DoublyListNode* tail; #ifdef USE_MUTEX pthread_mutex_t lock; // 添加互斥锁 #endif int list_len_max; int list_len_cur; } DoublyLinkedList; void initDoublyList(DoublyLinkedList* list); void clearDoublyList(DoublyLinkedList* list); void printDoublyList(DoublyLinkedList* list); void list_set_maxlen(DoublyLinkedList* list,int len); int insertAtHead(DoublyLinkedList *list, MYTYPE value); int insertAtTail(DoublyLinkedList *list, MYTYPE value); int deleteAtHead(DoublyLinkedList *list, MYTYPE *value); int deleteAtTail(DoublyLinkedList *list, MYTYPE *value); //full/enpty int is_list_empty(DoublyLinkedList *list); int is_list_full(DoublyLinkedList *list); //queue int queue_enqueue(DoublyLinkedList *list, MYTYPE value); int queue_dequeue(DoublyLinkedList *list, MYTYPE *value); //stack int stack_push(DoublyLinkedList *list, MYTYPE value); int stack_pop(DoublyLinkedList *list, MYTYPE *value); #endif
double_list.c
#include <stdio.h> #include <pthread.h> #include <stdlib.h> #include "double_list.h" #include <string.h> void list_set_maxlen(DoublyLinkedList* list, int len) { if(NULL == list) { printf("invalid parameter\n"); return; } list->list_len_max = len; } void initDoublyList(DoublyLinkedList* list) { list->head = NULL; list->tail = NULL; #ifdef USE_MUTEX pthread_mutex_init(&list->lock, NULL); // 初始化互斥锁 #endif list->list_len_max = -1; list->list_len_cur = 0; } //清空链表 void clearDoublyList(DoublyLinkedList* list) { DoublyListNode* current = list->head; while (current != NULL) { DoublyListNode* next = current->next; //printf("clearDoublyList free\n"); free(current); current = next; } list->head = NULL; list->tail = NULL; list->list_len_cur = 0; list->list_len_max = -1; #ifdef USE_MUTEX pthread_mutex_destroy(&list->lock); #endif } //遍历并打印链表 void printDoublyList(DoublyLinkedList* list) { #ifdef USE_MUTEX pthread_mutex_lock(&list->lock); #endif DoublyListNode* current = list->head; while (current != NULL) { printf("%d <-> ", current->data); current = current->next; } printf("NULL\n"); #ifdef USE_MUTEX pthread_mutex_unlock(&list->lock); #endif } //向链表头部插入元素 int insertAtHead(DoublyLinkedList *list, MYTYPE value) { int ret = 0; #ifdef USE_MUTEX pthread_mutex_lock(&list->lock); #endif if(list == NULL || (list->list_len_max != -1 && list->list_len_cur >= list->list_len_max)) { printf("invalid parameter or list full\n"); ret = -1; goto error; } DoublyListNode *newNode = (DoublyListNode *)malloc(sizeof(DoublyListNode)); if (newNode == NULL) { printf("Memory allocation failed\n"); ret = -1; goto error; } memcpy(&(newNode->data), &value, sizeof(MYTYPE)); newNode->prev = NULL; newNode->next = list->head; if (list->head != NULL) list->head->prev = newNode; else list->tail = newNode; list->head = newNode; list->list_len_cur++; error: #ifdef USE_MUTEX pthread_mutex_unlock(&list->lock); #endif return 0; } //向链表尾部插入元素 int insertAtTail(DoublyLinkedList *list, MYTYPE value) { int ret = 0; #ifdef USE_MUTEX pthread_mutex_lock(&list->lock); #endif if(list == NULL || (list->list_len_max != -1 && list->list_len_cur >= list->list_len_max)) { printf("invalid parameter or list full\n"); ret = -1; goto error; } DoublyListNode *newNode = (DoublyListNode *)malloc(sizeof(DoublyListNode)); if (newNode == NULL) { printf("Memory allocation failed\n"); ret = -1; goto error; } memcpy(&(newNode->data), &value, sizeof(MYTYPE)); newNode->next = NULL; newNode->prev = list->tail; if (list->tail != NULL) list->tail->next = newNode; else list->head = newNode; list->tail = newNode; list->list_len_cur++; error: #ifdef USE_MUTEX pthread_mutex_unlock(&list->lock); #endif return 0; } //从链表头部删除元素 int deleteAtHead(DoublyLinkedList *list, MYTYPE * value) { int ret = 0; if(list == NULL || value == NULL) { printf("invalid parameter\n"); return -1; } #ifdef USE_MUTEX pthread_mutex_lock(&list->lock); #endif if (list->head == NULL) { printf("List is empty\n"); list->tail = NULL; ret = -1; goto error; } // 头和尾指针指向一个node,只有一个元素 if(list->head == list->tail) { list->tail = NULL; list->head->next = NULL; } DoublyListNode *nodeToDelete = list->head; memcpy(value, &(nodeToDelete->data), sizeof(MYTYPE)); list->head = nodeToDelete->next; if (list->head != NULL) list->head->prev = NULL; // else // list->tail = NULL; //printf("deleteAtHead free\n"); free(nodeToDelete); list->list_len_cur--; error: #ifdef USE_MUTEX pthread_mutex_unlock(&list->lock); #endif return 0; } //从链表末尾删除元素 int deleteAtTail(DoublyLinkedList *list, MYTYPE * value) { int ret = 0; if(list == NULL || value == NULL) { printf("invalid parameter\n"); return -1; } #ifdef USE_MUTEX pthread_mutex_lock(&list->lock); #endif if (list->tail == NULL) { printf("List is empty\n"); list->head = NULL; ret = -1; goto error; } // 头和尾指针指向一个node,只有一个元素 if(list->head == list->tail) { list->head = NULL; list->tail->prev = NULL; } DoublyListNode *nodeToDelete = list->tail; memcpy(value, &(nodeToDelete->data), sizeof(MYTYPE)); list->tail = nodeToDelete->prev; if (list->tail != NULL) list->tail->next = NULL; // else // list->head = NULL; //printf("deleteAtTail free\n"); free(nodeToDelete); list->list_len_cur--; error: #ifdef USE_MUTEX pthread_mutex_unlock(&list->lock); #endif return ret; } int is_list_full(DoublyLinkedList *list) { int ret = 0; #ifdef USE_MUTEX pthread_mutex_lock(&list->lock); #endif if(list->list_len_max != -1 && list->list_len_cur >= list->list_len_max) { ret = 1; } #ifdef USE_MUTEX pthread_mutex_unlock(&list->lock); #endif return ret; } int is_list_empty(DoublyLinkedList *list) { int ret = 0; #ifdef USE_MUTEX pthread_mutex_lock(&list->lock); #endif if(list->tail == NULL && list->head == NULL && list->list_len_cur == 0) { ret = 1; } #ifdef USE_MUTEX pthread_mutex_unlock(&list->lock); #endif return ret; } // queue int queue_enqueue(DoublyLinkedList *list, MYTYPE value) { return insertAtTail(list, value); } int queue_dequeue(DoublyLinkedList *list, MYTYPE *value) { return deleteAtHead(list, value); } //stack int stack_push(DoublyLinkedList *list, MYTYPE value) { return insertAtHead(list, value); } int stack_pop(DoublyLinkedList *list, MYTYPE *value) { return deleteAtHead(list, value); }
线程池c代码实现,支持设置线程数和任务队列大小,可以运行仅供参考
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。