熔断器 Hystrix 源码解析 —— 命令合并执行

芋道源码 2018-01-01

摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-collapser-execute/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Hystrix 1.5.X 版本

  • 1. 概述

  • 2. HystrixCollapser

    • 2.1 构造方法

    • 2.2 执行命令方式

    • 2.3 核心方法

  • 3. RequestCollapserFactory

  • 4. RequestCollapser

    • 4.1 构造方法

    • 4.2 RequestBatch

    • 4.3 #submitRequest(arg)

    • 4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)

  • 5. CollapserTimer

    • 5.1 RealCollapserTimer

    • 5.2 CollapsedTask

  • 666. 彩蛋


  • 友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。

  • 友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。

  • 友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。


1. 概述

本文主要分享 Hystrix 命令合并执行

在 《【翻译】Hystrix文档-实现原理》「请求合并」 中,对 Hystrix 命令合并执行的概念原理使用场景优缺点已经做了非常详细透彻的分享,所以胖友可以先认真阅读学习下。

命令合并执行整体流程如下图 :

FROM 《【翻译】Hystrix文档-实现原理》「请求合并」

  • 第一步,提交单个命令请求到请求队列( RequestQueue )

  • 第二部,定时任务( TimerTask ) 固定周期从请求队列获取多个命令执行,合并执行。

在官方提供的示例中,我们通过 CommandCollapserGetValueForKey 熟悉命令合并执行的使用。


推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG 。

  • 程序猿DD —— 《Spring Cloud微服务实战》

  • 周立 —— 《Spring Cloud与Docker微服务架构实战》

  • 两书齐买,京东包邮。

2. HystrixCollapser

com.netflix.hystrix.HystrixCollapser命令合并器抽象父类

NOTE : com.netflix.hystrix.HystrixObservableCollapser另一种命令合并器抽象父类,本文暂不解析。

2.1 构造方法

HystrixCollapser 构造方法,代码如下 :

  1. public abstract class HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType>

  2.        implements HystrixExecutable<ResponseType>, HystrixObservable<ResponseType> {

  3.    private final RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType> collapserFactory;

  4.    private final HystrixRequestCache requestCache;

  5.    private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> collapserInstanceWrapper;

  6.    private final HystrixCollapserMetrics metrics;

  7.    /* package for tests */ HystrixCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder, HystrixCollapserMetrics metrics) {

  8.        if (collapserKey == null || collapserKey.name().trim().equals("")) {

  9.            String defaultKeyName = getDefaultNameFromClass(getClass());

  10.            collapserKey = HystrixCollapserKey.Factory.asKey(defaultKeyName);

  11.        }

  12.        HystrixCollapserProperties properties = HystrixPropertiesFactory.getCollapserProperties(collapserKey, propertiesBuilder);

  13.        this.collapserFactory = new RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType>(collapserKey, scope, timer, properties);

  14.        this.requestCache = HystrixRequestCache.getInstance(collapserKey, HystrixPlugins.getInstance().getConcurrencyStrategy());

  15.        if (metrics == null) {

  16.            this.metrics = HystrixCollapserMetrics.getInstance(collapserKey, properties);

  17.        } else {

  18.            this.metrics = metrics;

  19.        }

  20.        final HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType> self = this;

  21.         /* strategy: HystrixMetricsPublisherCollapser */

  22.        HystrixMetricsPublisherFactory.createOrRetrievePublisherForCollapser(collapserKey, this.metrics, properties);

  23.        /**

  24.         * Used to pass public method invocation to the underlying implementation in a separate package while leaving the methods 'protected' in this class.

  25.         */

  26.        collapserInstanceWrapper = new HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType>() {

  27.            @Override

  28.            public Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shardRequests(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {

  29.                Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = self.shardRequests(requests);

  30.                self.metrics.markShards(shards.size());

  31.                return shards;

  32.            }

  33.            @Override

  34.            public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {

  35.                final HystrixCommand<BatchReturnType> command = self.createCommand(requests);

  36.                command.markAsCollapsedCommand(this.getCollapserKey(), requests.size());

  37.                self.metrics.markBatch(requests.size());

  38.                return command.toObservable();

  39.            }

  40.            @Override

  41.            public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {

  42.                return batchResponse.single().doOnNext(new Action1<BatchReturnType>() {

  43.                    @Override

  44.                    public void call(BatchReturnType batchReturnType) {

  45.                        // this is a blocking call in HystrixCollapser

  46.                        self.mapResponseToRequests(batchReturnType, requests);

  47.                    }

  48.                }).ignoreElements().cast(Void.class);

  49.            }

  50.            @Override

  51.            public HystrixCollapserKey getCollapserKey() {

  52.                return self.getCollapserKey();

  53.            }

  54.        };

  55.    }

  56. }

  • BatchReturnType 泛型多个命令合并执行返回结果类型。

  • ResponseType 泛型单个命令执行返回结果类型。

  • RequestArgumentType 泛型单个命令参数类型。

  • collapserFactory 属性,RequestCollapser 工厂,在 「3. RequestCollapserFactory」 详细解析。

  • requestCache 属性,TODO 【2012】【请求上下文】

  • collapserInstanceWrapper 属性,命令合并器包装器。

    • com.netflix.hystrix.collapser.HystrixCollapserBridge 接口,点击 链接 查看代码。

    • HystrixCollapserBridge ,为 RequestBatch 透明调用 HystrixCollapser 或 HystrixObservableCollapser 的方法不同的实现。参见 《桥接模式》 。

  • metrics 属性,TODO 【2002】【metrics】

