拜託!不要再問我是否瞭解多線程了好嗎

  面試過程當中,各面試官通常都會教科書式的問你幾個多線程的問題,但又不知從何問起。因而就來一句,你瞭解多線程嗎?拜託,這個好傷自尊的!java

  相信老司機們對於java的多線程問題處理,穩如老狗了。你問我瞭解不?都懶得理你。面試

  不過,既然是面對的是面試官,那你還得一一說來。redis

  今天咱們就從多個角度來領略下多線程技術吧!算法

1. 爲何會有多線程?

  其實有的語言是沒有多線程的概念的,而java則是從一出生便有了多線程天賦。爲何?
  多線程技術通常又被叫作併發編程,目的是爲了程序運行得更快。
  其基本原理是,是由cpu進行不一樣線程的調度,從而實現多個線程的同時運行效果。
  多進程和多線程相似,只是多進程不會共享內存資源,切換開銷更大,因此多線程是更明智的選擇。
  而在計算機出現早期,或者也許你也能找到單核的cpu,這時候的多線程是經過不停地切換惟一一個能夠運行的線程來實現的,因爲切換速度比較快,因此感受就是多線程同時在運行了。在這種狀況下,多線程與多進程等同的。可是,至少也讓用戶有了能夠同時處理多任務的能力了,也是頗有用的。
  而當下的多核cpu時代,則是真正能夠同時運行多個線程的時代,什麼四核八線程,八核八線程.... 意味着能夠同時並行n個線程。若是咱們能讓全部可用的線程都利用起來,那麼咱們的程序運行速度或者說總體性能將會獲得極大提高。這是咱們技術人員的目標。spring

 

2. 多線程就必定快嗎?(簡略)

  看起來,多線程確實挺好,可是凡事皆有度。過尤不及。編程

  若是隻運行與cpu能力範圍內的n線程,那是絕對ok的。但當你線程數超過這個n時,就會涉及到cpu的調度問題,調度時即會涉及一個上下文切換問題,這是要耗費時間和資源的東西。當cpu疲於奔命調度切換時,則多線程就是一個負擔了。api

 

3. 多線程主要注意什麼問題?(簡略)

  多線程要注意的問題多了去了,畢竟這是一門不簡單的學問,可是咱們也能夠總結下:tomcat

  1. 線程安全性問題;若是連正確性都沒法保障,談性能有何意義?
  2. 資源隔離問題;是你就是你的,不是你的就不是你的。
  3. 可讀性問題;若是爲了多線程,將代碼搞得一團糟,是否值得?
  4. 外部環境問題;若是外部環境很糟糕,那麼你內部性能再好,你能把壓力給外部嗎?
安全

 

4. 建立多線程的方式?(簡略)

  這個問題確實有點low, 不過也是一個體現真實實踐的地方!服務器

  1. 繼承Thread類,而後 new MyThread.start();
  2. 繼承Runnable類, 而後 new Thread(runnable).start();
  3. 繼承Callable類,而後使用 ExecutorService.submit(callable);
  4. 使用線程池技術,直接建立n個線程,將上面的方法再來一遍,new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); 簡化版: Executors.newFixedThreadPool(n).submit(runnable);

 

5. 來點實際的場景?(重點)

  理論始終太枯燥,不如來點實際的。

  有同窗說,我平時就寫寫業務代碼,而業務代碼基本由用戶觸發,一條線程走到底,哪來的多線程實踐?

  好,咱們能夠就這個問題來講下,這種業務的多線程:

  1. 好比一個http請求,對應一個響應,若是不使用多線程,會怎麼樣?咱們能夠簡單地寫一個socket服務器,進行處理業務,可是這絕對不是你想看到的。好比咱們經常使用的 spring+tomcat, 哪裏沒有用到多線程技術?

        http-nio-8080-exec-xxx #就是一個線程池中例子。

  2. 任何一個java應用,啓動起來以後,都會有不少的GC線程運行,這難道不是多線程?如:

        "G1 Main Concurrent Mark GC Thread" os_prio=0 tid=0x00007fb91008f000 nid=0x40e7 runnabl
        "Gang worker#0 (Parallel GC Threads)" os_prio=0 tid=0x00007fb910061800 nid=0x40de runnable

  如上這些多線程場景吧,面試官說,就算你瞭解其原理,那也不算是你的。你有真正使用過多線程嗎?

  接下來,咱們就來講道說道,實際業務場景中,有哪些是咱們可能會用上的,供你們參考:

看下多線程中幾個有趣或者經典的場景用法!

  場景1. 我有一個發郵件的功能,用戶操做成功後,我給他發送郵件,如何高效穩定地完成?

  場景2. 我有m個線程在循環執行主方法,爲實現高效處理,將分離n*m個子線程執行相關聯流程,要求子線程必須等到主線程執行完成後才能執行,如何保證?

  場景3. 某合做公司要求請求其api的qps不得大於n,如何保證?

  場景4. 一個大任務如何提升響應速度?

  場景5. 我有n個線程同時開始處理一個事務,要求至少等到一個線程執行完畢後,才能進行響應返回,如何高效處理?

  場景6. 抽象任務,後臺運行處理任務多線程?

 

你們應該已經見過世面了,這點問題還不至於,對吧。那你能夠拿出你的方案了。


下面是個人解決方案:

 

場景1. 我有一個發郵件的功能,用戶操做成功後,我給他發送郵件,如何高效穩定地完成?
場景1解決:(常規型)

  這個能夠說最實用最簡單的多線程應用場景了,不過如今進行微服務化以後,可能會有一些不一樣。換湯不換藥。

  針對C端用戶的多線程,咱們是不建議使用 new Thread() 這種方式的,線程池是個經常使用伎倆。

    ExecutorService mailExecutors = Executors.newFixedThreadPool(20);

    public void sendMail() {
        mailExecutors.submit(() -> {
            // do send mail biz, http, rpc,...
            System.out.println("sending mail");
        });
    }

 

場景2. 我有m個線程在循環執行主方法,爲實現高效處理,將分離n*m個子線程執行相關聯流程,要求子線程必須等到主線程執行完成後才能執行,如何保證?
場景2解決:(全部等待型)

  主任務,只管調度子線程,在子線程使用閉鎖在適當的地方進行等待,主線程循環分配完成後,打開閉鎖,放行全部子線程便可。

  具體代碼以下:

    private void mainWork() {
        try {
            resetRedisZsetLockGate();
            for (String linkTraceCacheKey : expiredKeys) {
                subWork(linkTraceCacheKey);
            }
        }
        finally {
            releaseRedisZsetLock();
        }
    }
    
    private void subWork(String linkTraceCacheKey) {
        deleteService.execute(new Runnable() {
            @Override
            public void run() {
                // do other biz
                blockingWaitRedisZsetLock();
                postSth(linkTraceCacheKey);
            }
        });
    }
    
    /**
     * 重置鎖網關,每次主方法的調度都將獲得一個私有的鎖
     */
    private void resetRedisZsetLockGate() {
        redisZsetScanLockGate = new CountDownLatch(1);
    }
    
    /**
     * 阻塞等待 鎖
     */
    private void blockingWaitRedisZsetLock() {
        final CountDownLatch myGate = redisZsetScanLockGate;
        try {
            myGate.await();
        } 
        catch (InterruptedException e) {
            logger.error("等待鎖中斷異常", e);
            Thread.currentThread().interrupt();
        }
    }

    /**
     * 釋放鎖
     */
    private void releaseRedisZsetLock() {
        final CountDownLatch myGate = redisZsetScanLockGate;
        myGate.countDown();
    }

 

場景3. 某合做公司要求請求其api的qps不得大於n,如何保證?
場景3解決:(流量控制型、有限資源型)

  這種問題準確的說,使用單機的多線程仍是有點難控制的,可是咱們只是爲了講清道理,具體(集羣)作法只要稍作變通便可。

  簡單點說,就是做用一個 Semphore 信號量進行數量控制,當數量未到時,直接多線程併發請求,到達限制後,則等待有空閒位置再進行!

public class AbstractConcurrentSimpleLiteJobBase {
    /**
     * 併發查詢:5 , 動態配置化
     */
    private final Semaphore maxConcurrentQueryLock;

    /**
     * 同步等待結束鎖,視狀況使用,同一個線程可能提交屢次任務,由同一個 holder 管理
     */
    private final ThreadLocal<List<Future<?>>> endGateTaskFutureContainer = new ThreadLocal<>();

    @Resource
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    public AbstractConcurrentSimpleLiteJobBase() {
        maxConcurrentQueryLock = new Semaphore(getMaxConcurrentThreadNum());
    }

    /**
     * 獲取最大容許的併發數,子類可自定義, 默認:5
     *
     * @return 最大併發數
     */
    protected int getMaxConcurrentThreadNum() {
        return 5;
    }
    
