当前位置:   article > 正文

【Linux】-- 线程池_linux 线程池例子

linux 线程池例子

目录

铺垫

内存

线程的角度

线程池

基本代码结构

对于线程池的生产消费的完善

初步实现线程池生产消费

结合日志完善线程池


铺垫

内存

(以STL处理方式,引入提供效率的一种思想)

        通过进行C语言与C++语言的学习中,平时我们使用的内存,都是我们需要了,才进行申请。而又通过系统的学习,我们可以发现,需要才进行申请,这样语言上我们要调用语言的接口:malloc、new等,但是它们的底层也一定是要调用操作系统提供的接口的。而调用系统接口,需要:

  • 陷入内核。
  • 更改CPU状态。
  • 切换页表。
  • 空间不够,操作系统还需要进行查找(空间碎片),进行其内部的内存管理算法(刷新缓冲区进行IO腾出空间、对内存碎片做整理、杀掉不常用的应用)

        所以,对应的STL容器,对于此方面:增容不是我们要多少给多少,而是直接预先多申请一部分,本质上就是用空间换时间的策略

线程的角度

        内存是资源,线程本质是一种调用执行流,也是资源。所以如果我们预先创建出一大批的线程,当有任务在来的时候,我们直接让线程去进行处理任务,而不用再去当任务到来时,然后才进行创建线程(本质创建各种数据结构,并且进行初始化、以及一系列为管理的操作),这是很浪费时间的。

        如果我们预先创建出一大批的线程,当有任务来的时候,然后直接让线程去处理任务,而不用再去当任务来的时候,才创建……。

线程池

        一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

线程池的应用场景:

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
  2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限。
线程池示例:
  1. 创建固定数量线程池,循环从任务队列中获取任务对象。
  2. 获取到任务对象后,执行任务对象中的任务接口。

Note:

        核心:用来预先申请资源,达到以空间换时间的一种做法。

线程池的框架实现:

        有人向线程池内部push任务,线程池内部的线程就可以自动的处理任务。即:有线程向线程池内部push任务,就一定回有线程在线程池内部拿任务 —— 本质:生产消费模型。

基本代码结构

        并未加入互斥量信号量

thread.hpp

        对线程的封装。

  1. #pragma once
  2. #include <string>
  3. #include <pthread.h>
  4. #include <cstdio>
  5. // 对线程的封装 - 不是完全必要,但是这样便于后期的统一管理
  6. typedef void*(*fun_t)(void*);
  7. // 整合线程的数据
  8. class ThreadData
  9. {
  10. public:
  11. std::string _name;
  12. void* _args;
  13. };
  14. class Thread
  15. {
  16. public:
  17. Thread(int num, fun_t callback, void* args):_func(callback)
  18. {
  19. char nameBuffer[64];
  20. snprintf(nameBuffer, sizeof(nameBuffer), "Thread-%d", num);
  21. _name = nameBuffer;
  22. _tdata._args = args;
  23. _tdata._name = _name;
  24. }
  25. void start()
  26. {
  27. pthread_create(&_tid, nullptr, _func, (void*)&_tdata);
  28. }
  29. void join()
  30. {
  31. pthread_join(_tid, nullptr);
  32. }
  33. // 未来不再使用线程id了,因为其是一个地址不便于我们查看
  34. std::string name()
  35. {
  36. return _name;
  37. }
  38. ~Thread()
  39. {}
  40. private:
  41. std::string _name;
  42. fun_t _func;
  43. ThreadData _tdata;
  44. pthread_t _tid;
  45. };

