大數據實戰之環境搭建(十)

  1. Html5, 雲計算,移動互聯網,大數據你知道多少,若是不知道多少,請抓緊時間學習。html


今天要說的是消息隊列ActiveMQ,這個和cassandra同樣也是apache的產品,開源的,支持不少客戶端,好比java,C#,ruby,python等。ActiveMQ 是一個徹底支持JMS和J2EE規範的 JMS Provider實現。在介紹jms以前,先要介紹一下它的規範,俗話說無規矩不成方圓,就像序列化和反序列化同樣,你們要遵循必定的規則才能交互。java

JMS的基本構件有python

(1).鏈接工廠, 是客戶端用來建立鏈接的對象,ActiveMQConnectionFactory類。linux

(2). 鏈接,JMS Connection封裝了客戶端和JMS提供者之間的一個虛擬鏈接,ActiveMQConnectio類。web

(3). 會話,JMS Session是生產者和消費者之間Produce message和Consume message的上下文。macos

會話用於建立消息的生產者,消費者和消息。這個上下文保持了會話的事務性,即發送消息和接受消息被組合到了一個事務中。apache

(4). 目的地,這個目的地代表了生產消息的目標和消費消息的目標,Destination對象。c#

JMS規範中定義了兩種消息傳遞域,一種是點對點,一種是發佈/訂閱。點對點的特色是一個消息只能有一個消費者,消費者和生產者之間沒有時間上的相關性,不管消費者在生產者發送消息的時候是不是出於運行狀態,他均可以提取到消息。訂閱發佈模式的特色是一個生產者能夠有多個消費者,這種模式存在時間上的相關性,消費者只能消費自從他訂閱以後的消息,以前的消息是不能消費的。可是JMS規範容許客戶建立持久訂閱,即便是在生產者在消費者發送消息的時候處於一個未激活的狀態,等他激活之後,依然能夠消費未激活以前生產者發送的消息瀏覽器

(5).生產者,是由會話建立的一個對象,它主要職責是將消息發送到目的地ruby

(6).消費者,是由會話建立的一個對象,它主要是接收生產者發送到目的地的消息

消費者能夠同步消費和異步消費。


OK,概念就先講到這裏,咱們看一下環境

首先從apache 網站上下載ActiveMQ linux版本

在這裏我下載的版本是5.8.0。OK咱們把它放到FTP Server上,在CentOS上拷貝至opt文件夾。


ok咱們用命令解壓

解壓完成後,直接運行啓動腳本

[root@bogon opt]# ls
apache-activemq-5.8.0  apache-activemq-5.8.0-bin.tar.gz
[root@bogon opt]# cd apache-activemq-5.8.0
[root@bogon apache-activemq-5.8.0]# ls
activemq-all-5.8.0.jar  docs     NOTICE           webapps-demo
bin                     example  README.txt       WebConsole-README.txt
conf                    lib      user-guide.html
data                    LICENSE  webapps
[root@bogon apache-activemq-5.8.0]# cd bin
[root@bogon bin]# ls
activemq        activemq.jar  linux-x86-32  macosx
activemq-admin  diag          linux-x86-64  wrapper.jar
[root@bogon bin]# sh activemq

OK,開始啓動,到這一步啓動成功

咱們看一下它的管理界面,默認端口是8161,在conf目錄下的jetty.xml中

<property name="connectors">
<list>
<bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector">
<property name="port" value="8161" />
</bean>
<!--
Enable this connector if you wish to use https with web console
-->
<!--
<bean id="SecureConnector" class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector">
<property name="port" value="8162" />
<property name="keystore" value="file:${activemq.conf}/broker.ks" />
<property name="password" value="password" />
</bean>
-->
</list>
</property>

OK,咱們在瀏覽器中輸入Http://localhost:8161/admin,提示輸入用戶名和密碼,用戶名:admin,密碼admin,

這個用戶名和密碼能夠在conf/jetty-realm.properties文件中去修改。

OK,管理界面以下

由於哥一直作.net開發,因此仍是以.net來作個demo,來作個消息的發送可接收。首先須要下載.net客戶端。去網站上找到下載.net 客戶端的地方

在這裏我下載的是1.5.3版本,由於1.6.0版本鏈接有問題。下載下來以後,咱們使用.net 4.0版本

OK,新建一個solution,建立兩個WinForm的工程,以下

注意要引用上面的兩個dll,這兩個dll分別在build和lib下

咱們看一下代碼,沒什麼好解釋的,先看生產者代碼

using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
namespace ActiveMQSender
{
public partial class FrmMsgSender : Form
{
IConnectionFactory factory;
IConnection connection;
ISession session;
IDestination destination;
IMessageProducer prod;
ITextMessage streamMessage;
~FrmMsgSender()
{
session.Close();
connection.Close();
}
public FrmMsgSender()
{
InitializeComponent();
this.Init();
}
private void Init()
{
factory = new ConnectionFactory("tcp://192.168.192.128:61616/");
connection = factory.CreateConnection("admin", "admin");
session = connection.CreateSession();
destination = new ActiveMQTopic("Bruce Test");
prod = session.CreateProducer(destination);
streamMessage = prod.CreateTextMessage();
}
private void btnSend_Click(object sender, EventArgs e)
{
streamMessage.Text=txtSendMsg.Text.Trim();
prod.Send(streamMessage,MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
}
}
}

再看消費者代碼

using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
namespace FrmMsgReceiver
{
public partial class FrmMsgReceiver : Form
{
IConnectionFactory factory;
IConnection connection;
ISession session;
IDestination destination;
IMessageConsumer consumer;
~FrmMsgReceiver()
{
session.Close();
connection.Close();
}
public FrmMsgReceiver()
{
InitializeComponent();
}
private void Init()
{
factory = new ConnectionFactory("tcp://192.168.192.128:61616/");
connection = factory.CreateConnection("admin", "admin");
session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
destination = new ActiveMQTopic("Bruce Test");
consumer = session.CreateConsumer(destination);
}
private void FrmMsgReceiver1_Load(object sender, EventArgs e)
{
this.Init();
IMessage streamMessage = consumer.Receive();
this.txtReceiveMsg.Text = (streamMessage as ActiveMQStreamMessage).ReadString();
}
}
}

OK,就是這麼簡單,我也不寫運行過程了,啓動VMPlayer後,機器的內存已經使用了90%了,可憐的dell 1420。

若是你們有興趣的話,看一下這篇文章

http://blog.csdn.net/bodybo/article/details/5647968

相關文章
相關標籤/搜索