    /**
     * 提交一個任務到線程池執行
     *
     * @param task 任務
     */
    protected void submitTask(Runnable task) {
        // 考慮是否要阻塞等待結果
        Future<?> future1 =  threadPoolTaskExecutor.submit(() -> {
            try {
                maxConcurrentQueryLock.acquire();
            }
            catch (InterruptedException ie) {
                // ignore...
                log.error("【任務運行】異常,中斷", ie);
                Thread.currentThread().interrupt();
                return;
            }
            try {
                task.run();
            }
            finally {
                maxConcurrentQueryLock.release();
            }
        });
        endGateCountDown(future1);
    }
    
    /**
     * 等待線程結果完成,並清理 gate 信息
     */
    private void awaitForComplete() {
        try {
            // 同步等待執行完成,防止併發任務執行
            for(Future<?> future1 : endGateTaskFutureContainer.get()) {
                future1.get();
            }
            endGateTaskFutureContainer.remove();
        }
        catch (ExecutionException e) {
            log.error("【任務執行】異常,拋出異常", e);
        }
        catch (InterruptedException e) {
            log.error("【任務執行】異常,中斷", e);
        }
    }


}

 

場景4. 一個大任務如何提升響應速度?
場景4解決:(大任務拆分型)

  針對大任務的處理,基本想到的都是相似於分佈式計算之類的東西(map/reduce),在java單機操做來講,標準的解決方案是 Fork/Join 框架。

public class MyForkJoinTask extends RecursiveTask<Integer> {
    //原始數據
    private List<Integer> records;

    public MyForkJoinTask(List<Integer> records) {
        this.records = records;
    }

    @Override
    protected Integer compute() {
        //任務拆分到可接受程度後,運行處理邏輯
        if (records.size() < 3) {
            return doRealCompute();
        }
        // 不然一直往下拆分任務
        int size = records.size();
        MyForkJoinTask aTask = new MyForkJoinTask(records.subList(0, size / 2));
        MyForkJoinTask bTask = new MyForkJoinTask(records.subList(size / 2, records.size()));
        //兩個任務併發執行
        invokeAll(aTask, bTask);
        //結果合併
        return aTask.join() + bTask.join();
    }

    /**
     * 真正任務處理邏輯
     */
    private int doRealCompute() {
        try {
            Thread.sleep((long) (records.size() * 1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("計算任務:" + Arrays.toString(records.toArray()));
        return records.size();
    }

    // 測試任務
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(5);
        List<Integer> originalData = new ArrayList<>();
        originalData.add(1);
        originalData.add(2);
        originalData.add(3);
        originalData.add(4);
        originalData.add(5);
        originalData.add(6);
        originalData.add(7);
        originalData.add(8);
        originalData.add(9);
        originalData.add(10);
        originalData.add(11);
        originalData.add(12);
        originalData.add(13);

        MyForkJoinTask myForkJoinTask = new MyForkJoinTask(originalData);
        long t1 = System.currentTimeMillis();
        ForkJoinTask<Integer> affectNums = forkJoinPool.submit(myForkJoinTask);
        System.out.println("affect nums: " + affectNums.get());
        long t2 = System.currentTimeMillis();
        System.out.println("cost time: " + (t2-t1));
    }
}

  其實若是不用Fork/join 框架,也是能夠的,好比我就只開n個線依次從數據源處取數據進行處理,最後將結果合併到另外一個隊列中。只是,這期間你得多付出多少努力才能作到 Fork/Join 相同的效果呢!

  固然了,Fork/Join 的重要特性是: 使用了work-stealing算法。Worker線程跑完任務後,能夠從其餘還在忙着的線程去竊取任務。

  你要願意造輪子,也是能夠的。

 

場景5. 我有n個線程同時開始處理一個事務,要求至少等到一個線程執行完畢後,才能進行響應返回,如何高效處理?
場景5解決:(至少一個返回型)

  初步思路: 主任務中,使用一個閉鎖,CountDownLatch(1); 全部子線程執行完成,調用 latch.countDown(); 開啓一次閉鎖。主任務執行完成後,調用 latch.await(); 阻塞等待,當有任意一個子線程打開閉鎖後,就能夠返回了。

  可是這個是有問題的,即這個鎖只會有一次生效機會,後續的完成動做並不會有實際意義,所以只能換一個方式。

  使用回調實現,就容易多了,只要一個任務完成,就作一次回調,主任務若是分配完成後,發現有空閒的任務槽,就當即進行下一次分配便可,沒有則等到有再進行分配工做。

  具體代碼以下:

public class TaskDispatcher {
    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting assign */
    private final Condition finishedTaskNotEmpty;

    /**
     * 正在運行的任務計數器
     */
    private final AtomicInteger runningTaskCounter = new AtomicInteger(0);

    /**
     * 新完成的任務計數器,當被從新分派後,此計數將會被置0
     */
    private Integer newFinishedTaskCounter = 0;
    
    private void consumLogHub(String shards) throws InterruptedException {
        resetConsumeCounter();
        String[] shardList = shards.split(",");
        for (int i = 0; i < shardList.length; i++) {
            String shard = shardList[i];
            int shardId = Integer.parseInt(shard);
            LogHubConsumer consuemr = getConsuemer(shardId);
            if(consuemr.startNewConsumeTask(this)) {
                runningTaskCounter.incrementAndGet();
            }
        }
        cleanConsumer(Arrays.asList(shardList));
        // 沒有一個任務已完成,阻塞等待一個完成
        if(runningTaskCounter.get() > 0) {
            if(newFinishedTaskCounter == 0) {
                waitAtLeastOnceTaskFinish();
            }
        }
    }
    
    /**
     * 重置消費者計數器
     */
    private void resetConsumeCounter() {
        newFinishedTaskCounter = 0;
    }

    /**
     * 阻塞等待至少一個任務執行完成
     *
     * @throws InterruptedException 中斷
     */
    private void waitAtLeastOnceTaskFinish() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (newFinishedTaskCounter == 0) {
                finishedTaskNotEmpty.await();
            }
        }
        finally {
            lock.unlock();
        }
    }