ThreadPool.hpp 

  1. #include "thread.hpp"
  2. #include <vector>
  3. #include <queue>
  4. #include <unistd.h>
  5. #include <iostream>
  6. const int g_thread_num = 3;
  7. template<class T>
  8. class ThreadPool
  9. {
  10. public:
  11. ThreadPool(int thread_num = g_thread_num):_num(thread_num)
  12. {
  13. for(int i = 1; i <= _num; i++)
  14. _threads.push_back(new Thread(i, routine, nullptr));
  15. }
  16. // 1.run - 将线程跑起来
  17. void run()
  18. {
  19. int i = 0;
  20. for(auto& iter : _threads)
  21. {
  22. iter->start();
  23. std::cout << iter->name() << " 启动成功" << std::endl;
  24. }
  25. }
  26. // 未来所有执行流所执行的方法 - 核心取任务、执行任务的逻辑
  27. static void* routine(void* args) // 因为在类当中,如果是一个成员方法,其会有一个隐藏的参数this指针,所以我们需要使用static进行修饰。
  28. {
  29. ThreadData *td = (ThreadData*)args;
  30. while(true)
  31. {
  32. std::cout << "我是一个线程,我的名字是:" << td->_name << std::endl;
  33. sleep(1);
  34. }
  35. }
  36. // 2.pushTask - 将任务放到任务池里
  37. void pushTask()
  38. {}
  39. void join()
  40. {
  41. for(auto& iter : _threads)
  42. iter->join();
  43. }
  44. ~ThreadPool()
  45. {
  46. for(auto& iter : _threads)
  47. delete iter;
  48. }
  49. private:
  50. std::vector<Thread*> _threads;
  51. int _num;
  52. std::queue<T> _task_queue;
  53. };

testMain.cc 

  1. #include "threadPool.hpp"
  2. #include <iostream>
  3. int main()
  4. {
  5. ThreadPool<int> *tp = new ThreadPool<int>();
  6. tp->run();
  7. tp->join();
  8. return 0;
  9. }

对于线程池的生产消费的完善

        加入互斥量信号量,并且加入生产者的生产数据、方法,消费者的消费数据、方法。此处我们需要注意:最难的并不是生产,而是消费。

生产者:

        生产的过程。

  1. // 2.pushTask - 将任务放到任务池里
  2. void pushTask(const T& task)
  3. {
  4. lockGuard lockguard(&_lock); // 加锁
  5. _task_queue.push(task); // 压入任务
  6. pthread_cond_signal(&_cond); // 唤醒线程
  7. } // 自动释放锁

消费者:

        消费的过程。

原:

  1. // 未来所有执行流所执行的方法 - 核心取任务、执行任务的逻辑
  2. static void* routine(void* args) // 因为在类当中,如果是一个成员方法,其会有一个隐藏的参数this指针,所以我们需要使用static进行修饰。
  3. {
  4. ThreadData *td = (ThreadData*)args;
  5. while(true)
  6. {
  7. std::cout << "我是一个线程,我的名字是:" << td->_name << std::endl;
  8. sleep(1);
  9. }
  10. }

        因为,每一个线程创建出来了,它都会执行routine,并且我们是在构造的时候就将其传送给了我们对应的线程,该线程就会在start的时候就会进入我们设置的回调。

  1. //ThreadPool.hpp 
  2. //---------------------------
  3. // 构造函数
  4. ThreadPool(int thread_num = g_thread_num):_num(thread_num)
  5. {
  6. pthread_mutex_init(&_lock, nullptr);
  7. pthread_cond_init(&_cond, nullptr);
  8. for(int i = 1; i <= _num; i++)
  9. _threads.push_back(new Thread(i, routine, nullptr));
  10. }
  11. // 1.run - 将线程跑起来
  12. void run()
  13. {
  14. for(auto& iter : _threads)
  15. {
  16. iter->start();
  17. std::cout << iter->name() << " 启动成功" << std::endl;
  18. }
  19. }
  20. //Thread.hpp 
  21. //---------------------------
  22. void start()
  23. {
  24. pthread_create(&_tid, nullptr, _func, (void*)&_tdata);
  25. }

        但是,作为势必要消费数据的routine就带来一些问题(困扰):消费就代表要访问,存储对应方法的_task_queue,但是_task_queue作为内类的成员方法,而routine是一个被staict修饰的成员方法(其只能够使用静态成员、静态方法,无法使用内类的成员属性、成员方法)

