Spring集成rabbitmq消息中間件

一、消息生產者配置spring

<!-- rabbitmq服務配置-->
<rabbit:connection-factory id="connectionFactory" host="192.168.188.128" username="root" password="root" port="5672" />
<rabbit:admin connection-factory="connectionFactory" />

<!-- 聲明隊列 -->
<rabbit:queue name="myQueue" id="myQueue" durable="true" auto-delete="false" exclusive="false" />

<!-- 任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的全部Queue上 -->
<rabbit:direct-exchange name="my_mq_exchange" durable="true" auto-delete="false" id="my_mq_exchange">
   <rabbit:bindings>
      <rabbit:binding queue="myQueue" key="myQueue_key"></rabbit:binding>
   </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:template id="amqpTemplate" queue="myQueue" routing-key="myQueue_key" connection-factory="connectionFactory" exchange="my_mq_exchange"></rabbit:template>

發送消息app

@Autowired
private UserService userService;

@Autowired
private AmqpTemplate amqpTemplate;

@RequestMapping("/test.do")
public void test(){
   userService.test();
   //System.out.println("aaaaaaaaaaaaaaaaaaaaaaaaaaa");
   //System.out.println("使用rabbitmq發送消息啦啦啦啦啦啦啦");
   amqpTemplate.convertAndSend("我是rabbitmq生產者,我在發送消息啦啦啦啦啦啦啦");
}

 

二、消息消費者配置線程

<!-- 線程池配置 -->
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
   <!-- 線程池維護線程的最少數量 -->
   <property name="corePoolSize" value="5" />
   <!-- 線程池維護線程所容許的空閒時間 -->
   <property name="keepAliveSeconds" value="30000" />
   <!-- 線程池維護線程的最大數量 -->
   <property name="maxPoolSize" value="1000" />
   <!-- 線程池使用的緩衝隊列 -->
   <property name="queueCapacity" value="200" />
</bean>


<!-- rabbitmq服務配置 默認端口爲5672-->
<rabbit:connection-factory id="connectionFactory" host="192.168.188.128" username="root" password="root" port="5672" />

<rabbit:admin connection-factory="connectionFactory" />

<!-- 聲明隊列-->
<rabbit:queue id="myQueue" name="myQueue" durable="true" auto-delete="false" exclusive="false"/>

<!-- 任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的全部Queue上-->
<rabbit:direct-exchange name="my_mq_exchange" id="my_mq_exchange" durable="true" auto-delete="false">
   <rabbit:bindings>
      <rabbit:binding queue="myQueue" key="myQueue_key"></rabbit:binding>
   </rabbit:bindings>
</rabbit:direct-exchange>

<bean id="MyQueueListener" class="com.mall.listener.rabbitmq.MyQueueListener"></bean>

<!-- queue listener 觀察監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor">
   <rabbit:listener queues="myQueue" ref="MyQueueListener"></rabbit:listener>
</rabbit:listener-container>

 

接收消息對象

public class MyQueueListener implements MessageListener{


    public void onMessage(Message message){

        //System.out.println("我是rabbit消費者,我在消費生產者生產的消息啦啦啦啦啦啦啦");
        System.out.println(new String(message.getBody()));
    }

}
相關文章
相關標籤/搜索