ThreadPoolExecutor的CallerRunsPolicy拒绝策略是什么

这篇文章将为大家详细讲解有关ThreadPoolExecutor的CallerRunsPolicy拒绝策略是什么,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

成都网络公司-成都网站建设公司创新互联建站十多年经验成就非凡,专业从事网站建设、成都做网站,成都网页设计,成都网页制作,软文发稿广告投放平台等。十多年来已成功提供全面的成都网站建设方案,打造行业特色的成都网站建设案例,建站热线:18980820575,我们期待您的来电!

首先介绍一下ThreadPoolExecutor构造方法:

public ThreadPoolExecutor(int corePoolSize,

                          int maximumPoolSize,

                          long keepAliveTime,

                          TimeUnit unit,

                          BlockingQueue workQueue,

                          ThreadFactory threadFactory,

                          RejectedExecutionHandler handler) {

    if (corePoolSize < 0 ||

        maximumPoolSize <= 0 ||

        maximumPoolSize < corePoolSize ||

        keepAliveTime < 0)

        throw new IllegalArgumentException();

    if (workQueue == null || threadFactory == null || handler == null)

        throw new NullPointerException();

    this.corePoolSize = corePoolSize;

    this.maximumPoolSize = maximumPoolSize;

    this.workQueue = workQueue;

    this.keepAliveTime = unit.toNanos(keepAliveTime);

    this.threadFactory = threadFactory;

    this.handler = handler;

}

构造方法中的字段含义如下

  • corePoolSize:核心线程数量,当有新任务在execute()方法提交时,会执行以下判断:

    • 如果运行的线程少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;

    • 如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当workQueue满时才创建新的线程去处理任务;

    • 如果设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池的大小是固定的,这时如果有新任务提交,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理;

    • 如果运行的线程数量大于等于maximumPoolSize,这时如果workQueue已经满了,则通过handler所指定的策略来处理任务

    • 所以,任务提交时,判断的顺序为 corePoolSize –> workQueue –> maximumPoolSize。

  • maximumPoolSize:最大线程数量

  • workQueue:等待队列,当任务提交时,如果线程池中的线程数量大于等于corePoolSize的时候,把该任务封装成一个Worker对象放入等待队列;

  • keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;

  • threadFactory:它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。

  • handler它是RejectedExecutionHandler类型的变量,表示线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:

    • AbortPolicy:直接抛出异常,这是默认策略;

    • CallerRunsPolicy:用调用者所在的线程来执行任务;

    • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;

    • DiscardPolicy:直接丢弃任务;

简单来说,在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下:

  1. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;

  2. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;

  3. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;

  4. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

ThreadPoolExecutor的CallerRunsPolicy拒绝策略是什么

实验:拒绝策略CallerRunsPolicy

    测试当拒绝策略是CallerRunsPolicy时,用调用者所在的线程来执行任务,是什么现象。

实验环境

  •  jdk 1.8

  • postman模拟并发

  • MySQL5.7

  • intellj 

  • springboot 2.1.4.RELEASE

实验代码

本实验代码,在https://github.com/vincentduan/mavenProject 下的threadManagement目录下。

实验步骤

1.  首先配置maven pom.xml文件内容,关键内容如下:


        
            
                org.springframework.boot
                spring-boot-dependencies
                2.1.4.RELEASE
                import
                pom
            
        
    

    

        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-configuration-processor
            true
        

        
            mysql
            mysql-connector-java
        
        
            org.springframework.boot
            spring-boot-starter-jdbc
        
        
            com.alibaba
            druid
            1.1.6
        

        
            org.projectlombok
            lombok
            1.18.6
        

2. 配置数据库连接池:

@SpringBootConfiguration
public class DBConfiguration {

    @Autowired
    private Environment environment;

    @Bean
    public DataSource createDataSource() {
        DruidDataSource druidDataSource = new DruidDataSource();
        druidDataSource.setUrl(environment.getProperty("spring.datasource.url"));
        druidDataSource.setUsername(environment.getProperty("spring.datasource.username"));
        druidDataSource.setPassword(environment.getProperty("spring.datasource.password"));
        druidDataSource.setDriverClassName(environment.getProperty("spring.datasource.driver-class-name"));
        return druidDataSource;
    }

}

3. 配置线程池:

@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {
    @Autowired
    VisiableThreadPoolTaskExecutor visiableThreadPoolTaskExecutor;

    @Bean
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = visiableThreadPoolTaskExecutor;
        //配置核心线程数
        executor.setCorePoolSize(5);
        //配置最大线程数
        executor.setMaxPoolSize(5);
        //配置队列大小
        executor.setQueueCapacity(5);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("async-service-");

        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();

        return executor;
    }
}

