SpringCloud超时和重试机制是什么

这篇文章给大家介绍Spring Cloud 超时和重试机制是什么,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

创新互联主营同安网站建设的网络公司,主营网站建设方案,成都app软件开发,同安h5微信小程序搭建,同安网站营销推广欢迎同安等地区企业咨询

本文基于Spring Cloud Greenwich.SR2、Spring Boot 2.1.6.RELEASE

一、Feign的配置

1.1 超时时间

feign:
  client:
    config:
      default:
        connect-timeout: 2000
        read-timeout: 2000

1.2 重试

Spring Cloud默认关闭了Feign的重试机制

//org.springframework.cloud.openfeign.FeignClientsConfiguration
@Bean
@ConditionalOnMissingBean
public Retryer feignRetryer() {
    return Retryer.NEVER_RETRY;
}

//feign.Retryer
/**
 * Implementation that never retries request. It propagates the RetryableException.
 */
Retryer NEVER_RETRY = new Retryer() {

    @Override
    public void continueOrPropagate(RetryableException e) {
        throw e;
    }

    @Override
    public Retryer clone() {
        return this;
    }
};

如果想要开启的话,就自己声明一个bean

@Bean
public Retryer feignRetryer() {
    return  new Retryer.Default();
}

二、Ribbon的配置

2.1 超时时间

#ribbon的超时时间
#如果ribbon和feign的超时时间都配置了,ribbon的配置会被覆盖
ribbon:
  ReadTimeout: 3000
  ConnectTimeout: 3000

2.2 重试

ribbon:
  MaxAutoRetries: 1 #同一台实例最大重试次数,不包括首次调用
  MaxAutoRetriesNextServer: 1 #重试负载均衡其他的实例最大重试次数,不包括首次调用
  OkToRetryOnAllOperations: false  #是否所有操作都重试

三、Hystrix的配置

3.1 超时时间

hystrix:
  command:
    default:
      execution:
        timeout:
          enabled: true
        isolation:
          thread:
            timeoutInMilliseconds: 1000

注意Hystrix的超时时间要超过ribbon的重试时间,否则ribbon重试过程中,就会先触发Hystrix的熔断

超时时间计算可以参考zuul中的AbstractRibbonCommand类的getRibbonTimeout()方法,

protected static int getRibbonTimeout(IClientConfig config, String commandKey) {
    int ribbonTimeout;
    // 这是比较异常的情况,不说
    if (config == null) {
        ribbonTimeout = RibbonClientConfiguration.DEFAULT_READ_TIMEOUT + RibbonClientConfiguration.DEFAULT_CONNECT_TIMEOUT;
    } else {
       // 这里获取了四个参数,ReadTimeout,ConnectTimeout,MaxAutoRetries, MaxAutoRetriesNextServer
        int ribbonReadTimeout = getTimeout(config, commandKey, "ReadTimeout",
            IClientConfigKey.Keys.ReadTimeout, RibbonClientConfiguration.DEFAULT_READ_TIMEOUT);
        int ribbonConnectTimeout = getTimeout(config, commandKey, "ConnectTimeout",
            IClientConfigKey.Keys.ConnectTimeout, RibbonClientConfiguration.DEFAULT_CONNECT_TIMEOUT);
        int maxAutoRetries = getTimeout(config, commandKey, "MaxAutoRetries",
            IClientConfigKey.Keys.MaxAutoRetries, DefaultClientConfigImpl.DEFAULT_MAX_AUTO_RETRIES);
        int maxAutoRetriesNextServer = getTimeout(config, commandKey, "MaxAutoRetriesNextServer",
            IClientConfigKey.Keys.MaxAutoRetriesNextServer, DefaultClientConfigImpl.DEFAULT_MAX_AUTO_RETRIES_NEXT_SERVER);
        // ribbonTimeout的计算方法在这里,以上文的设置为例
        // ribbonTimeout = (3000 + 3000) * (1 + 1) * (1 + 1) = 24000(毫秒)
        ribbonTimeout = (ribbonReadTimeout + ribbonConnectTimeout) * (maxAutoRetries + 1) * (maxAutoRetriesNextServer + 1);
    }
    return ribbonTimeout;
}

