使用aws sqs 作緩衝隊列(go)

背景

做爲一位剛進公司的小白,參與到項目的第一個任務是爲操做記錄的存儲增長消息隊列,爲何咱們要這麼作呢?緣由以下:在現有系統中咱們直接將用戶的操做記錄增長到mongodb數據庫中,可是在咱們的系統出現峯值的時候,發現mongodb受不了,爲此咱們要作到削峯這個功能,按照慣例咱們想到了使用消息隊列,同時因爲咱們在項目中廣泛採用aws的雲服務,爲此咱們採用了aws的消息隊列。html

注意事項

  1. aws sqs 收費是按照請求次數收費因此要儘可能使用批量操做
  2. aws sqs 的消費上線是12000次,最多容許12000個在傳遞的數據
  3. aws sqs 容量無限大
  4. aws sqs 的批量操做的上限是10條數據(畢竟是按次數收費)
  5. aws sqs並行取數據的過程當中可能會出現重複,咱們利用數據庫的ID來去重,注意咱們在生產id的時候使用mongodb本身的庫來生成,緣由是依照mongodb生成的id比較均勻,存入的數據庫中的樹形結構也比較平衡,效率比較高

操做步驟

使用aws sqs和使用其餘的消息隊列基本步驟一致,aws sqs的官方已經給出了很是詳盡的使用說明,儘可能參考官方文檔,下面給出簡單的操做步驟,以及示例代碼,代碼是用go寫的,其餘的語言能夠參考go的官方文檔git

  1. 配置aws sqs的鏈接信息
awsSqs := AwsSQS{}

creds := credentials.NewStaticCredentials("key", "secret", "")
sess := session.Must(session.NewSession(&aws.Config{
    Region:      aws.String("region"),
    Credentials: creds,
}))

awsSqs.svc = sqs.New(sess)
  1. 向aws sqs發送數據
// 將消息發送給隊列
func (awsSqs *AwsSQS) SendMessage(record string, qURL string) *Error {
    _, err := awsSqs.svc.SendMessage(&sqs.SendMessageInput{
        MessageBody: aws.String(record),
        QueueUrl:    &qURL,
    })
    if err != nil {
        Errorf("Error Send Message to sqs: err = %v", err)
        return NewError(ErrorCodeInnerError, err.Error())
    }
    return nil
}
  1. 從aws sqs 獲取數據
// 從隊列中獲取消息
func (awsSqs *AwsSQS) ReserveMessage(qURL string) (*sqs.ReceiveMessageOutput, *Error) {
    result, err := awsSqs.svc.ReceiveMessage(&sqs.ReceiveMessageInput{
        QueueUrl:            &qURL,
        MaxNumberOfMessages: aws.Int64(10),
        WaitTimeSeconds:     aws.Int64(10),
    })

    if err != nil {
        Errorf("Error aws sqs ReceiveMessage : err=%v ", err)
        return nil, NewError(ErrorCodeInnerError, err.Error())
    }

    return result, nil
}
  1. 從aws sqs 刪除數據
deleteMessageList := make([]*sqs.DeleteMessageBatchRequestEntry, 0)
deleteMessage := sqs.DeleteMessageBatchRequestEntry{Id: message.MessageId, ReceiptHandle: message.ReceiptHandle}
deleteMessageList = append(deleteMessageList, &deleteMessage)
// 將隊列中的消息刪除(批量刪除)
func (awsSqs *AwsSQS) DeleteMessage(list []*sqs.DeleteMessageBatchRequestEntry, qURL string) *Error {
    // delete message
    _, err := awsSqs.svc.DeleteMessageBatch(&sqs.DeleteMessageBatchInput{
        QueueUrl: &qURL,
        Entries:  list,
    })

    if err != nil {
        Errorf("Delete Message error:error =%v", err)
        return NewError(ErrorCodeInnerError, err.Error())
    }
    return nil
}
  1. 在存儲到moongodb的過程當中防止重複
// 自定義mongodb的_id,使用mongodb的庫來生成id
id := bson.NewObjectId().Hex()
entity.id = id
type entity struct {
    Id                 string `bson:"_id,omitempty"`
}

參考資料

官方代碼示例
aws 限制github

相關文章
相關標籤/搜索