聊聊rocketmq-client-go的transactionProducer

本文主要研究一下rocketmq-client-go的transactionProducer異步

transactionProducer

rocketmq-client-go-v2.0.0/producer/producer.gocode

type transactionProducer struct {
    producer *defaultProducer
    listener primitive.TransactionListener
}
  • transactionProducer定義了producer及listener屬性

NewTransactionProducer

rocketmq-client-go-v2.0.0/producer/producer.gorem

func NewTransactionProducer(listener primitive.TransactionListener, opts ...Option) (*transactionProducer, error) {
    producer, err := NewDefaultProducer(opts...)
    if err != nil {
        return nil, errors.Wrap(err, "NewDefaultProducer failed.")
    }
    return &transactionProducer{
        producer: producer,
        listener: listener,
    }, nil
}
  • NewTransactionProducer方法實例化transactionProducer

Start

rocketmq-client-go-v2.0.0/producer/producer.gostring

func (tp *transactionProducer) Start() error {
    go primitive.WithRecover(func() {
        tp.checkTransactionState()
    })
    return tp.producer.Start()
}
  • Start方法先異步執行tp.checkTransactionState(),而後執行tp.producer.Start()

checkTransactionState

rocketmq-client-go-v2.0.0/producer/producer.goit

func (tp *transactionProducer) checkTransactionState() {
    for ch := range tp.producer.callbackCh {
        switch callback := ch.(type) {
        case *internal.CheckTransactionStateCallback:
            localTransactionState := tp.listener.CheckLocalTransaction(callback.Msg)
            uniqueKey := callback.Msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
            if uniqueKey == "" {
                uniqueKey = callback.Msg.MsgId
            }
            header := &internal.EndTransactionRequestHeader{
                CommitLogOffset:      callback.Header.CommitLogOffset,
                ProducerGroup:        tp.producer.group,
                TranStateTableOffset: callback.Header.TranStateTableOffset,
                FromTransactionCheck: true,
                MsgID:                uniqueKey,
                TransactionId:        callback.Header.TransactionId,
                CommitOrRollback:     tp.transactionState(localTransactionState),
            }

            req := remote.NewRemotingCommand(internal.ReqENDTransaction, header, nil)
            req.Remark = tp.errRemark(nil)

            err := tp.producer.client.InvokeOneWay(context.Background(), callback.Addr.String(), req,
                tp.producer.options.SendMsgTimeout)
            if err != nil {
                rlog.Error("send ReqENDTransaction to broker error", map[string]interface{}{
                    "callback":               callback.Addr.String(),
                    "request":                req.String(),
                    rlog.LogKeyUnderlayError: err,
                })
            }
        default:
            rlog.Error(fmt.Sprintf("unknown type %v", ch), nil)
        }
    }
}
  • checkTransactionState方法遍歷tp.producer.callbackCh,根據type來不一樣處理,目前支持CheckTransactionStateCallback,它會構造EndTransactionRequestHeader執行tp.producer.client.InvokeOneWay

Shutdown

rocketmq-client-go-v2.0.0/producer/producer.goio

func (tp *transactionProducer) Shutdown() error {
    return tp.producer.Shutdown()
}
  • Shutdown方法執行tp.producer.Shutdown()

SendMessageInTransaction

rocketmq-client-go-v2.0.0/producer/producer.gocli

func (tp *transactionProducer) SendMessageInTransaction(ctx context.Context, msg *primitive.Message) (*primitive.TransactionSendResult, error) {
    msg.WithProperty(primitive.PropertyTransactionPrepared, "true")
    msg.WithProperty(primitive.PropertyProducerGroup, tp.producer.options.GroupName)

    rsp, err := tp.producer.SendSync(ctx, msg)
    if err != nil {
        return nil, err
    }
    localTransactionState := primitive.UnknowState
    switch rsp.Status {
    case primitive.SendOK:
        if len(rsp.TransactionID) > 0 {
            msg.WithProperty("__transactionId__", rsp.TransactionID)
        }
        transactionId := msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
        if len(transactionId) > 0 {
            msg.TransactionId = transactionId
        }
        localTransactionState = tp.listener.ExecuteLocalTransaction(msg)
        if localTransactionState != primitive.CommitMessageState {
            rlog.Error("executeLocalTransaction but state unexpected", map[string]interface{}{
                "localState": localTransactionState,
                "message":    msg,
            })
        }

    case primitive.SendFlushDiskTimeout, primitive.SendFlushSlaveTimeout, primitive.SendSlaveNotAvailable:
        localTransactionState = primitive.RollbackMessageState
    default:
    }

    tp.endTransaction(*rsp, err, localTransactionState)

    transactionSendResult := &primitive.TransactionSendResult{
        SendResult: rsp,
        State:      localTransactionState,
    }

    return transactionSendResult, nil
}
  • SendMessageInTransaction方法先執行tp.producer.SendSync(ctx, msg),而後根據rsp.Status來作不一樣處理;對於primitive.SendOK執行tp.listener.ExecuteLocalTransaction來更新localTransactionState;對於primitive.SendFlushDiskTimeout、primitive.SendFlushSlaveTimeout、primitive.SendSlaveNotAvailable則更新localTransactionState爲primitive.RollbackMessageState;最後執行tp.endTransaction

小結

transactionProducer定義了producer及listener屬性;它提供了NewTransactionProducer、Start、Shutdown、SendMessageInTransaction方法List

doc

相關文章
相關標籤/搜索