线程池666666

1. 作用

线程池内部维护了多个工作线程,每个工作线程都会去任务队列中拿取任务并执行,当执行完一个任务后不是马上销毁,而是继续保留执行其它任务。显然,线程池提高了多线程的复用率,减少了创建和销毁线程的时间。

2. 实现原理

线程池内部由任务队列、工作线程和管理者线程组成。

任务队列:存储需要处理的任务。每个任务其实就是具体的函数,在任务队列中存储函数指针和对应的实参。当工作线程获取任务后,就能根据函数指针来调用指定的函数。其实现可以是数组、链表、STL容器等。

工作线程:有N个工作线程,每个工作线程会去任务队列中拿取任务,然后执行具体的任务。当任务被处理后,任务队列中就不再有该任务了。当任务队列中没有任务时,工作线程就会阻塞。

管理者线程:周期性检测忙碌的工作线程数量和任务数量。当任务较多线程不够用时,管理者线程就会多创建几个工作线程来加快处理(不会超过工作线程数量的上限)。当任务较少线程空闲多时,管理者线程就会销毁几个工作线程来减少内存占用(不会低于工作线程数量的下限)。

注意:线程池中没有维护“生产者线程”,所谓的“生产者线程”就是往任务队列中添加任务的线程。

3. 手撕线程池

参考来源:爱编程的大丙。

【1】threadpool.c:

#include "threadpool.h"
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdio.h>

#define NUMBER	2	//管理者线程增加或减少的工作线程数量

//任务结构体
typedef struct Task {
	void (*func)(void* arg);
	void* arg;
} Task;


//线程池结构体
struct ThreadPool {
	//任务队列,视为环形队列
	Task* taskQ;
	int queueCapacity;	//队列容量
	int queueSize;		//当前任务个数
	int queueFront;		//队头 -> 取任务
	int queueRear;		//队尾 -> 加任务
	//线程相关
	pthread_t managerID;	//管理者线程ID
	pthread_t* threadIDs;	//工作线程ID
	int minNum;				//工作线程最小数量
	int maxNum;				//工作线程最大数量
	int busyNum;			//工作线程忙的数量
	int liveNum;			//工作线程存活数量
	int exitNum;			//要销毁的工作线程数量
	pthread_mutex_t mutexPool;	//锁整个线程池
	pthread_mutex_t mutexBusy;	//锁busyNum
	pthread_cond_t notFull;		//任务队列是否满
	pthread_cond_t notEmpty;	//任务队列是否空
	//线程池是否销毁
	int shutdown;		//释放为1,否则为0
};

/***************************************************************
 * 函  数: threadPoolCreate
 * 功  能: 创建线程池并初始化
 * 参  数: min---工作线程的最小数量
 *         max---工作线程的最大数量
 *		   capacity---任务队列的最大容量
 * 返回值: 创建的线程池的地址
 **************************************************************/
ThreadPool* threadPoolCreate(int min, int max, int capacity)
{
	//申请线程池空间
	ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
	do {//此处循环只是为了便于失败释放空间,只会执行一次
		if (pool == NULL) {
			printf("pool create error!\n");
			break;
		}
		//申请任务队列空间,并初始化
		pool->taskQ = (Task*)malloc(sizeof(Task) * capacity);
		if (pool->taskQ == NULL) {
			printf("Task create error!\n");
			break;
		}
		pool->queueCapacity = capacity;
		pool->queueSize = 0;
		pool->queueFront = 0;
		pool->queueRear = 0;
		//初始化互斥锁和条件变量
		if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
			pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
			pthread_cond_init(&pool->notFull, NULL) != 0 ||
			pthread_cond_init(&pool->notEmpty, NULL) != 0)
		{
			printf("mutex or cond create error!\n");
			break;
		}
		//初始化shutdown
		pool->shutdown = 0;
		//初始化线程相关参数
		pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
		if (pool->threadIDs == NULL) {
			printf("threadIDs create error!\n");
			break;
		}
		memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
		pool->minNum = min;
		pool->maxNum = max;
		pool->busyNum = 0;
		pool->liveNum = min;
		pool->exitNum = 0;
		//创建管理者线程和工作线程
		pthread_create(&pool->managerID, NULL, manager, pool);//创建管理线程
		for (int i = 0; i < min; ++i) {
			pthread_create(&pool->threadIDs[i], NULL, worker, pool);//创建工作线程
		}
		return pool;
	} while (0);
	//申请资源失败,释放已分配的资源
	if (pool && pool->taskQ) free(pool->taskQ);
	if (pool && pool->threadIDs) free(pool->threadIDs);
	if (pool) free(pool);
	return NULL;
}


