redis的發佈訂閱小結-websocket 多臺tomcat集羣解決方案

項目裏用到了websocket 可是後臺是tomcat集羣 有時候就會發現前臺沒收到推送消息。發現websocket是用一個靜態變量存的先後臺鏈接。java

百度發現解決方案有好多 選了個簡單的 使用redis 發佈訂閱方式實現  就是後臺收到須要推送的消息時就往redis發佈一下。而後全部的tomcat都訂閱他  就能夠實現 集羣不漏發的問題了。web

一下是一些實現的代碼和邏輯。redis

websocket代碼、spring

package com.asdc.jbp.hisLogin.websocket;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.context.ContextLoader;



@Component("OutpatientDocWebSocket")
@ServerEndpoint("/outpatientDoc/{userId}")
public class OutpatientDocWebSocket {

	// concurrent包的線程安全Set,用來存放每一個客戶端對應的MyWebSocket對象。若要實現服務端與單一客戶端通訊的話,能夠使用Map來存放,其中Key能夠爲用戶標識
	//private static CopyOnWriteArraySet<OutpatientDocWebSocket> outpatientDocWebSocket = new CopyOnWriteArraySet<OutpatientDocWebSocket>();
	private static ConcurrentHashMap<String, OutpatientDocWebSocket> outpatientDocWebSocket = new ConcurrentHashMap<String, OutpatientDocWebSocket>();
	// 與某個客戶端的鏈接會話,須要經過它來給客戶端發送數據
	private Session session;
	
	//當前發消息的人員id
    private String userId = "";
    
    //保證當前鏈接是惟一的
    private String uniqueUuid = "";
	
    @Autowired
	private OutpatientRedisDao outpatientRedisDao;
	//=(OutpatientRedisDao) ContextLoader.getCurrentWebApplicationContext().getBean("OutpatientRedisDao")
	private Logger log=LoggerFactory.getLogger(getClass());

	/**
	 * 鏈接創建成功調用的方法
	 * @param session  可選的參數。session爲與某個客戶端的鏈接會話,須要經過它來給客戶端發送數據
	 */
	@OnOpen
	public void onOpen(@PathParam(value = "userId") String paramUserId,Session session) {
		this.session = session;
		userId=paramUserId;
		uniqueUuid=UUID.randomUUID().toString();
		outpatientDocWebSocket.put(paramUserId, this); // 加入set中
		OutpatientRedisDao outpatientRedisDaoOnOpen=(OutpatientRedisDao) ContextLoader.getCurrentWebApplicationContext().getBean("OutpatientRedisDao");
		//廣播創建鏈接 刪除其餘服務器鏈接
		outpatientRedisDaoOnOpen.sendMessageByOpen(OutpatientRedisDao.outpatientSocketOpenChannel, userId+"|"+uniqueUuid);
		   
	}

	/**
	 * 鏈接關閉調用的方法
	 */
	@OnClose
	public void onClose() {
		if(userId!=""){
			outpatientDocWebSocket.remove(userId); // 從map中刪除
		}
		
	}

	/**
	 * 收到客戶端消息後調用的方法
	 * @param message 客戶端發送過來的消息
	 * @param session 可選的參數
	 */
	@OnMessage
	public void onMessage(String message, Session session) {
		//System.out.println("來自客戶端的消息:" + message);
		//接受到的只能是websocket heart請求 不需理會
		//System.out.println(message);
		
	}

	/**
	 * 發生錯誤時調用
	 * @param session
	 * @param error
	 */
	@OnError
	public void onError(Session session, Throwable error) {
		//System.out.println("發生錯誤");
		log.error("websocket發生錯誤:",error);
		//error.printStackTrace();
	}

	/**
	 * 
	 * Description : 向前臺發送消息 <br>
	 * Create Time: 2018年4月26日 <br>
	 * Create by : xxx<br>
	 *
	 * @param message
	 * @throws IOException
	 */
	public void sendMessage(String message) throws IOException {
		this.session.getBasicRemote().sendText(message);
	}


	/**
	 * 
	 * Description : 後臺業務調用方法 <br>
	 * Create Time: 2018年5月7日 <br>
	 * Create by : xxx<br>
	 *
	 * @param regMsg
	 */
	public void SendRegisterMessage(String regMsg) {

		outpatientRedisDao.sendMessage(OutpatientRedisDao.outpatientChannel, regMsg);
		// 羣發消息,向全部鏈接這個websocket服務器的客戶端發送消息.
		
	}
	
	/**
	 * 
	 * Description : 接受redis回調的推送方法 <br>
	 * Create Time: 2018年5月7日 <br>
	 * Create by : xxx<br>
	 *
	 * @param callBackMsg
	 */
	public void sendMessageHandle(String callBackMsg){
		for (OutpatientDocWebSocket item : outpatientDocWebSocket.values()) {
			try {
				item.sendMessage(callBackMsg);
			} catch (IOException e) {
				log.error("websocket發送出錯:",e);
				e.printStackTrace();
				continue;
			}
		}
	}
	
	/**
	 * 
	 * Description : 創建鏈接時刪除當前已存在的鏈接---保持鏈接惟一 <br>
	 * Create Time: 2018年5月7日 <br>
	 * Create by : xxx<br>
	 *
	 * @param callBackMsg
	 */
	public void onOpenRemoveHandle(String callBackMsg){
		String[] msgs=callBackMsg.split("\\|");
		//0用戶id  1 惟一標識uuid
		for (OutpatientDocWebSocket item : outpatientDocWebSocket.values()) {
			if(msgs[0].equals(item.userId)&&!msgs[1].equals(item.uniqueUuid)){
				outpatientDocWebSocket.remove(item.userId);
			}
		}
	}
	
}

監聽redis訂閱tomcat

package com.asdc.jbp.hisLogin.redisUtil;

import org.springframework.beans.factory.annotation.Autowired;



/**
 * 
 * ClassName : OutpatientDocMessageDelegateListenerImpl <br>
 * Description : 接受訂閱默認代理-----由 MessageListenerAdapter  delegate  <br>
 * Create Time : 2018年4月26日 <br>
 * Create by : xxx<br>
 *
 */
public class OutpatientDocMessageDelegateListenerImpl{
	
	
	@Autowired
	private OutpatientDocWebSocket outpatientDocWebSocket;
	/**
	 * 
	 * Description : 此方法裏實現訂閱後調用方法 <br>
	 * Create Time: 2018年4月26日 <br>
	 * Create by : xxx<br>
	 *
	 * @param message
	 */
	public void handleMessage(String message){
		outpatientDocWebSocket.sendMessageHandle(message);
	}
	
	/**
	 * 
	 * Description : 鏈接時回調方法 <br>
	 * Create Time: 2018年5月9日 <br>
	 * Create by : xxx<br>
	 *
	 * @param message
	 */
	public void handleMessageByOpen(String message){
		outpatientDocWebSocket.onOpenRemoveHandle(message);
	}
}

 

往redis發佈消息安全

public interface OutpatientRedisDao {
	public static final String outpatientChannel ="outpatientDoc_Channel";
	
	public static final String outpatientSocketOpenChannel="outpatientSocketOpen_Channel";
	
	/**
	 * 
	 * Description : 掛號發送醫囑列表信息 <br>
	 * Create Time: 2018年4月26日 <br>
	 * Create by : xxx <br>
	 *
	 * @param channel
	 * @param message
	 */
	public void sendMessage(String channel,String message);
	
	public void sendMessageByOpen(String channel,String message);
}


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

@Component("OutpatientRedisDao")
public class OutpatientRedisDaoImpl implements OutpatientRedisDao {

	@Autowired
	private StringRedisTemplate redisTemplate;
	
	@Override
	public void sendMessage(String channel, String message) {
		redisTemplate.convertAndSend(channel, message);
	}

	@Override
    public void sendMessageByOpen(String channel, String message) {
		redisTemplate.convertAndSend(channel, message);	    
    }

}

spring-redis配置服務器

<bean id="redisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate">
		<property name="connectionFactory" ref="jedisConnectionFactory" />
		<property name="keySerializer">
			<bean
				class="org.springframework.data.redis.serializer.StringRedisSerializer" />
		</property>
		<property name="valueSerializer">
			<bean
				class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
		</property>
		<property name="hashKeySerializer">
			<bean
				class="org.springframework.data.redis.serializer.StringRedisSerializer" />
		</property>
		<property name="hashValueSerializer">
			<bean
				class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
		</property>
	</bean>

<!-- 掛號醫囑列表 訂閱發佈  -->
    <bean id="outpatientMessageDelegateListener" class="com.OutpatientDocMessageDelegateListenerImpl" />
	
    <bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="outpatientMessageDelegateListener" />
        <property name="serializer" ref="serializer" />
    </bean>
    <!-- org.springframework.data.redis.serializer.StringRedisSerializer 會有亂碼 -->
    <bean id="serializer" class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
    <redis:listener-container connection-factory="jedisConnectionFactory">
        <!-- the method attribute can be skipped as the default method name is "handleMessage" -->
        <redis:listener ref="outpatientMessageDelegateListener" serializer="serializer" method="handleMessage" topic="outpatientDoc_Channel" />
        <redis:listener ref="outpatientMessageDelegateListener" serializer="serializer" method="handleMessageByOpen" topic="outpatientSocketOpen_Channel" />
    </redis:listener-container>

代碼還不是很完善 基本思路就這樣。水平有限啊。。。。。websocket

相關文章
相關標籤/搜索