#:将_task_queue设置为static修饰的


        这个方法是可以的(类内的成员方法,能够使用static修饰的成员),但是是不好的。因为如果定义为static修饰的,未来如果有两个三个等的线程池,这些线程池就相当于_task_queue被大家所共享的,而且对于此类数据还是最好封装住。

        与其让routine拿到_task_queue,不如让其直接拿到整体对象。拿到之后可以直接让其利用函数的方式访问类内的各种属性。

        直接在构造函数的地方传递:this指针

  1. //ThreadPool.hpp 
  2. //---------------------------
  3. // 构造函数
  4. ThreadPool(int thread_num = g_thread_num):_num(thread_num)
  5. {
  6. pthread_mutex_init(&_lock, nullptr);
  7. pthread_cond_init(&_cond, nullptr);
  8. for(int i = 1; i <= _num; i++)
  9. _threads.push_back(new Thread(i, routine, this));
  10. }

        此时,对于routine内部只需要强转一下就行了。

  1. // 未来所有执行流所执行的方法 - 核心取任务、执行任务的逻辑
  2. static void* routine(void* args) // 因为在类当中,如果是一个成员方法,其会有一个隐藏的参数this指针,所以我们需要使用static进行修饰。
  3. {
  4. ThreadData *td = (ThreadData*)args;
  5. ThreadPool<T>* tp = (ThreadPool<T>*)td->_args;
  6. while(true)
  7. {
  8. // lock
  9. // while(task_queue_.empty()) wait();
  10. // 获取任务
  11. // unlock
  12. // 处理任务
  13. }
  14. }

初步完善:

  1. public:
  2. // 返回锁的地址
  3. pthread_mutex_t *getMutex()
  4. {
  5. return &_lock;
  6. }
  7. bool isEmpty()
  8. {
  9. return _task_queue.empty();
  10. }
  11. void waitCond()
  12. {
  13. pthread_cond_wait(&_cond, &_lock);
  14. }
  15. T getTask()
  16. {
  17. T t = _task_queue.front();
  18. _task_queue.pop();
  19. return t;
  20. }
  21. ------------------------
  22. // 未来所有执行流所执行的方法 - 核心取任务、执行任务的逻辑
  23. static void *routine(void *args) // 因为在类当中,如果是一个成员方法,其会有一个隐藏的参数this指针,所以我们需要使用static进行修饰。
  24. {
  25. ThreadData *td = (ThreadData *)args;
  26. ThreadPool<T> *tp = (ThreadPool<T> *)td->_args;
  27. while (true)
  28. {
  29. T task; // 对应任务的对象
  30. // 利用{}确定加锁的区间
  31. {
  32. // lock
  33. lockGuard lockguard(tp->getMutex());
  34. // while(task_queue_.empty()) wait();
  35. while (tp->isEmpty())
  36. tp->waitCond();
  37. // 获取任务 - 100%有任务
  38. task = tp->getTask(); // 任务队列是共享的->将任务从共享,拿到自己的私有空间
  39. } // 自动释放锁
  40. // 处理任务
  41. task(); // 要求每一个任务都要提供一个仿函数
  42. }
  43. }

初步实现线程池生产消费

提供一个:Task.hpp

        任务类型的封装,重点:包含仿函数。

  1. #pragma once
  2. #include <iostream>
  3. #include <string>
  4. #include <functional>
  5. typedef std::function<int(int, int)> func_t;
  6. class Task
  7. {
  8. public:
  9. Task(){}
  10. Task(int x, int y, func_t func):_x(x), _y(y), _func(func)
  11. {}
  12. int operator ()()
  13. {
  14. return _func(_x, _y);
  15. }
  16. public:
  17. int _x;
  18. int _y;
  19. // int type;
  20. func_t _func;
  21. };

        任务是什么样子,不能够让线程池知道,与线程池没有关系,线程池只需要我们让其干什么就干什么就行了,也就是一定意义上的解耦合。