/***************************************************************
 * 函  数: threadPoolDestroy
 * 功  能: 销毁线程池
 * 参  数: pool---要销毁的线程池
 * 返回值: 0表示销毁成功,-1表示销毁失败
 **************************************************************/
int threadPoolDestroy(ThreadPool* pool)
{
	if (!pool) return -1;
	//关闭线程池
	pool->shutdown = 1;
	//阻塞回收管理者线程
	pthread_join(pool->managerID, NULL);
	//唤醒所有工作线程,让其自杀
	for (int i = 0; i < pool->liveNum; ++i) {
		pthread_cond_signal(&pool->notEmpty);
	}
	//释放所有互斥锁和条件变量
	pthread_mutex_destroy(&pool->mutexBusy);
	pthread_mutex_destroy(&pool->mutexPool);
	pthread_cond_destroy(&pool->notEmpty);
	pthread_cond_destroy(&pool->notFull);
	//释放堆空间
	if (pool->taskQ) {
		free(pool->taskQ);
		pool->taskQ = NULL;
	}
	if (pool->threadIDs) {
		free(pool->threadIDs);
		pool->threadIDs = NULL;
	}
	free(pool);
	pool = NULL;
	return 0;
}


/***************************************************************
 * 函  数: threadPoolAdd
 * 功  能: 生产者往线程池的任务队列中添加任务
 * 参  数: pool---线程池
 *		   func---函数指针,要执行的任务地址
 *		   arg---func指向的函数的实参
 * 返回值: 无
 **************************************************************/
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
	pthread_mutex_lock(&pool->mutexPool);
	//任务队列满,阻塞生产者
	while (pool->queueSize == pool->queueCapacity && !pool->shutdown) {
		pthread_cond_wait(&pool->notFull, &pool->mutexPool);
	}
	//判断线程池是否关闭
	if (pool->shutdown) {
		pthread_mutex_unlock(&pool->mutexPool);
		return;
	}
	//添加任务进pool->taskQ
	pool->taskQ[pool->queueRear].func = func;
	pool->taskQ[pool->queueRear].arg = arg;
	pool->queueSize++;
	pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
	pthread_cond_signal(&pool->notEmpty);//唤醒工作线程
	pthread_mutex_unlock(&pool->mutexPool);
}


/***************************************************************
 * 函  数: getThreadPoolBusyNum
 * 功  能: 获取线程池忙的工作线程数量
 * 参  数: pool---线程池
 * 返回值: 忙的工作线程数量
 **************************************************************/
int getThreadPoolBusyNum(ThreadPool* pool)
{
	pthread_mutex_lock(&pool->mutexBusy);
	int busyNum = pool->busyNum;
	pthread_mutex_unlock(&pool->mutexBusy);
	return busyNum;
}


/***************************************************************
 * 函  数: getThreadPoolAliveNum
 * 功  能: 获取线程池存活的工作线程数量
 * 参  数: pool---线程池
 * 返回值: 存活的工作线程数量
 **************************************************************/
int getThreadPoolAliveNum(ThreadPool* pool)
{
	pthread_mutex_lock(&pool->mutexPool);
	int liveNum = pool->liveNum;
	pthread_mutex_unlock(&pool->mutexPool);
	return liveNum;
}


/***************************************************************
 * 函  数: worker
 * 功  能: 工作线程的执行函数
 * 参  数: arg---实参传入,这里传入的是线程池
 * 返回值: 空指针
 **************************************************************/
