美團簡單版動態線程池源碼實現

語言: CN / TW / HK

最近看到美團技術團隊的動態線程池分析文章: tech.meituan.com/2020/04/02/… 以及一個對應的開源項目: github.com/dromara/dyn…

有點意思,同時也覺得動態線程池在工作的實用性,便通過此文來分析一下動態線程池的核心實現原理,本文參考了美團的文章和開源項目的實現思路,特此感謝。

背景

動態線程池,指的是線程池中的參數可以動態修改並生效,比如corePoolSize、maximumPoolSize等。

在工作中,線程池的核心線程數和最大線程數等參數是很難估計和固定的,如果能在應用運行過程中動態進行調整,也就很有必要了。

前置條件

  1. 直接基於SpringBoot
  2. 支持Nacos配置中心配置

核心配置項

dtp:
  enable: true
  core-pool-size: 10
  maximum-pool-size: 50 
複製代碼

我希望,能通過以上配置就能配置出一個動態線程池:

  1. dtp:表示dynamic thread pool,動態線程池的縮寫
  2. enable:表示是否使用動態線程池,默認為true
  3. core-pool-size:表示dtp的核心線程數
  4. maximum-pool-size:表示dtp的最大線程數
  5. 對於線程池的其他參數,可以後續再擴展

另外,我希望能做到,只有項目的配置中 存在dtp配置,並且enable不等於false ,那就表示開啟動態線程池,就需要向Spring容器中添加一個線程池對象作為一個Bean對象,這樣其他Bean就能通過依賴注入來使用動態線程池了。

另外,對於上面的配置,我們最好是配置在nacos中,這樣才能動態修改。

動手實現

首先創建兩個項目:

  1. dtp-autoconfiguration:表示動態線程池的自動配置模塊,會存放一些相關的自動配置類
  2. user:表示一個業務應用,會使用動態線程池

然後把user改寫為一個SpringBoot應用:

引入依賴:

<dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-dependencies</artifactId>
			<version>2.3.12.RELEASE</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
	</dependencies>
</dependencyManagement>

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
</dependencies>
複製代碼

新建啟動類和Controller:

@SpringBootApplication
public class MyApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }
}
複製代碼
@RestController
public class ZhouyuController {

    @GetMapping("/test")
    public String test(){
        return "hello";
    }
}
複製代碼

現在,我喜歡能在ZhouyuController中使用動態線程池,就像如下:

@RestController
public class ZhouyuController {

    @Autowired
    private ThreadPoolExecutor threadPoolExecutor; // 需要一個動態線程池

    @GetMapping("/test")
    public String test(){
        threadPoolExecutor.execute(() -> {
            System.out.println("執行任務");
        });
        return "hello";
    }
}
複製代碼

這段要能工作,得有幾個條件:

  1. Spring容器中得有一個ThreadPoolExecutor類型的Bean
  2. 並且這個ThreadPoolExecutor對象還得是我們所説的動態線程池對象

定義DtpExecutor

這裏就引出一個問題,我們到底該如何表示一個動態線程池,動態線程池和普通線程池的區別在於,動態線程池能支持通過nacos來修改其參數。

那我們是不是需要新定義一個類來表示動態線程池呢?我給的答案是需要,因為如果不新定義一個,那麼對於上述代碼,如果我Spring容器中存在多個ThreadPoolExecutor類型的Bean對象,那麼該如何找到動態線程池呢?只能通過屬性名字了,比如屬性名字為dynamicThreadPoolExecutor,這樣也就需要我們在往Spring容器註冊動態線程池對象時,對於的beanName一定得是dynamicThreadPoolExecutor。

而如果我們新定義一個類(dtp-aucotoncifuration項目中):

public class DtpExecutor extends ThreadPoolExecutor {
    
	public DtpExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
	
}
複製代碼

那麼如果我們想要用動態線程池就方便了:

@RestController
public class ZhouyuController {

    @Autowired
    private DtpExecutor dtpExecutor; // 需要一個動態線程池

    @GetMapping("/test")
    public String test(){
        dtpExecutor.execute(() -> {
            System.out.println("執行任務");
        });
        return "hello";
    }
}
複製代碼

這樣,代碼看起來就更加明確了。

注意,user中添加依賴:

<dependency>
	<groupId>org.example</groupId>
	<artifactId>dtp-autoconfiguration</artifactId>
	<version>1.0-SNAPSHOT</version>
</dependency>
複製代碼

創建DtpExecutor

接下來,我們再來創建DtpExecutor對象並添加到Spring容器中,這一步是非常重要的。

如果應用要開啟動態線程池,那麼就需要做一步,否則就不需要做這一步,並且在創建DtpExecutor對象時,得用配置的參數,並且得支持Nacos,並且還得放到Spring容器中。

這裏就可以用到SpringBoot的自動配置類了。