2.2 执行命令方式

在 《Hystrix 源码解析 —— 执行命令方式》 中,我们已经看了 HystrixCommand 提供的四种执行命令方式。

HystrixCollapser 类似于 HystrixCommand ,也提供四种相同的执行命令方式,其中如下三种方式代码基本类似,我们就给下传送门,就不重复啰嗦了 :

  • #observe() 方法 :传送门 。

  • #queue() 方法 :传送门 。

  • #execute() 方法 :传送门 。

下面一起来看看 #toObservable() 方法的实现,代码如下 :

  1.  1: public Observable<ResponseType> toObservable() {

  2.  2:     // when we callback with the data we want to do the work

  3.  3:     // on a separate thread than the one giving us the callback

  4.  4:     return toObservable(Schedulers.computation());

  5.  5: }

  6.  6:

  7.  7: public Observable<ResponseType> toObservable(Scheduler observeOn) {

  8.  8:     return Observable.defer(new Func0<Observable<ResponseType>>() {

  9.  9:         @Override

  10. 10:         public Observable<ResponseType> call() {

  11. 11:             // // 缓存开关、缓存KEY

  12. 12:             final boolean isRequestCacheEnabled = getProperties().requestCacheEnabled().get();

  13. 13:             final String cacheKey = getCacheKey();

  14. 14:

  15. 15:             // 优先从缓存中获取

  16. 16:             /* try from cache first */

  17. 17:             if (isRequestCacheEnabled) {

  18. 18:                 HystrixCachedObservable<ResponseType> fromCache = requestCache.get(cacheKey);

  19. 19:                 if (fromCache != null) {

  20. 20:                     metrics.markResponseFromCache();

  21. 21:                     return fromCache.toObservable();

  22. 22:                 }

  23. 23:             }

  24. 24:

  25. 25:             // 获得 RequestCollapser

  26. 26:             RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper);

  27. 27:

  28. 28:             // 提交 命令请求

  29. 29:             Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());

  30. 30:

  31. 31:             // 获得 缓存Observable

  32. 32:             if (isRequestCacheEnabled && cacheKey != null) {

  33. 33:                 HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response);

  34. 34:                 HystrixCachedObservable<ResponseType> fromCache = requestCache.putIfAbsent(cacheKey, toCache);

  35. 35:                 if (fromCache == null) {

  36. 36:                     return toCache.toObservable();

  37. 37:                 } else {

  38. 38:                     toCache.unsubscribe(); // 取消订阅

  39. 39:                     return fromCache.toObservable();

  40. 40:                 }

  41. 41:             }

  42. 42:

  43. 43:             // 获得 非缓存Observable

  44. 44:             return response;

  45. 45:         }

  46. 46:     });

  47. 47: }

  • observeOn 方法参数,实际方法暂未用到,跳过无视。

  • 第 11 至 13 行 :缓存存开关、KEY 。

  • 反向】第 32 至 41 行 :获得【缓存 Observable】。这块代码和 AbstractCommand#toObservavle(...) 类似,在《Hystrix 源码解析 —— 执行结果缓存》「4. AbstractCommand#toObservavle(...)」 有详细解析。

  • 反向】第 44 行 :获得【非缓存 Observable】。

  • 注意 :返回的 Observable ,很可能命令实际并未执行,或者说并未执行完成,此时在 #queue() / #execute() 方法,通过 BlockingObservable 阻塞等待执行完成。BlockingObservable 在 《RxJava 源码解析 —— BlockingObservable》 有详细解析。

  • 第 26 行 :调用 RequestCollapserFactory#getRequestCollapser() ,获得 RequestCollapser 。在 「3. RequestCollapserFactory」 详细解析。

  • 第 29 行 :提交单个命令请求到请求队列( RequestQueue ),即命令合并执行整体流程第一步。在 「4. RequestCollapser」 详细解析。

