Streamr助你掌控本身的數據(2)——三種整合數據至Streamr的典型場景

博客說明

全部刊發內容都可轉載可是須要註明出處。
html

三種整合數據至Streamr的典型場景

本系列文檔主要介紹怎麼經過Streamr管理本身的DATA,整個系列包括三篇教程文檔,分別是:教你5分鐘上傳數據至Streamr三種整合數據至Streamr的典型場景教你在Streamr市場上發佈數據。全部文檔均參考Streamr blog。前兩篇主要偏向技術文檔,因此須要有必定的技術背景。第三篇不包含任何技術知識,大部分人均可以按照教程來完成相應的操做。node

簡介

第一篇文檔主要介紹如何經過調用API接口上傳數據至Streamr,本篇教程文檔主要介紹了三種整合數據至Streamr網絡的典型示例。包括: 1. 直接從數據源推送數據至Streamr網絡; 2. 數據源和Streamr之間創建橋接(數據源向橋接點實時推送數據); 3. 數據源和Streamr之間創建橋接(橋接點向數據源發送輪詢請求)。git

相關概念說明

在Streamr網絡中,每一個數據點屬於一個Stream。每一個數據點(或者成爲事件、消息)包含了標有時間戳的一系列信息,好比,傳感器採集的數據、即時通信的消息等。根據使用場景的不一樣,一個Stream能夠包括一個源的數據或者是多個數據源的數據。Streamr網絡爲用戶上傳數據提供API接口,用戶調用該API接口最簡單的方式是使用Streamr客戶端庫。當前的Streamr客戶端庫是JavaScript編寫的,其餘語言的客戶端庫還在開發中。若是您不方便使用當前的Streamr客戶端庫,您也可使用HTTP庫來調用API接口,相關方法請參見上一篇說明文檔。程序員

場景一:直接從數據源推送數據至Streamr網絡

  • 複雜度:低
  • 延遲:低
  • 可用性:中等
    在這種模式下,數據源一產生新的數據,將被當即直接推送至Streamr網絡。這種模式是官方推薦的,可是受環境影響,這種模式有可能會不可用。這種模式須要用戶可以控制產生數據的系統而且可以決定該數據發送的地方。舉例來講,若是您是IoT設備生產商,您能夠直接將Streamr客戶端直接植入這些IoT設備中,而且容許設備使用者上傳數據至Streamr網絡。大部分的工業設備具備良好的可配置特性,能夠隨時將產生的數據接入Streamr網絡。然而,諸如汽車、手機、智能手環等消費級設備常常強制將用戶數據發送至設備製造商的雲端,而後向用戶提供訪問該數據的API接口。在這種場景下,用戶可使用橋接模式將數據接入到Streamr網絡(具體參見下一小節)。
    下面以一個實際應用爲例說明如何直接從數據源推送數據至Streamr網絡。這裏主要介紹如何將IoT傳感器Ruuvi產生的測量數據實時上傳至Streamr。該設備經過低功耗藍牙將數據傳輸至一臺裝有Streamr客戶端庫的網關計算機。其中,網關計算機爲每一個設備單獨創建一個Stream,而且將設備產生的測量數據實時上傳至所建立的Stream。其示例代碼以下:
const StreamrClient = require('streamr-client')
const ruuvi = require('node-ruuvitag')
const API_KEY = 'MY-API-KEY'
const client = new StreamrClient({
    apiKey: API_KEY
})
const streams = {}
console.log('Listening for RuuviTags...')
ruuvi.on('found', tag => {
    // Create a Stream for this tag if it doesn't exist yet
    if (!streams[tag.id]) {
        streams[tag.id] = await client.getOrCreateStream({
            name: 'Ruuvi ' + tag.id
        })
    }
    tag.on('updated', async (data) => {
        const stream = streams[tag.id]
        try {
            // Produce the data point to the stream
            await stream.produce(data)
        } catch (err) {
            console.error(err)
        }
    })
})

場景二:數據源和Streamr之間創建橋接(數據源向橋接點實時推送數據)

  • 複雜度:中等
  • 延遲:低
  • 可用性:中等
    這種模式適用於您沒法直接控制數據源的場景,用戶能夠經過一系列API接口實時獲取數據。具體地,在這種模式下,數據源實時將新產生的數據推送給用戶,用戶收到通知後當即將該數據上傳至Streamr。

示例1

示例1主要介紹如何使用Bitfinex客戶端庫和Streamr客戶端庫,將Bitfinex上DATA/USD交易數據和Streamr創建鏈接。github

const BFX = require('bitfinex-api-node')
const StreamrClient = require('streamr-client')
const STREAM_ID = 'MY-STREAM-ID'
const API_KEY = 'MY-API-KEY'
const bws = new BFX('', '', {
    version: 2,
    transform: true
}).ws
const client = new StreamrClient({
    apiKey: API_KEY
})
bws.on('open', () => {
    bws.subscribeTicker('DATUSD')
})
bws.on('ticker', (pair, ticker) => {
    console.log('Ticker:', ticker)
    client.produceToStream(STREAM_ID, ticker)
        .then(() => console.log('Sent to Streamr!'))
        .catch((err) => console.error(err))
})

示例2

示例1主要從編程角度介紹如何創建相應的鏈接,若是您非程序員,那可使用本示例中的可視化編輯器Streamr Editor來創建相應的鏈接。可是,使用該可視化編輯器也須要用戶瞭解一些API接口和協議等技術知識。下面展現一個經過MQTT API監測電車實時位置的過程,該過程已得到赫爾辛基公共交通的受權。編程

# MQTT 模型參數
URL: mqtt://mqtt.hsl.fi
Topic: /hfp/v1/journey/ongoing/tram/#

MQTT 模型數據JSON格式的字符串,經過JSON解析器模型解析爲一個對象。從該對象中提取出'VP'字段,該字段包含的也是一個對象。從提取出的對象中獲取'desi','lat','long'和'veh'字段的值,而後通過數據處理以後經過SendToStream模型將該數據上傳至Streamr。

場景三:數據源和Streamr之間創建橋接(橋接點向數據源發送輪詢請求)

  • 複雜度:中等
  • 延遲:中等
  • 可用性:好
    大多數雲服務提供商都會爲用戶提供訪問數據的API接口,這種場景正是使用該API接口完成用戶數據上傳至Streamr。在這種場景下,當數據源產生新的數據時,用戶不會收到任何通知,這就意味着用戶須要重複向該API接口發送數據請求,這種方式成爲輪詢。顯而易見,這種方式並非一個獲取實時數據的最佳方式,緣由以下:1. 在用戶請求該數據以前,該數據可能已經變化屢次,意味着用戶會丟失一些數據信息;2. 會給API服務器端帶來沒必要要的負載,由於用戶請求的時機是不固定的,不必定恰巧在數據值發送變化的時候;3. 增長了時延(輪詢週期的一半時間)。

示例1

本示例將介紹如何使用OpenWeatherMap每隔15分鐘查詢瑞士楚格的天氣狀況。對於天氣等這種數據,使用輪詢方式獲取是可行的,由於天氣情況不會變化的太快。json

const fetch = require('node-fetch')
const StreamrClient = require('streamr-client')
const OPENWEATHERMAP_API_KEY = 'MY-OPENWEATHERMAP-KEY'
const STREAMR_API_KEY = 'MY-STREAMR-KEY'
const POLL_INTERVAL = 15 * 60 * 1000 // 5 minutes
const location = 'Zug,Switzerland'
const client = new StreamrClient({
    apiKey: STREAMR_API_KEY
})
// Query data from OWM and produce the result to Streamr
function pollAndProduce(stream) {
    fetch(`https://api.openweathermap.org/data/2.5/weather?q=${location}&APPID=${OPENWEATHERMAP_API_KEY}&units=metric`)
        .then((response) => response.json())
        .then((json) => {
            console.log(json)
            // Produce the data point to Streamr
            return stream.produce(json)
        }).catch((err) => {
            console.error(err)
        })
}
// Create a Stream for this location, if it doesn't exist
client.getOrCreateStream({
    name: `Weather in ${location}`,
    description: 'From openweathermap.org, updated every 15 minutes'
}).then((stream) => {
    console.log(`Target Stream id: ${stream.id}`)
    // Poll and produce now
    pollAndProduce(stream)
    // Keep repeating every 15 minutes
    setInterval(() => pollAndProduce(stream), POLL_INTERVAL)
}).catch((err) => {
    console.log(err)
})

示例2

示例2將介紹如何利用可視化編輯器完成上述功能。
canvas

圖中,Clock模型每隔15分鐘發出一次警報而且觸發HTTP Request模型向OpenWeatherMap發出獲取天氣數據的請求。而後,SendToStream模型將獲取到的數據上傳至Streamr。api

參考文獻

  1. https://medium.com/streamrblog/three-patterns-for-integrating-your-data-to-streamr-2-of-3-38027dc91f9e
相關文章
相關標籤/搜索