「平常研究」之 respage01 :動態刷新百度熱力圖

本來要成爲一個小系列了

由於這個DashBoard本身會一直開發維護下去,本來會演變成一個《試用 vue-admin-template 寫一個本身的dashboard》+ 一、二、三、、N。可是今天仍是把標題改掉了,由於其實重點並非dashboard自己,真正的重點是我平常想去作的業餘研究,順便研究些華麗的先後端技術。javascript

respage01 再說明

Respage01的原由是老婆大人對周邊各類早教和教育培訓機構春筍般出現的現象感到費解,而後向我提了一個問題,「若是咱們入股相似的一個小教育培訓機構,會不會有前途?」。這是一個很難回答的問題,由於咱們都沒有在培訓教育行業裏待過,可是既然問題已經出現了,本身也以爲這個問題有些意思,便想要作一個平常研究。Respage01會從宏觀角度去分析並監測教育培訓機構(在金華地區)相關的數據,數據都來自互聯網。html

預加功能

上一次已經留下了一個殼子,相似這樣:vue

可是從代碼上也看到了,我是把某一天的座標數據硬編碼到了這個頁面中,因此今天就是要來作成動態,恰好結合《部署Django REST Framework服務(Nginx + uWSGI + Django)》,將接口部署到線上。java

既然作了接口化,那咱們就能夠定時作好天天的數據爬取 -> 數據清洗 -> 數據入庫 -> 接口吐出。那麼頁面即可天天獲取到最新的數據。node

問題又來了,既然天天有了最新的數據,那最好是有方式能夠看到天天的變化。可能在必定的時間跨度內,能夠看到一些明顯的變化,因此我打算作一個數據回看功能,作法可能很簡單: 觸發了某個按鈕後,定時刷新全部的數據,或者作成一個短視頻形式python

在一個較短的時間跨度內,有可能很難從熱力圖上看到變化,因此打算加一個折線圖來標識天天的數據。mysql

部署自動爬取和數據清洗併入庫

目前的爬蟲目錄很簡單:redis

.
├── config.json
├── log
│   └── train-2018-09-03.log
├── result
│   └── train-2018-09-03.txt
└── spiker.py
複製代碼

那完成這個事情定時運行相似這樣的腳本就能夠:sql

python spiker.py | python format.py | python writeToRedis.py && python redisToMysql.py
複製代碼

雖然有人會建議能夠直接在爬取時完成清洗、去重和入庫的動做,可是我仍是喜歡用這種流式的方法來處理,這樣更加清晰,功能也更加解耦。並且瞭解管道的同窗能夠看出來,這其實就是同時完成了爬取、清洗和入庫動做,只不過是每條數據串行完成了這系列動做。這裏的writeToRedis.py是爲了利用redis自然的去重功能,redis的讀寫性能也會讓效率更高些。數據庫

修改spiker.py

修改就只有兩個:

  • 將原先的查詢關鍵字等配置信息寫到config.json中,方便各管道節點獲取到統一的信息
  • 在原先寫文件的地方,直接加個print,將數據標準輸出。
""" 查詢關鍵字:移到config.json """
FileKey = 'train'
KeyWord = u"早教$培訓"
複製代碼
## 設置標準輸出的編碼格式
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8')

for r in res['results']:
  file.writelines(str(r).strip() + '\n')
  # 增長標準輸出
  print(str(r).strip(), flush=True)
複製代碼

新增format.py 過濾無用信息

首先讀取上一級管道的標準輸出做爲輸入,使用fileinput即可實現:

def main():
    for line in fileinput.input():
         format(line)
複製代碼

分析一條的數據的結構,只保留感興趣的數據(不用擔憂丟失,由於在第一個節點上已經保存了原始數據),並儘可能把json結構拉平成只有一級:

{'name': '英倫藝術培訓', 'lat': 29.109614, 'lng': 119.662018, 'address': '解放東路238號福蓮匯8層', 'province': '浙江省', 'city': '金華市', 'area': '婺城區', 'street_id': '15ca1ce6773a95f7a2a9343c', 'detail': 1, 'uid': '15ca1ce6773a95f7a2a9343c', 'detail_info': {'tag': '教育培訓;培訓機構', 'type': 'education', 'detail_url': 'http://api.map.baidu.com/place/detail?uid=15ca1ce6773a95f7a2a9343c&output=html&source=placeapi_v2', 'overall_rating': '0.0', 'children': []}}
複製代碼

