不允許還有Java程序員不了解BlockingQueue阻塞隊列的實現原理

我們平時開發中好像很少使用到BlockingQueue(阻塞隊列),比如我們想要存儲一組數據的時候會使用ArrayList,想要存儲鍵值對數據會使用HashMap,在什么場景下需要用到BlockingQueue呢?
1. BlockingQueue的應用場景當我們處理完一批數據之后,需要把這批數據發給下游方法接著處理,但是下游方法的處理速率不受控制,可能時快時慢 。如果下游方法的處理速率較慢,會拖慢當前方法的處理速率,這時候該怎么辦呢?
你可能想到使用線程池,是個辦法,不過需要創建很多線程,還要考慮下游方法支不支持并發,如果是CPU密集任務,可能多線程比單線程處理速度更慢,因為需要頻繁上下文切換 。
這時候就可以考慮使用BlockingQueue,BlockingQueue最典型的應用場景就是上面這種生產者-消費者模型 。生產者往隊列中放數據,消費者從隊列中取數據,中間使用BlockingQueue做緩沖隊列,也就解決了生產者和消費者速率不同步的問題 。

不允許還有Java程序員不了解BlockingQueue阻塞隊列的實現原理

文章插圖
你可能聯想到了消息隊列(MessageQueue),消息隊列相當于分布式阻塞隊列,而BlockingQueue相當于本地阻塞隊列,只作用于本機器 。對應的是分布式緩存(比如:Redis、Memcache)和本地緩存(比如:Guava、Caffeine) 。
另外很多框架中都有BlockingQueue的影子,比如線程池中就用到BlockingQueue做任務的緩沖 。消息隊列中發消息、拉取消息的方法也都借鑒了BlockingQueue,使用起來很相似 。
今天就一塊深入剖析一下Queue的底層源碼 。
2. BlockingQueue的用法BlockingQueue的用法非常簡單,就是放數據和取數據 。
/** * @apiNote BlockingQueue示例 * @author 一燈架構 */public class Demo {public static void main(String[] args) throws InterruptedException {// 1. 創建隊列,設置容量是10BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);// 2. 往隊列中放數據queue.put(1);// 3. 從隊列中取數據Integer result = queue.take();}}為了滿足不同的使用場景,BlockingQueue設計了很多的放數據和取數據的方法 。
操作拋出異常返回特定值阻塞阻塞一段時間放數據addofferputoffer(e, time, unit)取數據removepolltakepoll(time, unit)取數據(不刪除)element()peek()不支持不支持這幾組方法的不同之處就是:
  1. 當隊列滿了,再往隊列中放數據,add方法拋異常,offer方法返回false,put方法會一直阻塞(直到有其他線程從隊列中取走數據),offer方法阻塞指定時間然后返回false 。
  2. 當隊列是空,再從隊列中取數據,remove方法拋異常,poll方法返回null,take方法會一直阻塞(直到有其他線程往隊列中放數據),poll方法阻塞指定時間然后返回null 。
  3. 當隊列是空,再去隊列中查看數據(并不刪除數據),element方法拋異常,peek方法返回null 。
工作中使用最多的就是offer、poll阻塞指定時間的方法 。
3. BlockingQueue實現類BlockingQueue常見的有下面5個實現類,主要是應用場景不同 。