首先在dtp-autoconfiguration中添加spring-boot的依賴:

<dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-dependencies</artifactId>
			<version>2.3.12.RELEASE</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
	</dependencies>
</dependencyManagement>

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter</artifactId>
	</dependency>
</dependencies>
複製代碼

並且新建一個自動配置類:

@Configuration
@ConditionalOnProperty(prefix = "dtp", value = "enable", havingValue = "true")
public class DtpAutoConfiguration {
}
複製代碼

表示這個配置類,只有在dtp.anable=true的時候才會生效,沒有這個配置項或為false則不會生效。

然後我們可以就可以在DtpAutoConfiguration中來定義DtpExecutor的Bean了,我們首先想到的就是利用@Bean,比如:

@Bean
public DtpExecutor dtpExecutor(){
	DtpExecutor dtpExecutor = new DtpExecutor();
	return dtpExecutor;
}
複製代碼

但是,DtpExecutor中是沒有無參構造方法的,也就是在構造DtpExecutor對象時,我們需要能夠拿到配置項參數,那怎麼拿呢?

熟悉Spring的同學,可能會想到利用Enviroment對象,因為不管我們在應用程序本地的application.yaml中的配置項,還是在nacos中的配置項,最終都是放在了Environment對象中,比如如下代碼:

@Configuration
@ConditionalOnProperty(prefix = "dtp", value = "enable", havingValue = "true")
public class DtpAutoConfiguration {

    @Autowired
    private Environment environment;

    @Bean
    public DtpExecutor dtpExecutor(){
        Integer corePoolSize = Integer.valueOf(environment.getProperty("dpt.core-pool-size"));
        Integer maximumPoolSize = Integer.valueOf(environment.getProperty("dpt.maximum-pool-size"));

        DtpExecutor dtpExecutor = new DtpExecutor(corePoolSize, maximumPoolSize);
        return dtpExecutor;
    }
}
複製代碼

第一次測試

我們先來測試一下,注意DtpAutoConfiguration自動配置類要能夠生效,還需要利用spring.factories:

因為我們有限制,所以我們還得在user中配置dtp:

dtp:
	enable: true
	core-pool-size: 10
	maximum-pool-size: 50
複製代碼

並且把ZhouyuController的代碼也大概改一下:

@RestController
public class ZhouyuController {

	@Autowired
	private DtpExecutor dtpExecutor; // 需要一個動態線程池

	@GetMapping("/test")
	public Integer test(){
		return dtpExecutor.getCorePoolSize();
	}
}
複製代碼

這樣就能測出來,能不能使用動態線程池,並且是否是我們所配置的參數。

啟動User應用,然後訪問localhost:8080/test,結果為: 發現,結果正常。

如果有問題,可以關注我,一起討論:

並且,我們可以試試在nacos中進行配置,那我們需要在user中引入nacos-client:

<dependency>
	<groupId>com.alibaba.boot</groupId>
	<artifactId>nacos-config-spring-boot-starter</artifactId>
	<version>0.2.7</version>
</dependency>
複製代碼

然後配置nacos:

nacos:
  config:
    server-addr: 127.0.0.1:8848
    data-id: dtp.yaml
    type: yaml
    auto-refresh: true
    bootstrap:
      enable: true
複製代碼

然後在nacos中配置dtp.yaml:

啟動user應用,訪問localhost:8080/test: 結果也是正常的。

也就是説,代碼寫到這,我們完成了:

  1. 能讀取nacos中的配置
  2. 並創建DtpExecutor對象
  3. 並放入Spring容器

NacosRefresher

那麼最核心的問題還沒有解決:應用在運行過程中,如果在nacos中修改了配置,如何生效?

這就需要在user應用中能夠發現nacos中配置內容是否修改了,這就需要利用到nacos的監聽器機制,我們在auto-configuration模塊來定義一個nacos的監聽器,這就需要auto-configuratino也依賴nacos-client,我們直接nacos-client的依賴從user模塊轉移到auto-configuration模塊中去,這樣對於user是沒有影響的,因為user依賴了auto-configuration模塊,從而間接的依賴了nacos-client。

我們新建一個Nacos監聽器NacosRefresher:

public class NacosRefresher implements Listener, InitializingBean {

    @NacosInjected
    private ConfigService configService;

	// 利用Spring的Bean初始化機制,來設置要監聽的nacos的dataid
	// 暫時寫死,最好是拿到程序員所配置的dataid和group
    @Override
    public void afterPropertiesSet() throws NacosException {
        configService.addListener("dtp.yaml", "DEFAULT_GROUP", this);
    }

	// 這個是Nacos收到變更事件異步執行邏輯要用到的線程池,跟動態線程池沒關係
    @Override
    public Executor getExecutor() {
        return Executors.newFixedThreadPool(1);
    }