thread.hpp

  1. #pragma once
  2. #include <string>
  3. #include <pthread.h>
  4. #include <cstdio>
  5. // 对线程的封装 - 不是完全必要,但是这样便于后期的统一管理
  6. typedef void*(*fun_t)(void*);
  7. // 整合线程的数据
  8. class ThreadData
  9. {
  10. public:
  11. std::string _name;
  12. void* _args;
  13. };
  14. class Thread
  15. {
  16. public:
  17. Thread(int num, fun_t callback, void* args):_func(callback)
  18. {
  19. char nameBuffer[64];
  20. snprintf(nameBuffer, sizeof(nameBuffer), "Thread-%d", num);
  21. _name = nameBuffer;
  22. _tdata._args = args;
  23. _tdata._name = _name;
  24. }
  25. void start()
  26. {
  27. pthread_create(&_tid, nullptr, _func, (void*)&_tdata);
  28. }
  29. void join()
  30. {
  31. pthread_join(_tid, nullptr);
  32. }
  33. // 未来不再使用线程id了,因为其是一个地址不便于我们查看
  34. std::string name()
  35. {
  36. return _name;
  37. }
  38. ~Thread()
  39. {}
  40. private:
  41. std::string _name;
  42. fun_t _func;
  43. ThreadData _tdata;
  44. pthread_t _tid;
  45. };

lockGuard.hpp

  1. #pragma once
  2. #include <iostream>
  3. #include <pthread.h>
  4. // RAII风格的加锁方式
  5. class lockGuard
  6. {
  7. public:
  8. lockGuard(pthread_mutex_t *mtx):mtx_(mtx)
  9. {
  10. pthread_mutex_lock(mtx_);
  11. }
  12. ~lockGuard()
  13. {
  14. pthread_mutex_unlock(mtx_);
  15. }
  16. private:
  17. pthread_mutex_t *mtx_;
  18. };

Task.hpp

  1. #pragma once
  2. #include <iostream>
  3. #include <string>
  4. #include <functional>
  5. typedef std::function<int(int, int)> func_t;
  6. class Task
  7. {
  8. public:
  9. Task(){}
  10. Task(int x, int y, func_t func):_x(x), _y(y), _func(func)
  11. {}
  12. void operator()(std::string& name)
  13. {
  14. std::cout << "线程" << name << "处理完成,结果是" << _x << "+" << _y << "=" << _func(_x, _y) << std::endl;
  15. }
  16. public:
  17. int _x;
  18. int _y;
  19. // int type;
  20. func_t _func;
  21. };

