在以前的工做中,微服務框架使用的是springcloud,消息中間件使用的rocketmq,這段時間看到阿里出了spring cloud alibaba集成了rocketmq,出於好奇,寫了個demojava
第一步:下載:https://www.apache.org/dyn/cl...
第二步:解壓
第三步:修改三個配置文件:runbroker.sh,runserver.sh,tools.sh,將其中JAVA_HOME改爲本身電腦的環境配置,修改完以下web
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=本身的地址 #[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java [ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
第四步:依次執行命令spring
./mqnamesrv ./mqbroker -n localhost:9876 ./mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic
若是啓動成功,沒有報錯,表明啓動成功哈,下面就能夠開發了apache
第一步:導入相關的pomjson
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>0.2.1.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency>
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 爲了Endpoint 信息查看 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>io.dropwizard.metrics</groupId> <artifactId>metrics-core</artifactId> <version>3.2.6</version> </dependency>
第二步:建一個springboot項目,啓動類以下:api
@SpringBootApplication @EnableBinding({ Source.class, Sink.class }) public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
第三步:建立provider瀏覽器
@Service public class RocketmqProducer { public void send(String message) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("test_producer_group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); Message msg = new Message("test-topic", "test-tag", message.getBytes()); producer.send(msg); } }
第四步:建立consumerspringboot
@Service public class ReceiveService { /** * 默認是input,在Sink類中指定,若是想要多個input,須要寫一個實現Sink的類 * @param receiveMsg */ @StreamListener("input") public void receiveInput1(String receiveMsg) { System.out.println("input receive: " + receiveMsg); } }
第五步:加入配置文件:數據結構
server.port=8087 spring.application.name=spring-cloud-alibaba-rocketmq-demo # 配置rocketmq的nameserver地址 spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876 # 定義name爲output的binding spring.cloud.stream.bindings.output.destination=test-topic spring.cloud.stream.bindings.output.content-type=application/json #定義name爲input的binding spring.cloud.stream.bindings.input.destination=test-topic spring.cloud.stream.bindings.input.content-type=application/json spring.cloud.stream.bindings.input.group=test-group management.endpoint.health.show-details=always
第六步:寫一個controller,啓動項目,訪問接口app
@RestController @RequestMapping(value = "/api/demo/test") public class TestController { @Autowired RocketmqProducer rocketmqProducer; @RequestMapping(value = "/send", method = RequestMethod.GET) public String send() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { rocketmqProducer.send("test rocketmq message"); return "success"; } }
會看到控制檯輸出:input receive: test rocketmq message
瀏覽器輸入:http://127.0.0.1:8087/actuator/rocketmq-binder
這一篇文章只是將spring cloud stream 和 rocketmq跑通了,其實對於spring cloud stream和rocketmq仍是學習的階段,只能感嘆spring cloud博大精深
更多網站能夠訪問https://www.zplxjj.com或關注公衆號: