十六,java多線程詳解

1.線程的操做方法

1.1設置和取得名字

設置名字 java

set方法:
android

public final void setName(String name)
構造方法 :
public Thread (Runable target ,String name)

public Thread (String name)
在線程操做中 ,由於其操做的不肯定性 ,因此提供了一個方法 ,取得當前線程名 .

public static Thread currentThread() web

示例:
算法

Thread.currentThread().getName()

注意 :在程序運行時 ,主方法實際上就是一個線程 .能夠寫以下代碼驗證 . 數據庫

示例:
編程

public class ThreadDemo {
	public static void main(String[] args) {
		MyThread mt = new MyThread(); 
		new Thread(mt,」自定義線程」).start() ;
                          mt.run();  //這個是主線程中執行的.
                }
}
注: 執行一個 java程序至少啓動兩個線程 ,一個是 main主線程 ,一個是 gc垃圾回收線程 .


1.2線程的休眠

線程休眠的方法是:
設計模式

public static void sleep(long millis ) throws InterruptedException 服務器

因此使用此方法時須要使用try...catch...來處理. 多線程

1.3線程的中斷

sleep()方法中,存在InterruptedException這個異常,中斷能夠引起這個異常. 併發

public void interrupt()

1.4設置線程的優先級

public final void setPriority(int newPriority)

java線程的優先級:

  • public static final int MAX_PRIORITY 10

  • public static final int MIN_PRIORITY 1

  • public static final int NORM_PRIORITY 5

注意: main方法的優先級是5,NORM_PRIORITY普通優先級.


2.線程的同步與死鎖

2.1問題引出

示例:

package org.lxh.syndemo;
class MyTicketThread implements Runnable {// 實現Runnable接口
	private int ticket = 5; // 一共才5張票
	public void run() {// 覆寫run()方法
		for (int i = 0; i < 50; i++) {// 表示循環10次
			if (this.ticket > 0) {
				try {
					Thread.sleep(300);// 延遲致使數據操做出現問題
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("賣票:ticket = " + this.ticket--);
			}
		}
	}
}
public class SynDemo01 {

	public static void main(String[] args) {
		MyTicketThread mt = new MyTicketThread();
		new Thread(mt, "票販子a").start();
		new Thread(mt, "票販子b").start();
		new Thread(mt, "票販子c").start();
	}
}
問題出現了 :可能有兩個線程同時操做一個變量的時候 ,都知足大於 1的條件 ,可是都執行了 - - 操做 .致使數據出現問題 ,這時引入了同步與鎖定 .

2.2同步的實現

java中能夠經過同步代碼的方式進行代碼的加鎖操做,同步的實現有兩種方式:

  • 同步代碼塊

  • 同步方法

2.2.1同步代碼塊

格式:

synchronized(對象){ //通常都是將this進行鎖定

//須要同步的代碼

}
示例 :
class MyTicketThread implements Runnable {// 實現Runnable接口
	private int ticket = 5; // 一共才5張票
	public void run() {// 覆寫run()方法
		for (int i = 0; i < 50; i++) {// 表示循環10次
                                  synchronized(this){
			if (this.ticket > 0) {
				try {
					Thread.sleep(300);// 延遲致使數據操做出現問題
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("賣票:ticket = " + this.ticket--);
			}
                                 }
		}
	}
}


2.2.2同步方法

示例:

class MyTicketThread implements Runnable {// 實現Runnable接口
	private int ticket = 5; // 一共才5張票
	public void run() {// 覆寫run()方法
		for (int i = 0; i < 50; i++) {// 表示循環10次
			this.sale();   //同步方法
		}
	}
}

public void synchronized sale(){   //同步方法
              if (this.ticket > 0) {
		try {
		     Thread.sleep(300);// 延遲致使數據操做出現問題
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("賣票:ticket = " + this.ticket--);
	 }
}
:java中方法定義的完整格式

[訪問權限: public private protected default ]

[static final abstract synchronized native ]

返回值類型 void

方法名詞([參數列表]) [throw 異常1,異常2,異常3...]{}


2.3死鎖

程序中,過多的同步會致使死鎖的問題.

示例:

package com.android.syndemo;
class Bangfei {
	public synchronized void say(QinYou qy) {
		System.out.println("把錢給我,我放人");
		qy.give();
	}

	public synchronized void give() {
		System.out.println("獲得了錢,同時溜之大吉");
	}
}

class QinYou {
	public synchronized void say( Bangfei bf) {
		System.out.println("把人放了,我給你錢.");
		bf.give();
	}

	public synchronized void give() {
		System.out.println("人救回來,同時報案了.");
	}
}

public class DeadLockDemo implements Runnable {
	private BangFei bf = new BangFei();
	private QinYou qy = new QinYou();
	public DeadLockDemo() {
		new Thread(this).start() ;
   //這句致使死鎖.
		qy.say(bf);
	}
	public void run() {
		bf.say(qy);
	}
	public static void main(String[] args) {
		new DeadLockDemo();
	}
}
多線程共享一個數據的時候須要進行同步 ,可是過多的同步會致使死鎖 ,即各個線程進入相互等待的狀態而中止執行.


3.生產者和消費者模式

3.1爲什麼引入生產者消費者模式

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題.該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度.

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程.在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據.一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者.爲了解決這個問題因而引入了生產者和消費者模式.

3.2生產者消費者模式定義

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題.生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力.

這個阻塞隊列就是用來給生產者和消費者解耦的.縱觀大多數設計模式,都會找一個第三者出來進行解耦,如工廠模式的第三者是工廠類,模板模式的第三者是模板類.在學習一些設計模式的過程當中,若是先找到這個模式的第三者,能幫助咱們快速熟悉一個設計模式.

3.3生產者消費者模式實踐

我和同事一塊兒利用業餘時間開發的Yuna工具中使用了生產者和消費者模式.首先我先介紹下Yuna工具,在阿里巴巴不少同事都喜歡經過郵件分享技術文章,由於經過郵件分享很方便,同窗們在網上看到好的技術文章,複製粘貼發送就完成了分享,可是咱們發現技術文章不能沉澱下來,對於新來的同窗看不到之前分享的技術文章,你們也很難找到之前分享過的技術文章.爲了解決這問題,咱們開發了Yuna工具.Yuna取名自我喜歡的一款遊戲最終幻想裏的女主角.

首先咱們申請了一個專門用來收集分享郵件的郵箱,好比share@alibaba.com,同窗將分享的文章發送到這個郵箱,讓同窗們每次都抄送到這個郵箱確定很麻煩,因此咱們的作法是將這個郵箱地址放在部門郵件列表裏,因此分享的同窗只須要象之前同樣向整個部門分享文章就行,Yuna工具經過讀取郵件服務器裏該郵箱的郵件,把全部分享的郵件下載下來,包括郵件的附件,圖片,和郵件回覆,咱們可能會從這個郵箱裏下載到一些非分享的文章,因此咱們要求分享的郵件標題必須帶有一個關鍵字,好比[內貿技術分享],下載完郵件以後,經過confluenceweb service接口,把文章插入到confluence,這樣新同事就能夠在confluence裏看之前分享過的文章,而且Yuna工具還能夠自動把文章進行分類和歸檔.

爲了快速上線該功能,當時咱們花了三天業餘時間快速開發了Yuna1.0版本.1.0版本中我並無使用生產者消費模式,而是使用單線程來處理,由於當時只須要處理咱們一個部門的郵件,因此單線程明顯夠用,整個過程是串行執行的.在一個線程裏,程序先抽取所有的郵件,轉化爲文章對象,而後添加所有的文章,最後刪除抽取過的郵件.

示例:

public void extract() {
        logger.debug("開始" + getExtractorName() + "..");
        //抽取郵件
        List<Article> articles = extractEmail();
        //添加文章
        for (Article article : articles) {
            addArticleOrComment(article);
        }
        //清空郵件
        cleanEmail();
        logger.debug("完成" + getExtractorName() + "..");
    }
Yuna工具在推廣後 ,愈來愈多的部門使用這個工具 ,處理的時間愈來愈慢 ,Yuna是每隔 5分鐘進行一次抽取的 ,而當郵件多的時候一次處理可能就花了幾分鐘 ,因而我在 Yuna2.0版本里使用了生產者消費者模式來處理郵件 ,首先生產者線程按必定的規則去郵件系統裏抽取郵件 ,而後存放在阻塞隊列裏 ,消費者從阻塞隊列裏取出文章後插入到 conflunce.

示例:

public class QuickEmailToWikiExtractor extends AbstractExtractor {

private ThreadPoolExecutor      threadsPool;

private ArticleBlockingQueue<ExchangeEmailShallowDTO> emailQueue;

public QuickEmailToWikiExtractor() {
        emailQueue= new ArticleBlockingQueue<ExchangeEmailShallowDTO>();
        int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
        threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 10l, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000));
    
    }

public void extract() {
        logger.debug("開始" + getExtractorName() + "..");
        long start = System.currentTimeMillis();

        //抽取全部郵件放到隊列裏
        new ExtractEmailTask().start();

        // 把隊列裏的文章插入到Wiki
        insertToWiki();

        long end = System.currentTimeMillis();
        double cost = (end - start) / 1000;
        logger.debug("完成" + getExtractorName() + ",花費時間:" + cost + "秒");
    }

    

    /**
     * 把隊列裏的文章插入到Wiki
     */
    private void insertToWiki() {
        //登陸wiki,每間隔一段時間須要登陸一次
        confluenceService.login(RuleFactory.USER_NAME, RuleFactory.PASSWORD);

        while (true) {
            //2秒內取不到就退出
            ExchangeEmailShallowDTO email = emailQueue.poll(2, TimeUnit.SECONDS);
            if (email == null) {
                break;
            }
            threadsPool.submit(new insertToWikiTask(email));
        }
    }


     protected List<Article> extractEmail() {
        List<ExchangeEmailShallowDTO> allEmails = getEmailService().queryAllEmails();
        if (allEmails == null) {
            return null;
        }
        for (ExchangeEmailShallowDTO exchangeEmailShallowDTO : allEmails) {
            emailQueue.offer(exchangeEmailShallowDTO);
        }
        return null;
    }

    /**
     * 抽取郵件任務
     * 
     * @author tengfei.fangtf
     */
    public class ExtractEmailTask extends Thread {
        public void run() {
            extractEmail();
        }
    }
}

3.4多生產者和多消費者場景

在多核時代,多線程併發處理速度比單線程處理速度更快,因此咱們可使用多個線程來生產數據,一樣可使用多個消費線程來消費數據.而更復雜的狀況是,消費者消費的數據,有可能須要繼續處理,因而消費者處理完數據以後,它又要做爲生產者把數據放在新的隊列裏,交給其餘消費者繼續處理.以下圖:

咱們在一個長鏈接服務器中使用了這種模式,生產者1負責將全部客戶端發送的消息存放在阻塞隊列1,消費者1從隊列裏讀消息,而後經過消息ID進行hash獲得N個隊列中的一個,而後根據編號將消息存放在到不一樣的隊列裏,每一個阻塞隊列會分配一個線程來消費阻塞隊列裏的數據.若是消費者2沒法消費消息,就將消息再拋回到阻塞隊列1,交給其餘消費者處理.

如下是消息總隊列的代碼:

/**
 * 總消息隊列管理
 * 
 * @author tengfei.fangtf
 */
public class MsgQueueManager implements IMsgQueue{

    private static final Logger              LOGGER             
 = LoggerFactory.getLogger(MsgQueueManager.class);


    /**
     * 消息總隊列
     */
    public final BlockingQueue<Message> messageQueue;

    private MsgQueueManager() {
        messageQueue = new LinkedTransferQueue<Message>();
    }

    public void put(Message msg) {
        try {
            messageQueue.put(msg);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public Message take() {
        try {
            return messageQueue.take();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return null;
    }

}
啓動一個消息分發線程 .在這個線程裏子隊列自動去總隊列裏獲取消息 .

/**
     * 分發消息,負責把消息從大隊列塞到小隊列裏
     * 
     * @author tengfei.fangtf
     */
    static class DispatchMessageTask implements Runnable {
        @Override
        public void run() {
            BlockingQueue<Message> subQueue;
            for (;;) {
                //若是沒有數據,則阻塞在這裏
                Message msg = MsgQueueFactory.getMessageQueue().take();
                //若是爲空,則表示沒有Session機器鏈接上來,
須要等待,直到有Session機器鏈接上來
                while ((subQueue = getInstance().getSubQueue()) == null) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                //把消息放到小隊列裏
                try {
                    subQueue.put(msg);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
使用 Hash算法獲取一個子隊列 .
/**
     * 均衡獲取一個子隊列.
     * 
     * @return
     */
    public BlockingQueue<Message> getSubQueue() {
        int errorCount = 0;
        for (;;) {
            if (subMsgQueues.isEmpty()) {
                return null;
            }
            int index = (int) (System.nanoTime() % subMsgQueues.size());
            try {
                return subMsgQueues.get(index);
            } catch (Exception e) {
                //出現錯誤表示,在獲取隊列大小以後,隊列進行了一次刪除操做
                LOGGER.error("獲取子隊列出現錯誤", e);
                if ((++errorCount) < 3) {
                    continue;
                }
            }
        }
    }

使用的時候咱們只須要往總隊列裏發消息.

//往消息隊列裏添加一條消息
        IMsgQueue messageQueue = MsgQueueFactory.getMessageQueue();
        Packet msg = Packet.createPacket(Packet64FrameType.
TYPE_DATA, "{}".getBytes(), (short) 1);
        messageQueue.put(msg);
小結

本章講解了生產者消費者模式,並給出了實例.讀者能夠在平時的工做中思考下哪些場景可使用生產者消費者模式,我相信這種場景應該很是之多,特別是須要處理任務時間比較長的場景,好比上傳附件並處理,用戶把文件上傳到系統後,系統把文件丟到隊列裏,而後馬上返回告訴用戶上傳成功,最後消費者再去隊列裏取出文件處理.好比調用一個遠程接口查詢數據,若是遠程服務接口查詢時須要幾十秒的時間,那麼它能夠提供一個申請查詢的接口,這個接口把要申請查詢任務放數據庫中,而後該接口馬上返回.而後服務器端用線程輪詢並獲取申請任務進行處理,處理完以後發消息給調用方,讓調用方再來調用另一個接口拿數據.

另外Java中的線程池類其實就是一種生產者和消費者模式的實現方式,可是實現方法更高明.生產者把任務丟給線程池,線程池建立線程並處理任務,若是將要運行的任務數大於線程池的基本線程數就把任務扔到阻塞隊列裏,這種作法比只使用一個阻塞隊列來實現生產者和消費者模式顯然要高明不少,由於消費者可以處理直接就處理掉了,這樣速度更快,而生產者先存,消費者再取這種方式顯然慢一些.

咱們的系統也可使用線程池來實現多生產者消費者模式.好比建立N個不一樣規模的Java線程池來處理不一樣性質的任務,好比線程池1將數據讀到內存以後,交給線程池2裏的線程繼續處理壓縮數據.線程池1主要處理IO密集型任務,線程池2主要處理CPU密集型任務.

第3部分參考自:http://www.infoq.com/cn/articles/producers-and-consumers-mode/



20150419


JAVA學習筆記系列

--------------------------------------------

                    聯繫方式

--------------------------------------------

        Weibo: ARESXIONG

        E-Mail: aresxdy@gmail.com

------------------------------------------------
相關文章
相關標籤/搜索