threadPool.hpp 

  1. #include "thread.hpp"
  2. #include "lockGuard.hpp"
  3. #include <vector>
  4. #include <queue>
  5. #include <unistd.h>
  6. #include <iostream>
  7. const int g_thread_num = 3;
  8. template <class T>
  9. class ThreadPool
  10. {
  11. public:
  12. // 返回锁的地址
  13. pthread_mutex_t *getMutex()
  14. {
  15. return &_lock;
  16. }
  17. bool isEmpty()
  18. {
  19. return _task_queue.empty();
  20. }
  21. void waitCond()
  22. {
  23. pthread_cond_wait(&_cond, &_lock);
  24. }
  25. T getTask()
  26. {
  27. T t = _task_queue.front();
  28. _task_queue.pop();
  29. return t;
  30. }
  31. public:
  32. ThreadPool(int thread_num = g_thread_num) : _num(thread_num)
  33. {
  34. for (int i = 1; i <= _num; i++)
  35. {
  36. _threads.push_back(new Thread(i, routine, this));
  37. }
  38. pthread_mutex_init(&_lock, nullptr);
  39. pthread_cond_init(&_cond, nullptr);
  40. }
  41. // 1.run - 将线程跑起来
  42. void run()
  43. {
  44. for (auto &iter : _threads)
  45. {
  46. iter->start();
  47. std::cout << iter->name() << " 启动成功" << std::endl;
  48. }
  49. }
  50. // 未来所有执行流所执行的方法 - 核心取任务、执行任务的逻辑
  51. static void *routine(void *args) // 因为在类当中,如果是一个成员方法,其会有一个隐藏的参数this指针,所以我们需要使用static进行修饰。
  52. {
  53. ThreadData *td = (ThreadData *)args;
  54. ThreadPool<T> *tp = (ThreadPool<T> *)td->_args;
  55. while (true)
  56. {
  57. T task; // 对应任务的对象
  58. {
  59. // lock
  60. lockGuard lockguard(tp->getMutex());
  61. // while(task_queue_.empty()) wait();
  62. while (tp->isEmpty())
  63. tp->waitCond();
  64. // 获取任务 - 100%有任务
  65. task = tp->getTask(); // 任务队列是共享的->将任务从共享,拿到自己的私有空间
  66. } // 自动释放锁
  67. // 处理任务
  68. task(td->_name); // 要求每一个任务都要提供一个仿函数
  69. }
  70. }
  71. // 2.pushTask - 将任务放到任务池里
  72. void pushTask(const T &task)
  73. {
  74. lockGuard lockguard(&_lock); // 加锁
  75. _task_queue.push(task); // 压入任务
  76. pthread_cond_signal(&_cond); // 唤醒线程
  77. } // 自动释放锁
  78. // void join()
  79. // {
  80. // for (auto &iter : _threads)
  81. // {
  82. // iter->join();
  83. // }
  84. // }
  85. ~ThreadPool()
  86. {
  87. for (auto &iter : _threads)
  88. {
  89. iter->join();
  90. delete iter;
  91. }
  92. pthread_mutex_destroy(&_lock);
  93. pthread_cond_destroy(&_cond);
  94. }
  95. private:
  96. std::vector<Thread *> _threads;
  97. int _num;
  98. std::queue<T> _task_queue;
  99. pthread_mutex_t _lock;
  100. pthread_cond_t _cond;
  101. };

 testMain.cc

  1. #include "threadPool.hpp"
  2. #include "Task.hpp"
  3. #include <ctime>
  4. #include <iostream>
  5. int main()
  6. {
  7. srand((unsigned long)time(nullptr) ^ getpid()); // 利用 ^ getpid()让数据更随机
  8. ThreadPool<Task> *tp = new ThreadPool<Task>();
  9. tp->run();
  10. while (true)
  11. {
  12. // 生产的过程(制作任务的时候,要花时间)
  13. int x = rand() % 1000;
  14. usleep(5000); // 模拟制作任务的时候,花费的时间
  15. int y = rand() % 1000;
  16. Task t(x, y, [](int x, int y)->int{
  17. return x + y;
  18. });
  19. std::cout << "制作任务完成" << std::endl;
  20. // 推送任务到线程池中
  21. tp->pushTask(t);
  22. sleep(1); // 防止刷新过快
  23. }
  24. return 0;
  25. }

将上述代码进行优化:

进阶:

  1. // 方案2:
  2. // queue1,queue2
  3. // std::queue<T> *p_queue, *c_queue
  4. // p_queue->queue1
  5. // c_queue->queue2
  6. // p_queue -> 生产一批任务之后,swap(p_queue,c_queue),唤醒所有线程/一个线程
  7. // 当消费者处理完毕的时候,你也可以进行swap(p_queue,c_queue)
  8. // 因为我们生产和消费用的是不同的队列,未来我们要进行资源的处理的时候,仅仅是指针
  • 需要加锁的仅仅是swap的地方
  • swap方式:
  1. 满了就swap。
  2. 让一个线程不断地去观察,消费完了,且生产的数据就交换。

        对于上述的多处打印信息的逻辑,其实在正常的时候是需要封装成日志的。一般在公司内部是,任何一款软件,都必须有日志:

        日志中因为要实现,至少:日志等级、时间、日志内容、支持用户自定义。所以参数的传递中,我们需要利用到可变参数列表。

补充:

        可变参数表的使用。