四、RestTemplate的配置

4.1 超时时间

RestTemplate默认超时时间是-1,即不会超时,如果想要设置的话,可以这么做

@Bean
@Primary
@LoadBalanced
public RestTemplate lbRestTemplate() {
    SimpleClientHttpRequestFactory simpleClientHttpRequestFactory = new   SimpleClientHttpRequestFactory();
    simpleClientHttpRequestFactory.setConnectTimeout(1000);
    simpleClientHttpRequestFactory.setReadTimeout(1000);

    return new RestTemplate(simpleClientHttpRequestFactory);
}

五、Feign调用流程源码分析

5.1 OKToRetryOnAllOperations参数意义

//AbstractLoadBalancerAwareClient.java
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    LoadBalancerCommand command = buildLoadBalancerCommand(request, requestConfig);
	//省略...
}

protected LoadBalancerCommand buildLoadBalancerCommand(final S request, final IClientConfig config) {
    RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, config);
	//省略...
}

//FeignLoadBalancer.java
//FeignLoadBalancer是AbstractLoadBalancerAwareClient的子类
@Override
public RequestSpecificRetryHandler getRequestSpecificRetryHandler(
    RibbonRequest request, IClientConfig requestConfig) {
    //如果isOkToRetryOnAllOperations参数为true
    if (this.ribbon.isOkToRetryOnAllOperations()) {
        return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(), requestConfig);
    }
    if (!request.toRequest().httpMethod().name().equals("GET")) {
        return new RequestSpecificRetryHandler(true, false, this.getRetryHandler(), requestConfig);
    }
    else {
        return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(), requestConfig);
    }
}

//RequestSpecificRetryHandler.java
/**
 * okToRetryOnConnectErrors:只对连接错误发起重试
 * okToRetryOnAllErrors:对于所有错误都会发起重试
 */
public RequestSpecificRetryHandler(boolean okToRetryOnConnectErrors, boolean okToRetryOnAllErrors, RetryHandler baseRetryHandler, @Nullable IClientConfig requestConfig) {
    Preconditions.checkNotNull(baseRetryHandler);
    this.okToRetryOnConnectErrors = okToRetryOnConnectErrors;
    this.okToRetryOnAllErrors = okToRetryOnAllErrors;
    this.fallback = baseRetryHandler;
    if (requestConfig != null) {
        if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetries)) {
            retrySameServer = requestConfig.get(CommonClientConfigKey.MaxAutoRetries); 
        }
        if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetriesNextServer)) {
            retryNextServer = requestConfig.get(CommonClientConfigKey.MaxAutoRetriesNextServer); 
        } 
    }
}

可以看到如果设置了isOkToRetryOnAllOperations为true,就会对所有错误发起重试,否则的话就只对连接异常发起重试,判断是否重试的代码如下:

//RequestSpecificRetryHandler.java
@Override
public boolean isRetriableException(Throwable e, boolean sameServer) {
    if (okToRetryOnAllErrors) {
        return true;
    } 
    else if (e instanceof ClientException) {
        ClientException ce = (ClientException) e;
        if (ce.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) {
            return !sameServer;
        } else {
            return false;
        }
    } 
    else  {
        return okToRetryOnConnectErrors && isConnectionException(e);
    }
}

这里会有一个问题,如果你是新增/修改操作,系统处理时间过长导致超时,也会触发Feign的自动重试,如果你的幂等性做的不好,就会导致很严重的后果。

而如果是连接异常,此时请求还没有发送过去,所以是不会重复执行的。

当然了,在分布式系统中,还是建议做好每个接口的幂等性。

5.2 Feign重试逻辑

//SynchronousMethodHandler.java
@Override
public Object invoke(Object[] argv) throws Throwable {
    RequestTemplate template = buildTemplateFromArgs.create(argv);
    //这里我们假设你开启了Feign的重试,并且使用的是Retryer.Default这个类
    Retryer retryer = this.retryer.clone();
    while (true) {
        try {
            //这里会调用到5.3节executeWithLoadBalancer()方法
            return executeAndDecode(template);
        } catch (RetryableException e) {
            try {
                //在重试次数之内,会等待一段时间返回,继续while循环,否则会抛出异常跳出循环
                retryer.continueOrPropagate(e);
            } catch (RetryableException th) {
                Throwable cause = th.getCause();
                if (propagationPolicy == UNWRAP && cause != null) {
                    throw cause;
                } else {
                    throw th;
                }
            }
            if (logLevel != Logger.Level.NONE) {
                logger.logRetry(metadata.configKey(), logLevel);
            }
            continue;
        }
    }
}

