利用京東雲Serverless服務快速構建5G時代的IoT應用

10月31日,在2019年中國國際信息通訊展覽會上,工信部宣佈:5G商用正式啓動。5G商用時代來了!json

5G的商用,使得數據傳輸速度、響應速度、鏈接數據、數據傳輸量、傳輸可靠性等方面都有了顯著的提高,這一技術的突破才使得不少領域的應用場景得以真正的落地實施,走進普通人的生活,而這其中就包括物聯網。服務器

雖然物聯網的概念很簡明,就是把物品與互聯網相鏈接並進行信息交換和通信,從而實現萬物智能化,可是因爲各類緣由,商業化的應用並無大規模的落地,其中一方面是因爲網絡傳輸等緣由的限制。現在,5G的到來使得物聯網應用在網絡層面再也不受限,更好地促進了物聯網的發展。對於企業來講,快速構建一個商用的物聯網服務,搶佔先機就顯得尤其重要。網絡

京東雲Serverless服務正適應了現在企業的這種需求。Serverless一系列產品生態可讓用戶以業務爲導向,無需關心底層服務器部署以及承載能力,實施週期短,無預付價格按使用量付費。這些特性是適應互聯網5G快速時代、快速構建的不二選擇,爲企業節省成本的同時解決了技術上的難題。session

如下面一個車聯網數據爲例子,咱們來看一下如何用隊列服務來構建一個高可用高可靠的無服務應用。在車聯網應用中客戶端負載可能會在24小時內擴展/縮減三、4個數量級,這些特性自然適合Serverless服務動態擴縮,按量付費。app

案例場景:車聯網中搭載數據收集傳感器的汽車向雲端傳輸行駛數據,雲端利用隊列服務削峯填谷、動態擴縮的能力接收數據,而後轉存到Elasticsearch進行更好的數據檢索和應用,爲用戶提供更好的服務或者爲公司提供更多的商業價值。
需求:隊列服務SDK,隊列服務接入點地址,Elasticsearch接入點地址(已經建立好ES實例),京東雲用戶AK/SK
代碼語言: Go

Step1

建立隊列服務客戶端以及資源建立less

1func CreateClient() *sqs.SQS {
 2    ses, _ := session.NewSession(&aws.Config{
 3        Region: aws.String("cn-north-1"),
 4        Credentials: credentials.NewStaticCredentials("your AccessKey",
 5            "your SecretKey", ""),
 6        Endpoint:   aws.String("http://jqs.cn-north-1.jdcloud.com"),
 7        DisableSSL: aws.Bool(true),
 8    })
 9    _, err := ses.Config.Credentials.Get()
10    if err != nil {
11        log.Fatal("憑據建立失敗", err)
12    }
13    client := sqs.New(ses)
14    return client
15}
16
17func CreateQueueTask(name string) string {
18    resp, err := sqsClient.CreateQueue(&sqs.CreateQueueInput{
19        QueueName: aws.String(name),
20    })
21    if err != nil {
22        log.Println("Create Queue Failed:", err)
23        return ""
24    }
25    return *resp.QueueUrl
26}

Step2

每一個車輛設備發送消息運維

1func SendTask(url string, message interface{}) {
 2    body, _ := json.Marshal(message)
 3    _, err := sqsClient.SendMessage(&sqs.SendMessageInput{
 4        MessageBody: aws.String(string(body)),
 5        QueueUrl:    aws.String(url),
 6    })
 7    if err != nil {
 8        log.Println("Send Message Failed:", err)
 9        return
10    }
11    return
12}

測試數據:函數

測試機器的配置:CPU6四、內存256G、帶寬100Mbps(京東云云主機)post

場景:公網;send-單條(JQS)測試

Step3

從隊列服務中拉取消息轉存到Elasticsearch中

1func ReceiveMessageTask(url string) interface{} {
 2    result, err := sqsClient.ReceiveMessage(&sqs.ReceiveMessageInput{
 3        AttributeNames:        aws.StringSlice([]string{"All"}),
 4        MaxNumberOfMessages:   aws.Int64(1),
 5        MessageAttributeNames: aws.StringSlice([]string{"All"}),
 6        QueueUrl:              aws.String(url),
 7        VisibilityTimeout:     aws.Int64(30),
 8        WaitTimeSeconds:       aws.Int64(0),
 9    })
10    log.Println(*result.Messages[0].Body)
11    if err != nil {
12        log.Println("Receive Message Failed:", err)
13        return ""
14    }
15
16    if len(result.Messages) > 0 {
17        _, delErr := sqsClient.DeleteMessage(&sqs.DeleteMessageInput{
18            QueueUrl:      aws.String(url),
19            ReceiptHandle: result.Messages[0].ReceiptHandle,
20        })
21        if err != nil {
22            log.Println("Delete Message Failed:", delErr)
23        }
24
25        message := new(gpsMessage)
26        Err := json.Unmarshal([]byte(*result.Messages[0].Body), message)
27        if Err != nil {
28            log.Println("Receive Message Unmarshal Failed", Err)
29            return ""
30        }
31        return message
32    }
33    return ""
34}
35
36func Build(url string, method string, body interface{}) []byte {
37    client := &http.Client{}
38    //向服務端發送get請求
39    bodyData, _ := json.Marshal(body)
40    request, _ := http.NewRequest(method, url, bytes.NewReader(bodyData))
41
42    request.Header.Set("Accept", "application/json")
43    request.Header.Set("Content-Type", "application/json")
44    //接收服務端返回給客戶端的信息
45    response, _ := client.Do(request)
46    r, _ := ioutil.ReadAll(response.Body)
47    return r
48}
49
50func PostMessageToES(p string, body interface{}) string {
51    postReturn := new(postRes)
52    postResponse := Build(p, "POST", body)
53    err := json.Unmarshal(postResponse, postReturn)
54    if err != nil {
55        log.Println("Unmarshal Failed", err)
56    }
57    jsonS, _ := json.Marshal(postReturn)
58    log.Println("postResult:", string(jsonS))
59    return postReturn.Id
60}

Step4

從Elasticsearch按照需求索引搜索信息

1func GetMessageFromES(p string) {
 2    message := new(esMessage)
 3    getResponse := Build(p, "GET", nil)
 4    log.Println("getPath:", p)
 5    Err := json.Unmarshal(getResponse, message)
 6    if Err != nil {
 7        log.Println("Unmarshal Failed", Err)
 8    }
 9    jsonM, _ := json.Marshal(message)
10    log.Println("getResult:", string(jsonM))
11}

結果示例:

能夠按照車輛信息進行數據的檢索,繪製出汽車動態地圖,利用這些數據來幫助自動駕駛、動態導航、車輛健康度分析等多種場景進行實現。

京東雲的隊列服務做爲Serverless開發中的BaaS服務,實現了中間件服務的無運維和毫秒級擴縮能力,支持京東雲的合做夥伴零成本啓動業務和按使用量付費的模式,幫助用戶解決資源擴縮和閾值監控等複雜問題。結合函數服務FaaS使用,能夠知足更豐富的場景,而且調用整個京東雲Serverless生態,打造基於雲原生21世紀的開放式的全新應用。 點擊「 瞭解 」,快來進行體驗吧!

歡迎點擊「京東雲」瞭解更多精彩內容

相關文章
相關標籤/搜索