一、實現功能
但願使用一套API,實現兩種模式下的消息發送和接收功能,方便業務程序調用數組
一、發送Topic緩存
二、發送Queuesession
三、接收Topictcp
四、接收Queueide
二、接口設計
根據功能設計公共調用接口測試
-
-
-
-
-
-
-
public interface MsgDistributeInterface {
-
-
-
-
-
-
-
-
-
public boolean sendTopic(String topicName, byte[] data);
-
-
-
-
-
-
-
-
-
-
boolean sendTopic(String topicName, byte[] data, int offset, int length);
-
-
-
-
-
-
-
-
-
public boolean sendQueue(String queueName, byte[] data);
-
-
-
-
-
-
-
-
-
-
public boolean sendQueue(String queueName, byte[] data,int offset, int length);
-
-
-
-
-
-
-
-
void receiveQueue(String queueName, MessageListener listener) throws JMSException;
-
-
-
-
-
-
-
-
void receiveTopic(String topicName, MessageListener listener) throws JMSException;
-
三、基於ActiveMQ的接口實現
-
-
-
-
-
-
-
public class ActiveMQImpl implements MsgDistributeInterface {
-
-
-
-
private String brokerURL;
-
private boolean persistentMode;
-
-
ConnectionFactory connectionFactory;
-
-
-
-
-
-
-
ThreadLocal<MessageProducer> topicThreadLocal =
new ThreadLocal<MessageProducer>();
-
-
ThreadLocal<MessageProducer> queueThreadLocal =
new ThreadLocal<MessageProducer>();
-
-
public ActiveMQImpl(String userName, String password, String brokerURL) throws JMSException {
-
this(userName, password, brokerURL, true);
-
-
-
public ActiveMQImpl(String userName, String password, String brokerURL,boolean persistentMode) throws JMSException {
-
this.userName = userName;
-
this.password = password;
-
this.brokerURL = brokerURL;
-
this.persistentMode=persistentMode;
-
-
-
-
public void init() throws JMSException {
-
-
-
connectionFactory =
new ActiveMQConnectionFactory(this.userName, this.password, this.brokerURL);
-
-
connection = connectionFactory.createConnection();
-
-
-
-
session = connection.createSession(
false, Session.AUTO_ACKNOWLEDGE);
-
}
catch (JMSException e) {
-
-
-
-
-
-
public boolean sendTopic(String topicName, byte[] data) {
-
return sendTopic(topicName, data, 0, data.length);
-
-
-
-
public boolean sendTopic(String topicName, byte[] data, int offset, int length) {
-
return send(true, topicName, data, offset, length);
-
-
-
-
public boolean sendQueue(String queueName, byte[] data) {
-
return sendQueue(queueName, data, 0, data.length);
-
-
-
-
public boolean sendQueue(String queueName, byte[] data, int offset, int length) {
-
return send(false, queueName, data, offset, length);
-
-
-
-
-
-
-
-
-
-
-
-
-
-
private boolean send(boolean type, String name, byte[] data, int offset, int length) {
-
-
MessageProducer messageProducer = getMessageProducer(name, type);
-
-
BytesMessage msg = createBytesMsg(data, offset, length);
-
System.err.println(Thread.currentThread().getName()+
"發送消息");
-
-
messageProducer.send(msg);
-
}
catch (JMSException e) {
-
-
-
-
-
-
public void receive(String topicName) throws JMSException {
-
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-
Topic topic =session.createTopic(topicName);
-
MessageConsumer consumer=session.createConsumer(topic);
-
consumer.setMessageListener(
new MessageListener() {
-
-
public void onMessage(Message message) {
-
BytesMessage msg=(BytesMessage) message;
-
System.err.println(Thread.currentThread().getName()+
"收到消息:"+msg.toString());
-
-
-
-
-
-
-
-
-
-
-
-
-
-
private BytesMessage createBytesMsg(byte[] data, int offset, int length) throws JMSException {
-
BytesMessage msg = session.createBytesMessage();
-
msg.writeBytes(data, offset, length);
-
-
-
-
-
-
-
-
-
-
private ObjectMessage createMapMsg(Serializable obj) throws JMSException {
-
-
ObjectMessage msg = session.createObjectMessage(obj);
-
-
-
-
-
-
-
-
-
-
private TextMessage createTextMsg(String text) throws JMSException {
-
TextMessage msg = session.createTextMessage(text);
-
-
-
-
-
-
-
-
-
-
-
-
-
private MessageProducer getMessageProducer(String name, boolean type) throws JMSException {
-
return type?getTopicProducer(name):getQueueProducer(name);
-
-
-
-
-
-
-
-
-
private MessageProducer getQueueProducer(String queueName) throws JMSException {
-
MessageProducer messageProducer =
null;
-
if ((messageProducer = queueThreadLocal.get()) == null) {
-
Queue queue = session.createQueue(queueName);
-
messageProducer = session.createProducer(queue);
-
-
messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
-
queueThreadLocal.set(messageProducer);
-
-
-
-
-
-
-
-
-
-
-
private MessageProducer getTopicProducer(String topicName) throws JMSException {
-
MessageProducer messageProducer =
null;
-
if ((messageProducer = topicThreadLocal.get()) == null) {
-
Topic topic = session.createTopic(topicName);
-
messageProducer = session.createProducer(topic);
-
-
messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
-
topicThreadLocal.set(messageProducer);
-
-
-
-
-
public String getPassword() {
-
-
-
-
public void setPassword(String password) {
-
this.password = password;
-
-
-
-
public void receiveQueue(String queueName,MessageListener listener) throws JMSException {
-
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-
Queue topic =session.createQueue(queueName);
-
MessageConsumer consumer=session.createConsumer(topic);
-
consumer.setMessageListener(listener);
-
-
-
-
-
public void receiveTopic(String topicName,MessageListener listener) throws JMSException {
-
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-
Topic topic =session.createTopic(topicName);
-
MessageConsumer consumer=session.createConsumer(topic);
-
consumer.setMessageListener(listener);
-
四、測試一下Topic和Queue
-
public static void main(String[] args) throws JMSException{
-
-
MsgDistributeInterface producter =
new ActiveMQImpl("system", "manager", "tcp://127.0.0.1:61616");
-
Test testMq =
new Test();
-
-
-
}
catch (InterruptedException e) {
-
-
-
-
new Thread(testMq.new ProductorMq(producter)).start();
-
-
new Thread(testMq.new ProductorMq(producter)).start();
-
-
new Thread(testMq.new ProductorMq(producter)).start();
-
-
new Thread(testMq.new ProductorMq(producter)).start();
-
-
new Thread(testMq.new ProductorMq(producter)).start();
-
-
new Thread(testMq.new ProductorMq(producter)).start();
-
-
-
new Thread(new Runnable() {
-
-
-
-
producter.receiveTopic(
"eguid-topic",new MessageListener() {
-
-
public void onMessage(Message message) {
-
BytesMessage msg=(BytesMessage) message;
-
System.err.println(Thread.currentThread().getName()+
"訂閱主題消息:"+msg.toString());
-
-
-
}
catch (JMSException e) {
-
-
-
-
-
-
-
new Thread(new Runnable() {
-
-
-
-
producter.receiveTopic(
"eguid-topic",new MessageListener() {
-
-
public void onMessage(Message message) {
-
BytesMessage msg=(BytesMessage) message;
-
System.err.println(Thread.currentThread().getName()+
"訂閱主題消息:"+msg.toString());
-
-
-
}
catch (JMSException e) {
-
-
-
-
-
-
-
new Thread(testMq.new QueueProductor(producter)).start();
-
-
new Thread(testMq.new QueueProductor(producter)).start();
-
-
new Thread(new Runnable() {
-
-
-
-
producter.receiveQueue(
"eguid-queue",new MessageListener() {
-
-
public void onMessage(Message message) {
-
BytesMessage msg=(BytesMessage) message;
-
System.err.println(Thread.currentThread().getName()+
"收到隊列消息:"+msg.toString());
-
-
-
}
catch (JMSException e) {
-
-
-
-
-
-
-
new Thread(new Runnable() {
-
-
-
-
producter.receiveQueue(
"eguid-queue",new MessageListener() {
-
-
public void onMessage(Message message) {
-
BytesMessage msg=(BytesMessage) message;
-
System.err.println(Thread.currentThread().getName()+
"收到隊列消息:"+msg.toString());
-
-
-
}
catch (JMSException e) {
-
-
-
-
-
-
-
-
private class ProductorMq implements Runnable{
-
Jtt809MsgProducter producter;
-
public ProductorMq(Jtt809MsgProducter producter){
-
this.producter = producter;
-
-
-
-
-
-
-
String wang=Thread.currentThread().getName()+
"Hello eguid! This is topic.";
-
producter.sendTopic(
"eguid-topic",wang.getBytes());
-
-
-
}
catch (InterruptedException e) {
-
-
-
-
-
-
-
private class QueueProductor implements Runnable{
-
Jtt809MsgProducter producter;
-
public QueueProductor(Jtt809MsgProducter producter){
-
this.producter = producter;
-
-
-
-
-
-
-
String eguid=Thread.currentThread().getName()+
"Hello eguid! This is queue.";
-
producter.sendQueue(
"eguid-queue",eguid.getBytes());
-
-
}
catch (InterruptedException e) {
-
-
-
-
-
-------------------End--------------------ui