void* worker(void* arg)
{
	ThreadPool* pool = (ThreadPool*)arg;
	while (1) {
		/* 1.取出任务队列中的队头任务 */
		pthread_mutex_lock(&pool->mutexPool);
		//无任务就阻塞线程
		while (pool->queueSize == 0 && !pool->shutdown) {
			pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
			//唤醒后,判断是不是要销毁线程
			if (pool->exitNum > 0) {//线程自杀
				pool->exitNum--;//销毁指标-1
				if (pool->liveNum > pool->minNum) {
					pool->liveNum--;//活着的工作线程-1
					pthread_mutex_unlock(&pool->mutexPool);
					threadExit(pool);
				}
			}
		}
		//线程池关闭了就退出线程
		if (pool->shutdown) {
			pthread_mutex_unlock(&pool->mutexPool);
			threadExit(pool);
		}
		//取出pool中taskQ的任务
		Task task;
		task.func = pool->taskQ[pool->queueFront].func;
		task.arg = pool->taskQ[pool->queueFront].arg;
		pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;//移动队头
		pool->queueSize--;
		//通知生产者添加任务
		pthread_cond_signal(&pool->notFull);
		pthread_mutex_unlock(&pool->mutexPool);

		/* 2.设置pool的busyNum+1 */
		pthread_mutex_lock(&pool->mutexBusy);
		pool->busyNum++;
		pthread_mutex_unlock(&pool->mutexBusy);

		/* 3.执行取出的任务 */
		printf("thread %ld start working ...\n", pthread_self());
		task.func(task.arg);
		free(task.arg);
		task.arg = NULL;
		printf("thread %ld end working ...\n", pthread_self());

		/* 4.设置pool的busyNum-1 */
		pthread_mutex_lock(&pool->mutexBusy);
		pool->busyNum--;
		pthread_mutex_unlock(&pool->mutexBusy);
	}
	return NULL;
}


/***************************************************************
 * 函  数: manager
 * 功  能: 管理者线程的执行函数
 * 参  数: arg---实参传入,这里传入的是线程池
 * 返回值: 空指针
 **************************************************************/
void* manager(void* arg)
{
	ThreadPool* pool = (ThreadPool*)arg;
	while (!pool->shutdown) {
		/* 每隔3秒检测一次 */
		sleep(3);

		/* 获取pool中相关变量 */
		pthread_mutex_lock(&pool->mutexPool);
		int taskNum = pool->queueSize;	//任务队列中的任务数量
		int liveNum = pool->liveNum;	//存活的工作线程数量
		int busyNum = pool->busyNum;	//忙碌的工作线程数量
		pthread_mutex_unlock(&pool->mutexPool);

		/* 功能一:增加工作线程,每次增加NUMBER个 */
		//当任务个数大于存活工作线程数,且存活工作线程数小于最大值
		if (taskNum > liveNum && liveNum < pool->maxNum) {
			pthread_mutex_lock(&pool->mutexPool);
			int counter = 0;
			for (int i = 0; i < pool->maxNum && counter < NUMBER
				&& pool->liveNum < pool->maxNum; ++i)
			{
				if (pool->threadIDs[i] == 0) {
					pthread_create(&pool->threadIDs[i], NULL, worker, pool);
					counter++;
					pool->liveNum++;
				}
			}
			pthread_mutex_unlock(&pool->mutexPool);
		}

		/* 功能二:销毁工作线程,每次销毁NUMBER个 */
		//当忙的线程数*2 < 存活线程数,且存活线程数 > 最小线程数
		if (busyNum * 2 < liveNum && liveNum > pool->minNum) {
			pthread_mutex_lock(&pool->mutexPool);
			pool->exitNum = NUMBER;
			//唤醒NUMBER个工作线程,让其解除阻塞,在worker函数中自杀
			for (int i = 0; i < NUMBER; ++i) {
				pthread_cond_signal(&pool->notEmpty);
			}
			pthread_mutex_unlock(&pool->mutexPool);
		}
	}
	return NULL;
}



/***************************************************************
 * 函  数: threadExit
 * 功  能: 工作线程退出函数,将工作线程的ID置为0,然后退出
 * 参  数: pool---线程池
 * 返回值: 无
 **************************************************************/
void threadExit(ThreadPool* pool)
{
	//将pool->threadIDs中的ID改为0
	pthread_t tid = pthread_self();
	for (int i = 0; i < pool->maxNum; i++) {
		if (pool->threadIDs[i] == tid) {
			pool->threadIDs[i] = 0;
			printf("threadExit() called, %ld exiting...\n", tid);
			break;
		}
	}
	pthread_exit(NULL);//退出
}

【2】threadpool.h:

#ifndef _THREADPOOL_H
#define _THREADPOOL_H

typedef struct ThreadPool ThreadPool;

//创建线程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int capacity);

//销毁线程池
int threadPoolDestroy(ThreadPool* pool);

//给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);

//获取当前忙碌的工作线程的数量
int getThreadPoolBusyNum(ThreadPool* pool);

//获取当前存活的工作线程的数量
int getThreadPoolAliveNum(ThreadPool* pool);

/*********************其它函数**********************/
void* worker(void* arg);//工作线程的执行函数
void* manager(void* arg);//管理者线程的执行函数
void threadExit(ThreadPool* pool);//线程退出函数

#endif

【3】main.c:

#include <stdio.h>
#include "threadpool.h"
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

//任务函数,所有线程都执行此任务
void testFunc(void* arg)
{
	int* num = (int*)arg;
	printf("thread %ld is working, number = %d\n", pthread_self(), *num);
	sleep(1);
}

int main()
{
	//创建线程池: 最少3个工作线程,最多10个,任务队列容量为100
	ThreadPool* pool = threadPoolCreate(3, 10, 100);
	//加入100个任务于任务队列
	for (int i = 0; i < 100; ++i) {
		int* num = (int*)malloc(sizeof(int));
		*num = i + 100;
		threadPoolAdd(pool, testFunc, num);
	}
	//销毁线程池
	sleep(30);//保证任务全部运行完毕
	threadPoolDestroy(pool);
	return 0;
}

【4】运行结果:

......

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/768403.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

创建kset

1、kset介绍 2、相关结构体和api介绍 2.1 struct kset 2.2 kset_create_and_add kset_create_and_addkset_createkset_registerkobject_add_internalkobject_add_internal2.3 kset_unregister kset_unregisterkobject_delkobject_put3、实验操作 #include<linux/module.…

代码随想录第42天|动态规划

198.打家劫舍 参考 dp[j] 表示偷盗的总金额, j 表示前 j 间房(包括j)的总偷盗金额初始化: dp[0] 一定要偷, dp[1] 则取房间0,1的最大值遍历顺序: 从小到大 class Solution { public:int rob(vector<int>& nums) {if (nums.size() < 2) {return nums[0];}vector&…

[译]Reactjs性能篇

英文有限&#xff0c;技术一般&#xff0c;海涵海涵&#xff0c;由于不是翻译出身&#xff0c;所以存在大量的瞎胡乱翻译的情况&#xff0c;信不过我的&#xff0c;请看原文&#xff5e;&#xff5e; 原文地址&#xff1a;https://facebook.github.io/react/docs/advanced-per…

Java环境变量的设置

JAVA环境变量的设置 1.设置环境变量的作用2.如何设置环境变量2.1 找到系统的环境变量2.2 设置环境变量 1.设置环境变量的作用 说明&#xff1a;在Java中设置环境变量主要是为了能够让Java运行时能够找到Java开发工具包&#xff08;JDK&#xff09;的安装位置以及相关的库文件。…

Zabbix 配置端口监控

Zabbix 端口监控简介 在Zabbix中配置端口监控&#xff0c;可以帮助你实时监控服务器或网络设备上的特定端口是否开放和可访问。Zabbix提供了多种方式来监控端口&#xff0c;主要包括简单的端口可用性检查和更复杂的服务监控。 在Zabbix中进行端口监控时&#xff0c;不一定需要…

Ubuntu 安装Nginx服务

转自&#xff1a;https://blog.csdn.net/yegu001/article/details/135411588 Package: nginx Architecture: amd64 Version: 1.18.0-6ubuntu14.4 Priority: optional Section: web Origin: Ubuntu Maintainer: Ubuntu Developers <ubuntu-devel-discusslists.ubuntu.com>…

可充电纽扣电池ML2032充电电路设计

如图&#xff0c;可充电纽扣电池ML2032充电电路设计。 图中二极管是为了防止电流倒灌&#xff0c; 电阻分压出3.66v&#xff0c;再减掉二极管压降&#xff08;约0.4v)得3.26V&#xff0c;加在电池正负极充电。 随着电池电量的积累&#xff0c;充电电流逐步减小&#xff0c;极限…

魔行观察-AI数据分析-蜜雪冰城

摘要 本报告旨在评估蜜雪冰城品牌作为投资对象的潜力和价值&#xff0c;基于其经营模式、门店分布、人均消费、覆盖省份等关键指标进行分析。 数据数据源&#xff1a;魔行观察&#xff1a;http://www.wmomo.com/#/brand/brandDetails?code10013603 品牌概览 蜜雪冰城是中国…

[Information Sciences 2023]用于假新闻检测的相似性感知多模态提示学习

推荐的一个视频&#xff1a;p-tuning P-tunning直接使用连续空间搜索 做法就是直接将在自然语言中存在的词直接替换成可以直接训练的输入向量。本身的Pretrained LLMs 可以Fine-Tuning也可以不做。 这篇论文也解释了为什么很少在其他领域结合知识图谱的原因&#xff1a;就是因…

iOS 视图实现渐变色背景

需求 目的是要实现视图的自定义的渐变背景色&#xff0c;实现一个能够随时使用的工具。 实现讨论 在 iOS 中&#xff0c;如果设置视图单一的背景色&#xff0c;是很简单的。可是&#xff0c;如果要设置渐变的背景色&#xff0c;该怎么实现呢&#xff1f;其实也没有很是麻烦&…

普元EOS学习笔记-低开实现图书的增删改查

前言 在前一篇《普元EOS学习笔记-创建精简应用》中&#xff0c;我已经创建了EOS精简应用。 我之前说过&#xff0c;EOS精简应用就是自己创建的EOS精简版&#xff0c;该项目中&#xff0c;开发者可以进行低代码开发&#xff0c;也可以进行高代码开发。 本文我就记录一下自己在…

Sping源码(九)—— Bean的初始化(非懒加载)—mergeBeanDefinitionPostProcessor

序言 前几篇文章详细介绍了Spring中实例化Bean的各种方式&#xff0c;其中包括采用FactoryBean的方式创建对象、使用反射创建对象、自定义BeanFactoryPostProcessor以及构造器方式创建对象。 创建对象 这里再来简单回顾一下对象的创建&#xff0c;不知道大家有没有这样一个疑…

MySQL之备份与恢复(二)

备份与恢复 定义恢复需求 如果一切正常&#xff0c;那么永远也不需要考虑恢复。但是&#xff0c;一旦需要恢复&#xff0c;只有世界上最好的备份系统是没用的&#xff0c;还需要一个强大的恢复系统。 不幸的是&#xff0c;让备份系统平滑工作比构造良好的恢复过程和工具更容易…

优先级队列(堆)学的好,头发掉的少(Java版)

本篇会加入个人的所谓鱼式疯言 ❤️❤️❤️鱼式疯言:❤️❤️❤️此疯言非彼疯言 而是理解过并总结出来通俗易懂的大白话, 小编会尽可能的在每个概念后插入鱼式疯言,帮助大家理解的. &#x1f92d;&#x1f92d;&#x1f92d;可能说的不是那么严谨.但小编初心是能让更多人…

使用 Ollama 时遇到的问题

题意&#xff1a; ImportError: cannot import name Ollama from llama_index.llms (unknown location) - installing dependencies does not solve the problem Python 无法从 llama_index.llms 模块中导入名为 Ollama 的类或函数 问题背景&#xff1a; I want to learn LL…

七大排序算法的深入浅出(java篇)

&#x1f341; 个人主页&#xff1a;爱编程的Tom&#x1f4ab; 本篇博文收录专栏&#xff1a;Java专栏&#x1f449; 目前其它专栏&#xff1a;c系列小游戏 c语言系列--万物的开始_ 等等 &#x1f389; 欢迎 &#x1f44d;点赞✍评论⭐收藏&#x1f496;三连支…

springboot 整合 mybatis-plus

一.前言 1. mybatis-plus是什么 mybatis-plus是一个对mybati框架的拓展框架&#xff0c;它在mybatis框架基础上做了许多的增强&#xff0c;帮助我们快速的进行代码开发。目前企业开发中&#xff0c;使用mybati的项目基本会选择使用mybatis-plus来提升开发效率。 2.官网地址&…

Study--Oracle-06-Oracler网络管理

一、ORACLE的监听管理 1、ORACLE网络监听配置文件 cd /u01/app/oracle/product/12.2.0/db_1/network/admin 2、在Oracle数据库中&#xff0c;监听器&#xff08;Listener&#xff09;是一个独立的进程&#xff0c;它监听数据库服务器上的特定端口上的网络连接请求&#xff0c…

四十篇:内存巨擘对决:Redis与Memcached的深度剖析与多维对比

内存巨擘对决&#xff1a;Redis与Memcached的深度剖析与多维对比 1. 引言 在现代的系统架构中&#xff0c;内存数据库已经成为了信息处理的核心技术之一。这类数据库系统的高效性主要来源于其对数据的即时访问能力&#xff0c;这是因为数据直接存储在RAM中&#xff0c;而非传统…

p2p、分布式,区块链笔记: 通过libp2p的Kademlia网络协议实现kv-store

Kademlia 网络协议 Kademlia 是一种分布式哈希表协议和算法&#xff0c;用于构建去中心化的对等网络&#xff0c;核心思想是通过分布式的网络结构来实现高效的数据查找和存储。在这个学习项目里&#xff0c;Kademlia 作为 libp2p 中的 NetworkBehaviour的组成。 以下这些函数或…