很早前就倉促的接觸過activemq,但當時太趕時間.後面發現activemq 須要瞭解的東西實在是太多了.html
關於activemq 一直想起一遍文章.但也一直缺乏本身的看法.或許是網上這些文章太多了.也多是本身知識還不足夠.ios
實際應用就是讓開發者能從多線程,多消息通訊中解救出來.更多的關注應用邏輯.web
CMS (stands for C++ Messaging Service) is a JMS-like API for C++ for interfacing with Message Brokers such as Apache ActiveMQ. CMS helps to make your C++ client code much neater and easier to follow. To get a better feel for CMS try the API Reference. ActiveMQ-CPP is a client only library, a message broker such as Apache ActiveMQ is still needed for your clients to communicate.express
Our implementation of CMS is called ActiveMQ-CPP, which has an architecture that allows for pluggable transports and wire formats. Currently we support the OpenWire and Stomp protocols, both over TCP and SSL, we also now support a Failover Transport for more reliable client operation. In addition to CMS, ActiveMQ-CPP also provides a robust set of classes that support platform independent constructs such as threading, I/O, sockets, etc. You may find many of these utilities very useful, such as a Java like Thread class or the "synchronized" macro that let's you use a Java-like synchronization on any object that implements the activemq::concurrent::Synchronizable interface. ActiveMQ-CPP is released under the Apache 2.0 Licenseapache
大意:api
CMS (C++ 消息 服務)是一個面象apache activemq 的 消息 中間層的C++接口.服務器
CMS的實現 叫作activemq-cpp ,不過當前只支持 openwire,amqp,TCP,ssl. 如今還支持 主備切換功能(這個是重點,當時我不懂,結果就走了彎路!_!).session
-_- ,意思是 activemq\conf\activemq.xml中的stomp,mqtt,ws 是沒辦法的.多線程
<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors>
參考:Active MQ C++實現通信 http://blog.csdn.net/lee353086/article/details/6777261app
activemq-cpp下載地址:
http://activemq.apache.org/cms/download.html
相關依賴庫
在 http://activemq.apache.org/cms/building.html 中是有介紹的.不過是en的.
仍是再說下吧.本人en也特差.
"With versions of ActiveMQ-CPP 2.2 and later, we have a dependency on the Apache Portable Runtime project. You'll need to install APR on your system before you'll be able to build ActiveMQ-CPP."
"The package contains a complete set of CppUnit tests. In order for you to build an run the tests, you will need to download and install the CppUnit library. See http://cppunit.sourceforge.net/cppunit-wiki"
因此就包含了:apr,apr-iconv,apr-util,cppunit.
http://mirrors.hust.edu.cn/apache/apr/
中能夠下載 apr,apr-iconv,apr-util(版本號都找最高的,不要一高一低,否則編譯會出問題).
apr-1.5.1-win32-src.zip,
apr-iconv-1.2.1-win32-src-r2.zip,
apr-util-1.5.4-win32-src.zip.
解壓後記得重命令文件夾,去掉版本號,改爲以下圖,否則工程編譯時默認的 [附加包含目錄] 是找不到的.
全部文件夾放在一個根目錄下.
打開 activemq-cpp-library\vs2008-build\activemq-cpp.sln
依次添加[如今項目]:libapr.vcproj,libapriconv.vcproj,libaprutil.vcproj. 只須要lib項就好了.
最後項目圖:
libapriconv.vcproj,libaprutil.vcproj 的[項目依賴項]都須要libapr
activemq-cpp 的[項目依賴項]須要libapriconv,libaprutil,libapr.
activemq-cpp 的[附加包含目錄] 須要包含 這三個的的include目錄.
通過漫長的編譯後,
這個大lib文件就出來.
activemq-cpp-example 這個工程 ,就有 hello world 的代碼.
經過這個項目可讓咱們更好的認識 activemq-cpp的結構.
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ // START SNIPPET: demo #include <activemq/library/ActiveMQCPP.h> #include <decaf/lang/Thread.h> #include <decaf/lang/Runnable.h> #include <decaf/util/concurrent/CountDownLatch.h> #include <decaf/lang/Integer.h> #include <decaf/lang/Long.h> #include <decaf/lang/System.h> #include <activemq/core/ActiveMQConnectionFactory.h> #include <activemq/util/Config.h> #include <cms/Connection.h> #include <cms/Session.h> #include <cms/TextMessage.h> #include <cms/BytesMessage.h> #include <cms/MapMessage.h> #include <cms/ExceptionListener.h> #include <cms/MessageListener.h> #include <stdlib.h> #include <stdio.h> #include <iostream> #include <memory> using namespace activemq::core; using namespace decaf::util::concurrent; using namespace decaf::util; using namespace decaf::lang; using namespace cms; using namespace std; class HelloWorldProducer : public Runnable { private: Connection* connection; Session* session; Destination* destination; MessageProducer* producer; int numMessages; bool useTopic; bool sessionTransacted; std::string brokerURI; private: HelloWorldProducer(const HelloWorldProducer&); HelloWorldProducer& operator=(const HelloWorldProducer&); public: HelloWorldProducer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false) : connection(NULL), session(NULL), destination(NULL), producer(NULL), numMessages(numMessages), useTopic(useTopic), sessionTransacted(sessionTransacted), brokerURI(brokerURI) { } virtual ~HelloWorldProducer(){ cleanup(); } void close() { this->cleanup(); } virtual void run() { try { // Create a ConnectionFactory auto_ptr<ConnectionFactory> connectionFactory( ConnectionFactory::createCMSConnectionFactory(brokerURI)); // Create a Connection connection = connectionFactory->createConnection(); connection->start(); // Create a Session if (this->sessionTransacted) { session = connection->createSession(Session::SESSION_TRANSACTED); } else { session = connection->createSession(Session::AUTO_ACKNOWLEDGE); } // Create the destination (Topic or Queue) if (useTopic) { destination = session->createTopic("TEST.FOO"); } else { destination = session->createQueue("TEST.FOO"); } // Create a MessageProducer from the Session to the Topic or Queue producer = session->createProducer(destination); producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); // Create the Thread Id String string threadIdStr = Long::toString(Thread::currentThread()->getId()); // Create a messages string text = (string) "Hello world! from thread " + threadIdStr; for (int ix = 0; ix < numMessages; ++ix) { std::auto_ptr<TextMessage> message(session->createTextMessage(text)); message->setIntProperty("Integer", ix); printf("Sent message #%d from thread %s\n", ix + 1, threadIdStr.c_str()); producer->send(message.get()); } } catch (CMSException& e) { e.printStackTrace(); } } private: void cleanup() { if (connection != NULL) { try { connection->close(); } catch (cms::CMSException& ex) { ex.printStackTrace(); } } // Destroy resources. try { delete destination; destination = NULL; delete producer; producer = NULL; delete session; session = NULL; delete connection; connection = NULL; } catch (CMSException& e) { e.printStackTrace(); } } }; class HelloWorldConsumer : public ExceptionListener, public MessageListener, public Runnable { private: CountDownLatch latch; CountDownLatch doneLatch; Connection* connection; Session* session; Destination* destination; MessageConsumer* consumer; long waitMillis; bool useTopic; bool sessionTransacted; std::string brokerURI; private: HelloWorldConsumer(const HelloWorldConsumer&); HelloWorldConsumer& operator=(const HelloWorldConsumer&); public: HelloWorldConsumer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false, int waitMillis = 30000) : latch(1), doneLatch(numMessages), connection(NULL), session(NULL), destination(NULL), consumer(NULL), waitMillis(waitMillis), useTopic(useTopic), sessionTransacted(sessionTransacted), brokerURI(brokerURI) { } virtual ~HelloWorldConsumer() { cleanup(); } void close() { this->cleanup(); } void waitUntilReady() { latch.await(); } virtual void run() { try { // Create a ConnectionFactory auto_ptr<ConnectionFactory> connectionFactory( ConnectionFactory::createCMSConnectionFactory(brokerURI)); // Create a Connection connection = connectionFactory->createConnection(); connection->start(); connection->setExceptionListener(this); // Create a Session if (this->sessionTransacted == true) { session = connection->createSession(Session::SESSION_TRANSACTED); } else { session = connection->createSession(Session::AUTO_ACKNOWLEDGE); } // Create the destination (Topic or Queue) if (useTopic) { destination = session->createTopic("TEST.FOO"); } else { destination = session->createQueue("TEST.FOO"); } // Create a MessageConsumer from the Session to the Topic or Queue consumer = session->createConsumer(destination); consumer->setMessageListener(this); std::cout.flush(); std::cerr.flush(); // Indicate we are ready for messages. latch.countDown(); // Wait while asynchronous messages come in. doneLatch.await(waitMillis); } catch (CMSException& e) { // Indicate we are ready for messages. latch.countDown(); e.printStackTrace(); } } // Called from the consumer since this class is a registered MessageListener. virtual void onMessage(const Message* message) { static int count = 0; try { count++; const TextMessage* textMessage = dynamic_cast<const TextMessage*> (message); string text = ""; if (textMessage != NULL) { text = textMessage->getText(); } else { text = "NOT A TEXTMESSAGE!"; } printf("Message #%d Received: %s\n", count, text.c_str()); } catch (CMSException& e) { e.printStackTrace(); } // Commit all messages. if (this->sessionTransacted) { session->commit(); } // No matter what, tag the count down latch until done. doneLatch.countDown(); } // If something bad happens you see it here as this class is also been // registered as an ExceptionListener with the connection. virtual void onException(const CMSException& ex AMQCPP_UNUSED) { printf("CMS Exception occurred. Shutting down client.\n"); ex.printStackTrace(); exit(1); } private: void cleanup() { if (connection != NULL) { try { connection->close(); } catch (cms::CMSException& ex) { ex.printStackTrace(); } } // Destroy resources. try { delete destination; destination = NULL; delete consumer; consumer = NULL; delete session; session = NULL; delete connection; connection = NULL; } catch (CMSException& e) { e.printStackTrace(); } } }; int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) { activemq::library::ActiveMQCPP::initializeLibrary(); { std::cout << "=====================================================\n"; std::cout << "Starting the example:" << std::endl; std::cout << "-----------------------------------------------------\n"; // Set the URI to point to the IP Address of your broker. // add any optional params to the url to enable things like // tightMarshalling or tcp logging etc. See the CMS web site for // a full list of configuration options. // // http://activemq.apache.org/cms/ // // Wire Format Options: // ========================= // Use either stomp or openwire, the default ports are different for each // // Examples: // tcp://127.0.0.1:61616 default to openwire // tcp://127.0.0.1:61616?wireFormat=openwire same as above // tcp://127.0.0.1:61613?wireFormat=stomp use stomp instead // // SSL: // ========================= // To use SSL you need to specify the location of the trusted Root CA or the // certificate for the broker you want to connect to. Using the Root CA allows // you to use failover with multiple servers all using certificates signed by // the trusted root. If using client authentication you also need to specify // the location of the client Certificate. // // System::setProperty( "decaf.net.ssl.keyStore", "<path>/client.pem" ); // System::setProperty( "decaf.net.ssl.keyStorePassword", "password" ); // System::setProperty( "decaf.net.ssl.trustStore", "<path>/rootCA.pem" ); // // The you just specify the ssl transport in the URI, for example: // // ssl://localhost:61617 // std::string brokerURI = "failover:(tcp://localhost:61616" // "?wireFormat=openwire" // "&transport.useInactivityMonitor=false" // "&connection.alwaysSyncSend=true" // "&connection.useAsyncSend=true" // "?transport.commandTracingEnabled=true" // "&transport.tcpTracingEnabled=true" // "&wireFormat.tightEncodingEnabled=true" ")"; //============================================================ // set to true to use topics instead of queues // Note in the code above that this causes createTopic or // createQueue to be used in both consumer an producer. //============================================================ bool useTopics = true; bool sessionTransacted = false; int numMessages = 2000; long long startTime = System::currentTimeMillis(); HelloWorldProducer producer(brokerURI, numMessages, useTopics); HelloWorldConsumer consumer(brokerURI, numMessages, useTopics, sessionTransacted); // Start the consumer thread. Thread consumerThread(&consumer); consumerThread.start(); // Wait for the consumer to indicate that its ready to go. consumer.waitUntilReady(); // Start the producer thread. Thread producerThread(&producer); producerThread.start(); // Wait for the threads to complete. producerThread.join(); consumerThread.join(); long long endTime = System::currentTimeMillis(); double totalTime = (double)(endTime - startTime) / 1000.0; consumer.close(); producer.close(); std::cout << "Time to completion = " << totalTime << " seconds." << std::endl; std::cout << "-----------------------------------------------------\n"; std::cout << "Finished with the example." << std::endl; std::cout << "=====================================================\n"; } activemq::library::ActiveMQCPP::shutdownLibrary(); } // END SNIPPET: demo
從main()開始吧.
這樣便簡快速的實現了應用邏輯.
能夠參考:
http://shmilyaw-hotmail-com.iteye.com/blog/1897635
目前本人須要的是activemq-cpp的 request-response 模式.
服務器與客戶端通訊 數據的交互 和 確認.
如下是本人修改後的簡單代碼,bug可能存在,請指出.
複製兩份,一份定義 USE_COMSUMER 一份定義 USE_PRODUCER 就能夠生成.
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ // START SNIPPET: demo #include <activemq/library/ActiveMQCPP.h> #include <decaf/lang/Thread.h> #include <decaf/lang/Runnable.h> #include <decaf/util/concurrent/CountDownLatch.h> #include <decaf/lang/Integer.h> #include <decaf/lang/Long.h> #include <decaf/lang/System.h> #include <activemq/core/ActiveMQConnectionFactory.h> #include <activemq/util/Config.h> #include <cms/Connection.h> #include <cms/Session.h> #include <cms/TextMessage.h> #include <cms/BytesMessage.h> #include <cms/MapMessage.h> #include <cms/ExceptionListener.h> #include <cms/MessageListener.h> #include <stdlib.h> #include <stdio.h> #include <iostream> #include <memory> #include <decaf/util/Random.h> using namespace activemq::core; using namespace decaf::util::concurrent; using namespace decaf::util; using namespace decaf::lang; using namespace cms; using namespace std; #define QUEUE_NAME "eventQueue" #define NAME_BYTE_LEN 16 class HelloWorldProducer : public ExceptionListener, public MessageListener, public Runnable { private: CountDownLatch latch; CountDownLatch doneLatch; Connection* connection; Session* session; Destination* destination; MessageProducer* producer; int numMessages; bool useTopic; bool sessionTransacted; std::string brokerURI; bool bReciveMessage; long waitMillis; private: HelloWorldProducer(const HelloWorldProducer&); HelloWorldProducer& operator=(const HelloWorldProducer&); public: HelloWorldProducer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false, long waitMillis=3000) : latch(1), doneLatch(numMessages), connection(NULL), session(NULL), destination(NULL), producer(NULL), numMessages(numMessages), useTopic(useTopic), sessionTransacted(sessionTransacted), brokerURI(brokerURI) , bReciveMessage(false), waitMillis(waitMillis) { } virtual ~HelloWorldProducer(){ cleanup(); } void close() { this->cleanup(); } void waitUntilReady() { latch.await(); } virtual void run() { try { // Create a ConnectionFactory auto_ptr<ConnectionFactory> connectionFactory( ConnectionFactory::createCMSConnectionFactory(brokerURI)); // Create a Connection connection = connectionFactory->createConnection(); connection->start(); // Create a Session if (this->sessionTransacted) { session = connection->createSession(Session::SESSION_TRANSACTED); } else { session = connection->createSession(Session::AUTO_ACKNOWLEDGE); } session = connection->createSession(); // Create the destination (Topic or Queue) if (useTopic) { destination = session->createTopic(QUEUE_NAME); } else { destination = session->createQueue(QUEUE_NAME); } // Create a MessageProducer from the Session to the Topic or Queue producer = session->createProducer(destination); producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); // Create the Thread Id String string threadIdStr = Long::toString(Thread::currentThread()->getId()); // Create a messages string text = (string) "Hello world! from thread " + threadIdStr; for (int ix = 0; ix < numMessages; ++ix) { std::auto_ptr<TextMessage> message(session->createTextMessage(text)); //關鍵消息... std::auto_ptr<Destination> tempDest(session->createTemporaryQueue()); //cms::Destination tempDest=session->createTemporaryTopic() ; MessageConsumer * responseConsumer = session->createConsumer(tempDest.get()); responseConsumer->setMessageListener(this);//監聽... message->setCMSReplyTo(tempDest.get()); Random random; char buffer[NAME_BYTE_LEN]={0}; random.nextBytes((unsigned char *)buffer,NAME_BYTE_LEN); string correlationId=""; for(int i=0;i<NAME_BYTE_LEN;++i) { char ch[NAME_BYTE_LEN*2]={0}; sprintf(ch,"%02X",(unsigned char)buffer[i]); string str(ch); correlationId+=str; } message->setCMSCorrelationID(correlationId); message->setIntProperty("Integer", ix); printf("Producer Sent message #%d from thread %s\n", ix + 1, threadIdStr.c_str()); producer->send(message.get()); // Indicate we are ready for messages. latch.countDown(); // Wait while asynchronous messages come in. doneLatch.await(waitMillis); } } catch (CMSException& e) { printf("Producer run() CMSException \n" ); // Indicate we are ready for messages. latch.countDown(); e.printStackTrace(); } } // Called from the Producer since this class is a registered MessageListener. virtual void onMessage(const Message* message) { static int count = 0; try { count++; const TextMessage* textMessage = dynamic_cast<const TextMessage*> (message); //ActiveMQMessageTransformation //std::auto_ptr<TextMessage> responsemessage(session->createTextMessage()); //responsemessage->setCMSCorrelationID(textMessage->getCMSCorrelationID()); //responsemessage->getCMSReplyTo() string text = ""; if (textMessage != NULL) { text = textMessage->getText(); } else { text = "NOT A TEXTMESSAGE!"; } printf("Producer Message #%d Received: %s\n", count, text.c_str()); //producer.send } catch (CMSException& e) { printf("Producer onMessage() CMSException \n" ); e.printStackTrace(); } // Commit all messages. if (this->sessionTransacted) { session->commit(); } // No matter what, tag the count down latch until done. doneLatch.countDown(); } // If something bad happens you see it here as this class is also been // registered as an ExceptionListener with the connection. virtual void onException(const CMSException& ex AMQCPP_UNUSED) { printf("Producer onException() CMS Exception occurred. Shutting down client. \n" ); ex.printStackTrace(); exit(1); } private: void cleanup() { if (connection != NULL) { try { connection->close(); } catch (cms::CMSException& ex) { ex.printStackTrace(); } } // Destroy resources. try { delete destination; destination = NULL; delete producer; producer = NULL; delete session; session = NULL; delete connection; connection = NULL; } catch (CMSException& e) { e.printStackTrace(); } } }; class HelloWorldConsumer : public ExceptionListener, public MessageListener, public Runnable { private: CountDownLatch latch; CountDownLatch doneLatch; Connection* connection; Session* session; Destination* destination; MessageConsumer* consumer; MessageProducer *producer; long waitMillis; bool useTopic; bool sessionTransacted; std::string brokerURI; private: HelloWorldConsumer(const HelloWorldConsumer&); HelloWorldConsumer& operator=(const HelloWorldConsumer&); public: HelloWorldConsumer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false, int waitMillis = 30000) : latch(1), doneLatch(numMessages), connection(NULL), session(NULL), destination(NULL), consumer(NULL), producer(NULL), waitMillis(waitMillis), useTopic(useTopic), sessionTransacted(sessionTransacted), brokerURI(brokerURI) { } virtual ~HelloWorldConsumer() { cleanup(); } void close() { this->cleanup(); } void waitUntilReady() { latch.await(); } virtual void run() { try { // Create a ConnectionFactory auto_ptr<ConnectionFactory> connectionFactory( ConnectionFactory::createCMSConnectionFactory(brokerURI)); // Create a Connection connection = connectionFactory->createConnection(); connection->start(); connection->setExceptionListener(this); // Create a Session if (this->sessionTransacted == true) { session = connection->createSession(Session::SESSION_TRANSACTED); } else { session = connection->createSession(Session::AUTO_ACKNOWLEDGE); } // Create the destination (Topic or Queue) if (useTopic) { destination = session->createTopic(QUEUE_NAME); } else { destination = session->createQueue(QUEUE_NAME); } producer = session->createProducer(); producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); // Create a MessageConsumer from the Session to the Topic or Queue consumer = session->createConsumer(destination); consumer->setMessageListener(this); std::cout.flush(); std::cerr.flush(); // Indicate we are ready for messages. latch.countDown(); // Wait while asynchronous messages come in. doneLatch.await(); } catch (CMSException& e) { printf("Consumer onException() CMS Exception occurred. Shutting down client. \n" ); // Indicate we are ready for messages. latch.countDown(); e.printStackTrace(); } } // Called from the consumer since this class is a registered MessageListener. virtual void onMessage(const Message* message) { static int count = 0; try { count++; // Create the Thread Id String string threadIdStr = Long::toString(Thread::currentThread()->getId()); static bool bPrintf=true; if(bPrintf) { bPrintf=false; printf("consumer Message threadid: %s\n", threadIdStr.c_str()); } string strReply="consumer return xxx,ThreadID="+threadIdStr; const TextMessage* textMessage = dynamic_cast<const TextMessage*> (message); if(NULL==textMessage) { printf("NULL==textMessage", message->getCMSType().c_str()); //const cms::MapMessage* mapMsg = dynamic_cast<const cms::MapMessage*>(message); //if(mapMsg) //{ // // std::vector<std::string> elements = mapMsg->getMapNames(); // std::vector<std::string>::iterator iter = elements.begin(); // for(; iter != elements.end() ; ++iter) // { // std::string key = *iter; // cms::Message::ValueType elementType = mapMsg->getValueType(key); // string strxxx; // int cc=0; // switch(elementType) { // case cms::Message::BOOLEAN_TYPE: // //msg->setBoolean(key, mapMsg->getBoolean(key)); // break; // case cms::Message::BYTE_TYPE: // //msg->setByte(key, mapMsg->getByte(key)); // break; // case cms::Message::BYTE_ARRAY_TYPE: // //msg->setBytes(key, mapMsg->getBytes(key)); // break; // case cms::Message::CHAR_TYPE: // //msg->setChar(key, mapMsg->getChar(key)); // break; // case cms::Message::SHORT_TYPE: // //msg->setShort(key, mapMsg->getShort(key)); // break; // case cms::Message::INTEGER_TYPE: // //msg->setInt(key, mapMsg->getInt(key)); // break; // case cms::Message::LONG_TYPE: // //msg->setLong(key, mapMsg->getLong(key)); // break; // case cms::Message::FLOAT_TYPE: // //msg->setFloat(key, mapMsg->getFloat(key)); // break; // case cms::Message::DOUBLE_TYPE: // //msg->setDouble(key, mapMsg->getDouble(key)); // break; // case cms::Message::STRING_TYPE: // //msg->setString(key, mapMsg->getString(key)); // strxxx=mapMsg->getString(key); // cc=1; // break; // default: // break; // } // } //} return; } std::auto_ptr<TextMessage> responsemessage(session->createTextMessage(strReply)); responsemessage->setCMSCorrelationID(textMessage->getCMSCorrelationID()); string text = ""; if (textMessage != NULL) { text = textMessage->getText(); } else { text = "NOT A TEXTMESSAGE!"; } int nProPerty=textMessage->getIntProperty("Integer"); printf("consumer Message #%d Received: %s,nProPerty[%d]\n", count, text.c_str(),nProPerty); const cms::Destination* destSend=textMessage->getCMSReplyTo(); if(destSend) { this->producer->send(destSend,responsemessage.get()); printf("consumer Message #%d send: %s\n", count, strReply.c_str()); } } catch (CMSException& e) { printf("Consumer onMessage() CMS Exception occurred. Shutting down client. \n" ); e.printStackTrace(); } // Commit all messages. if (this->sessionTransacted) { session->commit(); } // No matter what, tag the count down latch until done. //doneLatch.countDown(); } // If something bad happens you see it here as this class is also been // registered as an ExceptionListener with the connection. virtual void onException(const CMSException& ex AMQCPP_UNUSED) { printf("Consumer onException() CMS Exception occurred. Shutting down client. \n" ); //printf("CMS Exception occurred. Shutting down client.\n"); ex.printStackTrace(); exit(1); } private: void cleanup() { if (connection != NULL) { try { connection->close(); } catch (cms::CMSException& ex) { ex.printStackTrace(); } } // Destroy resources. try { delete destination; destination = NULL; delete consumer; consumer = NULL; delete session; session = NULL; delete connection; connection = NULL; } catch (CMSException& e) { e.printStackTrace(); } } }; int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) { //if(argc<2) //{ // printf("argc<2\r\n"); // return 0; //} activemq::library::ActiveMQCPP::initializeLibrary(); { std::cout << "=====================================================\n"; std::cout << "Starting the example:" << std::endl; std::cout << "-----------------------------------------------------\n"; // Set the URI to point to the IP Address of your broker. // add any optional params to the url to enable things like // tightMarshalling or tcp logging etc. See the CMS web site for // a full list of configuration options. // // http://activemq.apache.org/cms/ // // Wire Format Options: // ========================= // Use either stomp or openwire, the default ports are different for each // // Examples: // tcp://127.0.0.1:61616 default to openwire // tcp://127.0.0.1:61616?wireFormat=openwire same as above // tcp://127.0.0.1:61613?wireFormat=stomp use stomp instead // // SSL: // ========================= // To use SSL you need to specify the location of the trusted Root CA or the // certificate for the broker you want to connect to. Using the Root CA allows // you to use failover with multiple servers all using certificates signed by // the trusted root. If using client authentication you also need to specify // the location of the client Certificate. // // System::setProperty( "decaf.net.ssl.keyStore", "<path>/client.pem" ); // System::setProperty( "decaf.net.ssl.keyStorePassword", "password" ); // System::setProperty( "decaf.net.ssl.trustStore", "<path>/rootCA.pem" ); // // The you just specify the ssl transport in the URI, for example: // // ssl://localhost:61617 // std::string brokerURI = "failover:(tcp://192.168.10.143:61616" // "?wireFormat=openwire" // "&transport.useInactivityMonitor=false" // "&connection.alwaysSyncSend=true" // "&connection.useAsyncSend=true" // "?transport.commandTracingEnabled=true" // "&transport.tcpTracingEnabled=true" // "&wireFormat.tightEncodingEnabled=true" ")"; //============================================================ // set to true to use topics instead of queues // Note in the code above that this causes createTopic or // createQueue to be used in both consumer an producer. //============================================================ bool useTopics = false; bool sessionTransacted = true; int numMessages = 1; bool useConsumer=true; bool useProducer=true; //int nSet=atoi(argv[1]); //if(1==nSet) //{ //#define USE_COMSUMER //} //else //{ //#define USE_PRODUCER // //} long long startTime = System::currentTimeMillis(); #ifdef USE_PRODUCER printf("當前 USE_PRODUCER \r\n"); int numProducerMessages = 30; int nThreadNumber=10; vector<HelloWorldProducer *> vHelloWorldProducer; for(int i=0;i<nThreadNumber;++i) { HelloWorldProducer * producerTemp=new HelloWorldProducer(brokerURI, numProducerMessages, useTopics); vHelloWorldProducer.push_back(producerTemp); } #endif #ifdef USE_COMSUMER printf("當前 USE_COMSUMER \r\n"); HelloWorldConsumer consumer(brokerURI, numMessages, useTopics, sessionTransacted); // Start the consumer thread. Thread consumerThread(&consumer); consumerThread.start(); // Wait for the consumer to indicate that its ready to go. consumer.waitUntilReady(); #endif #ifdef USE_PRODUCER // Start the producer thread. vector<Thread *> vThread; for(int i=0;i<nThreadNumber;++i) { HelloWorldProducer & ProducerTemp=*vHelloWorldProducer[i]; Thread * threadTemp=new Thread(&ProducerTemp); vThread.push_back(threadTemp); threadTemp->start(); ProducerTemp.waitUntilReady(); } for(int i=0;i<vThread.size();++i) { Thread * threadTemp=vThread[i]; //threadTemp->join(); } while(1) { Thread::sleep(10); } //Thread producerThread1(&producer1); //producerThread1.start(); //producer1.waitUntilReady(); //Thread producerThread2(&producer2); //producerThread2.start(); //producer2.waitUntilReady(); //Thread producerThread3(&producer3); //producerThread3.start(); //producer3.waitUntilReady(); #endif #ifdef USE_PRODUCER // Wait for the threads to complete. //producerThread1.join(); //producerThread2.join(); //producerThread3.join(); #endif #ifdef USE_COMSUMER consumerThread.join(); #endif long long endTime = System::currentTimeMillis(); double totalTime = (double)(endTime - startTime) / 1000.0; #ifdef USE_PRODUCER //producer1.close(); //producer2.close(); //producer3.close(); for(int i=0;i<vHelloWorldProducer.size();++i) { HelloWorldProducer * ProducerTemp=vHelloWorldProducer[i]; ProducerTemp->close(); if(ProducerTemp) { delete ProducerTemp; ProducerTemp=NULL; } } #endif #ifdef USE_COMSUMER consumer.close(); #endif std::cout << "Time to completion = " << totalTime << " seconds." << std::endl; std::cout << "-----------------------------------------------------\n"; std::cout << "Finished with the example." << std::endl; std::cout << "=====================================================\n"; } activemq::library::ActiveMQCPP::shutdownLibrary(); return 0; } // END SNIPPET: demo
程序運行結果:
關於activemq-cpp 的Message 消息轉換.
activemq-cpp 中的轉換ActiveMQMessageTransformation.transformMessage 中是有相應的實現.
//////////////////////////////////////////////////////////////////////////////// bool ActiveMQMessageTransformation::transformMessage(cms::Message* message, ActiveMQConnection* connection, Message** amqMessage) { if (message == NULL) { throw NullPointerException(__FILE__, __LINE__, "Provided source cms::Message pointer was NULL"); } if (amqMessage == NULL) { throw NullPointerException(__FILE__, __LINE__, "Provided target commands::Message pointer was NULL"); } *amqMessage = dynamic_cast<Message*>(message); if (*amqMessage != NULL) { return false; } else { if (dynamic_cast<cms::BytesMessage*>(message) != NULL) { cms::BytesMessage* bytesMsg = dynamic_cast<cms::BytesMessage*>(message); bytesMsg->reset(); ActiveMQBytesMessage* msg = new ActiveMQBytesMessage(); msg->setConnection(connection); try { for (;;) { // Reads a byte from the message stream until the stream is empty msg->writeByte(bytesMsg->readByte()); } } catch (cms::MessageEOFException& e) { // if an end of message stream as expected } catch (cms::CMSException& e) { } *amqMessage = msg; } else if (dynamic_cast<cms::MapMessage*>(message) != NULL) { cms::MapMessage* mapMsg = dynamic_cast<cms::MapMessage*>(message); ActiveMQMapMessage* msg = new ActiveMQMapMessage(); msg->setConnection(connection); std::vector<std::string> elements = mapMsg->getMapNames(); std::vector<std::string>::iterator iter = elements.begin(); for(; iter != elements.end() ; ++iter) { std::string key = *iter; cms::Message::ValueType elementType = mapMsg->getValueType(key); switch(elementType) { case cms::Message::BOOLEAN_TYPE: msg->setBoolean(key, mapMsg->getBoolean(key)); break; case cms::Message::BYTE_TYPE: msg->setByte(key, mapMsg->getByte(key)); break; case cms::Message::BYTE_ARRAY_TYPE: msg->setBytes(key, mapMsg->getBytes(key)); break; case cms::Message::CHAR_TYPE: msg->setChar(key, mapMsg->getChar(key)); break; case cms::Message::SHORT_TYPE: msg->setShort(key, mapMsg->getShort(key)); break; case cms::Message::INTEGER_TYPE: msg->setInt(key, mapMsg->getInt(key)); break; case cms::Message::LONG_TYPE: msg->setLong(key, mapMsg->getLong(key)); break; case cms::Message::FLOAT_TYPE: msg->setFloat(key, mapMsg->getFloat(key)); break; case cms::Message::DOUBLE_TYPE: msg->setDouble(key, mapMsg->getDouble(key)); break; case cms::Message::STRING_TYPE: msg->setString(key, mapMsg->getString(key)); break; default: break; } } *amqMessage = msg; } else if (dynamic_cast<cms::ObjectMessage*>(message) != NULL) { cms::ObjectMessage* objMsg = dynamic_cast<cms::ObjectMessage*>(message); ActiveMQObjectMessage* msg = new ActiveMQObjectMessage(); msg->setConnection(connection); msg->setObjectBytes(objMsg->getObjectBytes()); *amqMessage = msg; } else if (dynamic_cast<cms::StreamMessage*>(message) != NULL) { cms::StreamMessage* streamMessage = dynamic_cast<cms::StreamMessage*>(message); streamMessage->reset(); ActiveMQStreamMessage* msg = new ActiveMQStreamMessage(); msg->setConnection(connection); try { while(true) { cms::Message::ValueType elementType = streamMessage->getNextValueType(); int result = -1; std::vector<unsigned char> buffer(255); switch(elementType) { case cms::Message::BOOLEAN_TYPE: msg->writeBoolean(streamMessage->readBoolean()); break; case cms::Message::BYTE_TYPE: msg->writeBoolean(streamMessage->readBoolean()); break; case cms::Message::BYTE_ARRAY_TYPE: while ((result = streamMessage->readBytes(buffer)) != -1) { msg->writeBytes(&buffer[0], 0, result); buffer.clear(); } break; case cms::Message::CHAR_TYPE: msg->writeChar(streamMessage->readChar()); break; case cms::Message::SHORT_TYPE: msg->writeShort(streamMessage->readShort()); break; case cms::Message::INTEGER_TYPE: msg->writeInt(streamMessage->readInt()); break; case cms::Message::LONG_TYPE: msg->writeLong(streamMessage->readLong()); break; case cms::Message::FLOAT_TYPE: msg->writeFloat(streamMessage->readFloat()); break; case cms::Message::DOUBLE_TYPE: msg->writeDouble(streamMessage->readDouble()); break; case cms::Message::STRING_TYPE: msg->writeString(streamMessage->readString()); break; default: break; } } } catch (cms::MessageEOFException& e) { // if an end of message stream as expected } catch (cms::CMSException& e) { } *amqMessage = msg; } else if (dynamic_cast<cms::TextMessage*>(message) != NULL) { cms::TextMessage* textMsg = dynamic_cast<cms::TextMessage*>(message); ActiveMQTextMessage* msg = new ActiveMQTextMessage(); msg->setConnection(connection); msg->setText(textMsg->getText()); *amqMessage = msg; } else { *amqMessage = new ActiveMQMessage(); (*amqMessage)->setConnection(connection); } ActiveMQMessageTransformation::copyProperties(message, dynamic_cast<cms::Message*>(*amqMessage)); } return true; }
能夠參考:
http://bh-keven.iteye.com/blog/1617788
http://blog.csdn.net/jason5186/article/details/18702523
http://activemq.apache.org/nms/activemq-uri-configuration.html
http://activemq.apache.org/tcp-transport-reference.html
是有相應介紹,但須要花一些時間去讀.
//7,wireFormat=openwire 的幾種方式.的優缺點.
//openwire,amqp,stomp,mqtt,ws