官术网_书友最值得收藏!

1.5 Java 9中的響應式編程

到了Java 9,JDK開始支持響應式編程。從Java 9的JDK中可以找到java.util. concurrent.Flow類,其中所包含的接口和定義的靜態方法就是用來支持Flow控制編程的,并且主要基于里面的Publisher、Subscriber、Subscription等接口來支持響應式編程。

本節會分4部分介紹。第1部分是響應式編程接口的介紹。第2部分是一個Java 9響應式編程入門的簡單Demo。第3部分是對JDK的SubmissionPublisher<T>類的源碼解讀。最后一部分是使用Java 9的響應式編程整合Spring的實戰案例,以便讓大家深入理解和運用Java 9提供的API,并能夠快速運用到自己開發的項目中。

1.5.1 響應式編程接口

如表1-1所示就是對java.util.concurrent.Flow中各個接口組件的功能描述。

表1-1

我們可以通過圖1-3來了解整個工作過程。

圖1-3

由圖1-3可知Publisher用于發布元素,并將元素推送給Processor。Processor再將元素推送給Subscriber,Subscriber通過使用Subscriber::onNext方法來接收元素。

Processor通過調用Subscription::request方法來從Publisher請求元素。而這個動作是在Processor中進行的(此時Processor是作為Subscriber角色存在的),所以箭頭指向左;Subscriber::onNext接收并消費元素的動作是在Subscription中進行的,所以箭頭指向右。

如果大家還是不太明白的話,則請接著往下看,帶著問題思考。若概念理解得還不夠透徹,則不利于理解接下來的例子,那么就再深入地看看API。

我們可以看到,Flow.Publisher<T>接口是一個函數式接口(其上有注解@Functional Interface),它只有一個抽象方法public void subscribe(Subscriber<? super T> subscriber);。

通過查看Javadoc可知,如果可能的話,這個方法需要添加一個給定的Flow.Subscriber,如果嘗試訂閱失敗,那么會調用Flow.Subscriber的onError方法來發出一個IllegalStateException類型異常。否則,Flow.Subscriber<T>會調用onSubscribe方法,同時傳入一個Flow.Subscription,Subscriber通過調用其所屬的Flow.Subscription的request方法來獲取元素,也可以調用它的cancel方法來解除訂閱。

Flow.Subscriber<T>接口有4個方法,下面對它們進行簡單的描述。

● void onSubscribe(Subscription subscription) :在給定的Subscription想要使用Subscriber其他方法的前提下,必須先調用這個方法。

● void onError(Throwable throwable):當Publisher或者Subscription遇到了不可恢復的錯誤時,調用此方法,然后Subscription就不能再調用Subscriber的其他方法了。

● void onNext(T item):獲取Subscription的下一個元素。

● void onComplete:在調用這個方法后,Subscription就不能再調用Subscriber的其他方法了。

Flow.Subscription接口有兩個方法,下面對它們進行簡單的描述。

● void cancel:調用這個方法造成的直接后果是Subscription會停止接收信息。

● void request(long n):Subscription調用這個方法添加 n個元素。如果 n小于0,Subscriber將收到一個onError信號。如果n等于0,那么調用complete方法,否則調用onNext(T item)方法。

Flow.Processor<T,R>是Subscriber<T>、Publisher<R>的集合體,限于篇幅就不多說了,后面會有專門的章節來介紹。

1.5.2 Java 9響應式編程入門Demo

我們采用JDK包java.util.concurrent下的SubmissionPublisher<T>類的設計理念,模仿創建Flow.Publisher<T>接口的實現。