VisiableThreadPoolTaskExecutor类作为一个bean,并且继承了ThreadPoolTaskExecutor;

4. 接下来创建一个service,为了模拟并发,我们将方法执行sleep了30秒。如下:

@Service
@Slf4j
public class UserThreadServiceImpl implements UserThreadService {

    @Autowired
    UserThreadDao userThreadDao;

    @Override
    @Async("asyncServiceExecutor")
    public void serviceTest(String username) {
        log.info("开启执行一个Service, 这个Service执行时间为30s, threadId:{}",Thread.currentThread().getId());
        userThreadDao.add(username, Integer.parseInt(Thread.currentThread().getId() +""), "started");
        try {
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("执行完成一个Service, threadId:{}",Thread.currentThread().getId());
        userThreadDao.update(username, Integer.parseInt(Thread.currentThread().getId() +""), "ended");
    }

    @Override
    public void update(String username, int threadId, String status) {
        userThreadDao.update(username, threadId, status);
    }
}

5. 其中的dao很简单,就是往数据库插入数据和更新数据。

6. 创建web Controller:

@RestController
@RequestMapping("/serviceTest")
public class TestController {

    @Autowired
    UserThreadService userThreadService;
    @Autowired
    private VisiableThreadPoolTaskExecutor visiableThreadPoolTaskExecutor;

    @GetMapping("test/{username}")
    public Object test(@PathVariable("username") String username) {
        userThreadService.serviceTest(username);
        JSONObject jsonObject = new JSONObject();
        ThreadPoolExecutor threadPoolExecutor = visiableThreadPoolTaskExecutor.getThreadPoolExecutor();
        jsonObject.put("ThreadNamePrefix", visiableThreadPoolTaskExecutor.getThreadNamePrefix());
        jsonObject.put("TaskCount", threadPoolExecutor.getTaskCount());
        jsonObject.put("completedTaskCount", threadPoolExecutor.getCompletedTaskCount());
        jsonObject.put("activeCount", threadPoolExecutor.getActiveCount());
        jsonObject.put("queueSize", threadPoolExecutor.getQueue().size());
        return jsonObject;
    }

}

执行测试

因为CorePoolSize数量是5,MaxPoolSize也是5,因此线程池的大小是固定的。而QueueCapacity数量也是5。因此当请求前5次时,返回响应:

ThreadPoolExecutor的CallerRunsPolicy拒绝策略是什么

当前线程数达到CorePoolSize时,新来的请求就会进入到workQueue中,如下所示:

ThreadPoolExecutor的CallerRunsPolicy拒绝策略是什么

而QueueCapacity的数量也是5,因此达到QueueCapacity的最大限制后,同时也达到了MaxPoolSize的最大限制,将会根据拒绝策略来处理该任务,这里的策略是CallerRunsPolicy时,用调用者所在的线程来执行任务,表现为当前页面被阻塞住了,直到当前调用者所在的线程执行完毕。如下所示:

ThreadPoolExecutor的CallerRunsPolicy拒绝策略是什么

从控制台的输出也能看出CallerRunsPolicy策略执行线程是调用者线程:

2019-08-19 21:06:50.255  INFO 1302 --- [async-service-4] c.a.i.s.impl.UserThreadServiceImpl       : 开启执行一个Service, 这个Service执行时间为30s, threadId:51
2019-08-19 21:06:50.271  INFO 1302 --- [async-service-4] cn.ac.iie.dao.UserThreadDao              : threadId:51, jdbcTemplate:org.springframework.jdbc.core.JdbcTemplate@5d83694c
2019-08-19 21:06:50.751  INFO 1302 --- [async-service-5] c.a.i.s.impl.UserThreadServiceImpl       : 开启执行一个Service, 这个Service执行时间为30s, threadId:52
2019-08-19 21:06:50.771  INFO 1302 --- [async-service-5] cn.ac.iie.dao.UserThreadDao              : threadId:52, jdbcTemplate:org.springframework.jdbc.core.JdbcTemplate@5d83694c
2019-08-19 21:06:55.028  INFO 1302 --- [nio-8080-exec-1] c.a.i.s.impl.UserThreadServiceImpl       : 开启执行一个Service, 这个Service执行时间为30s, threadId:24
2019-08-19 21:06:55.036  INFO 1302 --- [nio-8080-exec-1] cn.ac.iie.dao.UserThreadDao              : threadId:24, jdbcTemplate:org.springframework.jdbc.core.JdbcTemplate@5d83694c

其中[nio-8080-exec-1]表示就是调用者的线程。而其他线程都是[async-service-]

关于ThreadPoolExecutor的CallerRunsPolicy拒绝策略是什么就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。


当前名称:ThreadPoolExecutor的CallerRunsPolicy拒绝策略是什么
URL链接:http://scyanting.com/article/pcjihe.html