因爲該數據一級比較簡單,因此format也只是作了很小的處理,另外,這樣的好處時,不一樣的數據結構能夠寫不一樣的format就能夠。

# coding: utf-8
import fileinput
import io
import sys
import chardet
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8')

def format(line):
    """ :param line: :return: """
    result = {}
    tmp = eval(line.decode('utf-8'))
    try:
        result = {
            "name": str(tmp["name"]),
            "lat": tmp["location"]["lat"],
            "lng": tmp["location"]["lng"],
            "address": str(tmp["address"]),
            "tag": str(tmp["detail_info"]["tag"]),
        }

        # 部分數據可能缺失字段
        if "detail_url" in tmp["detail_info"]:
            result["detail_url"] = tmp["detail_info"]["detail_url"]
        else:
            result["detail_url"] = ""

        if "overall_rating" in tmp["detail_info"]:
            result["rate"] = tmp["detail_info"]["overall_rating"]
        else:
            result["rate"] = "0.0"

        print(str(result).strip(), flush=True)

    except Exception as e:
        print(e)
        pass


def main():
    try:
        for line in fileinput.input(mode='rb'):
            format(line)
        sys.stderr.close()
    except Exception as e:
        print(e)
        pass


if __name__ == '__main__':
    main()

複製代碼

若是數據量大,能夠用相似的方法來調試:

cat node1.txt | head -n 1 | python format.py
複製代碼

新增writeToRedis.py

其實使用python的set也能夠完成去重的事情,代碼中也能夠嘗試這樣的操做。關於去重的方式,在不一樣場景下有各式的方案,咱們這屬於簡單場景,由於數據量不大。

系統安裝redis服務,並配置密碼:

sudo apt-get install redis-server
複製代碼

在虛擬環境下安裝redis庫:

pip install redis
vi /etc/redis/redis.conf
# 打開 requirepass 配置項,並後面跟上密碼
requirepass xxxx
複製代碼

登陸測試:

redis-cli -a xxxx
複製代碼

redis有String、List、Set、Hash、Sort Hash幾種類型,因爲咱們只是要作去重,那就用Set結構就能夠:

train_2018_09_07(key) -> (數據1,數據2 ... 數據n)
複製代碼

writeToRedis的簡單實現:

# coding: utf-8
import fileinput
import redis
import time
from tool.tool import tool
import io
import sys
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8')


def connectRedis():
    re = redis.Redis(
        host=tool.getRedisHost(),
        port=tool.getRedisPort(),
        password=tool.getRedisPass(),
        decode_responses=True)
    return re


def main():
    today = time.strftime("%Y_%m_%d")
    setName = tool.getFileKey() + "_" + today
    try:
        re = connectRedis()
        for line in fileinput.input(mode='rb'):
            re.sadd(setName, line.decode('utf-8').strip())
        exit(0)
    except Exception as e:
        print(e)
        exit(-1)

if __name__ == '__main__':
    main()

複製代碼

使用redis的set還有一個好處就是,能夠理由set的差集等功能,快速獲取天天發生變化的數據。這個需求打算後面加上

執行:

python spiker.py | python format.py | python writeToRedis.py
複製代碼

運行後,原始數據文件條目:

cat train-2018-09-07.txt | wc -l                                            663
複製代碼

Redis Set 內的條目數:

127.0.0.1:6379> SCARD train_2018_09_07
(integer) 640
複製代碼

說明確實仍是有重複的數據,緣由多是咱們使用了10*10小矩形掃描的方式,不可避免地會有交界處重疊的問題。

新增redisToMysql.py

關於使用了redis後仍是否須要Mysql的討論也有不少,你們能夠去參與討論。我我的的考慮是Django上能夠更好地支持Mysql來作序列化和反序列化,畢竟Mysql的查詢功能也更加溫馨一些。

