本文主要研究一下openmessaging-javajava
<dependency> <groupId>io.openmessaging</groupId> <artifactId>openmessaging-api</artifactId> <version>0.3.1-alpha</version> </dependency>
maven最新的是0.3.1-alpha,這裏直接用源碼的0.3.2-alpha-SNAPSHOT
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.javaios
/** * A {@code Producer} is a simple object used to send messages on behalf * of a {@code MessagingAccessPoint}. An instance of {@code Producer} is * created by calling the {@link MessagingAccessPoint#createProducer()} method. * <p> * It provides various {@code send} methods to send a message to a specified destination, * which is a {@code Queue} in OMS. * <p> * {@link Producer#send(Message)} means send a message to the destination synchronously, * the calling thread will block until the send request complete. * <p> * {@link Producer#sendAsync(Message)} means send a message to the destination asynchronously, * the calling thread won't block and will return immediately. Since the send call is asynchronous * it returns a {@link Future} for the send result. * * @version OMS 1.0.0 * @since OMS 1.0.0 */ public interface Producer extends MessageFactory, ServiceLifecycle { /** * Returns the attributes of this {@code Producer} instance. * Changes to the return {@code KeyValue} are not reflected in physical {@code Producer}. * <p> * There are some standard attributes defined by OMS for {@code Producer}: * <ul> * <li> {@link OMSBuiltinKeys#PRODUCER_ID}, the unique producer id for a producer instance. * <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code Producer}. * </ul> * * @return the attributes */ KeyValue attributes(); /** * Sends a message to the specified destination synchronously, the destination should be preset to * {@link Message#sysHeaders()}, other header fields as well. * * @param message a message will be sent * @return the successful {@code SendResult} * @throws OMSMessageFormatException if an invalid message is specified. * @throws OMSTimeOutException if the given timeout elapses before the send operation completes * @throws OMSRuntimeException if the {@code Producer} fails to send the message due to some internal error. */ SendResult send(Message message); /** * Sends a message to the specified destination synchronously, using the specified attributes, the destination * should be preset to {@link Message#sysHeaders()}, other header fields as well. * * @param message a message will be sent * @param attributes the specified attributes * @return the successful {@code SendResult} * @throws OMSMessageFormatException if an invalid message is specified. * @throws OMSTimeOutException if the given timeout elapses before the send operation completes * @throws OMSRuntimeException if the {@code Producer} fails to send the message due to some internal error. */ SendResult send(Message message, KeyValue attributes); /** * Sends a transactional message to the specified destination synchronously, using the specified attributes, * the destination should be preset to {@link Message#sysHeaders()}, other header fields as well. * <p> * A transactional message will be exposed to consumer if and only if the local transaction * branch has been committed, or be discarded if local transaction has been rolled back. * * @param message a transactional message will be sent * @param branchExecutor local transaction executor associated with the message * @param attributes the specified attributes * @return the successful {@code SendResult} * @throws OMSMessageFormatException if an invalid message is specified. * @throws OMSTimeOutException if the given timeout elapses before the send operation completes * @throws OMSRuntimeException if the {@code Producer} fails to send the message due to some internal error. */ SendResult send(Message message, LocalTransactionExecutor branchExecutor, KeyValue attributes); /** * Sends a message to the specified destination asynchronously, the destination should be preset to * {@link Message#sysHeaders()}, other header fields as well. * <p> * The returned {@code Promise} will have the result once the operation completes, and the registered * {@code FutureListener} will be notified, either because the operation was successful or because of an error. * * @param message a message will be sent * @return the {@code Promise} of an asynchronous message send operation. * @see Future * @see FutureListener */ Future<SendResult> sendAsync(Message message); /** * Sends a message to the specified destination asynchronously, using the specified attributes, the destination * should be preset to {@link Message#sysHeaders()}, other header fields as well. * <p> * The returned {@code Promise} will have the result once the operation completes, and the registered * {@code FutureListener} will be notified, either because the operation was successful or because of an error. * * @param message a message will be sent * @param attributes the specified attributes * @return the {@code Promise} of an asynchronous message send operation. * @see Future * @see FutureListener */ Future<SendResult> sendAsync(Message message, KeyValue attributes); /** * Sends a message to the specified destination in one way, the destination should be preset to * {@link Message.BuiltinKeys}, other header fields as well. * <p> * There is no {@code Promise} related or {@code RuntimeException} thrown. The calling thread doesn't * care about the send result and also have no context to get the result. * * @param message a message will be sent */ void sendOneway(Message message); /** * Sends a message to the specified destination in one way, using the specified attributes, the destination * should be preset to {@link Message.BuiltinKeys}, other header fields as well. * <p> * There is no {@code Promise} related or {@code RuntimeException} thrown. The calling thread doesn't * care about the send result and also have no context to get the result. * * @param message a message will be sent * @param properties the specified userHeaders */ void sendOneway(Message message, KeyValue properties); /** * Creates a {@code BatchMessageSender} to send message in batch manner. * * @return a {@code BatchMessageSender} instance */ BatchMessageSender createBatchMessageSender(); /** * Adds a {@code ProducerInterceptor} to intercept send operations of producer. * * @param interceptor a producer interceptor */ void addInterceptor(ProducerInterceptor interceptor); /** * Removes a {@code ProducerInterceptor} * * @param interceptor a producer interceptor will be removed */ void removeInterceptor(ProducerInterceptor interceptor); }
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/Message.javagit
/** * The {@code Message} interface is the root interface of all OMS messages, and the most commonly used OMS message is * {@link BytesMessage}. * <p> * Most message-oriented middleware (MOM) products treat messages as lightweight entities that consist of * header and body and is used by separate applications to exchange a piece of information, * like <a href="http://rocketmq.apache.org/">Apache RocketMQ</a>. * <p> * The header contains fields used by the messaging system that describes the message's meta information, * like QoS level, origin, destination, and so on, while the body contains the application data being transmitted. * <p> * As for the header, OMS defines two kinds types: System Header and User Header, * with respect to flexibility in vendor implementation and user usage. * <ul> * <li> * System Header, OMS defines some standard attributes that represent the characteristics of the message. * </li> * <li> * User Header, some OMS vendors may require enhanced extra attributes of the message * or some users may want to clarify some customized attributes to draw the body. * OMS provides the improved scalability for these scenarios. * </li> * </ul> * The body contains the application data being transmitted, * which is generally ignored by the messaging system and simply transmitted to its destination. * <p> * In BytesMessage, the body is just a byte array, may be compressed and uncompressed * in the transmitting process by the messaging system. * The application is responsible for explaining the concrete content and format of the message body, * OMS is never aware of that. * * The body part is placed in the implementation classes of {@code Message}. * * @version OMS 1.0.0 * @since OMS 1.0.0 */ public interface Message { /** * Returns all the system header fields of the {@code Message} object as a {@code KeyValue}. * * @return the system headers of a {@code Message} * @see BuiltinKeys */ KeyValue sysHeaders(); /** * Returns all the customized user header fields of the {@code Message} object as a {@code KeyValue}. * * @return the user headers of a {@code Message} */ KeyValue userHeaders(); /** * Puts a {@code String}-{@code int} {@code KeyValue} entry to the system headers of a {@code Message}. * * @param key the key to be placed into the system headers * @param value the value corresponding to <tt>key</tt> */ Message putSysHeaders(String key, int value); /** * Puts a {@code String}-{@code long} {@code KeyValue} entry to the system headers of a {@code Message}. * * @param key the key to be placed into the system headers * @param value the value corresponding to <tt>key</tt> */ Message putSysHeaders(String key, long value); /** * Puts a {@code String}-{@code double} {@code KeyValue} entry to the system headers of a {@code Message}. * * @param key the key to be placed into the system headers * @param value the value corresponding to <tt>key</tt> */ Message putSysHeaders(String key, double value); /** * Puts a {@code String}-{@code String} {@code KeyValue} entry to the system headers of a {@code Message}. * * @param key the key to be placed into the system headers * @param value the value corresponding to <tt>key</tt> */ Message putSysHeaders(String key, String value); /** * Puts a {@code String}-{@code int} {@code KeyValue} entry to the user headers of a {@code Message}. * * @param key the key to be placed into the user headers * @param value the value corresponding to <tt>key</tt> */ Message putUserHeaders(String key, int value); /** * Puts a {@code String}-{@code long} {@code KeyValue} entry to the user headers of a {@code Message}. * * @param key the key to be placed into the user headers * @param value the value corresponding to <tt>key</tt> */ Message putUserHeaders(String key, long value); /** * Puts a {@code String}-{@code double} {@code KeyValue} entry to the user headers of a {@code Message}. * * @param key the key to be placed into the user headers * @param value the value corresponding to <tt>key</tt> */ Message putUserHeaders(String key, double value); /** * Puts a {@code String}-{@code String} {@code KeyValue} entry to the user headers of a {@code Message}. * * @param key the key to be placed into the user headers * @param value the value corresponding to <tt>key</tt> */ Message putUserHeaders(String key, String value); /** * Get message body * * @param type Message body type * @param <T> Generic type * @return message body * @throws OMSMessageFormatException if the message body cannot be assigned to the specified type */ <T> T getBody(Class<T> type) throws OMSMessageFormatException; interface BuiltinKeys { /** * The {@code MESSAGE_ID} header field contains a value that uniquely identifies * each message sent by a {@code Producer}. * <p> * When a message is sent, MESSAGE_ID is assigned by the producer. */ String MESSAGE_ID = "MESSAGE_ID"; /** * The {@code DESTINATION} header field contains the destination to which the message is being sent. * <p> * When a message is sent this value is set to the right {@code Queue}, then the message will be sent to * the specified destination. * <p> * When a message is received, its destination is equivalent to the {@code Queue} where the message resides in. */ String DESTINATION = "DESTINATION"; //...... } }
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/KeyValue.javagithub
/** * The {@code KeyValue} class represents a persistent set of attributes, * which supports method chaining. * <p> * A {@code KeyValue} object only allows {@code String} keys and can contain four primitive type * as values: {@code int}, {@code long}, {@code double}, {@code String}. * <p> * The {@code KeyValue} is a replacement of {@code Properties}, with simpler * interfaces and reasonable entry limits. * <p> * A {@code KeyValue} object may be used in concurrent scenarios, so the implementation * of {@code KeyValue} should consider concurrent related issues. * * @version OMS 1.0.0 * @since OMS 1.0.0 */ public interface KeyValue { /** * Inserts or replaces {@code int} value for the specified key. * * @param key the key to be placed into this {@code KeyValue} object * @param value the value corresponding to <tt>key</tt> */ KeyValue put(String key, int value); /** * Inserts or replaces {@code long} value for the specified key. * * @param key the key to be placed into this {@code KeyValue} object * @param value the value corresponding to <tt>key</tt> */ KeyValue put(String key, long value); /** * Inserts or replaces {@code double} value for the specified key. * * @param key the key to be placed into this {@code KeyValue} object * @param value the value corresponding to <tt>key</tt> */ KeyValue put(String key, double value); /** * Inserts or replaces {@code String} value for the specified key. * * @param key the key to be placed into this {@code KeyValue} object * @param value the value corresponding to <tt>key</tt> */ KeyValue put(String key, String value); /** * Searches for the {@code int} property with the specified key in this {@code KeyValue} object. * If the key is not found in this property list, zero is returned. * * @param key the property key * @return the value in this {@code KeyValue} object with the specified key value * @see #put(String, int) */ int getInt(String key); /** * Searches for the {@code int} property with the specified key in this {@code KeyValue} object. * If the key is not found in this property list, the default value argument is returned. * * @param key the property key * @param defaultValue a default value * @return the value in this {@code KeyValue} object with the specified key value * @see #put(String, int) */ int getInt(String key, int defaultValue); /** * Searches for the {@code long} property with the specified key in this {@code KeyValue} object. * If the key is not found in this property list, zero is returned. * * @param key the property key * @return the value in this {@code KeyValue} object with the specified key value * @see #put(String, long) */ long getLong(String key); /** * Searches for the {@code long} property with the specified key in this {@code KeyValue} object. * If the key is not found in this property list, the default value argument is returned. * * @param key the property key * @param defaultValue a default value * @return the value in this {@code KeyValue} object with the specified key value * @see #put(String, long) */ long getLong(String key, long defaultValue); /** * Searches for the {@code double} property with the specified key in this {@code KeyValue} object. * If the key is not found in this property list, zero is returned. * * @param key the property key * @return the value in this {@code KeyValue} object with the specified key value * @see #put(String, double) */ double getDouble(String key); /** * Searches for the {@code double} property with the specified key in this {@code KeyValue} object. * If the key is not found in this property list, the default value argument is returned. * * @param key the property key * @param defaultValue a default value * @return the value in this {@code KeyValue} object with the specified key value * @see #put(String, double) */ double getDouble(String key, double defaultValue); /** * Searches for the {@code String} property with the specified key in this {@code KeyValue} object. * If the key is not found in this property list, {@code null} is returned. * * @param key the property key * @return the value in this {@code KeyValue} object with the specified key value * @see #put(String, String) */ String getString(String key); /** * Searches for the {@code String} property with the specified key in this {@code KeyValue} object. * If the key is not found in this property list, the default value argument is returned. * * @param key the property key * @param defaultValue a default value * @return the value in this {@code KeyValue} object with the specified key value * @see #put(String, String) */ String getString(String key, String defaultValue); /** * Returns a {@link Set} view of the keys contained in this {@code KeyValue} object. * <p> * The set is backed by the {@code KeyValue}, so changes to the set are * reflected in the @code KeyValue}, and vice-versa. * * @return the key set view of this {@code KeyValue} object. */ Set<String> keySet(); /** * Tests if the specified {@code String} is a key in this {@code KeyValue}. * * @param key possible key * @return <code>true</code> if and only if the specified key is in this {@code KeyValue}, <code>false</code> * otherwise. */ boolean containsKey(String key); }
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/producer/SendResult.javaapache
/** * The result of sending a OMS message to server * with the message id and some attributes. * * @version OMS 1.0.0 * @since OMS 1.0.0 */ public interface SendResult { /** * The unique message id related to the {@code SendResult} instance. * * @return the message id */ String messageId(); }
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/producer/BatchMessageSender.javaapi
/** * A message sender created through {@link Producer#createBatchMessageSender()}, to send * messages in batch manner, and commit or roll back at the appropriate time. * * @version OMS 1.0.0 * @since OMS 1.0.0 */ public interface BatchMessageSender { /** * Submits a message to this sender * * @param message a message to be sent * @return this batch sender */ BatchMessageSender send(Message message); /** * Commits all the uncommitted messages in this sender. * * @throws OMSRuntimeException if the sender fails to commit the messages due to some internal error. */ void commit(); /** * Discards all the uncommitted messages in this sender. */ void rollback(); /** * Close this sender. */ void close(); }
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/PullConsumer.javaapp
/** * A {@code PullConsumer} pulls messages from the specified queue, * and supports submit the consume result by acknowledgement. * * @version OMS 1.0.0 * @see MessagingAccessPoint#createPullConsumer() * @since OMS 1.0.0 */ public interface PullConsumer extends ServiceLifecycle { /** * Returns the attributes of this {@code PullConsumer} instance. * Changes to the return {@code KeyValue} are not reflected in physical {@code PullConsumer}. * <p> * There are some standard attributes defined by OMS for {@code PullConsumer}: * <ul> * <li> {@link OMSBuiltinKeys#CONSUMER_ID}, the unique consumer id for a consumer instance. * <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code PullConsumer}. * </ul> * * @return the attributes */ KeyValue attributes(); /** * Attaches the {@code PullConsumer} to a specified queue. * * @param queueName a specified queue * @return this {@code PullConsumer} instance */ PullConsumer attachQueue(String queueName); /** * Attaches the {@code PullConsumer} to a specified queue with some specified attributes.. * * @param queueName a specified queue * @param attributes some specified attributes * @return this {@code PullConsumer} instance */ PullConsumer attachQueue(String queueName, KeyValue attributes); /** * Detaches the {@code PullConsumer} from a specified queue. * <p> * After the success call, this consumer won't receive new message * from the specified queue any more. * * @param queueName a specified queue * @return this {@code PullConsumer} instance */ PullConsumer detachQueue(String queueName); /** * Receives the next message from the attached queues of this consumer. * <p> * This call blocks indefinitely until a message is arrives, the timeout expires, * or until this {@code PullConsumer} is shut down. * * @return the next message received from the attached queues, or null if the consumer is * concurrently shut down or the timeout expires * @throws OMSRuntimeException if the consumer fails to pull the next message due to some internal error. */ Message receive(); /** * Receives the next message from the attached queues of this consumer, using the specified attributes. * <p> * This call blocks indefinitely until a message is arrives, the timeout expires, * or until this {@code PullConsumer} is shut down. * * @param attributes the specified attributes * @return the next message received from the attached queues, or null if the consumer is * concurrently shut down or the timeout expires * @throws OMSRuntimeException if the consumer fails to pull the next message due to some internal error. */ Message receive(KeyValue attributes); /** * Acknowledges the specified and consumed message with the unique message receipt handle. * <p> * Messages that have been received but not acknowledged may be redelivered. * * @param receiptHandle the receipt handle associated with the consumed message * @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error. */ void ack(String receiptHandle); /** * Acknowledges the specified and consumed message with the specified attributes. * <p> * Messages that have been received but not acknowledged may be redelivered. * * @param receiptHandle the receipt handle associated with the consumed message * @param attributes the specified attributes * @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error. */ void ack(String receiptHandle, KeyValue attributes); }
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/PushConsumer.javaasync
/** * A {@code PushConsumer} receives messages from multiple queues, these messages are pushed from * MOM server to {@code PushConsumer} client. * * @version OMS 1.0.0 * @see MessagingAccessPoint#createPushConsumer() * @since OMS 1.0.0 */ public interface PushConsumer extends ServiceLifecycle { /** * Returns the attributes of this {@code PushConsumer} instance. * Changes to the return {@code KeyValue} are not reflected in physical {@code PushConsumer}. * <p> * There are some standard attributes defined by OMS for {@code PushConsumer}: * <ul> * <li> {@link OMSBuiltinKeys#CONSUMER_ID}, the unique consumer id for a consumer instance. * <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code PushConsumer}. * </ul> * * @return the attributes */ KeyValue attributes(); /** * Resumes the {@code PushConsumer} after a suspend. * <p> * This method resumes the {@code PushConsumer} instance after it was suspended. * The instance will not receive new messages between the suspend and resume calls. * * @throws OMSRuntimeException if the instance has not been suspended. * @see PushConsumer#suspend() */ void resume(); /** * Suspends the {@code PushConsumer} for later resumption. * <p> * This method suspends the consumer until it is resumed. * The consumer will not receive new messages between the suspend and resume calls. * <p> * This method behaves exactly as if it simply performs the call {@code suspend(0)}. * * @throws OMSRuntimeException if the instance is not currently running. * @see PushConsumer#resume() */ void suspend(); /** * Suspends the {@code PushConsumer} for later resumption. * <p> * This method suspends the consumer until it is resumed or a * specified amount of time has elapsed. * The consumer will not receive new messages during the suspended state. * <p> * This method is similar to the {@link #suspend()} method, but it allows finer control * over the amount of time to suspend, and the consumer will be suspended until it is resumed * if the timeout is zero. * * @param timeout the maximum time to suspend in milliseconds. * @throws OMSRuntimeException if the instance is not currently running. */ void suspend(long timeout); /** * This method is used to find out whether the {@code PushConsumer} is suspended. * * @return true if this {@code PushConsumer} is suspended, false otherwise */ boolean isSuspended(); /** * Attaches the {@code PushConsumer} to a specified queue, with a {@code MessageListener}. * <p> * {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new * delivered message is coming. * * @param queueName a specified queue * @param listener a specified listener to receive new message * @return this {@code PushConsumer} instance */ PushConsumer attachQueue(String queueName, MessageListener listener); /** * Attaches the {@code PushConsumer} to a specified queue, with a {@code MessageListener} and some * specified attributes. * <p> * {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new * delivered message is coming. * * @param queueName a specified queue * @param listener a specified listener to receive new message * @param attributes some specified attributes * @return this {@code PushConsumer} instance */ PushConsumer attachQueue(String queueName, MessageListener listener, KeyValue attributes); /** * Detaches the {@code PushConsumer} from a specified queue. * <p> * After the success call, this consumer won't receive new message * from the specified queue any more. * * @param queueName a specified queue * @return this {@code PushConsumer} instance */ PushConsumer detachQueue(String queueName); /** * Adds a {@code ConsumerInterceptor} instance to this consumer. * * @param interceptor an interceptor instance */ void addInterceptor(ConsumerInterceptor interceptor); /** * Removes an interceptor from this consumer. * * @param interceptor an interceptor to be removed */ void removeInterceptor(ConsumerInterceptor interceptor); }
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.javamaven
/** * A message listener must implement this {@code MessageListener} interface and register * itself to a consumer instance to asynchronously receive messages. * * @version OMS 1.0.0 * @since OMS 1.0.0 */ public interface MessageListener { /** * Callback method to receive incoming messages. * <p> * A message listener should handle different types of {@code Message}. * * @param message the received message object * @param context the context delivered to the consume thread */ void onReceived(Message message, Context context); interface Context { /** * Returns the attributes of this {@code MessageContext} instance. * * @return the attributes */ KeyValue attributes(); /** * Acknowledges the specified and consumed message, which is related to this {@code MessageContext}. * <p> * Messages that have been received but not acknowledged may be redelivered. * * @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error. */ void ack(); } }
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/StreamingConsumer.javaide
/** * A {@code StreamingConsumer} provides low level APIs to open multiple streams * from a specified queue and then retrieve messages from them through @{code StreamingIterator}. * * A {@code Queue} is consists of multiple streams, the {@code Stream} is an abstract concept and * can be associated with partition in most messaging systems. * * @version OMS 1.0.0 * @since OMS 1.0.0 */ public interface StreamingConsumer extends ServiceLifecycle { /** * Returns the attributes of this {@code StreamingConsumer} instance. * Changes to the return {@code KeyValue} are not reflected in physical {@code StreamingConsumer}. * <p> * There are some standard attributes defined by OMS for {@code StreamingConsumer}: * <ul> * <li> {@link OMSBuiltinKeys#CONSUMER_ID}, the unique consumer id for a consumer instance. * <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code * StreamingConsumer}. * </ul> * * @return the attributes */ KeyValue attributes(); /** * Creates a {@code StreamingIterator} from the end position of the specified stream. * * @param streamName the specified stream * @return a message iterator at the begin position. */ StreamingIterator seekToEnd(String streamName); /** * Creates a {@code StreamingIterator} from the begin position of the specified stream. * * @param streamName the specified stream * @return a message iterator at the begin position. */ StreamingIterator seekToBeginning(String streamName); /** * Creates a {@code StreamingIterator} from the fixed position of the specified stream. * <p> * Creates a {@code StreamingIterator} from the begin position if the given position * is earlier than the first message's store position in this stream. * <p> * Creates a {@code StreamingIterator} from the end position, if the given position * is later than the last message's store position in this stream. * <p> * The position is a {@code String} value, may represented by timestamp, offset, cursor, * even a casual key. * * @param streamName the specified stream * @param position the specified position * @return a message iterator at the specified position */ StreamingIterator seek(String streamName, String position); }
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/StreamingIterator.java
/** * A {@code StreamingIterator} is provided by {@code Stream} and is used to * retrieve messages a specified stream like a read-only iterator. * * @version OMS 1.0.0 * @since OMS 1.0.0 */ public interface StreamingIterator { /** * Returns the attributes of this {@code StreamingIterator} instance. * <p> * There are some standard attributes defined by OMS for {@code Stream}: * <ul> * <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code * Stream}. * </ul> * * @return the attributes */ KeyValue attributes(); /** * Returns {@code true} if this iterator has more messages when * traversing the iterator in the forward direction. * * @return {@code true} if the iterator has more messages */ boolean hasNext(); /** * Returns the next message in the iteration and advances the offset position. * <p> * This method may be called repeatedly to iterate through the iteration, * or intermixed with calls to {@link #previous} to go back and forth. * * @return the next message in the list * @throws OMSRuntimeException if the iteration has no more message, or * the the consumer fails to receive the next message */ Message next(); /** * Returns {@code true} if this partition iterator has more messages when * traversing the iterator in the reverse direction. * * @return {@code true} if the partition iterator has more messages when * traversing the iterator in the reverse direction */ boolean hasPrevious(); /** * Returns the previous message in the iteration and moves the offset * position backwards. * <p> * This method may be called repeatedly to iterate through the iteration backwards, * or intermixed with calls to {@link #next} to go back and forth. * * @return the previous message in the list * @throws OMSRuntimeException if the iteration has no previous message, or * the the consumer fails to receive the previous message */ Message previous(); /** * Returns the position of the message that would be returned by a * subsequent call to {@link #next}. * * @return the position of the next message * @throws OMSRuntimeException if the iteration has no next message */ String nextPosition(); /** * Returns the position of the message that would be returned by a * subsequent call to {@link #previous}. * * @return the position of the previous message * @throws OMSRuntimeException if the iteration has no previous message */ String previousPosition(); }
openmessaging-java的定義java實現OpenMessaging的api規範,其中producer提供了單個發送也提供了批量發送的方法,而consumer則提供了pull、push以及stream三類消費方式。