全部刊發內容都可轉載可是須要註明出處。
html
本系列文檔主要介紹怎麼經過Streamr管理本身的DATA,整個系列包括三篇教程文檔,分別是:教你5分鐘上傳數據至Streamr、三種整合數據至Streamr的典型場景、教你在Streamr市場上發佈數據。全部文檔均參考Streamr blog。前兩篇主要偏向技術文檔,因此須要有必定的技術背景。第三篇不包含任何技術知識,大部分人均可以按照教程來完成相應的操做。node
第一篇文檔主要介紹如何經過調用API接口上傳數據至Streamr,本篇教程文檔主要介紹了三種整合數據至Streamr網絡的典型示例。包括: 1. 直接從數據源推送數據至Streamr網絡; 2. 數據源和Streamr之間創建橋接(數據源向橋接點實時推送數據); 3. 數據源和Streamr之間創建橋接(橋接點向數據源發送輪詢請求)。git
在Streamr網絡中,每一個數據點屬於一個Stream。每一個數據點(或者成爲事件、消息)包含了標有時間戳的一系列信息,好比,傳感器採集的數據、即時通信的消息等。根據使用場景的不一樣,一個Stream能夠包括一個源的數據或者是多個數據源的數據。Streamr網絡爲用戶上傳數據提供API接口,用戶調用該API接口最簡單的方式是使用Streamr客戶端庫。當前的Streamr客戶端庫是JavaScript編寫的,其餘語言的客戶端庫還在開發中。若是您不方便使用當前的Streamr客戶端庫,您也可使用HTTP庫來調用API接口,相關方法請參見上一篇說明文檔。程序員
const StreamrClient = require('streamr-client') const ruuvi = require('node-ruuvitag') const API_KEY = 'MY-API-KEY' const client = new StreamrClient({ apiKey: API_KEY }) const streams = {} console.log('Listening for RuuviTags...') ruuvi.on('found', tag => { // Create a Stream for this tag if it doesn't exist yet if (!streams[tag.id]) { streams[tag.id] = await client.getOrCreateStream({ name: 'Ruuvi ' + tag.id }) } tag.on('updated', async (data) => { const stream = streams[tag.id] try { // Produce the data point to the stream await stream.produce(data) } catch (err) { console.error(err) } }) })
示例1主要介紹如何使用Bitfinex客戶端庫和Streamr客戶端庫,將Bitfinex上DATA/USD交易數據和Streamr創建鏈接。github
const BFX = require('bitfinex-api-node') const StreamrClient = require('streamr-client') const STREAM_ID = 'MY-STREAM-ID' const API_KEY = 'MY-API-KEY' const bws = new BFX('', '', { version: 2, transform: true }).ws const client = new StreamrClient({ apiKey: API_KEY }) bws.on('open', () => { bws.subscribeTicker('DATUSD') }) bws.on('ticker', (pair, ticker) => { console.log('Ticker:', ticker) client.produceToStream(STREAM_ID, ticker) .then(() => console.log('Sent to Streamr!')) .catch((err) => console.error(err)) })
示例1主要從編程角度介紹如何創建相應的鏈接,若是您非程序員,那可使用本示例中的可視化編輯器Streamr Editor來創建相應的鏈接。可是,使用該可視化編輯器也須要用戶瞭解一些API接口和協議等技術知識。下面展現一個經過MQTT API監測電車實時位置的過程,該過程已得到赫爾辛基公共交通的受權。編程
# MQTT 模型參數 URL: mqtt://mqtt.hsl.fi Topic: /hfp/v1/journey/ongoing/tram/# MQTT 模型數據JSON格式的字符串,經過JSON解析器模型解析爲一個對象。從該對象中提取出'VP'字段,該字段包含的也是一個對象。從提取出的對象中獲取'desi','lat','long'和'veh'字段的值,而後通過數據處理以後經過SendToStream模型將該數據上傳至Streamr。
本示例將介紹如何使用OpenWeatherMap每隔15分鐘查詢瑞士楚格的天氣狀況。對於天氣等這種數據,使用輪詢方式獲取是可行的,由於天氣情況不會變化的太快。json
const fetch = require('node-fetch') const StreamrClient = require('streamr-client') const OPENWEATHERMAP_API_KEY = 'MY-OPENWEATHERMAP-KEY' const STREAMR_API_KEY = 'MY-STREAMR-KEY' const POLL_INTERVAL = 15 * 60 * 1000 // 5 minutes const location = 'Zug,Switzerland' const client = new StreamrClient({ apiKey: STREAMR_API_KEY }) // Query data from OWM and produce the result to Streamr function pollAndProduce(stream) { fetch(`https://api.openweathermap.org/data/2.5/weather?q=${location}&APPID=${OPENWEATHERMAP_API_KEY}&units=metric`) .then((response) => response.json()) .then((json) => { console.log(json) // Produce the data point to Streamr return stream.produce(json) }).catch((err) => { console.error(err) }) } // Create a Stream for this location, if it doesn't exist client.getOrCreateStream({ name: `Weather in ${location}`, description: 'From openweathermap.org, updated every 15 minutes' }).then((stream) => { console.log(`Target Stream id: ${stream.id}`) // Poll and produce now pollAndProduce(stream) // Keep repeating every 15 minutes setInterval(() => pollAndProduce(stream), POLL_INTERVAL) }).catch((err) => { console.log(err) })
示例2將介紹如何利用可視化編輯器完成上述功能。
canvas
圖中,Clock模型每隔15分鐘發出一次警報而且觸發HTTP Request模型向OpenWeatherMap發出獲取天氣數據的請求。而後,SendToStream模型將獲取到的數據上傳至Streamr。api