做爲一位剛進公司的小白,參與到項目的第一個任務是爲操做記錄的存儲增長消息隊列,爲何咱們要這麼作呢?緣由以下:在現有系統中咱們直接將用戶的操做記錄增長到mongodb數據庫中,可是在咱們的系統出現峯值的時候,發現mongodb受不了,爲此咱們要作到削峯這個功能,按照慣例咱們想到了使用消息隊列,同時因爲咱們在項目中廣泛採用aws的雲服務,爲此咱們採用了aws的消息隊列。html
使用aws sqs和使用其餘的消息隊列基本步驟一致,aws sqs的官方已經給出了很是詳盡的使用說明,儘可能參考官方文檔,下面給出簡單的操做步驟,以及示例代碼,代碼是用go寫的,其餘的語言能夠參考go的官方文檔git
awsSqs := AwsSQS{} creds := credentials.NewStaticCredentials("key", "secret", "") sess := session.Must(session.NewSession(&aws.Config{ Region: aws.String("region"), Credentials: creds, })) awsSqs.svc = sqs.New(sess)
// 將消息發送給隊列 func (awsSqs *AwsSQS) SendMessage(record string, qURL string) *Error { _, err := awsSqs.svc.SendMessage(&sqs.SendMessageInput{ MessageBody: aws.String(record), QueueUrl: &qURL, }) if err != nil { Errorf("Error Send Message to sqs: err = %v", err) return NewError(ErrorCodeInnerError, err.Error()) } return nil }
// 從隊列中獲取消息 func (awsSqs *AwsSQS) ReserveMessage(qURL string) (*sqs.ReceiveMessageOutput, *Error) { result, err := awsSqs.svc.ReceiveMessage(&sqs.ReceiveMessageInput{ QueueUrl: &qURL, MaxNumberOfMessages: aws.Int64(10), WaitTimeSeconds: aws.Int64(10), }) if err != nil { Errorf("Error aws sqs ReceiveMessage : err=%v ", err) return nil, NewError(ErrorCodeInnerError, err.Error()) } return result, nil }
deleteMessageList := make([]*sqs.DeleteMessageBatchRequestEntry, 0) deleteMessage := sqs.DeleteMessageBatchRequestEntry{Id: message.MessageId, ReceiptHandle: message.ReceiptHandle} deleteMessageList = append(deleteMessageList, &deleteMessage) // 將隊列中的消息刪除(批量刪除) func (awsSqs *AwsSQS) DeleteMessage(list []*sqs.DeleteMessageBatchRequestEntry, qURL string) *Error { // delete message _, err := awsSqs.svc.DeleteMessageBatch(&sqs.DeleteMessageBatchInput{ QueueUrl: &qURL, Entries: list, }) if err != nil { Errorf("Delete Message error:error =%v", err) return NewError(ErrorCodeInnerError, err.Error()) } return nil }
// 自定義mongodb的_id,使用mongodb的庫來生成id id := bson.NewObjectId().Hex() entity.id = id
type entity struct { Id string `bson:"_id,omitempty"` }