首先創建自己的Flow.Subscriber<T>接口的實現類:

    public class DockerXDemoSubscriber<T> implements Flow.Subscriber<T>{
        private String name;
        private Flow.Subscription subscription;
        final long bufferSize;
        long count;
        public String getName() {
          return name;
        }
        public Flow.Subscription getSubscription() {
          return subscription;
        }
        public DockerXDemoSubscriber(long bufferSize,String name) {
          this.bufferSize = bufferSize;
          this.name = name;
      }
      public void onSubscribe(Flow.Subscription subscription) {
          //count = bufferSize - bufferSize / 2;
          //在消費一半的時候重新請求
          (this.subscription = subscription).request(bufferSize);
          System.out.println("開始onSubscribe訂閱");
          try {
              Thread.sleep(100);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }
      public void onNext(T item) {
          //if (--count <= 0) subscription.request(count = bufferSize -
          //bufferSize / 2);
      System.out.println(" ###### " +
          Thread.currentThread().getName()+
          " name: " + name + " item: " + item + " ######");
      System.out.println(name + " received: " + item);
      try {
          Thread.sleep(10);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
    }
      public void onError(Throwable throwable) {
          throwable.printStackTrace();
      }
      public void onComplete() {
          System.out.println("Completed");
      }
    }

接下來對Flow.Publisher<T>接口以及其內的Flow.Subscription接口進行實現:

    public  class  DockerXDemoPublisher<T>  implements  Flow.Publisher<T>,
AutoCloseable {
        private final ExecutorService executor; // daemon-based
        private CopyOnWriteArrayList<DockerXDemoSubscription> list = new
CopyOnWriteArrayList();
      public void submit(T item) {
          System.out.println("********* 開始發布元素item: " + item + "
***********");
          list.forEach(e -> {
              e.future=executor.submit(() -> {
                  e.subscriber.onNext(item);
              });
          });
      }
      public DockerXDemoPublisher(ExecutorService executor) {
          this.executor = executor;
      }
      public void close() {
          list.forEach(e -> {
              e.future=
              executor.submit(() -> { e.subscriber.onComplete();});
          });
      }
      @Override
      public void subscribe(Flow.Subscriber<? super T> subscriber) {
          subscriber.onSubscribe(new ockerXDemoSubscription(subscriber,
executor));
          list.add(new DockerXDemoSubscription(subscriber,executor));
      }
      static class DockerXDemoSubscription<T> implements Flow.Subscription
    {
          private final Flow.Subscriber<? super T> subscriber;
          private final ExecutorService executor;
          private Future<?> future;
          private T item;
          private boolean completed;
          public DockerXDemoSubscription(Flow.Subscriber<? super T>
subscriber,ExecutorService executor) {
              this.subscriber = subscriber;
              this.executor = executor;
          }
          @Override
          public void request(long n) {
              if (n != 0 && !completed) {
                  if (n < 0) {
                    IllegalArgumentException ex = new
IllegalArgumentException();
                    executor.execute(() -> subscriber.onError(ex));
                  } else {
                    future = executor.submit(() -> {
                        subscriber.onNext(item);
                    });
                  }
              } else {
                  subscriber.onComplete();
              }
          }
          @Override
          public void cancel() {
              completed = true;
              if (future != null && !future.isCancelled()) {
                  this.future.cancel(true);
              }
          }
        }
    }

如上述代碼所示,我們根據Javadoc中所提到和希望的,只有在將subscriber添加到publisher的時候,它的onSubscribe方法才會被自動調用。

一個需要注意的細節是,SubmissionPublisher<T>類有一個submit(T item)方法。通過查閱Javadoc可知,該方法就是通過異步調用每個訂閱它的subscriber的onNext方法將發布的給定元素傳送過去的,而當針對subscriber的資源不可用時,阻塞不會中斷。這樣SubmissionPublisher<T>會提交元素給當前的訂閱者(subscriber),直到它關閉為止。本例對其進行了簡單的實現,后面會具體講解。

接著,我們使用demoSubscribe方法創建幾個subscriber進行演示:

    private static void demoSubscribe(DockerXDemoPublisher<Integer> publisher,
String subscriberName){
        DockerXDemoSubscriber<Integer> subscriber = new
DockerXDemoSubscriber<>(4L,subscriberName);
       publisher.subscribe(subscriber);
    }

接著通過以下代碼片段來使用:

    ExecutorService execService =  ForkJoinPool.commonPool();
    //Executors.newFixedThreadPool(3);
    try (DockerXDemoPublisher<Integer> publisher = new
DockerXDemoPublisher<>(execService)) {
        demoSubscribe(publisher,"One");
        demoSubscribe(publisher,"Two");
        demoSubscribe(publisher,"Three");
        IntStream.range(1,5).forEach(publisher::submit);
    } finally {
        ...
    }

上述代碼創建了3個subscriber,通過為每一個subscriber分別指定subscription來連接同一個publisher。倒數第4行表示通過生成一個數字流并使用publisher提交出去,然后每一個subscriber將會通過onNext方法得到并消費元素。

在finally代碼塊中,沒什么我們需要注意的內容,直接來看代碼:

    finally {
        try {
          execService.shutdown();
          int shutdownDelaySec = 1;
          System.out.println("………………等待" + shutdownDelaySec + " 秒后結束服
………");
          execService.awaitTermination(shutdownDelaySec,TimeUnit.SECONDS);
        } catch (Exception ex) {
          System.out.println("捕獲到execService.awaitTermination()方法的異常:"
+ ex.getClass().getName());
        } finally {
          System.out.println("調用execService.shutdownNow()結束服務...");
          List<Runnable> l = execService.shutdownNow();
          System.out.println("還剩"+l.size() + " 個任務等待執行服務已關閉");
        }
    }

運行代碼后會看到如下輸出:

    開始onSubscribe訂閱
    ###### ForkJoinPool.commonPool-worker-9  name: One  item: null ######
    One received: null
    開始onSubscribe訂閱
    ###### ForkJoinPool.commonPool-worker-9  name: Two  item: null ######
    Two received: null
    開始onSubscribe訂閱
    ###### ForkJoinPool.commonPool-worker-9  name: Three  item: null ######
    Three received: null
    ***************** 開始發布元素item: 1 *****************
    ***************** 開始發布元素item: 2 *****************
    ***************** 開始發布元素item: 3 *****************
    ***************** 開始發布元素item: 4 *****************
    ###### ForkJoinPool.commonPool-worker-9  name: One  item: 1 ######
    One received: 1
    ###### ForkJoinPool.commonPool-worker-2  name: Two  item: 1 ######
    Two received: 1
    ###### ForkJoinPool.commonPool-worker-4  name: One  item: 2 ######
    One received: 2
    ###### ForkJoinPool.commonPool-worker-11  name: Three  item: 1 ######
    Three received: 1
    ###### ForkJoinPool.commonPool-worker-13  name: Two  item: 2 ######
    Two received: 2
    ###### ForkJoinPool.commonPool-worker-15  name: Three  item: 2 ######
    Three received: 2
    ###### ForkJoinPool.commonPool-worker-6  name: One  item: 3 ######
    One received: 3
    ………………等待1 秒后結束服務………………
     ###### main  name: Two  item: 3 ######
    Two received: 3
     ###### ForkJoinPool.commonPool-worker-9  name: Three  item: 3 ######
    Three received: 3
     ###### ForkJoinPool.commonPool-worker-13  name: One  item: 4 ######
    One received: 4
     ###### ForkJoinPool.commonPool-worker-4  name: Two  item: 4 ######
    Two received: 4
     ###### ForkJoinPool.commonPool-worker-15  name: Three  item: 4 ######
    Three received: 4
    Completed
    Completed
    Completed
    調用execService.shutdownNow()結束服務...
    還剩0 個任務等待執行服務已關閉

如上述代碼所示,由于其是異步處理的,整個控制流程會很快到達finally代碼塊,然后在停止服務前等1秒,這段時間足夠生成元素并發送給subscriber。由此,我們可以看到每一個生成的元素都會被發送給各個subscriber。每次在每一個subscriber調用onSubscribe方法時會請求4個元素,此時由于publisher并未發布元素,因此會返回一個null。一旦有元素發布,就會調用subscription內掛載的subscriber的onNext方法。因為DockerXDemoPublisher實現了AutoCloseable,所以我們使用try-with-resources語句來自動關閉DockerXDemoPublisher的資源,在池關閉的時候自動調用close方法對訂閱者進行解綁。我們在finally代碼塊中使用的方法,讀者在需要的時候也可以借鑒。

1.5.3 SubmissionPublisher類的源碼解讀

在前面的例子中,已經提到了SubmissionPublisher類,它是Publisher接口的實現。其內部提供了一個Executor,可以并發地將元素傳遞給Subscriber(后面會具體介紹)。

我們經常會通過Executors.newFixedThreadPool(int nThreads)和ForkJoinPool.commonPool來獲得一個線程池。其中Executors.newFixedThreadPool(int nThreads)用于創建一個指定最大線程數量的線程池,池中的每一個線程除非明確指定要關閉,否則會一直存在。

ForkJoinPool.commonPool是SubmissionPublisher內置的默認Executor,ForkJoinPool. commonPool內部調用了new ForkJoinPool((byte)0);,傳入的參數0沒什么用,其會通過System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");獲取并發線程數。如果并未設置java.util.concurrent.ForkJoinPool.common.parallelism屬性,將使用Runtime.getRuntime().availableProcessors() -1,即本機CPU核數-1。如果CPU核支持超線程技術,則核數為CPU的線程數量。現在,大家應該可以理解Demo中的這段代碼了。

針對每一個Subscriber,SubmissionPublisher類都使用一個獨立的緩沖,其最大值可在創建時進行指定,緩沖大小在使用時根據需要擴展,直到所設定的最大值。如果不設定最大值,則會用defaultbufferSize方法獲取該值。SubmissionPublisher類對AutoCloseable接口進行了實現,調用其close方法其實就是調用當前Subscriber的onComplete方法。

我們來看一下其內部BufferedSubscription的定義:

    @SuppressWarnings("serial")
    @jdk.internal.vm.annotation.Contended
    private static final class BufferedSubscription<T> implements
Flow.Subscription,ForkJoinPool.ManagedBlocker {
        // Order-sensitive field declarations
        long timeout;                        // > 0 if timed wait
        volatile long demand;              // # unfilled requests
        int maxCapacity;                    // reduced on OOME
        int putStat;                         // offer result for ManagedBlocker
        volatile int ctl;                   // atomic run state flags
        volatile int head;                 // next position to take
        int tail;                        // next position to put
        Object[] array;                // buffer: null if disabled
    //這里包含了我們要傳入的訂閱者的信息
   Flow.Subscriber<? super T> subscriber;  // null if disabled
   Executor executor;             // null if disabled
   BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable>
   onNextHandler;
   volatile Throwable pendingError;     // holds until onError issued
   volatile Thread waiter;                // blocked producer thread
   T putItem;                      // for offer within ManagedBlocker
   //這里通過next來構造Publisher的執行鏈也就是一堆訂閱者在此做一個編排
   BufferedSubscription<T> next;         // used only by publisher
   //這里將發送失敗需要重試的信息放到一起
   BufferedSubscription<T> nextRetry;   // used only by publisher
   // ctl values
   static final int ACTIVE    = 0x01; // consumer task active
   static final int CONSUME   = 0x02; // keep-alive for consumer task
   static final int DISABLED  = 0x04; // final state
   static final int ERROR     = 0x08; // signal onError then disable
   static final int SUBSCRIBE = 0x10; // signal onSubscribe
   static final int COMPLETE  = 0x20; // signal onComplete when done
   static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
   /**
     * maxBufferCapacity大于這個值時使用默認的初始大小,
     * maxBufferCapacity的大小必須為2n次方
     **/
    static final int DEFAULT_INITIAL_CAP = 32;

大家只需要查看上面代碼中的中文注釋內容即可,接下來就可以很輕松地看懂下面的close方法了:

    public void close() {
        if (!closed) {
          BufferedSubscription<T> b;
          synchronized (this) {
              //no need to re-check closed here
              b = clients;
              clients = null;
              closed = true;
          }
          while (b != null) {
              BufferedSubscription<T> next = b.next;
              b.next = null;
              b.onComplete();
              b = next;
          }
        }
    }

其實這是移除隊列中一個節點的操作。這里可得到這個節點的下一個元素,然后將這個節點置空,將下一個節點指定到這個節點的位置,同時將要移除的節點(也就是Subscriber)結束。

我們來觀察里面的subscribe(Flow.Subscriber<? super T> subscriber)方法。在調用這個方法后,會生成一個BufferedSubscription實例,其中包裝了subscriber。然后會調用subscription.onSubscribe方法,在這個方法內會調用startOrDisable方法。

在這里,我們可以看到e.execute(new ConsumerTask<T>(this))。其中的ConsumerTask繼承自抽象類ForkJoinTask<Void>,并實現了Runnable接口和CompletableFuture.Asynchronous-CompletionTask接口。其構造函數傳入的參數是一個BufferedSubscription實例,這樣ConsumerTask的run方法其實是調用BufferedSubscription實例的consume方法。而在consume方法里,可以看到我們傳入的subscriber實例在此出現,同時里面還調用了checkControl(s,c)方法。這個方法很關鍵,在此通過s.onSubscribe(this)將BufferedSubscription實例作為參數傳入subscriber的onSubscribe中。

在consume方法中,當ctl為SUBSCRIBE狀態時,執行checkControl(s,c);當ctl為CONSUME狀態時,會在QA中取得所要消費的元素,通過subscriber的onNext方法使用。這個過程是無限循環的,至于QA是如何存值的,在此就不做討論了,大家可自行查閱源碼進行分析:

    public void subscribe(Flow.Subscriber<? super T> subscriber) {
        if (subscriber == null) throw new NullPointerException();
          BufferedSubscription<T> subscription =
              new BufferedSubscription<T>(subscriber,executor,
onNextHandler,maxBufferCapacity);
          synchronized (this) {
          for (BufferedSubscription<T> b = clients,pred = null;;) {
              if (b == null) {
                  Throwable ex;
                  subscription.onSubscribe();
                  if ((ex = closedException) != null)
                      subscription.onError(ex);
                  else if (closed)
                      subscription.onComplete();
                  else if (pred == null)
                      clients = subscription;
                  else
                      pred.next = subscription;
                  break;
              }
              BufferedSubscription<T> next = b.next;
              if (b.isDisabled()) { // remove
                  b.next = null;    // detach
                  if (pred == null)
                      clients = next;
                  else
                      pred.next = next;
              }
              else if (subscriber.equals(b.subscriber)) {
                  b.onError(new IllegalStateException("Duplicate subscribe"));
                  break;
              }
              else
                  pred = b;
              b = next;
            }
        }
    }
    /**
    * Responds to control events in consume().
    */
    private boolean checkControl(Flow.Subscriber<? super T> s,int c) {
        boolean stat = true;
        if ((c & SUBSCRIBE) != 0) {
            if (CTL.compareAndSet(this,c,c & ~SUBSCRIBE)) {
              try {
                  if (s != null)
                      s.onSubscribe(this);
              } catch (Throwable ex) {
                  onError(ex);
              }
            }
        }
        else if ((c & ERROR) != 0) {
            Throwable ex = pendingError;
            ctl = DISABLED;           //no need for CAS
            if (ex != null) {         //null if errorless cancel
              try {
                  if (s != null)
                      s.onError(ex);
              } catch (Throwable ignore) {
              }
          }
        }
        else {
          detach();
          stat = false;
        }
        return stat;
    }

關于SubmissionPublisher,我們還需要了解它的以下3個方法,以便得心應手地使用它為我們服務。

● offer:該方法用于將元素發布給subscriber,subscriber可以異步無阻塞地調用它的onNext方法。同時,這個方法可以在超時的時候放棄一些元素,我們可以指定超時時間。在這里,我們還可以指定放棄處理的規則(其實就是一個BiPredicate條件表達式)。

● submit:該方法可以幫助我們以一個簡單的方式來將元素發布給subscriber。從synchronized(this)代碼塊中的while語句可得知,該方法會阻塞調用,直到資源分配給了當前所有的subscriber。若資源進行了分配但subscriber沒拿到,則會重新給,直至所有subscriber都拿到資源。該方法與offer方法的區別是后者有超時機制。

● consume:該方法可以定義請求到的元素要消費的動作(在SubmissionPublisher類定義中有Subscriber接口的內部類實現),接下來我們通過下面的這個例子來清晰明了地進行解釋。

    public class ConsumeSubmissionPublisher {
        public static void main(String[] args) throws InterruptedException,
ExecutionException {
          publish();
        }
        public static void publish() throws InterruptedException,Execution
Exception {
          CompletableFuture future = null;
          try (SubmissionPublisher publisher = new SubmissionPublisher
<Long>()) {
              System.out.println("Subscriber  Buffer  Size:  "  +  publisher.
getMaxBufferCapacity());
              future=publisher.consume(System.out::println);
              LongStream.range(1,10).forEach(publisher::submit);
          } finally {
              future.get();
          }
      }
    }

下面的代碼片段是SubmissionPublisher::consume方法:

    public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
        if (consumer == null)
          throw new NullPointerException();
        CompletableFuture<Void> status = new CompletableFuture<>();
        subscribe(new ConsumerSubscriber<T>(status,consumer));
        return status;
    }

當調用publisher.consume時,其實就是內部生成一個訂閱者對象,并通過subscribe(new ConsumerSubscriber<T>(status,consumer));進行訂閱。ConsumerSubscriber是一個通過裝飾模式得到的增強類,通過consume方法,我們可以得到一個CompletableFuture實例。這樣,就可以通過CompletableFuture實例提供的get方法來做到讓應用程序一直運行,直到所有的元素都處理完畢。請看前面例子的運行結果:

    Subscriber Buffer Size: 256
    1
    2
    3
    4
    5
    6
    7
    8
    9

1.5.4 響應式編程整合Spring實戰案例

有了上面的Demo做支撐,下面我們就來實現一個經常接觸的小場景“訂單與庫存”。在這里,我們通過Map<Product,StockItem>管理每一種產品的庫存數量。因為涉及并發場景,所以使用ConcurrentHashMap保存數據,使用StockItem比單純地用一個長整型數來表達庫存數量可以做更多的事情。設計這個響應式編程的小Demo,是為了讓大家看到我們無須處理繁多的并發過程導致的鎖,而只需要更多地關心我們的業務。我們通過這種編程模式可以達到一定的解耦效果。

下面開始編碼。關于庫存的編碼是比較簡單的,操作一個Map即可:

    @Component
    public class Stock {
        private final Map<Product,StockItem> stockItemMap = new
ConcurrentHashMap<>();
      private StockItem getItem(Product product){
          //如果沒有此商品添加一個key,返回null值即可
          stockItemMap.putIfAbsent(product,new StockItem());
          return stockItemMap.get(product);
      }
      public void store(Product product,long amount){
          getItem(product).store(amount);
      }
      public void remove(Product product,long amount)  throws ProductIs
OutOfStock {
          if (getItem(product).remove(amount) != amount)
              throw new ProductIsOutOfStock(product);
      }
    }

我們使用StockItem類來對庫存商品數量進行操作,因為數量的變動同時發生在多個線程中,也就是涉及并發操作,所以這就有一點復雜。我們在這里使用了一個原子類來保證線程安全(下單減庫存邏輯的代碼處有中文注釋):

    public class StockItem {
        private final AtomicLong amountItemStock =
          new AtomicLong(0);
        public void store(long n) {
          amountItemStock.accumulateAndGet(n,(pre,mount) -> pre + mount);
        }
        //下單時所需商品數量沒超過庫存數量的話就用庫存數量減去所需商品數量返回此次
        //從庫存移除商品的具體數量;超過的話不對庫存做任何操作返回此次所移除庫存商品
        //的數量即為0
        public long remove(long n) {
          class RemoveData {
              long remove;
          }
          RemoveData removeData = new RemoveData();
          amountItemStock.accumulateAndGet(n,
              (pre,mount) -> pre >= n ?
              pre - (removeData.remove = mount) : pre - (removeData.remove = 0L));
          return removeData.remove;
        }
    }

還要多說兩句,我們通過使用原子類AtomicLong維護商品數量,AtomicLong原子類擁有accumulateAndGet方法,這個方法接收一個長整型參數和一個以接收兩個長整型參數進行操作的動作,也就是一個Lambda表達式。我們通過accumulateAndGet方法計算出新的庫存數量,在通過訂單對庫存進行移除操作的時候,如果庫存數量充足,則正常操作;如果庫存數量無法滿足訂單數量,則不做任何操作。返回的數量如上面的代碼注釋所示,計算過程是在一個Lambda表達式內進行的,因為作用域的問題,基本類型的值沒辦法逃逸出去(這樣做也保證了計算的無狀態性),所以定義了一個內部類來達到想要的效果。

接下來介紹和本章內容相關的StockMaintain類。它實現了Subscriber接口,以便有能力針對訂單來維護庫存數量:

    public class StockMaintain implements Flow.Subscriber<Order> {
        private static final Logger log = LoggerFactory.
getLogger(StockMaintain.class);
        private Stock stock;
        private Flow.Subscription subscription = null;
        private ExecutorService execService =  ForkJoinPool.commonPool();
        public StockMaintain(@Autowired Stock stock) {
          this.stock = stock;
        }
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
          log.info("******調用onSubscribe******");
          subscription.request(3);
          this.subscription = subscription;
        }
        ...
    }

在訂閱生產者之后,StockMaintain產生的對象會立即調用onSubscribe方法(后面可以通過日志清楚地觀察到),并傳入一個subscription對象,將這個對象存儲到我們定義的字段上。在一個元素傳遞到onNext方法中并處理完畢后,就可以調用這個subscription來請求新的元素。這里只是簡單的展示,正常配置應該按照第一個Demo的方式做。

最重要的部分便是onNext方法了,為什么要單獨拿出來講?這主要是為了與第一個Demo有所區分。execService線程池操作既可以用于subscription,也可以用在subscriber的onNext方法中。在這個方法中,我們接收一個訂單,并從庫存數量中減去訂單包含的各種商品的數量。如果訂單中有所需商品數量超過此商品庫存數量的情況,那么就會產生錯誤日志。為了保證Demo簡單,這里不會涉及更多的邏輯,只是想告訴大家應該怎么維護自己的代碼。為了達到異步效果,這里通過ExecutorService進行操作,使用了形如()->{}的Lambda表達式。這是為了達到延遲執行的效果,將其當作一個動作進行傳遞,讓它在一個子線程上執行。

通過查閱submit的源碼可知,這個動作會被封裝成一個RunnableFuture<V> extends Runnable,Future<V>并返回。這樣方便我們獲取這個動作在子線程上執行的信息,同時方便操作其行為。而execute(ftask)最后其實就是通過new Thread(r).start來執行的。用一個現實中的場景來講,就是快遞員將快遞物品送到你手里,你不會立馬使用快遞包裹里的東西而讓快遞員一直等你簽收。關于任務的處理過程已經清晰明了地展現在我們面前了,交給系統自己處理吧,而我們要做的就是通過onNext方法獲取下一個元素:

    @Override
    public void onNext(Order order) {
    execService.submit(() -> {
        log.info("Thread {}",Thread.currentThread().getName());
        order.getItems().forEach(item -> {
          try {
              stock.remove(item.getProduct(),item.getAmount());
              log.info("{} 件商品從庫存消耗",item.getAmount());
          } catch (ProductIsOutOfStock productIsOutOfStock) {
              log.error("商品庫存不足");
          }
        });
        subscription.request(1);
    });
    }

AbstractExecutorService中的submit的源碼實現如下:

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task,null);
        execute(ftask);
        return ftask;
    }

SubmissionPublisher源碼中的部分實現如下:

    //default Executor setup; nearly the same as CompletableFuture
    /**
    * Default executor -- ForkJoinPool.commonPool() unless it cannot
    * support parallelism.
    */
    private static final Executor ASYNC_POOL =
        (ForkJoinPool.getCommonPoolParallelism() > 1) ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
    /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
    private static final class ThreadPerTaskExecutor implements Executor {
        ThreadPerTaskExecutor(){}      //prevent access constructor creation
        public void execute(Runnable r) { new Thread(r).start(); }
    }

最后需要我們關注的就是測試運行部分的代碼了:

    @Test
    public void teststockRemoval() throws InterruptedException {
        Stock stock = new Stock();
        SubmissionPublisher<Order> p = new SubmissionPublisher<>();
        ...
    }

為了避免麻煩,這里我們同樣使用了JDK提供的SubmissionPublisher類來做publisher。我們創建了一個Stock類的實例,并在publisher上訂閱。此時不會傳遞任何數據過去,因為還沒有發布數據,但是它創建了一個subscriber和publisher之間的橋梁,一旦有元素提交,subscriber就可以接收到元素。

接下來,我們開始向庫存中添加商品并創建訂單,然后在后續的代碼中進行提交。這里是重復提交的動作,展示Demo如下:

    Product product = new Product();
    stock.store(product,40);
    OrderItem item = new OrderItem();
    item.setProduct(product);
    item.setAmount(10);
    Order order = new Order();
    List<OrderItem> items = new LinkedList<>();
    items.add(item);
    order.setItems(items);

我們將訂單提交給publisher 10次,也就是下了10個相同的訂單,這樣也能測試代碼的所有功能,包括超過庫存數量的拒絕修改的反饋:

    for (int i = 0; i < 10; i++)
        p.submit(order);
    log.info("所有訂單已經提交完畢");

在訂單都發送完畢之后,我們在這里設定主線程等待,以便子線程完成任務。等待時間的設置是為了讓讀者更好地觀察子線程執行和請求元素的執行情況:

    for (int j = 0; j < 10; j++) {
        log.info("Sleeping a bit...");
        Thread.sleep(10);
    }
    p.close();
    log.info("Publisher已關閉");

測試結果如下:

    17-12-24 01:22:43,161  INFO StockMaintain:33- ******調用onSubscr ibe******
    17-12-24 01:22:43,169  INFO TestStockMaintain:39- 所有訂單已經提交完畢
    17-12-24 01:22:43,179  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,187  INFO StockMaintain:41- Thread ForkJoinPool.com
    monPool-worker-9
    17-12-24 01:22:43,187  INFO StockMaintain:41- Thread ForkJoin Pool.
    commonPool-worker-11
    17-12-24 01:22:43,187  INFO StockMaintain:41- Thread ForkJoinPool.com
    monPool-worker-2
    17-12-24 01:22:43,190  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,202  INFO StockMaintain:45- 10 件商品從庫存消耗
    17-12-24 01:22:43,202  INFO StockMaintain:45- 10 件商品從庫存消耗
    17-12-24 01:22:43,206  INFO StockMaintain:41- Thread ForkJoinPool.comm
    onPool-worker-2
    17-12-24 01:22:43,207  INFO StockMaintain:45- 10 件商品從庫存消耗
    17-12-24 01:22:43,202  INFO StockMaintain:45- 10 件商品從庫存消耗
    17-12-24 01:22:43,209  INFO StockMaintain:41- Thread ForkJoinPool.com
    monPool-worker-2
    17-12-24 01:22:43,207  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,207  INFO StockMaintain:41- Thread ForkJoinPool.comm
    onPool-worker-4
    17-12-24 01:22:43,212 ERROR StockMaintain:47- 商品庫存不足
    17-12-24 01:22:43,222  INFO StockMaintain:41- Thread ForkJoinPool.com
    monPool-worker-2
    17-12-24 01:22:43,224 ERROR StockMaintain:47- 商品庫存不足
    17-12-24 01:22:43,225  INFO StockMaintain:41- Thread ForkJoinPool.com
    monPool-worker-2
    17-12-24 01:22:43,226 ERROR StockMaintain:47- 商品庫存不足
    17-12-24 01:22:43,228  INFO StockMaintain:41- Thread ForkJoinPool.com
    monPool-worker-13
    17-12-24 01:22:43,210  INFO StockMaintain:41- Thread ForkJoinPool.com
    monPool-worker-9
    17-12-24 01:22:43,229 ERROR StockMaintain:47- 商品庫存不足
    17-12-24 01:22:43,227  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,214 ERROR StockMaintain:47- 商品庫存不足
    17-12-24 01:22:43,231 ERROR StockMaintain:47- 商品庫存不足
    17-12-24 01:22:43,244  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,256  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,268  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,279  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,290  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,301  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,312  INFO TestStockMaintain:45- Publisher已關閉
    17-12-24 01:22:43,312  INFO StockMaintain:61-  調用onComplete

我們用Subscriber做對外交流的接口,存儲操作業務邏輯并交給一個專門的類去處理。這里的Stock是真正的compute計算類業務(比較復雜),會單獨拿出來做封裝(如果讀者做過前端的Vue開發工作,應該會有更深入的理解)。最后我們通過Publisher將生產和消費連接起來,而且真正地做到了訂單多線程并發處理。理解了這些內容,就能在后面的章節中駕輕就熟地掌握一些新的東西。另外,代碼中一些注解的使用,包括為什么這樣用,為什么要在Stock上加上一個@Component注解,為什么在構造函數中用@Autowired注解,以及其他更多細節,就留給讀者自己思考學習吧。這也是本書要與Spring進行整合的目的。

主站蜘蛛池模板: 望城县| 阜新| 彭泽县| 黑河市| 岳阳市| 台南县| 祁门县| 绥德县| 济源市| 商南县| 广饶县| 改则县| 客服| 鸡西市| 通山县| 衡山县| 左贡县| 建水县| 晋宁县| 黎川县| 漳浦县| 临澧县| 公主岭市| 句容市| 安新县| 阿克陶县| 镇赉县| 多伦县| 贵德县| 三亚市| 民县| 甘南县| 察哈| 宁都县| 勐海县| 二连浩特市| 海伦市| 石屏县| 侯马市| 仲巴县| 屯门区|