本文主要研究一下rocketmq-client-go的transactionProducer異步
rocketmq-client-go-v2.0.0/producer/producer.gocode
type transactionProducer struct { producer *defaultProducer listener primitive.TransactionListener }
rocketmq-client-go-v2.0.0/producer/producer.gorem
func NewTransactionProducer(listener primitive.TransactionListener, opts ...Option) (*transactionProducer, error) { producer, err := NewDefaultProducer(opts...) if err != nil { return nil, errors.Wrap(err, "NewDefaultProducer failed.") } return &transactionProducer{ producer: producer, listener: listener, }, nil }
rocketmq-client-go-v2.0.0/producer/producer.gostring
func (tp *transactionProducer) Start() error { go primitive.WithRecover(func() { tp.checkTransactionState() }) return tp.producer.Start() }
rocketmq-client-go-v2.0.0/producer/producer.goit
func (tp *transactionProducer) checkTransactionState() { for ch := range tp.producer.callbackCh { switch callback := ch.(type) { case *internal.CheckTransactionStateCallback: localTransactionState := tp.listener.CheckLocalTransaction(callback.Msg) uniqueKey := callback.Msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex) if uniqueKey == "" { uniqueKey = callback.Msg.MsgId } header := &internal.EndTransactionRequestHeader{ CommitLogOffset: callback.Header.CommitLogOffset, ProducerGroup: tp.producer.group, TranStateTableOffset: callback.Header.TranStateTableOffset, FromTransactionCheck: true, MsgID: uniqueKey, TransactionId: callback.Header.TransactionId, CommitOrRollback: tp.transactionState(localTransactionState), } req := remote.NewRemotingCommand(internal.ReqENDTransaction, header, nil) req.Remark = tp.errRemark(nil) err := tp.producer.client.InvokeOneWay(context.Background(), callback.Addr.String(), req, tp.producer.options.SendMsgTimeout) if err != nil { rlog.Error("send ReqENDTransaction to broker error", map[string]interface{}{ "callback": callback.Addr.String(), "request": req.String(), rlog.LogKeyUnderlayError: err, }) } default: rlog.Error(fmt.Sprintf("unknown type %v", ch), nil) } } }
rocketmq-client-go-v2.0.0/producer/producer.goio
func (tp *transactionProducer) Shutdown() error { return tp.producer.Shutdown() }
rocketmq-client-go-v2.0.0/producer/producer.gocli
func (tp *transactionProducer) SendMessageInTransaction(ctx context.Context, msg *primitive.Message) (*primitive.TransactionSendResult, error) { msg.WithProperty(primitive.PropertyTransactionPrepared, "true") msg.WithProperty(primitive.PropertyProducerGroup, tp.producer.options.GroupName) rsp, err := tp.producer.SendSync(ctx, msg) if err != nil { return nil, err } localTransactionState := primitive.UnknowState switch rsp.Status { case primitive.SendOK: if len(rsp.TransactionID) > 0 { msg.WithProperty("__transactionId__", rsp.TransactionID) } transactionId := msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex) if len(transactionId) > 0 { msg.TransactionId = transactionId } localTransactionState = tp.listener.ExecuteLocalTransaction(msg) if localTransactionState != primitive.CommitMessageState { rlog.Error("executeLocalTransaction but state unexpected", map[string]interface{}{ "localState": localTransactionState, "message": msg, }) } case primitive.SendFlushDiskTimeout, primitive.SendFlushSlaveTimeout, primitive.SendSlaveNotAvailable: localTransactionState = primitive.RollbackMessageState default: } tp.endTransaction(*rsp, err, localTransactionState) transactionSendResult := &primitive.TransactionSendResult{ SendResult: rsp, State: localTransactionState, } return transactionSendResult, nil }
transactionProducer定義了producer及listener屬性;它提供了NewTransactionProducer、Start、Shutdown、SendMessageInTransaction方法List