参数:

  1. #include <stdarg.h>
  2. void va_start(va_list ap, last);
  3. type va_arg(va_list ap, type);
  4. void va_end(va_list ap);
  5. void va_copy(va_list dest, va_list src);
  • va_list:本质上一个char类型的指针。
  • last:代表可变参数的前面的参数当中的最后一个具体参数。
  • type:具体的类型。
  1. // 完整的日志功能,至少:日志等级 时间 日志内容 支持用户自定义
  2. void logMessage(int level, const char* format, ...)// level:日志等级,format, ...:用户传参、日志对应的信息等。
  3. {
  4. va_list ap; // va_list本质上就是char类型的指针
  5. va_start(ap, format); // 让指针指向栈帧对应的结构
  6. int x = va_arg(ap, int); // 通过具体的类型,来从通过指针提取特定的值 —— 没有参数就返回NULL
  7. va_end(ap); // 将指针设置为空(相当于:ap = nullptr)
  8. }

        但是,如果让我们自己提取,那也就太麻烦了,而且使用起来也不舒服。我们可以使用:#include <stdarg.h>中的。

  1. #include <stdarg.h>
  2. // 将我们传入的参数,可变的方式,格式化显示到:
  3. // 显示器
  4. int vprintf(const char *format, va_list ap);
  5. // 文件
  6. int vfprintf(FILE *stream, const char *format, va_list ap);
  7. // 支付串
  8. int vsprintf(char *str, const char *format, va_list ap);
  9. // 指定长度的字符串
  10. int vsnprintf(char *str, size_t size, const char *format, va_list ap);

        可以发现它们是与图中的上面四个,#include <stdio.h>中的类似的。

log.hpp:

  1. #pragma once
  2. #include <cstdarg>
  3. // 日志是有日志级别的
  4. #define NORMAL 1 // 正常
  5. #define WARNING 2 // 警告 -- 没出错
  6. #define ERROR 3 // 错误 -- 不影响后续执行(一个功能因为条件等,没有执行)
  7. #define FATAL 4 // 致命 -- 代码无法继续向后执行
  8. // 完整的日志功能,至少:日志等级 时间 日志内容 支持用户自定义
  9. void logMessage(int level, const char* format, ...)// level:日志等级; format, ...:用户传参、日志对应的信息等。
  10. {
  11. va_list args;
  12. va_start(args, format);
  13. // 这个时候就有一个可变参数列表的起始地址
  14. vprintf(format, args);
  15. va_end(args);
  16. }

testMain.cc:

  1. #include "log.hpp"
  2. #include <iostream>
  3. int main()
  4. {
  5. logMessage(NORMAL, "%s %d %c %f\n", "这是一条日志信息", 1234, 'a', 3.14);
  6. return 0;
  7. }

结合日志完善线程池

log.hpp

  1. #pragma once
  2. #include <cstdarg>
  3. #include <ctime>
  4. // 日志是有日志级别的
  5. #define DEBUG 0
  6. #define NORMAL 1 // 正常
  7. #define WARNING 2 // 警告 -- 没出错
  8. #define ERROR 3 // 错误 -- 不影响后续执行(一个功能因为条件等,没有执行)
  9. #define FATAL 4 // 致命 -- 代码无法继续向后执行
  10. const char* gLevelMap[] = {
  11. "DEBUG",
  12. "NORMAL",
  13. "WARNING",
  14. "ERROR",
  15. "FATAL",
  16. };
  17. #define LOGFILE "./threafpool.log"
  18. // 完整的日志功能,至少:日志等级 时间 日志内容 支持用户自定义
  19. void logMessage(int level, const char* format, ...)// level:日志等级; format, ...:用户传参、日志对应的信息等。
  20. {
  21. #ifndef DEBUG_SHOW
  22. if(level == DEBUG) return;
  23. #endif
  24. char stdBuffer[1024]; //标准部分
  25. time_t timestamp = time(nullptr);
  26. //struct tm* localtime = localtime(&timestamp); //- 详细时间输出操作
  27. //localtime->tm_year;
  28. snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);
  29. char logBuffer[1024]; //自定义部分
  30. va_list args;
  31. va_start(args, format);
  32. // 这个时候就有一个可变参数列表的起始地址
  33. // 向屏幕中直接打印
  34. // vprintf(format, args);
  35. // 向缓冲区logBuffer中打印
  36. vsnprintf(logBuffer, sizeof logBuffer, format, args);
  37. va_end(args);
  38. // 向屏幕
  39. printf("%s%s\n", stdBuffer, logBuffer);
  40. // 向文件
  41. FILE *fp = fopen(LOGFILE, "a");
  42. fprintf(fp, "%s%s\n", stdBuffer, logBuffer);
  43. fclose(fp);
  44. }

Task.hpp

  1. #pragma once
  2. #include "log.hpp"
  3. #include <iostream>
  4. #include <string>
  5. #include <functional>
  6. typedef std::function<int(int, int)> func_t;
  7. class Task
  8. {
  9. public:
  10. Task(){}
  11. Task(int x, int y, func_t func):_x(x), _y(y), _func(func)
  12. {}
  13. void operator()(std::string& name)
  14. {
  15. //std::cout << "线程" << name << "处理完成,结果是" << _x << "+" << _y << "=" << _func(_x, _y) << std::endl;
  16. logMessage(WARNING, "%s处理完成: %d+%d=%d | %s | %d",
  17. name.c_str(), _x, _y, _func(_x, _y), __FILE__, __LINE__); // __FILE__, __LINE__:预处理符。
  18. }
  19. public:
  20. int _x;
  21. int _y;
  22. // int type;
  23. func_t _func;
  24. };

预处理符: 

        这就是为什么要有这些预处理符,其一的原因就是日志中会使用到,便于我们找到文件的执行位置。

  1. __FILE__      //进行编译的源文件
  2. __LINE__     //文件当前的行号
  3. __DATE__    //文件被编译的日期
  4. __TIME__    //文件被编译的时间
  5. __STDC__    //如果编译器遵循ANSI C,其值为1,否则未定义

thread.hpp

  1. #pragma once
  2. #include <string>
  3. #include <pthread.h>
  4. #include <cstdio>
  5. // 对线程的封装 - 不是完全必要,但是这样便于后期的统一管理
  6. typedef void*(*fun_t)(void*);
  7. // 整合线程的数据
  8. class ThreadData
  9. {
  10. public:
  11. std::string _name;
  12. void* _args;
  13. };
  14. class Thread
  15. {
  16. public:
  17. Thread(int num, fun_t callback, void* args):_func(callback)
  18. {
  19. char nameBuffer[64];
  20. snprintf(nameBuffer, sizeof(nameBuffer), "Thread-%d", num);
  21. _name = nameBuffer;
  22. _tdata._args = args;
  23. _tdata._name = _name;
  24. }
  25. void start()
  26. {
  27. pthread_create(&_tid, nullptr, _func, (void*)&_tdata);
  28. }
  29. void join()
  30. {
  31. pthread_join(_tid, nullptr);
  32. }
  33. // 未来不再使用线程id了,因为其是一个地址不便于我们查看
  34. std::string name()
  35. {
  36. return _name;
  37. }
  38. ~Thread()
  39. {}
  40. private:
  41. std::string _name;
  42. fun_t _func;
  43. ThreadData _tdata;
  44. pthread_t _tid;
  45. };

lockGuard.hpp

  1. #pragma once
  2. #include <iostream>
  3. #include <pthread.h>
  4. // RAII风格的加锁方式
  5. class lockGuard
  6. {
  7. public:
  8. lockGuard(pthread_mutex_t *mtx):mtx_(mtx)
  9. {
  10. pthread_mutex_lock(mtx_);
  11. }
  12. ~lockGuard()
  13. {
  14. pthread_mutex_unlock(mtx_);
  15. }
  16. private:
  17. pthread_mutex_t *mtx_;
  18. };