5.3 Ribbon重试逻辑

//AbstractLoadBalancerAwareClient.java
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    LoadBalancerCommand command = buildLoadBalancerCommand(request, requestConfig);

    try {
        return command.submit(
            new ServerOperation() {
                @Override
                public Observable call(Server server) {
                    URI finalUri = reconstructURIWithServer(server, request.getUri());
                    S requestForServer = (S) request.replaceUri(finalUri);
                    try {
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } 
                    catch (Exception e) {
                        return Observable.error(e);
                    }
                }
            })
            .toBlocking()
            .single();
    } catch (Exception e) {
        Throwable t = e.getCause();
        if (t instanceof ClientException) {
            throw (ClientException) t;
        } else {
            throw new ClientException(e);
        }
    }
}

上边是调用的入口,下边是重试执行的逻辑,由于的RxJava写的,暂时看不懂,先贴出来日后再说......

//LoadBalancerCommand.java
public Observable submit(final ServerOperation operation) {
    final ExecutionInfoContext context = new ExecutionInfoContext();

    if (listenerInvoker != null) {
        try {
            listenerInvoker.onExecutionStart();
        } catch (AbortExecutionException e) {
            return Observable.error(e);
        }
    }

    final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
    final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

    // Use the load balancer
    Observable o = 
        (server == null ? selectServer() : Observable.just(server))
        .concatMap(new Func1>() {
            @Override
            // Called for each server being selected
            public Observable call(Server server) {
                context.setServer(server);
                final ServerStats stats = loadBalancerContext.getServerStats(server);

                // Called for each attempt and retry
                Observable o = Observable
                    .just(server)
                    .concatMap(new Func1>() {
                        @Override
                        public Observable call(final Server server) {
                            context.incAttemptCount();
                            loadBalancerContext.noteOpenConnection(stats);

                            if (listenerInvoker != null) {
                                try {
                                    listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                } catch (AbortExecutionException e) {
                                    return Observable.error(e);
                                }
                            }

                            final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();

                            return operation.call(server).doOnEach(new Observer() {
                                private T entity;
                                @Override
                                public void onCompleted() {
                                    recordStats(tracer, stats, entity, null);
                                    // TODO: What to do if onNext or onError are never called?
                                }

                                @Override
                                public void onError(Throwable e) {
                                    recordStats(tracer, stats, null, e);
                                    logger.debug("Got error {} when executed on server {}", e, server);
                                    if (listenerInvoker != null) {
                                        listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                    }
                                }

                                @Override
                                public void onNext(T entity) {
                                    this.entity = entity;
                                    if (listenerInvoker != null) {
                                        listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                    }
                                }                            

                                private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                    tracer.stop();
                                    loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                }
                            });
                        }
                    });

                if (maxRetrysSame > 0) 
                    o = o.retry(retryPolicy(maxRetrysSame, true));
                return o;
            }
        });

    if (maxRetrysNext > 0 && server == null) 
        o = o.retry(retryPolicy(maxRetrysNext, false));

    return o.onErrorResumeNext(new Func1>() {
        @Override
        public Observable call(Throwable e) {
            if (context.getAttemptCount() > 0) {
                if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                                            "Number of retries on next server exceeded max " + maxRetrysNext
                                            + " retries, while making a call for: " + context.getServer(), e);
                }
                else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                                            "Number of retries exceeded max " + maxRetrysSame
                                            + " retries, while making a call for: " + context.getServer(), e);
                }
            }
            if (listenerInvoker != null) {
                listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
            }
            return Observable.error(e);
        }
    });
}

关于Spring Cloud 超时和重试机制是什么就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。


网页标题:SpringCloud超时和重试机制是什么
标题URL:http://scyanting.com/article/jpiseg.html