首先從redis中讀出數據的形式,可使用迭代器的方式,好處是稍微省內存些,可是問題是若是單條數據單獨寫入mysql的話,IO上估計也不太合算,因此我使用pandas的DataFrame寫入mysql的方式來分批寫入,使用pandas的好處是字典數據寫入數據操做比通常的mysql庫要簡潔不少。

虛擬環境下安裝pandas:

pip install pandas sqlalchemy
複製代碼
# coding: utf-8
import redis
from tool.tool import tool
import time
import pandas as pd
from sqlalchemy import create_engine
import pymysql


def connectRedis():
    """ 鏈接Redis :return: redis connect """
    re = redis.Redis(
        host=tool.getRedisHost(),
        port=tool.getRedisPort(),
        password=tool.getRedisPass(),
        decode_responses=True)
    return re


def connectMysql():
    """ 鏈接mysql數據庫 :return: engine connect """
    config = tool.getMysqlConfig()
    engine = create_engine(str(r"mysql+pymysql://%s:%s@%s/%s?charset=utf8") %
                           (config['User'],
                            config['Pass'],
                            config['Host'],
                            config['Name']))
    return engine


def redisToMysql(re, en):
    """ :param re: redis connect :param en: mysql engine connect :return: """
    today = time.strftime("%Y_%m_%d")
    tableName = tool.getFileKey() + '_' + today
    res = []
    index = 0
    for item in re.sscan_iter(tool.getFileKey() + '_' + today):
        tmp = eval(item.encode('utf-8').decode('utf-8'))
        tmp['time'] = today
        res.append(tmp)
        index += 1
        if index >= 100:
            df = pd.DataFrame(res)
            df.to_sql('respage01', con=en, if_exists='append', index=False,)
            index = 0
            res = []
    if index != 0:
        df = pd.DataFrame(res)
        df.to_sql(name='respage01', con=en, if_exists='append', index=False)

    # 添加主鍵
    # print("xxxxxxxx")
    # with en.connect() as con:
    # con.execute("alter table respage01 add COLUMN id INT NOT NULL AUTO_INCREMENT primary key first")


def main():
    re = connectRedis()
    en = connectMysql()
    redisToMysql(re, en)


if __name__ == '__main__':
    main()

複製代碼

爲了後面django處理方便,我後面臨時加入了一個自增id做爲主鍵。方法能夠是:

alter table respage01 add COLUMN id INT NOT NULL AUTO_INCREMENT  primary key first;
複製代碼

編寫相關apis

咱們設計兩個api: * 獲取某個時間段內的全部座標數據 * 獲取某個時間段內天天的數量值

model實現

class Respage01Info(models.Model):
    """ respage 01 相關的數據 """
    time = models.CharField(max_length=100)
    name = models.CharField(max_length=200)
    address = models.CharField(max_length=500)
    detail_url = models.URLField(max_length=500)
    rate = models.FloatField()
    lat = models.FloatField()
    lng = models.FloatField()

    class Meta:
			# 指定數據表
        db_table = "respage01"
複製代碼

須要注意的是,咱們已經擁有了數據庫,而且表裏已經有了數據,因此在執行migrate的時候,須要指明fake掉該項目的數據遷移:

python manage.py migrate --fake rouboapi
複製代碼

Serializer實現

因爲咱們的計數接口是須要使用聚合類查詢功能,簡單說,就是須要返回數據庫字段之外的字段給客戶端,因此須要使用serializers的Field方法。

class Respage01Serializer(serializers.HyperlinkedModelSerializer):
    """ 序列化Respage01相關的數據 """

    class Meta:
        model = Respage01Info
        fields = ('time', 'lat', 'lng', 'name', 'address', 'detail_url', 'rate')


class Respage01CountSerializer(serializers.HyperlinkedModelSerializer):
    """ 序列化計數數據,用於序列化聚合類查詢的結果 """
    time = serializers.StringRelatedField()
    count = serializers.IntegerField()

    class Meta:
        model = Respage01Info
        fields = ('time', 'count')
複製代碼

view實現

這裏須要用到django的數據庫查詢相關的知識,咱們這裏用到了fiter、values、annotate幾個函數,具體的能夠參考官方文檔,基本用法仍是比較簡單。

class Respage01(APIView):
    """ 獲取respage01相關的數據 """

    authentication_classes = []
    permission_classes = []

    def rangeTime(self, start_time, end_time):
        """ 獲取時間區間 :param start_time: :param end_time: :return: """
        print("------------")
        dateList = [datetime.strftime(x, "%Y_%m_%d")
                    for x in list(pd.date_range(start=start_time.replace('_',''), end=end_time.replace('_','')))]
        return dateList

    def get(self, request, format=None):
        req = request.query_params
        if 'type' not in req or 'start_time' not in req or 'end_time' not in req:
            return Response({}, status=status.HTTP_400_BAD_REQUEST)
        if req['type'] == 'location':
            dateList = self.rangeTime(start_time=req['start_time'], end_time=req['end_time'])
            queryset = Respage01Info.objects.filter(time__in=dateList)
            serializer = Respage01Serializer(queryset, many=True)
        elif req['type'] == 'count':
            dateList = self.rangeTime(start_time=req['start_time'], end_time=req['end_time'])
            queryset = Respage01Info.objects.filter(time__in=dateList).values('time').annotate(count=Count('id'))
            serializer = Respage01CountSerializer(queryset, many=True)
        return Response(serializer.data, status=status.HTTP_200_OK)
複製代碼

接口上線後發現的異常

在接口上線後測試過程當中,發現接口極其不穩定,查了一下發現mysql會異常地退出,查看了日誌發現是內存不足致使。

個人vps是1G內存的基礎配置,雖然小,可是不至於這麼緊張。經過top【M】排序後驚奇地發現uwsgi開了10個進程,每一個進程佔用了7%左右的內存。修改uwsgi ini文件重啓後故障排除(咱們這種小服務,兩個進程足夠了)。

# mysite_uwsgi.ini file
[uwsgi]

# Django-related settings
# the base directory (full path)
chdir           = /data/django/rouboApi
# Django's wsgi file
module          = rouboinfo.wsgi
# the virtualenv (full path)
home            = /data/django/env3

# process-related settings
# master
master          = true
# maximum number of worker processes
processes       = 2
# the socket (use the full path to be safe
socket          = /data/django/rouboApi/rouboapi.scok
# ... with appropriate permissions - may be needed
chmod-socket    = 666
# clear environment on exit
vacuum          = true
複製代碼

修改respage01頁面

roubo’s dashboard 主要是增長了兩個接口請求,並將v-charts的數據動態化。這裏也簡單加了一個「覆盤」按鈕,定時刷新數據,能夠大概看到一些變化。

<template>
  <div>
    <div style="height: 100%">
      <button @click="onPlay">覆盤</button>
      <ve-heatmap :data="chartDataMap" :settings="chartSettingsMap" height="600px"/>
    </div>
    <div>
      <ve-line :data="chartDataChart" :settings="chartSettingsChart"/>
    </div>
  </div>
</template>
複製代碼
/** * 獲取某個時間區間的位置信息 * @param start_time * @param end_time */
getLocations: function(start_time, end_time) {
  this.rouboapis.getRespage01Info('location', start_time, end_time, {
    success: (res) => {
      this.chartDataMap.rows = res
    },
    fail: (err) => {
      console.log(err)
    }
  })
},
/** * 獲取某個時間段的統計數據 * @param start_time * @param end_time */
getCount: function(start_time, end_time) {
  this.rouboapis.getRespage01Info('count', start_time, end_time, {
    success: (res) => {
      this.chartDataChart.rows = res
    }
  })
},

/** * 點擊覆盤按鈕事件 */
onPlay: function() {
  const dateList = this.getDateList('2018_09_13', this.today('_'))
  let index = 0
  const timer = setInterval(() => {
    this.getLocations(dateList[index], dateList[index])
    this.getCount('2018_09_13', dateList[index])
    index = index + 1
    if (index >= dateList.length) {
      clearInterval(timer)
      return
    }
  }, 5000)
}

複製代碼

完成

頁面仍是很醜,下一步抽時間美化下。

相關文章
相關標籤/搜索