<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>2.17.0</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-spring</artifactId> <version>2.17.0</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-spring-redis</artifactId> <version>2.17.0</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-stream</artifactId> <version>2.17.0</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.11</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-camel_2.11</artifactId> <version>2.4.0</version> </dependency> </dependencies>
/** * Created by sam on 5/9/16. */ public class MyRedisProducer extends UntypedProducerActor { public void preStart() { super.preStart(); } @Override public String getEndpointUri() { return "spring-redis://localhost:9999?connectionFactory=#connectionFactory&serializer=#serializer"; } @Override public void onRouteResponse(Object message) { System.out.println("response from route:{}" + message); } @Override public Object onTransformOutgoingMessage(Object message) { if (message instanceof CamelMessage) { CamelMessage camelMessage = (CamelMessage) message; return camelMessage; } else { Map<String, Object> headers = new HashMap<String, Object>(); headers.put("CamelRedis.Command", "PUBLISH"); headers.put("CamelRedis.Channel", "testChannel"); headers.put("CamelRedis.Message", message.toString()); CamelMessage camelMessage = new CamelMessage(message, headers); return camelMessage; } } }
/** * Created by sam on 5/9/16. */ public class MyRedisConsumer extends UntypedConsumerActor { @Override public String getEndpointUri() { return "spring-redis://localhost:9999?connectionFactory=#connectionFactory&serializer=#serializer&channels=testChannel&command=SUBSCRIBE"; } @Override public void onReceive(Object o) throws Exception { System.out.println(o); if (o instanceof CamelMessage) { CamelMessage msg = (CamelMessage) o; System.out.println(msg.getBodyAs(String.class, getCamelContext())); } } }
/** * Created by sam on 5/9/16. */ public class RedisTest { public static void main(String[] args) throws Exception { ActorSystem system = ActorSystem.create("redis-actor"); Camel camel = CamelExtension.get(system); // 獲取Camel對象,該對象能夠直接操做Camel,好比獲取CamelContext對象等。 PropertyPlaceholderDelegateRegistry delegateRegistry = (PropertyPlaceholderDelegateRegistry) camel.context().getRegistry(); JndiRegistry registry = (JndiRegistry) delegateRegistry.getRegistry(); // Apache Camel默認使用JndiRegistry來註冊類信息。 if (registry.lookup("connectionFactory") == null && registry.lookup("serializer") == null) { // 添加beans JedisConnectionFactory connectionFactory = new JedisConnectionFactory(); connectionFactory.setHostName("localhost"); connectionFactory.setPassword("1234567890"); connectionFactory.setPort(9999); // call this method to initialize connection factory connectionFactory.afterPropertiesSet(); registry.bind("connectionFactory", connectionFactory); registry.bind("serializer", new StringRedisSerializer()); } // 建立producer和consumer ActorRef producer = system.actorOf(Props.create(MyRedisProducer.class), "redisProducer"); ActorRef consumer = system.actorOf(Props.create(MyRedisConsumer.class), "redisConsumer"); while (true) { Thread.sleep(1000); producer.tell(new Date().toString(), ActorRef.noSender()); } } }
java.naming.factory.initial = org.apache.camel.util.jndi.CamelInitialContextFactory
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. response from route:{}CamelMessage(null, Map()) CamelMessage(Thu May 12 09:38:04 CST 2016, Map(MessageExchangeId -> ID-localhost-localdomain-33258-1463017082575-0-2, breadcrumbId -> ID-localhost-localdomain-33258-1463017082575-0-1, CamelRedis.Channel -> testChannel, CamelRedis.Pattern -> [B@5910162e)) Thu May 12 09:38:04 CST 2016 response from route:{}CamelMessage(null, Map()) CamelMessage(Thu May 12 09:38:05 CST 2016, Map(MessageExchangeId -> ID-localhost-localdomain-33258-1463017082575-0-4, breadcrumbId -> ID-localhost-localdomain-33258-1463017082575-0-3, CamelRedis.Channel -> testChannel, CamelRedis.Pattern -> [B@4154c5ce)) Thu May 12 09:38:05 CST 2016 response from route:{}CamelMessage(null, Map()) CamelMessage(Thu May 12 09:38:06 CST 2016, Map(MessageExchangeId -> ID-localhost-localdomain-33258-1463017082575-0-6, breadcrumbId -> ID-localhost-localdomain-33258-1463017082575-0-5, CamelRedis.Channel -> testChannel, CamelRedis.Pattern -> [B@2cb03be0)) Thu May 12 09:38:06 CST 2016