Java多線程編程實戰:模擬大量數據同步

背景

最近對於 Java 多線程作了一段時間的學習,筆者一直認爲,學習東西就是要應用到實際的業務需求中的。不然要麼沒法深刻理解,要麼硬生生地套用技術只是達到炫技的效果。java

不過筆者仍舊認爲本身對於多線程掌握不夠熟練,不敢輕易應用到生產代碼中。這就按照平時工做中遇到的實際問題,腦補了一個極可能存在的業務場景:數據庫

已知某公司管理着 1000 個微信服務號,每一個服務號有 1w ~ 50w 粉絲不等。假設該公司天天都須要將全部微信服務號的粉絲數據經過調用微信 API 的方式更新到本地數據庫。編程

需求分析

對此需求進行分析,主要存在如下問題:安全

  • 單個服務號獲取粉絲 id,只能每次 1w 按順序拉取
  • 微信的 API 對於服務商的併發請求數量有限制

單個服務號獲取粉絲 id,只能每次 1w 按順序拉取。這個問題決定了單個公衆號在拉取粉絲 id 上,沒法分配給多個線程執行。服務器

微信的 API 對於服務商的併發請求數量有限制。這點最容易被忽略,若是咱們同時有過多的請求,則會致使接口被封禁。這裏能夠經過信號量來控制同時執行的線程數量。微信

爲了儘快完成數據同步,根據實際狀況:整個數據同步可分爲讀數據和寫數據兩個部分。讀數據是經過 API 獲取,走網絡 IO,速度較慢;寫數據是寫到數據庫,速度較快。因此得出結論:須要分配較多的線程進行讀數據,較少的線程進行寫數據。網絡

設計要點

首先,咱們須要肯定開啓多少個線程(在生產中每每是使用線程池),線程數量須要根據服務器性能來決定,這裏咱們定爲 40 個讀取數據線程(將 1000 個公衆號分爲 40 份,分別在 40 個線程中執行),1個寫入數據線程。(具體開多少個線程,取決於線程池的容量,以及能夠分配給此業務的數量。具體的數字須要根據實際狀況測試得出,比服務器閾值低一些較好。固然,配置容許範圍內越大越好)數據結構

其次,考慮到微信對於 API 併發請求的限制,須要限制同時執行的線程數,使用java.util.concurrent.Semaphore進行控制,這裏咱們限制爲 20 個(具體的信號量憑證數,取決於同一時間可以執行的線程,跟 API 限制,服務器性能有關)。多線程

而後,咱們須要知道數據什麼時候讀取、寫入完畢,以控制程序邏輯以及終止程序,這裏咱們使用java.util.concurrent.CountDownLatch進行控制。併發

最後,咱們須要一個數據結構,用來在多個線程中共享處理的數據,此處同步數據的場景很是適合使用隊列,這裏咱們使用線程安全的java.util.concurrent.ConcurrentLinkedQueue來進行處理。(須要注意的是,在實際開發中,隊列不可以無限制地增加,這將會很快消耗掉內存,咱們須要根據實際狀況對隊列長度作控制。例如,能夠經過控制讀取線程數和寫入線程數的比例來控制隊列的長度)

模擬代碼

因爲本文重點關注多線程的使用,模擬代碼只體現多線程操做的方法。代碼裏添加了大量的註釋,方便各位讀者閱讀理解。

JDK:1.8

import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * N個線程向隊列添加數據
 * 一個線程消費隊列數據
 */
public class QueueTest {
    private static List<String> data = Arrays.asList("a", "b", "c", "d", "e");

    private static final int OFFER_COUNT = 40; // 開啓的線程數量

    private static Semaphore semaphore = new Semaphore(20); // 同一時間執行的線程數量(大多用於控制API調用次數或數據庫查詢鏈接數)

    public static void main(String[] args) throws InterruptedException {
        Queue<String> queue = new ConcurrentLinkedQueue<>(); // 處理隊列,須要處理的數據,放置到此隊列中

        CountDownLatch offerLatch = new CountDownLatch(OFFER_COUNT); // offer線程latch,每完成一個,latch減一,lacth的count爲0時表示offer處理完畢
        CountDownLatch pollLatch = new CountDownLatch(1); // poll線程latch,latch的count爲0時,表示poll處理完畢

        Runnable offerRunnable = () -> {
            try {
                semaphore.acquire(); // 信號量控制
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            try {
                for (String datum : data) {
                    queue.offer(datum);
                    TimeUnit.SECONDS.sleep(2); // 模擬取數據很慢的狀況
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 在finally中執行latch.countDown()以及信號量釋放,避免因異常致使沒有正常釋放
                offerLatch.countDown();
                semaphore.release();
            }
        };

        Runnable pollRunnable = () -> {
            int count = 0;
            try {
                while (offerLatch.getCount() > 0 || queue.size() > 0) { // 只要offer的latch未執行完,或queue仍舊有數據,則繼續循環
                    String poll = queue.poll();
                    if (poll != null) {
                        System.out.println(poll);
                        count++;
                    }
                    // 不管是否poll到數據,均暫停一小段時間,可下降CPU消耗
                    TimeUnit.MILLISECONDS.sleep(100);
                }
                System.out.println("total count:" + count);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 在finally中執行latch.countDown(),避免因異常致使沒有正常釋放
                pollLatch.countDown();
            }
        };

        // 啓動線程(生產環境中建議使用線程池)
        new Thread(pollRunnable).start(); // 啓動一個poll線程
        for (int i = 0; i < OFFER_COUNT; i++) {
            new Thread(offerRunnable).start();
        } // 模擬取數據很慢,須要開啓40個線程處理

        // latch等待,會block主線程直到latch的count爲0
        offerLatch.await();
        pollLatch.await();

        System.out.println("===the end===");
    }
}

到這裏,本文結束。以上是筆者腦補的一個常見需求的解決方案。

注意:多線程編程對實際環境和需求有很大的依賴,須要根據實際的需求狀況對各個參數作調整。實際在使用中,須要儘可能模擬生產環境的數據狀況來進行測試,對服務器執行期間的併發數,CPU、內存、網絡 IO、磁盤 IO 作好觀察。並適當地調低併發數,以給服務器留有處理其餘請求的餘量。

相關文章
相關標籤/搜索