    /**
     * 通知任務完成(回調)
     *
     * @throws InterruptedException 中斷
     */
    private void notifyTaskFinished() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            runningTaskCounter.decrementAndGet();
            // 此處計數不可能小於0
            newFinishedTaskCounter += 1;
            finishedTaskNotEmpty.signal();
        }
        finally {
            lock.unlock();
        }
    }
    /**
     * 通知任務完成(回調)
     *
     * @throws InterruptedException 中斷
     */
    public void taskFinishCallback() throws InterruptedException {
        notifyTaskFinished();
    }
    
}

public class ConsumerWorker {

    private Future<?> future;
    
    @Resource
    private ExecutorService consumerService; 
    
    /**
     * 當查詢結果爲時的等待延時, 每次查詢結果都會爲空時,加大該延時, 直到達到設定的最大值爲準
     */
    private Long baseEmptyQueryDelayMills = 200L;
    private Long emptyQueryDelayMills = baseEmptyQueryDelayMills;

    /**
     * 調置最大延時爲1秒
     */
    private static final Long maxEmptyQueryDelayMills = 1000L;

    /**
     * 記數
     */
    private void encounterEmptyQueryDelay() {
        if(emptyQueryDelayMills < maxEmptyQueryDelayMills) {
            emptyQueryDelayMills += 100L;
        }
    }

    private void resetEmptyQueryDelay() {
        emptyQueryDelayMills = baseEmptyQueryDelayMills;
    }


    // 開啓一個消費者線程
    public boolean startNewConsumeTask(LogHubClientWork callback) {
        if(future==null || future.isCancelled() || future.isDone()) {
            //沒有任務或者任務已取消或已完成 提交任務
            future = consumerService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        Integer dealCount = doBizData();
                        if(dealCount == 0) {
                            SleepUtil.millis(emptyQueryDelayMills);
                            encounterEmptyQueryDelay();
                        }
                        else {
                            resetEmptyQueryDelay();
                        }
                    }
                    finally {
                        try {
                            callback.taskFinishCallback();
                        }
                        catch (InterruptedException e) {
                            logger.error("處理完成通知失敗,中斷", e);
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            });
            return true;
        }
        return false;
    }
    
}

 


場景6. 抽象任務,後臺運行處理任務多線程?
場景6解決:(業務相關類)

  最簡單也是最難的一種,根據具體業務類型作相應處理就好,主要考慮讀寫的安全性問題。

 

  如上幾個多線程的應用場景,是我在工做中切實用上的場景(所言非虛)。不過它們都有一個特色,即任務都是很獨立的,即基本上不用太關心線程安全問題,這也是咱們編寫多線程代碼時儘可能要作的事。固然不少場景共享數據是必定的,這時候就更要注意線程安全了。

  要作到線程安全也不是難事,好比足夠好的封裝,可讓你把關注點鎖定在很小的範圍內。

  固然,爲了線程安全,咱們可能每每又會犧牲性能,這就看咱們如何把握這些度了!互斥鎖是最容易使用的鎖,可是也是性能最差的鎖。分段鎖可以解決鎖性能問題,可是又會給編寫帶來更大的困難。

 

  多線程,不止要會寫,還要會給本身填坑。

 

嘮叨: 去追天邊的那束光!

相關文章
相關標籤/搜索