java多線程和長鏈接,三方轉換通訊的實踐(1)——轉換端程序

        因爲工做網絡安全關係,咱們一個項目的公共服務網站部署在外網,而數據庫(並不是只供外網訪問,內網也要使用數據庫)部署在內網,重要的是外網不能直接訪問內網,這就打破原來網站的訪問順序了,原本網站頁面查詢由後臺直接訪問數據庫,如今只能經過多線程+轉換平臺+數據庫端服務程序組成了。java

    首先由數據庫端服務程序創建socket訪問部署在外網的「轉換平臺」(Server)端創建長鏈接,並定時檢查長鏈接狀況,開啓斷線重連。網站端每一次查詢也是向「轉換平臺」(Server)端創建短鏈接,而後由「轉換平臺」(Server)端轉發至數據庫端服務程序並接收數據庫端服務程序查詢結果轉發給網站端。數據庫

下面首先展現「轉換平臺」(Server)端代碼:(服務端由1/2/3/4/5/6組成)

一、安全

public class serverSocketForApp {
	 //記錄日誌
	 Log log = LogFactory.getLog(this .getClass()); 
	 CommonTool commonTool=new CommonTool();
	 private static MyBlockingQueue myBlockingQueue=new MyBlockingQueue(10);
	 public static  Socket socketWZ;
	 
	 
	 public static void main(String[] args) {
		try{
			final int portWZ=8888;
	        final int portYW = 9999; 
	        ServerSocket serverSocketYW= new ServerSocket(portYW);
	        //創建線程一直監聽服務端鏈接
	       // System.out.println("創建線程一直監聽服務端鏈接");
	        AcceptFromYW acceptFromYW =new AcceptFromYW(serverSocketYW,myBlockingQueue);
	        Thread threadYW=new Thread(acceptFromYW);
	        threadYW.setName("監聽服務端鏈接線程");
	        threadYW.start();
	       // System.out.println("監聽服務端鏈接線程開啓...");
	        Thread checkSocket=new Thread(new checkSocket(myBlockingQueue));
	        checkSocket.setName("檢查線程+++");
	        checkSocket.start();
	        System.out.println("檢查線程開啓。。。。");
	        ServerSocket serverSocketWZ= new ServerSocket(portWZ);
			while(true){
				 //System.out.println("==================監聽網站端鏈接開始=====================");
				 socketWZ=serverSocketWZ.accept();//監聽網站端鏈接
				 System.out.println("myBlockingQueue.size():"+myBlockingQueue.size()+";ListElement:"+myBlockingQueue.getListElement());
				 while(myBlockingQueue.size()<=0){
					// System.out.println("myBlockingQueue.size():"+myBlockingQueue.size()+";休眠1秒");
					 Thread.sleep(1000);
				 }
			  TranFromWZToYW tranFromWZToYW=new TranFromWZToYW(socketWZ,myBlockingQueue);//轉換端轉發線程
			  new Thread(tranFromWZToYW).start();
				 
			}
			
		}catch (Exception e) {
			// TODO: handle exception
		}
			
	}

}

上面代碼很簡單,主要是開啓三個線程:一、threadYW(監聽線程,用於監聽數據庫端鏈接請求),二、checkSocket(檢查線程,用於定時檢查線程安全隊列myBlockingQueue存儲的數據庫端發起的socket長鏈接是否正常,並剔除非正常鏈接),三、TranFromWZToYW(轉發線程,用於接收網站端請求——>轉發至數據庫端——>接收數據庫端相應結果服務器

——>轉發至網站端)。網絡

二、下面首先介紹的是線程安全隊列:myBlockingQueue,這個是我本身封裝的一個arraylist。多線程

public class MyBlockingQueue {

	@SuppressWarnings("rawtypes")
	private List list=new ArrayList<String>();
	
	private int MaxSize;
	//自定義對象鎖
	private Object lock=new Object();
	
	public MyBlockingQueue(int MaxSize){
		this.MaxSize=MaxSize;
		//System.out.println("線程"+Thread.currentThread().getName()+"已經完成初始化。。。大小是:"+MaxSize);
	}
	
	public int size(){
		return this.list.size();
	}
	
	//使用自定義鎖同步代碼塊
	@SuppressWarnings("unchecked")
	public void put(Socket socket) {//Socket socket
		synchronized (lock) {
			try {
				//判斷是否滿了
				if(this.list.size()==MaxSize){
					//System.out.println("線程"+Thread.currentThread().getName()+"當前隊列已滿,put等待中。。。");
					lock.wait();//當隊列滿時阻塞隊列,釋放鎖
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
			this.list.add(socket);
			//System.out.println("線程"+Thread.currentThread().getName()+"向隊列中添加了元素:"+socket);
			lock.notifyAll();//當隊列未滿時喚醒阻塞隊列其餘線程
		}
	}
	//獲取socket
	public Socket get() {
		Socket socket=null;
		synchronized (lock) {
			try {
				if(this.list.size()==0){
					//System.out.println("線程"+Thread.currentThread().getName()+"當前隊列已經空了,get等待中。。。");
					lock.wait();
				}else{
					 socket=(Socket)list.get(0);
					//String socket=list.get(0).toString();
					list.remove(0);//移出list
					//System.out.println("線程"+Thread.currentThread().getName()+"取到了元素:"+socket);
					lock.notifyAll();
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
			return socket;
		}
	}
   //獲取全部socket,測試用
	public String getListElement(){
		String strtemp="";
		for(int i=0;i<this.list.size();i++){
			strtemp+=list.get(i)+";";
		}
		return strtemp;
	}
}

三、再次介紹的是CommonTool 類,用於接收、發送、檢查的方法都放在裏面,因爲我這邊發送的都是message(封裝的XML格式),因此後面轉發什麼的都是這個類,若是想測試的話,能夠用這個類註釋掉的代碼發送字符串(String)。app

public class CommonTool {
	//記錄日誌
	Log log = LogFactory.getLog(this .getClass()); 
	//PublicAppService publicAppService=new PublicAppService();
	/**
	* 判斷是否斷開鏈接,斷開返回true,沒有返回false
	* @param socket
	* @return
	*/
	public  Boolean isServerClose(Socket socket){
		try{
			socket.sendUrgentData(0);//發送1個字節的緊急數據,默認狀況下,服務器端沒有開啓緊急數據處理,不影響正常通訊
			return false;
		}catch(Exception se){
			return true;
		}
	}
	
	/**
	* 發送數據,發送失敗返回false,發送成功返回true
	* @param csocket
	* @param message
	* @return
	*/
	public synchronized  Boolean Send(Socket csocket,Message message){
		ObjectOutputStream os=null;
		XmlEntity appxmlentity=new XmlEntity();
		try{
			 if(null!=message){
             	os=new ObjectOutputStream(csocket.getOutputStream());
					//發數據到服務器端    
					 os.writeObject(message);
					 os.flush();
 			}
			 return true;
		}catch(Exception se){
			return false;
		}
	}
	/**
	* 讀取數據,返回字符串類型
	* @param csocket
	* @return
	*/
	public  synchronized  Message ReadText(Socket csocket){
		ObjectInputStream ois = null;
		Object obj=null;
		String strObj="";
		boolean flag=true;
		Message  message=new Message();
		try{
			while(flag){
				ois = new ObjectInputStream(new BufferedInputStream(csocket.getInputStream()));
				obj=ois.readObject();
				message=(Message)obj;
    			strObj=obj.toString();
    			if(strObj.indexOf("維持鏈接包")!=-1 || strObj.isEmpty() || strObj==""){
              		flag=true;
              	}else{
              		break;
              	}
			}
			
			//csocket.setSoTimeout(sotimeout);
			/*InputStream input = csocket.getInputStream();
			BufferedReader in = new BufferedReader(new InputStreamReader(input));
			char[] sn = new char[1000];
			in.read(sn);
			String sc = new String(sn);
			return sc;*/
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}catch(IOException se){
			return null;
		}
		return message;
	} 
}

  四、監聽線程:threadYW,很簡單的一個實現Runnable接口的類socket

public class AcceptFromYW implements Runnable {
	int portYW;
	ServerSocket serverSocket;
	Socket socket;
	MyBlockingQueue myBlockingQueue;
	public AcceptFromYW(){
		
	}
	public AcceptFromYW(ServerSocket serverSocket,MyBlockingQueue myBlockingQueue){
		this.serverSocket=serverSocket;
		this.myBlockingQueue=myBlockingQueue;
	}
	public synchronized void run() {
		try {
			while(true){
				 //調用accept()方法開始監聽,等待客戶端的鏈接
				 socket=serverSocket.accept();
				 socket.setKeepAlive(true);
				 if(socket!=null){
					 myBlockingQueue.put(socket);
					 try {
						Thread.sleep(50);
					} catch (InterruptedException e1) {
						e1.printStackTrace();
					}
					 //System.out.println("服務端接收到業務端鏈接請求:"+socket);
				 }
			}
			
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

}

 五、檢查線程:checkSocket,也是實現Runnable接口的類。測試

public class checkSocket implements Runnable {
	CommonTool commonTool=new CommonTool();
	Socket socket;
	MyBlockingQueue myBlockingQueue;
	/**
	 * @param socket
	 */
	public checkSocket(MyBlockingQueue myBlockingQueue) {
		super();
		this.myBlockingQueue = myBlockingQueue;
	}

	public void run() {
		while(true){
			try {
				if(myBlockingQueue.size()>0){//線程安全隊列存在socket纔會去檢查
					for(int i=0;i<=myBlockingQueue.size();i++){
						socket=myBlockingQueue.get();
						if(null!=socket && !commonTool.isServerClose(socket)){
							myBlockingQueue.put(socket);
						}else if(null!=socket){
							socket.close();
						}
						Thread.sleep(50);
					}
					System.out.println("myBlockingQueue.size:"+myBlockingQueue.size()+";ListElement:"+myBlockingQueue.getListElement());
				}
				Thread.sleep(2000);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

}

六、轉發線程:TranFromWZToYW也是一個實現Runnable接口的類。網站

public class TranFromWZToYW implements Runnable {
	//記錄日誌
	Log log = LogFactory.getLog(this .getClass()); 
	CommonTool commonTool=new CommonTool();
	private boolean close;
	Socket socketYW;
	Socket socketWZ;
	ObjectInputStream ois = null;
	ObjectOutputStream oos = null;
  	Message readMessageWZ=null;
  	Message readMessageYW=null;
  	MyBlockingQueue myBlockingQueue;
  	
  	/**
	 * @param socketYW
	 * @param socketWZ
	 */
	public TranFromWZToYW(Socket socketWZ,MyBlockingQueue myBlockingQueue) {
		super();
		this.socketWZ = socketWZ;
		this.myBlockingQueue=myBlockingQueue;
	}
	
	public synchronized void run() {
		try{
			
			close = commonTool.isServerClose(socketWZ);//判斷是否斷開
			if(!close){//沒有斷開,開始讀數據
				readMessageWZ = commonTool.ReadText(socketWZ);
				if(null!=readMessageWZ){
					System.out.println("讀取WZ數據:"+readMessageWZ);
					//log.info("讀取WZ數據:"+readMessageWZ);
				}
			}
			 if(null!=readMessageWZ){
			 	   while(myBlockingQueue.size()<=0){
			 		   System.out.println("服務端鏈接已用完,等待新鏈接...");
			 		  // log.info("服務端鏈接已用完,等待新鏈接...");
			 		   Thread.sleep(1000);
			 	   }
			 	  System.out.println("轉換平臺開始寫數據至業務端。。。");
			 	  while(null==socketYW){
			 		 socketYW=myBlockingQueue.get();
			 	  }
			 	  boolean a=commonTool.Send(socketYW, readMessageWZ);
			 	  if(a){
			 		 System.out.println("轉發成功,轉換平臺開始從業務端讀數據+++");
			 		 int i=0;
			 		 while(null==readMessageYW && i<3){
			 			 readMessageYW=commonTool.ReadText(socketYW);
			 			 Thread.sleep(50);
			 			 i++;
			 			 if(commonTool.isServerClose(socketYW) && null==readMessageYW){//用於判斷socketYW是否失效
			 				socketYW=myBlockingQueue.get();
			 			 }
			 		 }
				 	 if(null!=readMessageYW){
				 		 System.out.println("轉換平臺開始從業務端讀到數據,並開始轉發數據至客戶端網站.......");
				 		 close = commonTool.isServerClose(socketWZ);//判斷是否斷開
				 		 if(!close){
				 			 boolean b=commonTool.Send(socketWZ, readMessageYW);
					 		 if(b){
					 			 System.out.println("轉發至網站端成功+++readMessageYW:"+readMessageYW);
					 			// log.info("轉發至網站端成功+++readMessageYW:"+readMessageYW);
					 			 myBlockingQueue.put(socketYW);
					 			 System.out.println("socketYW放入myBlockingQueue成功!");
					 		 }
				 		 }else{
				 			 System.out.println("網站端鏈接關閉....轉發至網站端失敗!!!!   readMessageYW:"+readMessageYW);
				 			 //log.info("網站端鏈接關閉....轉發至網站端失敗!!!!    readMessageYW:"+readMessageYW);
				 			 myBlockingQueue.put(socketYW);
				 			// System.out.println("socketYW放入myBlockingQueue成功!");
				 		 }
				 		
				 	 }
			 	  }
			 	 
			 	 
			 }
		
		}catch (Exception e) {
			System.out.println("系統出現異常。。。。");
			try{
				if(ois!=null){
					ois.close();
				}
				if(oos!=null){
					oos.close();
				}
				if(socketWZ!=null){
					socketWZ.close();
				}
			}catch (Exception e1) {
				// TODO: handle exception
				System.err.println(e1);
			}
			
		}finally{
			try{
				if(ois!=null){
					ois.close();
				}
				if(oos!=null){
					oos.close();
				}
				if(socketWZ!=null){
					socketWZ.close();
				}
			}catch (Exception e) {
				// TODO: handle exception
				System.err.println(e);
			}
		}
	}
}

上面介紹的是狀態服務程序的相應代碼,下面一篇將會介紹數據庫服務端程序。

相關文章
相關標籤/搜索