java异步并发请求和请求合并举例分析

这篇文章主要介绍“java异步并发请求和请求合并举例分析”,在日常操作中,相信很多人在java异步并发请求和请求合并举例分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”java异步并发请求和请求合并举例分析”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

创新互联是专业的天峻网站建设公司,天峻接单;提供成都做网站、网站制作,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行天峻网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!

概述

在做业务系统需求开发中,经常需要从其他服务获取数据,拼接数据,然后返回数据给前端使用;常见的服务调用就是通过http接口调用,而对于http,通常一个请求会分配一个线程执行,在同步调用接口的情况下,整个线程是一直被占用或者阻塞的;如果有大量的这种请求,整个系统的吞吐量就比较低,而在依赖的服务响应时间比较低的情况下,我们希望先让出cpu,让其他请求先执行,等依赖的服务请求返回结果时再继续往下执行,这时我们会考虑将请求异步化,或者将相同的请求合并,从而达到提高系统执行效率和吞吐量的目的。

异步并发请求

目前常见的几种调用方式是同步调用,线程池+future,异步回调completableFuture;协程也是异步调用的解决方式,但java目前不支持协程;对于future方式,只能用get或者while(!isDone)轮询这种阻塞的方式直到线程执行完成,这也不是我们希望的异步执行方式,jdk8提供的completableFuture其实也不是异步的方式,只是对依赖多服务的Callback调用结果处理做结果编排,来弥补Callback的不足,从而实现异步链式调用的目的,这也是比较推荐的方式。

同步调用
	RpcService rpcService = new RpcService();
	HttpService httpService = new HttpService();
	// 假设rpc1耗时10ms
	Map rpcResult1=rpcService.getRpcResult();
	// 假设rpc2耗时20ms
	Integer rpcResult2 = httpService.getHttpResult();
	
	// 则线程总耗时:30ms
future
	ExecutorService executor = Executors.newFixedThreadPool(2);
	RpcService rpcService = new RpcService();
	HttpService httpService = new HttpService();
	future1 = executor.submit(() -> rpcService.getRpcResult());
	future2 = executor.submit(() -> httpService.getHttpResult());
	//rpc1耗时10ms
	Map rpcResult1 = future1.get(300, TimeUnit.MILLISECONDS);
	//rpc2耗时20ms
	Integer rpcResult2 = future2.get(300, TimeUnit.MILLISECONDS);

	//则线程总耗时20ms
CompletableFuture
	/** 
	* 场景:两个接口并发异步调用,返回CompletableFuture,不阻塞主线程 
	* 两个服务也是异步非阻塞调用
	**/
	CompletableFuture future1 = service.getHttpData("http://www.vip.com/showGoods/50");
	CompletableFuture future2 = service.getHttpData("http://www.vip.com/showGoods/50");
	CompletableFuture future3 = future1.thenCombine(future2, (f1, f2) -> {
	    //处理业务....

	    return f1 + "," + f2;
	}).exceptionally(e -> {

	    return "";
	});	

CompletableFuture使用ForkJoinPool执行线程,在ForkJoinPool类注册ForkJoinWorkerThread线程时可以看到,ForkJoinPool里面的线程都是daemon线程(垃圾回收线程就是一个典型的deamon线程),

/**
     * Callback from ForkJoinWorkerThread constructor to establish and
     * record its WorkQueue.
     *
     * @param wt the worker thread
     * @return the worker's queue
     */
    final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
        UncaughtExceptionHandler handler;
		
	    #注册守护线程
        wt.setDaemon(true);          // configure thread
		
        if ((handler = ueh) != null)
            wt.setUncaughtExceptionHandler(handler);
        WorkQueue w = new WorkQueue(this, wt);
        int i = 0;                                    // assign a pool index
        int mode = config & MODE_MASK;
        int rs = lockRunState();
        try {
            WorkQueue[] ws; int n;                    // skip if no array
            if ((ws = workQueues) != null && (n = ws.length) > 0) {
                int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
                int m = n - 1;
                i = ((s << 1) | 1) & m;               // odd-numbered indices
                if (ws[i] != null) {                  // collision
                    int probes = 0;                   // step by approx half n
                    int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                    while (ws[i = (i + step) & m] != null) {
                        if (++probes >= n) {
                            workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                            m = n - 1;
                            probes = 0;
                        }
                    }
                }
                w.hint = s;                           // use as random seed
                w.config = i | mode;
                w.scanState = i;                      // publication fence
                ws[i] = w;
            }
        } finally {
            unlockRunState(rs, rs & ~RSLOCK);
        }
        wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
        return w;
    }

当主线程执行完后,jvm就会退出,所以需要考虑主线程执行完成时间和fork出去的线程执行时间,也需要考虑线程池的大小,默认为当前cpu的核数-1,可以参考下其他系统的故障记录:CompletableFuture线程池问题。

请求合并

当系统遇到瞬间产生大量请求时,可以考虑将相同的请求合并,最大化利用系统IO,提高系统的吞吐量。

设计时,可以将符合条件的url请求,先收集起来,直到满足以下条件之一时进行合并发送:

  1. 收集到的请求数超过预设的最大请求数。

  2. 距离上次请求发送时长超过预设的最大时长。

实现方案有自行使用阻塞队列方式:并发环境下的请求合并,也可以考虑Hystrix:Hystrix实现请求合并/请求缓存

目前公司网关组件janus也是通过合并auth请求的方式减少网络开销,提高cpu的利用率和系统吞吐量的。

nginx同样有合并请求模块nginx-http-concat用来减少请求io,参考:nginx 合并多个js/css请求为一个请求

赖泽坤@vipshop.com

到此,关于“java异步并发请求和请求合并举例分析”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!


当前名称:java异步并发请求和请求合并举例分析
分享网址:http://scyanting.com/article/gigsho.html