threadPool.hpp

  1. #include "thread.hpp"
  2. #include "lockGuard.hpp"
  3. #include "log.hpp"
  4. #include <vector>
  5. #include <queue>
  6. #include <unistd.h>
  7. #include <iostream>
  8. const int g_thread_num = 3;
  9. template <class T>
  10. class ThreadPool
  11. {
  12. public:
  13. // 返回锁的地址
  14. pthread_mutex_t *getMutex()
  15. {
  16. return &_lock;
  17. }
  18. bool isEmpty()
  19. {
  20. return _task_queue.empty();
  21. }
  22. void waitCond()
  23. {
  24. pthread_cond_wait(&_cond, &_lock);
  25. }
  26. T getTask()
  27. {
  28. T t = _task_queue.front();
  29. _task_queue.pop();
  30. return t;
  31. }
  32. public:
  33. ThreadPool(int thread_num = g_thread_num) : _num(thread_num)
  34. {
  35. for (int i = 1; i <= _num; i++)
  36. {
  37. _threads.push_back(new Thread(i, routine, this));
  38. }
  39. pthread_mutex_init(&_lock, nullptr);
  40. pthread_cond_init(&_cond, nullptr);
  41. }
  42. // 1.run - 将线程跑起来
  43. void run()
  44. {
  45. for (auto &iter : _threads)
  46. {
  47. iter->start();
  48. // std::cout << iter->name() << " 启动成功" << std::endl;
  49. logMessage(NORMAL, "%s %s", iter->name().c_str(), "启动成功");
  50. }
  51. }
  52. // 未来所有执行流所执行的方法 - 核心取任务、执行任务的逻辑
  53. static void *routine(void *args) // 因为在类当中,如果是一个成员方法,其会有一个隐藏的参数this指针,所以我们需要使用static进行修饰。
  54. {
  55. ThreadData *td = (ThreadData *)args;
  56. ThreadPool<T> *tp = (ThreadPool<T> *)td->_args;
  57. while (true)
  58. {
  59. T task; // 对应任务的对象
  60. {
  61. // lock
  62. lockGuard lockguard(tp->getMutex());
  63. // while(task_queue_.empty()) wait();
  64. while (tp->isEmpty())
  65. tp->waitCond();
  66. // 获取任务 - 100%有任务
  67. task = tp->getTask(); // 任务队列是共享的->将任务从共享,拿到自己的私有空间
  68. } // 自动释放锁
  69. // 处理任务
  70. task(td->_name); // 要求每一个任务都要提供一个仿函数
  71. }
  72. }
  73. // 2.pushTask - 将任务放到任务池里
  74. void pushTask(const T &task)
  75. {
  76. lockGuard lockguard(&_lock); // 加锁
  77. _task_queue.push(task); // 压入任务
  78. pthread_cond_signal(&_cond); // 唤醒线程
  79. } // 自动释放锁
  80. // void join()
  81. // {
  82. // for (auto &iter : _threads)
  83. // {
  84. // iter->join();
  85. // }
  86. // }
  87. ~ThreadPool()
  88. {
  89. for (auto &iter : _threads)
  90. {
  91. iter->join();
  92. delete iter;
  93. }
  94. pthread_mutex_destroy(&_lock);
  95. pthread_cond_destroy(&_cond);
  96. }
  97. private:
  98. std::vector<Thread *> _threads;
  99. int _num;
  100. std::queue<T> _task_queue;
  101. pthread_mutex_t _lock;
  102. pthread_cond_t _cond;
  103. };

testMain.cc

  1. #include "threadPool.hpp"
  2. #include "Task.hpp"
  3. #include "log.hpp"
  4. #include <ctime>
  5. #include <iostream>
  6. int main()
  7. {
  8. logMessage(NORMAL, "%s %d %c %f\n", "这是一条日志信息", 1234, 'a', 3.14);
  9. srand((unsigned long)time(nullptr) ^ getpid()); // 利用 ^ getpid()让数据更随机
  10. ThreadPool<Task> *tp = new ThreadPool<Task>();
  11. tp->run();
  12. while (true)
  13. {
  14. // 生产的过程(制作任务的时候,要花时间)
  15. int x = rand() % 1000;
  16. usleep(5000); // 模拟制作任务的时候,花费的时间
  17. int y = rand() % 1000;
  18. Task t(x, y, [](int x, int y)->int{
  19. return x + y;
  20. });
  21. // std::cout << "制作任务完成" << std::endl;
  22. logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
  23. // 推送任务到线程池中
  24. tp->pushTask(t);
  25. sleep(1); // 防止刷新过快
  26. }
  27. return 0;
  28. }

补充:

对于时间信息的提取与输出:

struct tm中所存储的内容: 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/944857
推荐阅读
相关标签
  

闽ICP备14008679号