2.3 核心方法

  • #getRequestArgument(...) 抽象方法,获得单个命令参数。代码如下 :

  1. public abstract RequestArgumentType getRequestArgument();


  • #createCommand(...) 抽象方法,将多个命令请求合并,创建一个 HystrixCommand 。代码如下 :

  1. protected abstract HystrixCommand<BatchReturnType> createCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);


  • #mapResponseToRequests(...) 抽象方法,将一个 HystrixCommand 的执行结果,映射回对应的命令请求们。

  1. protected abstract void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);


  • #shardRequests(...) 方法,将多个命令请求分片N 个【多个命令请求】。默认实现下,不进行分片。代码如下 :

  1. protected Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shardRequests(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {

  2.    return Collections.singletonList(requests);

  3. }


  • 未重写 #shardRequests(...) 的情况下,整体方法流程如下 :

  • 重写 #shardRequests(...) 的情况下,整体方法流程如下 :

    • 本图中命令请求分片仅仅是例子,实际根据重写的逻辑不同而不同。

3. RequestCollapserFactory

com.netflix.hystrix.collapser.RequestCollapserFactory ,RequestCollapser 工厂

  1. public class RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType> {

  2.    private final CollapserTimer timer;

  3.    private final HystrixCollapserKey collapserKey;

  4.    private final HystrixCollapserProperties properties;

  5.    private final HystrixConcurrencyStrategy concurrencyStrategy;

  6.    private final Scope scope;

  7.    public RequestCollapserFactory(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties properties) {

  8.         /* strategy: ConcurrencyStrategy */

  9.        this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();

  10.        this.timer = timer;

  11.        this.scope = scope;

  12.        this.collapserKey = collapserKey;

  13.        this.properties = properties;

  14.    }

  • timer 属性,命令合并器的定时器,在 「5. CollapserTimer」 详细解析。

  • collapserKey 属性,命令合并器标识,实现类似 HystrixThreadPoolKey 。



    • HystrixCollapserKey ,点击 链接 查看代码。

    • HystrixThreadPoolKey ,在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》「3. HystrixThreadPoolKey」 有详细解析。

  • properties 属性,命令合并器属性配置。

  • concurrencyStrategy 属性,并发策略,在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》「4. HystrixConcurrencyStrategy」 有详细解析。

  • scope 属性,命令请求作用域。目前有两种作用域 :


    • REQUEST :请求上下文( HystrixRequestContext )。

      Typically this means that requests within a single user-request (ie. HTTP request) are collapsed.
      No interaction with other user requests.
      1 queue per user request.

    • GLOBAL :全局。

      Requests from any thread (ie. all HTTP requests) within the JVM will be collapsed.
      1 queue for entire app.


调用 #getRequestCollapser() 方法,获得 RequestCollapser 。代码如下 :

  1. public RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> getRequestCollapser(HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser) {

  2.   if (Scopes.REQUEST == Scopes.valueOf(getScope().name())) {

  3.       return getCollapserForUserRequest(commandCollapser);

  4.   } else if (Scopes.GLOBAL == Scopes.valueOf(getScope().name())) {

  5.       return getCollapserForGlobalScope(commandCollapser);

  6.   } else {

  7.       logger.warn("Invalid Scope: {}  Defaulting to REQUEST scope.", getScope());

  8.       return getCollapserForUserRequest(commandCollapser);

  9.   }

  10. }

  • 根据 scope 不同,调用两个不同方法,获得 RequestCollapser 。这两个方法大体逻辑相同,优先从缓存中查找满足条件的 RequestCollapser 返回;若不存在,则创建满足条件的 RequestCollapser 添加到缓存并返回。

    • REQUEST :调用 #getCollapserForUserRequest() 方法,TODO 【2012】【请求上下文】。

    • GLOBAL :调用 #getCollapserForGlobalScope() 方法,点击 链接 查看中文注释的代码。

4. RequestCollapser

com.netflix.hystrix.collapser.RequestCollapser命令请求合并器。主要用于 :

  • 提交单个命令请求到请求队列( RequestQueue )。

  • 接收来自定时任务提交的多个命令,合并执行。

4.1 构造方法

RequestCollapser 构造方法,代码如下 :

  1. public class RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> {

  2.    private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser;

  3.    // batch can be null once shutdown

  4.    private final AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>> batch = new AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>>();

  5.    private final AtomicReference<Reference<TimerListener>> timerListenerReference = new AtomicReference<Reference<TimerListener>>();

  6.    private final AtomicBoolean timerListenerRegistered = new AtomicBoolean();

  7.    private final CollapserTimer timer;

  8.    private final HystrixCollapserProperties properties;

  9.    private final HystrixConcurrencyStrategy concurrencyStrategy;

  10.    RequestCollapser(HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser, HystrixCollapserProperties properties, CollapserTimer timer, HystrixConcurrencyStrategy concurrencyStrategy) {

  11.        this.commandCollapser = commandCollapser; // the command with implementation of abstract methods we need

  12.        this.concurrencyStrategy = concurrencyStrategy;

  13.        this.properties = properties;

  14.        this.timer = timer;

  15.        batch.set(new RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>(properties, commandCollapser, properties.maxRequestsInBatch().get()));

  16.    }

  17. }

  • commandCollapser 属性,命令合并器包装器。

  • batch 属性,RequestBatch,即是本文一直说的请求队列。在 「4.2 RequestBatch」 也会详细解析。

  • timerListenerReference 属性,注册在命令合并器的定时器的监听器。每个 RequestCollapser 独有一个监听器。该监听器( 实际上会使用该监听器创建定时任务 )固定周期从请求队列获取多个命令执行,提交 RequestCollapser 合并执行。在 「5. CollapserTimer」 也会详细解析。

  • timerListenerRegistered 属性, timerListenerReference 是否已经注册。

  • timer 属性,命令合并器的定时器。

  • properties 属性,命令合并器属性配置。

  • concurrencyStrategy 属性,并发策略。

4.2 RequestBatch

com.netflix.hystrix.collapser.RequestBatch ,命令请求队列。提供如下功能 :

  • 命令请求的添加

  • 命令请求的移除

  • 命令请求的批量执行。笔者把 RequestBatch 解释成 "命令请求队列",主要方便大家理解。

    • 那可能有胖友有疑问,为啥该功能不在 RequestCollapser 直接实现,这样 RequestBatch 成为纯粹的队列呢?在 「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 详细解析。

RequestBatch 构造方法,代码如下 :

  1. public class RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> {

  2.    private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser;

  3.    private final int maxBatchSize;

  4.    private final AtomicBoolean batchStarted = new AtomicBoolean();

  5.    private final ConcurrentMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>> argumentMap =

  6.            new ConcurrentHashMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>>();

  7.    private final HystrixCollapserProperties properties;

  8.    private ReentrantReadWriteLock batchLock = new ReentrantReadWriteLock();

  9.    public RequestBatch(HystrixCollapserProperties properties, HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser, int maxBatchSize) {

  10.        this.properties = properties;

  11.        this.commandCollapser = commandCollapser;

  12.        this.maxBatchSize = maxBatchSize;

  13.    }

  14. }

  • commandCollapser 属性,命令合并器包装器。

  • maxBatchSize 属性,队列最大长度。

  • batchStarted 属性,执行是否开始。

  • argumentMap 属性,命令请求参数映射( 队列 )。

  • properties 属性,命令合并器属性配置。

  • batchLock 属性, argumentMap 操作的读写锁

RequestBatch 实现队列具体的操作方法,在 「4.3 #submitRequest(arg)」/「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 一起解析。

4.3 #submitRequest(arg)

#toObservable() 方法里,调用 #submitRequest(arg) 方法,提交单个命令请求到 RequestBatch 。代码如下 :

  1.  1: public Observable<ResponseType> submitRequest(final RequestArgumentType arg) {

  2.  2:     /*

  3.  3:      * We only want the timer ticking if there are actually things to do so we register it the first time something is added.

  4.  4:      */

  5.  5:     if (!timerListenerRegistered.get() && timerListenerRegistered.compareAndSet(false, true)) {

  6.  6:         /* schedule the collapsing task to be executed every x milliseconds (x defined inside CollapsedTask) */

  7.  7:         timerListenerReference.set(timer.addListener(new CollapsedTask()));

  8.  8:     }

  9.  9:

  10. 10:     // loop until succeed (compare-and-set spin-loop)

  11. 11:     while (true) {

  12. 12:         // 获得 RequestBatch

  13. 13:         final RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> b = batch.get();

  14. 14:         if (b == null) {

  15. 15:             return Observable.error(new IllegalStateException("Submitting requests after collapser is shutdown"));

  16. 16:         }

  17. 17:

  18. 18:         // 添加到 RequestBatch

  19. 19:         final Observable<ResponseType> response;

  20. 20:         if (arg != null) {

  21. 21:             response = b.offer(arg);

  22. 22:         } else {

  23. 23:             response = b.offer( (RequestArgumentType) NULL_SENTINEL);

  24. 24:         }

  25. 25:

  26. 26:         // 添加成功,返回 Observable

  27. 27:         // it will always get an Observable unless we hit the max batch size

  28. 28:         if (response != null) {

  29. 29:             return response;

  30. 30:         } else {

  31. 31:             // 添加失败,执行 RequestBatch ,并创建新的 RequestBatch

  32. 32:             // this batch can't accept requests so create a new one and set it if another thread doesn't beat us

  33. 33:             createNewBatchAndExecutePreviousIfNeeded(b);

  34. 34:         }

  35. 35:     }

  36. 36: }

  • 第 5 至 8 行 :当 RequestCollapser 的监听任务( CollapsedTask )还未创建,进行初始化。

  • 第 11 至 35 行 :死循环,直到提交单个命令请求到 RequestBatch 成功

    • 第 13 至 16 行 :获得 RequestBatch 。从目前代码看下来,除非 RequestCollapser 被 #shutdown() 后才会出现为 null 的情况。

    • 第 19 至 24 行 :调动 RequestBatch#offer(...) 方法,提交单个命令请求到 RequestBatch ,并获得 Observable 。这里对 arg==null 做了特殊处理,因为 RequestBatch.argumentMap 是 ConcurrentHashMap ,不允许值为 null 。另外, RequestBatch#offer(...) 方法的实现代码,在结束了当前方法,详细解析。

    • 第 28 至 29 行 :添加成功,返回 Observable 。

    • 第 30 至 34 行 :添加失败,执行当前 RequestBatch 的多个命令合并执行,并创建新的 RequestBatch 。在 「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 详细解析。


RequestBatch#offer(...) 方法,代码如下 :

  1.  1: public Observable<ResponseType>  offer(RequestArgumentType arg) {

  2.  2:     // 执行已经开始,添加失败

  3.  3:     /* short-cut - if the batch is started we reject the offer */

  4.  4:     if (batchStarted.get()) {

  5.  5:         return null;

  6.  6:     }

  7.  7:

  8.  8:     /*

  9.  9:      * The 'read' just means non-exclusive even though we are writing.

  10. 10:      */

  11. 11:     if (batchLock.readLock().tryLock()) {

  12. 12:         try {

  13. 13:             // 执行已经开始,添加失败

  14. 14:             /* double-check now that we have the lock - if the batch is started we reject the offer */

  15. 15:             if (batchStarted.get()) {

  16. 16:                 return null;

  17. 17:             }

  18. 18:

  19. 19:             // 超过队列最大长度,添加失败

  20. 20:             if (argumentMap.size() >= maxBatchSize) {

  21. 21:                 return null;

  22. 22:             } else {

  23. 23:                 // 创建 CollapsedRequestSubject ,并添加到队列

  24. 24:                 CollapsedRequestSubject<ResponseType, RequestArgumentType> collapsedRequest = new CollapsedRequestSubject<ResponseType, RequestArgumentType>(arg, this);

  25. 25:                 final CollapsedRequestSubject<ResponseType, RequestArgumentType> existing = (CollapsedRequestSubject<ResponseType, RequestArgumentType>) argumentMap.putIfAbsent(arg, collapsedRequest);

  26. 26:                 /**

  27. 27:                  * If the argument already exists in the batch, then there are 2 options:

  28. 28:                  * A) If request caching is ON (the default): only keep 1 argument in the batch and let all responses

  29. 29:                  * be hooked up to that argument

  30. 30:                  * B) If request caching is OFF: return an error to all duplicate argument requests

  31. 31:                  *

  32. 32:                  * This maintains the invariant that each batch has no duplicate arguments.  This prevents the impossible

  33. 33:                  * logic (in a user-provided mapResponseToRequests for HystrixCollapser and the internals of HystrixObservableCollapser)

  34. 34:                  * of trying to figure out which argument of a set of duplicates should get attached to a response.

  35. 35:                  *

  36. 36:                  * See https://github.com/Netflix/Hystrix/pull/1176 for further discussion.

  37. 37:                  */

  38. 38:                 if (existing != null) {

  39. 39:                     boolean requestCachingEnabled = properties.requestCacheEnabled().get();

  40. 40:                     if (requestCachingEnabled) {

  41. 41:                         return existing.toObservable();

  42. 42:                     } else {

  43. 43:                         return Observable.error(new IllegalArgumentException("Duplicate argument in collapser batch : [" + arg + "]  This is not supported.  Please turn request-caching on for HystrixCollapser:" + commandCollapser.getCollapserKey().name() + " or prevent duplicates from making it into the batch!"));

  44. 44:                     }

  45. 45:                 } else {

  46. 46:                     return collapsedRequest.toObservable();

  47. 47:                 }

  48. 48:

  49. 49:             }

  50. 50:         } finally {

  51. 51:             batchLock.readLock().unlock();

  52. 52:         }

  53. 53:     } else {

  54. 54:         return null;

  55. 55:     }

  56. 56: }

  • 第 4 至 6 行 :执行已经开始,添加失败。在 RequestBatch#executeBatchIfNotAlreadyStarted(...) 方法的开头,优先 CAS 使 batchStarted=true

  • 第 11 行 :获得读锁The'read'just means non-exclusive even though we are writing. ,即使该方法实际在做"写操作",不排他,线程安全,所以可以使用读锁。

  • 第 15 至 17 行 : double-check,执行已经开始,添加失败。在 RequestBatch#executeBatchIfNotAlreadyStarted(...) 方法,优先 CAS 使 batchStarted=true,再获取写锁,所以会出现该情况。

  • 第 20 至 21 行 :超过队列最大长度,添加失败。

  • 第 24 至 25 行 :创建 com.netflix.hystrix.collapser.CollapsedRequestSubject ,并将添加到队列( argumentMap ) 。

    • argument 属性,单个命令请求参数。

    • valueSet 属性,结果( Response ) 是否设置,通过 #setResponse()#emitResponse() 方法设置。

    • subject 属性,可回放执行结果的 Subject 。此处使用 ReplaySubject 的主要目的,当 HystrixCollapser 开启缓存功能时,通过回放执行结果,在 《Hystrix 源码解析 —— 执行结果缓存》「5. HystrixCachedObservable」 也有相同的实现。另外,这里有一点要注意下,ReplaySubject 并没有向任何 Observable 订阅结果,而是通过 #setResponse()#emitResponse() 方法设置结果

    • outstandingSubscriptions 属性,订阅数量。

    • subjectWithAccounting 属性,带订阅数量的 ReplaySubject 。当取消订阅时,调用 RequestBatch#remove(arg) 方法,移除单个命令请求。

    • CollapsedRequestSubject 实现 com.netflix.hystrix.HystrixCollapser.CollapsedRequest 接口,定义了批量命令执行的请求,不仅限于获得请求参数( #getArgument() 方法 ),也包括对批量命令执行结束后,每个请求的结果设置( #setResponse(...)/ #emitResponse(...)/ #setException(...)/ #setComplete() 方法 ),点击 链接 查看该接口的代码。

    • CollapsedRequestSubject 构造方法,代码如下:

  1. /* package */class CollapsedRequestSubject<T, R> implements CollapsedRequest<T, R> {

    /**

  2.  * 参数

  3.  */

  4. private final R argument;

  5. /**

  6.  * 结果( response ) 是否设置

  7.  */

  8. private AtomicBoolean valueSet = new AtomicBoolean(false);

  9. /**

  10.  * 可回放的 ReplaySubject

  11.  */

  12. private final ReplaySubject&lt;T&gt; subject = ReplaySubject.create();

  13. /**

  14.  * 带订阅数量的 ReplaySubject

  15.  */

  16. private final Observable&lt;T&gt; subjectWithAccounting;

  17. /**

  18.  * 订阅数量

  19.  */

  20. private volatile int outstandingSubscriptions = 0;

  21. public CollapsedRequestSubject(final R arg, final RequestBatch&lt;?, T, R&gt; containingBatch) {

  22.     // 设置 argument

  23.     if (arg == RequestCollapser.NULL_SENTINEL) {

  24.         this.argument = null;

  25.     } else {

  26.         this.argument = arg;

  27.     }

  28.     // 设置 带订阅数量的 ReplaySubject

  29.     this.subjectWithAccounting = subject

  30.             .doOnSubscribe(new Action0() {

  31.                 @Override

  32.                 public void call() {

  33.                     outstandingSubscriptions++;

  34.                 }

  35.             })

  36.             .doOnUnsubscribe(new Action0() {

  37.                 @Override

  38.                 public void call() {

  39.                     outstandingSubscriptions--;

  40.                     if (outstandingSubscriptions == 0) {

  41.                         containingBatch.remove(arg);

  42.                     }

  43.                 }

  44.             });

  45. }

  46. }

  • 第 38 至 47 行 :返回 Observable 。



    • 当 argumentMap 已经存在 arg 对应的 Observable 时,必须开启缓存 ( HystrixCollapserProperties.requestCachingEnabled=true ) 功能。原因是,如果在相同的 arg ,并且未开启缓存,同时第 43 行实现的是 collapsedRequest.toObservable() ,那么相同的 arg 将有多个 Observable 执行命令,此时 HystrixCollapserBridge#mapResponseToRequests(...) 方法无法将执行( Response )赋值到 arg 对应的命令请求( CollapsedRequestSubject ) 。更多讨论,见 https://github.com/Netflix/Hystrix/pull/1176 。

    • 回过头看 HystrixCollapser#toObservable() 方法的第 32 至 41 行的代码,这里也有对缓存功能,是不是重复了呢? argumentMap 针对的是 RequestBatch 级的缓存,HystrixCollapser : RequestCollapser : RequestBatch 是 1:1:N 的关系,通过 HystrixCollapser#toObservable() 对缓存的处理逻辑,保证 RequestBatch 切换后,依然有缓存


    RequestBatch#remove() 方法,代码如下 :

    1. /* package-private */ void remove(RequestArgumentType arg) {

    2.    if (batchStarted.get()) {

    3.        //nothing we can do

    4.        return;

    5.    }

    6.    if (batchLock.readLock().tryLock()) {

    7.        try {

    8.            /* double-check now that we have the lock - if the batch is started, deleting is useless */

    9.            if (batchStarted.get()) {

    10.                return;

    11.            }

    12.            argumentMap.remove(arg);

    13.        } finally {

    14.            batchLock.readLock().unlock();

    15.        }

    16.    }

    17. }

    • 当 RequestBatch 开始执行,不允许移除单个命令请求。

    4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)

    本小节建议在 「5. CollapserTimer」 后,再回过头看。

    #createNewBatchAndExecutePreviousIfNeeded(previousBatch) 方法,代码如下 :

    1.  1: private void createNewBatchAndExecutePreviousIfNeeded(RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> previousBatch) {

    2.  2:     if (previousBatch == null) {

    3.  3:         throw new IllegalStateException("Trying to start null batch which means it was shutdown already.");

    4.  4:     }

    5.  5:     if (batch.compareAndSet(previousBatch, new RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>(properties, commandCollapser, properties.maxRequestsInBatch().get()))) {

    6.  6:         // this thread won so trigger the previous batch

    7.  7:         previousBatch.executeBatchIfNotAlreadyStarted();

    8.  8:     }

    9.  9: }

    • 第 5 行 :通过 CAS 修改 batch ,保证并发情况下的线程安全。同时注意,此处也进行了新的 RequestBatch ,切换掉老的 RequestBatch 。

    • 第 6 行 :使用老的 RequestBatch ,调用 RequestBatch#executeBatchIfNotAlreadyStarted() 方法,命令合并执行。


    RequestBatch#executeBatchIfNotAlreadyStarted() 方法,代码如下 :

    1.  1: public void executeBatchIfNotAlreadyStarted() {

    2.  2:     /*

    3.  3:      * - check that we only execute once since there's multiple paths to do so (timer, waiting thread or max batch size hit)

    4.  4:      * - close the gate so 'offer' can no longer be invoked and we turn those threads away so they create a new batch

    5.  5:      */

    6.  6:     // 设置 执行已经开始

    7.  7:     if (batchStarted.compareAndSet(false, true)) {

    8.  8:         // 获得 写锁

    9.  9:         /* wait for 'offer'/'remove' threads to finish before executing the batch so 'requests' is complete */

    10. 10:         batchLock.writeLock().lock();

    11. 11:

    12. 12:         try {

    13. 13:             // 将多个命令请求分片成 N 个【多个命令请求】。

    14. 14:             // shard batches

    15. 15:             Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values());

    16. 16:             // for each shard execute its requests

    17. 17:             for (final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> shardRequests : shards) {

    18. 18:                 try {

    19. 19:                     // 将多个命令请求合并,创建一个 HystrixCommand

    20. 20:                     // create a new command to handle this batch of requests

    21. 21:                     Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests);

    22. 22:

    23. 23:                     // 将一个 HystrixCommand 的执行结果,映射回对应的命令请求们

    24. 24:                     commandCollapser.mapResponseToRequests(o, shardRequests).doOnError(new Action1<Throwable>() {

    25. 25:

    26. 26:                         /**

    27. 27:                          * This handles failed completions

    28. 28:                          */

    29. 29:                         @Override

    30. 30:                         public void call(Throwable e) {

    31. 31:                             // handle Throwable in case anything is thrown so we don't block Observers waiting for onError/onCompleted

    32. 32:                             Exception ee;

    33. 33:                             if (e instanceof Exception) {

    34. 34:                                 ee = (Exception) e;

    35. 35:                             } else {

    36. 36:                                 ee = new RuntimeException("Throwable caught while executing batch and mapping responses.", e);

    37. 37:                             }

    38. 38:                             logger.debug("Exception mapping responses to requests.", e);

    39. 39:                             // if a failure occurs we want to pass that exception to all of the Futures that we've returned

    40. 40:                             for (CollapsedRequest<ResponseType, RequestArgumentType> request : argumentMap.values()) {

    41. 41:                                 try {

    42. 42:                                     ((CollapsedRequestSubject<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(ee);

    43. 43:                                 } catch (IllegalStateException e2) {

    44. 44:                                     // if we have partial responses set in mapResponseToRequests

    45. 45:                                     // then we may get IllegalStateException as we loop over them

    46. 46:                                     // so we'll log but continue to the rest

    47. 47:                                     logger.error("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting Exception. Continuing ... ", e2);

    48. 48:                                 }

    49. 49:                             }

    50. 50:                         }

    51. 51:

    52. 52:                     }).doOnCompleted(new Action0() {

    53. 53:

    54. 54:                         /**

    55. 55:                          * This handles successful completions

    56. 56:                          */

    57. 57:                         @Override

    58. 58:                         public void call() {

    59. 59:                             // check that all requests had setResponse or setException invoked in case 'mapResponseToRequests' was implemented poorly

    60. 60:                             Exception e = null;

    61. 61:                             for (CollapsedRequest<ResponseType, RequestArgumentType> request : shardRequests) {

    62. 62:                                 try {

    63. 63:                                    e = ((CollapsedRequestSubject<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(e,"No response set by " + commandCollapser.getCollapserKey().name() + " 'mapResponseToRequests' implementation.");

    64. 64:                                 } catch (IllegalStateException e2) {

    65. 65:                                     logger.debug("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting 'No response set' Exception. Continuing ... ", e2);

    66. 66:                                 }

    67. 67:                             }

    68. 68:                         }

    69. 69:

    70. 70:                     }).subscribe();

    71. 71:                    

    72. 72:                 } catch (Exception e) {

    73. 73:                     // 异常

    74. 74:                     logger.error("Exception while creating and queueing command with batch.", e);

    75. 75:                     // if a failure occurs we want to pass that exception to all of the Futures that we've returned

    76. 76:                     for (CollapsedRequest<ResponseType, RequestArgumentType> request : shardRequests) {

    77. 77:                         try {

    78. 78:                             request.setException(e);

    79. 79:                         } catch (IllegalStateException e2) {

    80. 80:                             logger.debug("Failed trying to setException on CollapsedRequest", e2);

    81. 81:                         }

    82. 82:                     }

    83. 83:                 }

    84. 84:             }

    85. 85:

    86. 86:         } catch (Exception e) {

    87. 87:             // 异常

    88. 88:             logger.error("Exception while sharding requests.", e);

    89. 89:             // same error handling as we do around the shards, but this is a wider net in case the shardRequest method fails

    90. 90:             for (CollapsedRequest<ResponseType, RequestArgumentType> request : argumentMap.values()) {

    91. 91:                 try {

    92. 92:                     request.setException(e);

    93. 93:                 } catch (IllegalStateException e2) {

    94. 94:                     logger.debug("Failed trying to setException on CollapsedRequest", e2);

    95. 95:                 }

    96. 96:             }

    97. 97:         } finally {

    98. 98:             batchLock.writeLock().unlock();

    99. 99: