ActiveMQ攔截客戶端建立/接收消息隊列spring
1.建立插件數據庫
public class AuthPlugin implements BrokerPlugin{ private String mqName;//本MQ服務器名稱 private JdbcTemplate jdbcTemplate;//數據庫操做類 public AuthPlugin(JdbcTemplate jdbcTemplate,String mqName) { this.jdbcTemplate=jdbcTemplate; this.mqName=mqName; } @Override public Broker installPlugin(Broker broker) throws Exception { return new AuthBroker(broker,jdbcTemplate,mqName); } }
2.修改apache-activemq\conf\activemq.xmlapache
<!--broker節點下--> <plugins> <bean xmlns="http://www.springframework.org/schema/beans" id="ehlPlugin" class="com.ehl.plugin.AuthPlugin"> <constructor-arg index="0"> <ref bean="jdbcTemplate"/> </constructor-arg> <constructor-arg index="1" value="MQName"/><!--本消息隊列的名稱--> </bean> </plugins>
3.建立插件類服務器
public class AuthBroker extends AbstractAuthenticationBroker{ private static Log log = LogFactory.getLog(AuthBroker.class); //用戶 對應的權限 private Map<String, Map<String,ViewProjectMqQueuesCom>> powers=new HashMap<String,Map<String,ViewProjectMqQueuesCom>>();//權限 private static final String ACTIVEMQ_ADVISORY_PRODUCER_QUEUE="ActiveMQ.Advisory.Producer.Queue.";//消息生產者前綴 private static final String ACTIVEMQ_ADVISORY_CONSUMER_QUEUE="ActiveMQ.Advisory.Consumer.Queue.";//消息消費者前綴 private JdbcTemplate jdbcTemplate;//數據庫操做 private String mqName;//MQ服務器名稱 public AuthBroker(Broker next,JdbcTemplate jdbcTemplate,String mqName) { super(next); this.jdbcTemplate=jdbcTemplate; this.mqName=mqName; } /** * 鏈接攔截器 */ @Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { log.info("用戶["+info.getUserName()+"]請求鏈接["+mqName+"]!"); SecurityContext securityContext = context.getSecurityContext(); if (securityContext == null) { securityContext = authenticate(info.getUserName(), info.getPassword(), null); context.setSecurityContext(securityContext); securityContexts.add(securityContext); } try { super.addConnection(context, info); } catch (Exception e) { securityContexts.remove(securityContext); context.setSecurityContext(null); throw e; } } /** * 認證 * <p>Title: authenticate</p> */ @Override public SecurityContext authenticate(String username, String password, X509Certificate[] peerCertificates) throws SecurityException { SecurityContext securityContext = null; Com com=getCom(username,password); //驗證用戶信息 if(com!=null&&com.getId()!=null){ securityContext = new SecurityContext(username) { @Override public Set<Principal> getPrincipals() { Set<Principal> groups = new HashSet<Principal>(); groups.add(new GroupPrincipal("users"));//默認加入了users的組 return groups; } }; // log.info("用戶:"+username+"驗證成功!"); }else{ log.error("用戶:"+username+"驗證失敗!"); throw new SecurityException("驗證失敗"); } return securityContext; } /** * 添加一個目標 * <p>Title: addDestination</p> * @see org.apache.activemq.broker.BrokerFilter#addDestination(org.apache.activemq.broker.ConnectionContext, org.apache.activemq.command.ActiveMQDestination, boolean) */ @Override public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception { boolean destStats = destination.getPhysicalName().regionMatches(true, 0, ACTIVEMQ_ADVISORY_PRODUCER_QUEUE, 0,ACTIVEMQ_ADVISORY_PRODUCER_QUEUE.length()); //發送消息者 if(destStats){ if(context.getSecurityContext()!=null){ //判斷不是默認用戶 if(!context.getSecurityContext().getUserName().equals(SecurityContext.BROKER_SECURITY_CONTEXT.getUserName())){ String queuesName=destination.getPhysicalName().replace(ACTIVEMQ_ADVISORY_PRODUCER_QUEUE, "");//獲得消息隊列名 if(powers.containsKey(context.getSecurityContext().getUserName())){//判斷該用戶是否有權限 Map<String,ViewProjectMqQueuesCom> map=powers.get(context.getSecurityContext().getUserName()); if(map!=null&&map.containsKey(queuesName)){//判斷是否有發送的權限 if(map.get(queuesName).getBindId()!=null&&map.get(queuesName).getComQueuesType()!=null){ if(map.get(queuesName).getComQueuesType().intValue()==QueuesComType.BOTH.getValue().intValue()||map.get(queuesName).getComQueuesType().intValue()==QueuesComType.SEND.getValue().intValue()){ return super.addDestination(context, destination, createIfTemporary); } } } throw new Exception("["+mqName+"-"+context.getUserName()+"]對消息隊列["+queuesName+"]沒有發送消息的權限"); }else{ throw new Exception("請登陸後再操做!"); } } } }else{ boolean consumerStats = destination.getPhysicalName().regionMatches(true, 0, ACTIVEMQ_ADVISORY_CONSUMER_QUEUE, 0,ACTIVEMQ_ADVISORY_CONSUMER_QUEUE.length()); //消息接收者 if(consumerStats){ if(context.getSecurityContext()!=null){ //判斷不是默認用戶 if(!context.getSecurityContext().getUserName().equals(SecurityContext.BROKER_SECURITY_CONTEXT.getUserName())){ String queuesName=destination.getPhysicalName().replace(ACTIVEMQ_ADVISORY_CONSUMER_QUEUE, "");//獲得消息隊列名稱 if(powers.containsKey(context.getSecurityContext().getUserName())){//判斷用戶是否有對應的權限 Map<String,ViewProjectMqQueuesCom> map=powers.get(context.getSecurityContext().getUserName()); if(map!=null&&map.containsKey(queuesName)){ if(map.get(queuesName).getBindId()!=null&&map.get(queuesName).getComQueuesType()!=null){ if(map.get(queuesName).getComQueuesType().intValue()==QueuesComType.BOTH.getValue().intValue()||map.get(queuesName).getComQueuesType().intValue()==QueuesComType.RECEIVE.getValue().intValue()){ return super.addDestination(context, destination, createIfTemporary); } } } throw new Exception("["+mqName+"-"+context.getUserName()+"]對消息隊列["+queuesName+"]沒有獲取消息的權限"); }else{ throw new Exception("請登陸後再操做!"); } } } } } return super.addDestination(context, destination, createIfTemporary); } /** * 監控發送消息 * <p>Title: send</p> * @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange, org.apache.activemq.command.Message) */ @Override public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { String userName=producerExchange.getConnectionContext().getUserName(); ActiveMQDestination msgDest = messageSend.getDestination(); String physicalName = msgDest.getPhysicalName(); } /** * 監控消息接收者 * <p>Title: acknowledge</p> * @see org.apache.activemq.broker.BrokerFilter#acknowledge(org.apache.activemq.broker.ConsumerBrokerExchange, org.apache.activemq.command.MessageAck) */ @Override public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { String userName=consumerExchange.getConnectionContext().getUserName(); String queues=ack.getDestination().getPhysicalName(); } }