執行緒間通訊方式(1)

語言: CN / TW / HK

前文了解了執行緒的建立方式和狀態切換,在實際開發時,一個程序中往往有很多個執行緒,大多數執行緒之間往往不是絕對獨立的,比如說我們需要將A和B 兩個執行緒的執行結果收集在一起然後顯示在介面上,又或者比較典型的消費者-生產者模式,在這些場景下,執行緒間通訊成了我們必須使用的手段,那麼執行緒之間怎麼通訊呢?

執行緒間通訊方式,從實現本質來講,主要可以分為兩大類共享記憶體和訊息傳遞。

相信大家還記得,在記憶體模型一節,我們提到多執行緒併發情況下的三大特性,原子性,有序性,可見性,其所對應的解決方案就可以用來實現執行緒間通訊,這些解決方案的本質就是共享記憶體。

對於訊息傳遞而言,最經典的實現就是我們的Handler機制,在子執行緒使用主執行緒的Handler物件將一些資訊傳送到主執行緒以便進行處理。

下面我們來看一些執行緒間通訊的典型實現

Object.wait/Object.notify

對於Object物件而言,其提供了等待/通知機制以便實現執行緒間通訊,由於Object是Java中所有類的父類,也就意味著Java中所有物件都支援通知/等待機制,與該機制關聯的主要有五個方法:

| 方法名稱 | 描述 | 備註 | | ----------------------------- | -------------------------------------------------------------------------------- | -- | | wait() | 執行緒執行中呼叫物件的wait方法可以使得當前執行緒進入WAITING狀態,只有等待其他執行緒的通知或被中斷才會返回,需要注意的是,呼叫wait方法後,會釋放物件的鎖 | / | | wait(long timeout) | 與wait含義一致,不同的是通過timeout指定了超時時間,如果時間到了還沒收到通知就超時返回 | / | | wait(long timeout, int nanos) | 超時管控更加精確,第二個引數單位為毫微秒 | / | | notify | 通知一個在物件上等待的執行緒使其從wait物件返回 | / | | notifyAll | 通知所有等待在該物件上的執行緒 | / |

以Object.wait/Object.notify實現一個典型的訊息者生產者模型,消費者對變數做-1操作,生產者對變數做+1操作,程式碼如下:

// 盤子  public class Number {      // 盤子當前容量,是否有內容      private int mCount = 0;          //對盤子容量進行+1操作      public void inc() {          if (mCount != 0) {              try {                  this.wait();             } catch (InterruptedException e) {                  e.printStackTrace();             }         }          mCount++;          System.out.println(Thread.currentThread().getName()+",mCount+1,mCount="+mCount);          this.notifyAll();     }  ​      //對盤子容量進行-1操作      public void dec() {          if (mCount == 0) {              try {                  this.wait();             } catch (InterruptedException e) {                  e.printStackTrace();             }         }          mCount--;          System.out.println(Thread.currentThread().getName()+",mCount-1,mCount="+mCount);          this.notifyAll();     }  }

public static void main(String[] args) {          Number number = new Number();          // 生產者執行緒          Thread incThread = new Thread(new Runnable() {              @Override              public void run() {                  number.inc();             }         });          incThread.setName("Inc Thread");          incThread.start();  ​          // 消費者執行緒          Thread decThread = new Thread(new Runnable() {              @Override              public void run() {                  number.dec();             }         });          decThread.setName("Dec Thread");          decThread.start();     }

如上述程式碼備註,其中Inc Thread為生產者執行緒,當盤子內容為0時,每次向盤子Number中放一個內容,消費者執行緒Dec Thread當盤子有內容時,消耗內容,讓盤子內容變為0.執行輸出如下:

1-4-3-1

糟糕,正確執行一個迴圈後,丟擲了IllegalMonitorStateException,為什麼會這樣呢?這個異常是什麼意思?

遇事不決看原始碼,IllegalMonitorStateException的類說明如下:

Thrown to indicate that a thread has attempted to wait on an object's monitor or to notify other threads waiting on an object's monitor without owning the specified monitor.

翻譯過來的意思就是當執行緒在沒有持有特定的鎖的情況下試圖等待物件鎖或者通知其他執行緒等待物件鎖會丟擲此異常,有點拗口,先放置,即然我們呼叫了wait/notifyAll這兩個方法,不妨看下這兩個方法的說明,看是否有新的提示,wait方法說明如下:

/**   * Causes the current thread to wait until another thread invokes the   * {@link java.lang.Object#notify()} method or the   * {@link java.lang.Object#notifyAll()} method for this object.   * In other words, this method behaves exactly as if it simply   * performs the call {@code wait(0)}.   * <p>   * The current thread must own this object's monitor. The thread   * releases ownership of this monitor and waits until another thread   * notifies threads waiting on this object's monitor to wake up   * either through a call to the {@code notify} method or the   * {@code notifyAll} method. The thread then waits until it can   * re-obtain ownership of the monitor and resumes execution.   * <p>   * As in the one argument version, interrupts and spurious wakeups are   * possible, and this method should always be used in a loop:   * <pre>   *     synchronized (obj) {   *         while (&lt;condition does not hold&gt;)   *             obj.wait();   *         ... // Perform action appropriate to condition   *     }   * </pre>   * This method should only be called by a thread that is the owner   * of this object's monitor. See the {@code notify} method for a   * description of the ways in which a thread can become the owner of   * a monitor.   *   * @throws IllegalMonitorStateException if the current thread is not   *               the owner of the object's monitor.   * @throws InterruptedException if any thread interrupted the   *             current thread before or while the current thread   *             was waiting for a notification. The <i>interrupted   *             status</i> of the current thread is cleared when   *             this exception is thrown.   * @see       java.lang.Object#notify()   * @see       java.lang.Object#notifyAll()   */  public final void wait() throws InterruptedException {      wait(0);  }

在上述說明中反覆提到 The current thread must own this object's monitor. This method should only be called by a thread that is the owner of this object's monitor.也就是說在呼叫Object.wait方法前,當前執行緒必須持有該物件的鎖,獲取鎖的方法很簡單,wait方法說明中也有,通過synchronized關鍵詞,那麼正確的呼叫程式碼如下所示:

public class Number {      private int mCount = 0;      public void inc() {          synchronized (this) {              if (mCount != 0) {                  try {                      this.wait();                 } catch (InterruptedException e) {                      e.printStackTrace();                 }             }              mCount++;              System.out.println(Thread.currentThread().getName()+",mCount+1,mCount="+mCount);              this.notifyAll();         }     }  ​      public void dec() {          synchronized (this) {              if (mCount == 0) {                  try {                      this.wait();                 } catch (InterruptedException e) {                      e.printStackTrace();                 }             }              mCount--;              System.out.println(Thread.currentThread().getName()+",mCount-1,mCount="+mCount);              this.notifyAll();         }     }  }

重新執行程式碼,輸出如下:

1-4-3-2

這裡可以看出,只運行了一個迴圈,那麼怎麼讓它一直執行呢?將if修改稱while即可,以生產10次為例,如需一直生產訊息,使用while(true)即可,程式碼及輸出如下:

public class Number {      private int mCount = 0;      private int mIncTimes = 0;      private int mDecTimes = 0;      public void inc() {          synchronized (this) {              while (mIncTimes < 10) {                  if (mCount != 0) {                      try {                          this.wait();                     } catch (InterruptedException e) {                          e.printStackTrace();                     }                 }                  mCount++;                  mIncTimes ++;                  System.out.println(Thread.currentThread().getName()+",mCount+1,mCount="+mCount+",mIncTimes:"+mIncTimes);                  this.notifyAll();             }         }     }  ​      public void dec() {          synchronized (this) {              while (mDecTimes < 10) {                  if (mCount == 0) {                      try {                          this.wait();                     } catch (InterruptedException e) {                          e.printStackTrace();                     }                 }                  mCount--;                  mDecTimes++;                  System.out.println(Thread.currentThread().getName()+",mCount-1,mCount="+mCount+",mDecTimes:"+mDecTimes);                  this.notifyAll();             }         }     }  }

1-4-3-3

綜上,使用Object.wait/Object.notify/Object.notifyAll時,切記其必須先使用關鍵詞獲取同個Object的物件鎖,否則就會丟擲IllegalMonitorStateException異常

Semaphore

Semaphore翻譯為訊號量,一個訊號量維護一組許可,在呼叫acquire方法時阻塞,直到獲取許可,在呼叫release的時候釋放佔用,從而喚醒阻塞的某個執行緒。訊號量操作類似於停車場車輛管理,初始時停車場有5個車位,當停車場內部5個車位全佔滿時,此時可用資源為0,即訊號量可用許可數量為0,其他車輛想停車就只能在停車場外排隊阻塞(相當於呼叫acquire),當一輛車輛從停車場駛出時(相當於呼叫release方法),此時訊號量許可數量為1,喚醒一個等待停車的車輛進入停車輛,自身可用許可數量再次為0,依此往復。

對於只有一個許可的訊號量而言,其可用許可數量為0或1,故被稱為二進位制訊號量,對於有多個正整數可用許可資料的訊號量而言,其被稱為通用訊號量。需要注意在執行acquire時訊號量本身並不會持有同步鎖,因為這樣會影響被釋放的許可進入可用許可池中。

二進位制訊號量,不同於其他鎖機制,要求釋放鎖的執行緒和獲取鎖的執行緒是同一個,也就意味著我們可以在其他執行緒釋放二進位制訊號量以完成死鎖恢復。

下面我們以二進位制訊號量實現消費者生產者模式,程式碼如下(生產消費4次即停止):

public class Counter {      private int mCount = 0;      public void incCount() {          mCount ++;     }  ​      public void decCount() {          mCount--;     }  ​      public int getCount() {          return mCount;     }  }  ​  // Main主類程式碼  private static int mIncTimes = 0;  public static void main(String[] args) {      Counter counter = new Counter();      Semaphore semaphore = new Semaphore(1);      Thread incThread = new Thread(new Runnable() {          @Override          public void run() {              while (mIncTimes < 4) {                  try {                      semaphore.acquire();                      if (counter.getCount() == 0) {                          counter.incCount();                          mIncTimes ++;                          System.out.println("Inc Thread ++,current count is:" + counter.getCount());                     }                 } catch (InterruptedException e) {                      e.printStackTrace();                 } finally {                      semaphore.release();                 }             }         }     });      incThread.setName("Inc Thread");      incThread.start();  ​      Thread decThread = new Thread(new Runnable() {          @Override          public void run() {              while (mIncTimes < 4) {                  try {                      semaphore.acquire();                      if (counter.getCount() != 0) {                          counter.decCount();                          System.out.println("Dec Thread --,current count is:" + counter.getCount());                     }                 } catch (InterruptedException e) {                      e.printStackTrace();                 } finally {                      semaphore.release();                 }             }         }     });      decThread.setName("Dec Thread");      decThread.start();  }

執行結果如下:

1-4-3-6

記憶體一致性影響,要求一個執行緒中的release操作和另一個執行緒中的acquire操作必須存在happen-before關係