	// 這是用來接收數據變更的,content就是變更後的內容
    @Override
    public void receiveConfigInfo(String content) {
        System.out.println(content);
    }
}
複製代碼

另外在DtpAutoConfiguration中定義NacosRefresher為一個Bean:

@Bean
public NacosRefresher nacosRefresher(){
	return new NacosRefresher();
}
複製代碼

也可以利用@Import來導入NacosRefresher:

@Configuration
@ConditionalOnProperty(prefix = "dtp", value = "enable", havingValue = "true")
@Import(NacosRefresher.class)
public class DtpAutoConfiguration {

    @Autowired
    private Environment environment;

    @Bean
    public DtpExecutor dtpExecutor(){
        Integer corePoolSize = Integer.valueOf(environment.getProperty("dtp.core-pool-size"));
        Integer maximumPoolSize = Integer.valueOf(environment.getProperty("dtp.maximum-pool-size"));

        DtpExecutor dtpExecutor = new DtpExecutor(corePoolSize, maximumPoolSize);
        return dtpExecutor;
    }

    
}
複製代碼

更新DtpExecutor

NacosRefresher一旦就到了數據變更事件,那麼就可以更新DtpExecutor了,那麼這裏我們要解決兩個問題:

  1. 解析content,因為content是String,我們需要進行解析並得到配置項內容
  2. 更新Spring容器中的DtpExecutor對象,那就需要能夠拿到DtpExecutor對象

我們先啟動應用,修改一下nacos的配置,看看context長什麼樣:

我只是修改了core-pool-size,但是content是怎麼dtp.yaml的內容,那麼我們就來進行解析:

@Override
public void receiveConfigInfo(String content) {
	YamlPropertiesFactoryBean bean = new YamlPropertiesFactoryBean();
	bean.setResources(new ByteArrayResource(content.getBytes()));
	Properties properties = bean.getObject();
	System.out.println(properties);
}
複製代碼

結果為:

這樣,我們就將content解析為了Properties的格式,這樣就能更加方便獲取配置項了。

接下來,我們只要能拿到Spring容器中的DtpExecutor對象,那麼該如何拿到呢,這裏參考開源項目 dynamic-tp 的做法,利用BeanPostProcessor來存入到一個static的map中。

首先新建一個DtpUtil:

public class DtpUtil {

    public static DtpExecutor dtpExecutor;

    public static DtpExecutor get() {
        return dtpExecutor;
    }

    public static void set(DtpExecutor dtpExecutor) {
        DtpUtil.dtpExecutor = dtpExecutor;
    }
}
複製代碼

然後新建一個BeanPostProcessor,會把DtpExecutor對象存入DtpUtil:

public class DtpBeanPostProcessor implements BeanPostProcessor {

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof DtpExecutor) {
            DtpUtil.set((DtpExecutor) bean);
        }

        return bean;
    }
	
}
複製代碼

另外在DtpAutoConfiguration導入DtpBeanPostProcessor。

然後會到NacosRefresher中,我們就可以利用DtpUtil獲取到DtpExecutor對象了,並且可以修改對應的參數:

@Override
public void receiveConfigInfo(String content) {
	YamlPropertiesFactoryBean bean = new YamlPropertiesFactoryBean();
	bean.setResources(new ByteArrayResource(content.getBytes()));
	Properties properties = bean.getObject();

	DtpExecutor dtpExecutor = DtpUtil.get();
	dtpExecutor.setCorePoolSize(Integer.parseInt(properties.getProperty("dtp.core-pool-size")));
	dtpExecutor.setMaximumPoolSize(Integer.parseInt(properties.getProperty("dtp.maximum-pool-size")));
}
複製代碼

這樣就完成了DtpExecutor的參數修改,到此,一個簡單的動態線程池就完成了,大家可以自行測試一下,修改Nacos配置,看controller那邊能不能實時拿到最新的corePoolSize,我測是沒問題的。

總結

對於實現一個動態線程池,核心要點為:

  1. 動態線程池為一個Bean對象
  2. 能實時發現配置的變更
  3. 不過最為核心的是線程池本身的源碼設計

比如ThreadPoolExecutor的setCorePoolSize:

public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
            interruptIdleWorkers();
        else if (delta > 0) {
            // We don't really know how many new threads are "needed".
            // As a heuristic, prestart enough new workers (up to new
            // core size) to handle the current number of tasks in
            // queue, but stop if queue becomes empty while doing so.
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }
複製代碼

它會判斷:

  1. 如果當前工作線程大於最新corePoolSize,那麼則會中斷空閒線程,最終只會保護corePoolSize個空閒線程
  2. 如果核心線程數增加了,那麼就會調用addWorker(null, true)方法來新增核心線程

而所謂的動態線程池,其實就是動態的去修改線程池中的線程數量,少了就增加,多了就中斷。

希望大家能有所收穫,有問題可以關注我,一起討論: