node簡單分析基金定投是否靠譜(數據篇)

注:若是該文章侵犯數據來源網站數據安全,本人會當即刪除前端

當前環境,對於大多數人而言,理財渠道愈來愈窄,就連餘額寶的年化都降至2%如下,大多數狀況是沒法跑贏通脹的,並且市面其餘理財產品不是風險過高就是要求額度比較大亦或收益不達預期,所以,閒來無事,簡單的分析下基金定投。node

1. 市面常見理財產品對比

  • 股票:靠政策和市場吃飯,並且極度考驗人性,須要全方位知識體系去支撐,八成投資者被淪爲韭菜
  • 基金:定投須要耐得住時間,不用勞心傷神,達到預期收割便可(僅限指數基金),難點無非在選擇入場的時間點及其是否經得住時間的考驗
  • p2p: 近幾年暴雷頻發,年化預期10%,但須要承擔血本無歸的風險,資金不靈活
  • 銀行大額訂單:資金量大,收益不達預期(達預期的估計你也撈不到)
  • 餘額寶:市面同類產品京東小金庫,微信理財其實差很少都是貨幣基金等低風險基金,收益大多數 時間很難跑贏通脹

2. 前端爬蟲

分析數據以前固然離不開數據,固然,對於基金數據各大平臺也提供了專門的對外接口供數據分析,但大多數都是收費的,並且還要註冊帳號,折騰太煩人,仍是直接把數據搞進本身的數據庫舒坦。mysql

先簡單總結下前端幾種爬蟲方式sql

方式 簡介 優勢 缺點
瀏覽器dom操做 瀏覽器的控制檯直接dom操做獲取內容 方便簡潔 單頁面少許數據獲取
iframe 經過在網站插入iframe標籤,不斷更改src監聽iframe內容加載完成事件獲取數據 解決同源問題,速度快 數據收集困難,只能採集完一次性打印到控制檯
headless 經過無頭瀏覽器在node端模擬人爲操做,獲取數據 能夠僞造權限,來源,可以獲取頁面的幾乎全部數據 採集速度慢
接口請求 對於異步數據,抓取接口,尋找參數規律無腦請求 抓取速度快,數據易分析 部分網站須要僞造來源和身份,IP容易被封(不斷更換代理能夠解決)

3. 頁面分析

切記,分析以前必定要查看數據網站的robots.txt文件和申明,是否容許爬取chrome

3.1 獲取某基金網站的全部基金列表

咱們發現該基金列表能夠經過調用接口獲取(能經過接口獲取的絕對不去用headless,速度快,數據完整性高),惟一有效參數就是page=2,200表示第二頁200條

3.2 獲取基金概況

咱們發現基金概況頁面數據服務端渲染,所以須要經過headless獲取,並且每一個頁面對應的get參數爲該基金編碼,所以第一步獲取的基金列表的編碼能夠用來拼湊基金概況連接數據庫

3.3 獲取基金的歷史淨值

咱們發現能夠經過接口獲取,有效參數爲基金編碼,頁碼,每頁數以及時間段json

4. 數據表創建

經過對網站分析,創建sql表,基金列表和基金淨值表瀏覽器

分析完數據來源網站並建完數據表,接下來開始擼代碼,go!!!安全

5. 數據庫的簡單封裝

因爲須要將爬取的數據存入數據庫,因此對mysql進行了簡單的封裝bash

5.1 封裝簡單的打印

新建文件log.js封裝簡單的日誌打印

const log = console.log;
const chalk = require('chalk');
module.exports = {
    info(s){
        log(chalk.green(s));
    },

    warn(s){
        log(chalk.yellow(s));
    },

    err(s){
        log(chalk.red(s));
    }
}
複製代碼

5.2 事務簡單封裝

新建TransactionConnection.js對sql的事務進行async/await封裝

const log = require('./log');

class TransactionConnection{

    constructor(conn){
        this._connection = conn; //鏈接
        this._isReleased = false; //改鏈接是否已釋放
    }

    /**
     * @description:  query封裝
     * @param {type} 
     * @return: 
     */
    query(sql, values){
        if( ! this._connection ){
            return Promise.reject('當前MySQL鏈接已經釋放, 不能調用 query');
        }
        return new Promise( (resolve, reject) => {
            this._connection.query(sql, values, function(err, results, fields){
                if( err ){
                    return reject(err);
                }
                resolve({
                    results : results,
                    fields : fields
                });
            });
        });
    }

    /**
     * @description: 釋放鏈接
     * @param {type} 
     * @return: 
     */
    release(){
        if( ! this._isReleased && this._connection ){
            this._connection.release();
            this._isReleased = true;
            this._connection = null;
            log.info('鏈接已釋放')
        }
    }

    /**
     * @description: 銷燬鏈接
     * @param {type} 
     * @return: 
     */
    destroy(){
        if( this._connection ){
            this._connection.destroy();
            this._connection = null;
        }
    }

    /**
     * @description: 開始事務
     * @param {type} 
     * @return: 
     */
    beginTransaction(){
        if( ! this._connection ){
            return Promise.reject('當前MySQL鏈接已經釋放, 不能調用 beginTransaction');
        }
        let that = this;
        return new Promise( (resolve, reject) => {
            this._connection.beginTransaction( function(err){
                if( err ){
                    return reject(err);
                }
                resolve(that);
            } );
        });
    }

    /**
     * @description: 提交事務
     * @param {type} 
     * @return: 
     */
    commit(){
        if( ! this._connection ){
            return Promise.reject('當前MySQL鏈接已經釋放, 不能調用 commit');
        }
        let that = this;
        return new Promise( (resolve, reject) => {
            this._connection.commit( function(err){
                if( err ){
                    return reject(err);
                }
                resolve(that);
            } );
        });
    }

    /**
     * @description: 事務回滾
     * @param {type} 
     * @return: 
     */
    rollback(){
        if( ! this._connection ){
            return Promise.reject('當前MySQL鏈接已經釋放, 不能調用 rollback');
        }
        let that = this;
        return new Promise( (resolve, reject) => {
            this._connection.rollback( function(err){
                if( err ){
                    return reject(err);
                }
                resolve(that);
            } );
        });
    }

}
複製代碼

5.3 sql的簡單封裝

新建mysqlFactory.js對sql進行簡單封裝

'use strict';

const mysql = require('mysql');
const log = require('./log');
const TransactionConnection = require('./TransactionConnection');

module.exports = function mysqlFactory(config){
    const pool = mysql.createPool(config);
    let singleton = null;

    class MysqlClient{
        
        /**
         * @description: 獲取單例靜態方式
         * @param {type} 
         * @return: 
         */
        static getSingle(){
            if(! singleton){
                singleton = new MysqlClient();
                return singleton;
            }
        }

        /**
         * @description: 斷開數據庫鏈接
         * @param {type} 
         * @return: 
         */
        close(){
            return new Promise( function(resolve, reject){
                pool.end(err => {
                    if(err){
                        log.err('斷開數據庫鏈接失敗');
                        return reject(err);
                    }
                    resolve();
                })
            })
        }


        /**
         * @description: 獲取事務操做鏈接
         * @param {type} 
         * @return: 
         */
        getConnection(){
            return new Promise( function(resolve, reject){
                //當前配置的是 鏈接池 模式, 直接從池子中獲取
                pool.getConnection( function(err, connection){
                    if( err ){
                        log.err(`從MySQL鏈接池中獲取connection失敗: ${err}`);
                        return reject(err);
                    }
                    let conWrap = new TransactionConnection(connection);
                    resolve( conWrap );
                });
            });
        }


        /**
         * @description: query
         * @param {type} 
         * @return: 
         */
        query(sql, values){
            return new Promise( (resolve, reject) => {
                let finalSQL = sql;
                if( arguments.length === 2 ){
                    finalSQL = mysql.format(sql, values);
                }
                // log.info(`執行MySQL的SQL語句: ${finalSQL}`);
                pool.query(finalSQL, function(err, results, fields){
                    if( err ){
                        return reject(err);
                    }
                    resolve({
                        results : results,
                        fields : fields
                    });
                });
            });
        }

        /**
         * @description: 格式化字符串
         * @param {type} 
         * @return: 
         */
        format(sql, values){
            return mysql.format(sql, values);
        }

        /**
         * @description: 批量插入數據
         * @param {type} 
         * @return: 
         */
        async insert(table, rows){
            if (!Array.isArray(rows)) {
                rows = [rows];
            }

            const fields = Object.keys(rows[0]), len = rows.length;
            const template = `(${new Array(fields.length).fill('?').join(', ')})`;
            const sql = rows.reduce((str, obj, idx) => {
                const currentVal = fields.map(key => obj[key]);
                if(idx >= len - 1) return `${str} ${this.format(template, currentVal)}`
                return `${str} ${this.format(template, currentVal)}, `
            }, `INSERT INTO ${table} (${fields.join(', ')}) VALUES`)

            let out = await this.query(sql);
            return out.results;
        }     

        
    }

    return MysqlClient;
}
複製代碼

6. 工具函數的簡單封裝

因爲抓取數據牽扯到頁面抓取和接口抓取,決定使用urllib進行接口請求,對頁面用puppeteer-core進行瀏覽器模擬抓取

const urllib = require('urllib');
const path = require('path');
const puppeteer = require('puppeteer-core');
const {chromePath} = require('../config');

class Request{
    
    constructor(){
        this.browser = null;
        this.page = null;
    }

    /**
     * @description: 請求封裝
     * @param {type} 
     * @return: 
     */
    curl(url, options){
        return new Promise((resolve, reject)=>{
            urllib.request(url, options, (err, data, res)=>{
                if(err) return reject(err);
                resolve({
                    status: res.statusCode,
                    data: data.toString()
                })
            })
        })
    }

    /**
     * @description: 打開瀏覽器並新建窗口
     * @param {type} 
     * @return: 
     */
    async initBrowser(){
        if(this.browser && this.page) return this.page;
        this.browser = await puppeteer.launch({
            executablePath: chromePath,
            headless: true,
        })

        this.page = await this.browser.newPage();
    }

    /**
     * @description: 打開頁面獲取數據
     * @param {type} 
     * @return: 
     */
    async goPage(url, config, callback){
        if(!this.page){
            await this.initBrowser();
        }
        config = Object.assign({
            timeout: 0,
            waitUntil: 'domcontentloaded'
        }, config);

        await this.page.goto(url, config);
        const result = await this.page.evaluate(callback);
        return result;
    }

    /**
     * @description: 關閉瀏覽器
     * @param {type} 
     * @return: 
     */
    async close(){
        if(this.browser || this.page){
            await this.browser.close();
        }
    }

}

module.exports = new Request();
複製代碼

7.增長配置文件

新建config/index.js增長配置文件,因爲數據比較敏感,因此將配置文件中的有關連接部分隱去

const path = require('path')


module.exports = {

    // mysql配置
    mysql: {
        connectionLimit: 10,
        host: '127.0.0.1',
        user: 'root',
        password: '123456',
        database: 'fund',
        charset: 'UTF8_GENERAL_CI',
        timezone: 'local',
        connectTimeout: 10000
    },

    // headless配置
    chromePath: path.resolve(__dirname, '../Chromium.app/Contents/MacOS/Chromium'),

    //數據接口配置
    fund: {
        fundList: {
            url: '基金列表請求連接',
            config: {
                method: 'GET',
                headers: {
                    referer: '請求來源僞造'
                },
                data: {
                    page: '1,50000'
                }
            }
        },
        fundDetail: (code) => '基金詳情頁面連接',
        dayValue: {
            url: '基金每日淨值連接',
            getConfig: function (fundCode) {
                return {
                    method: 'GET',
                    headers: {
                        referer: '請求來源僞造'
                    },
                    data: {
                        fundCode,
                        pageIndex: 1,
                        pageSize: 365 * 10,
                        startDate: '',
                        endDate: '',
                    }
                }
            }
        }
    }

}
複製代碼

8. 主代碼編寫

8.1 獲取基金列表

將基金列表先寫入json文件,後續和詳情一塊兒寫入基金列表數據表

const mysqlFactory = require('./lib/mysqlFactory');
const request = require('./lib/Request');
const fs = require('fs-extra');
const moment = require('moment');
const {
    mysql,
    fund
} = require('./config');
const log = require('./lib/log');

// 獲取基金列表
async function getFundList() {
    const {
        url,
        config
    } = fund.fundList;
    const {
        data,
        status
    } = await request.curl(url, config);
    if (status === 200) {
        eval(data); // 該接口是jsonp直接執行獲得變量db
        const {
            datas
        } = db;
        log.info(`基金列表總共${datas.length}條數據,最後一條的名字爲${datas[datas.length-1][1]}`);
        const result = datas.map(e => {
            return {
                code: e[0],
                name: e[1]
            };
        })
        await fs.outputJSON('data.json', JSON.stringify(result));
        log.info(`基金列表寫入完成!!!`);
        return result;
    } else {
        log.err('獲取基金列表出錯')
        process.exit(1);
    }

}
複製代碼

8.2 獲取列表詳情

unction _callback() {
    const tr = document.querySelectorAll('.info tr td');
    const [establishDate, total] = tr[5].textContent && tr[5].textContent.split('/');
    return {
        type: tr[3].textContent || '暫無',
        publicDate: tr[4].textContent || '暫無',
        establishDate: establishDate.trim() || '暫無',
        total: total.trim() || '暫無',
        company: tr[8].textContent || '暫無',
        trusteeship: tr[9].textContent || '暫無',
        manager: tr[10].textContent || '暫無',
    }
}

// 獲取基金詳情
async function getFundDetail(list = [], sql) {
    try {
        for (let [index, {
                code,
                name
            }] of list.entries()) {
            log.info(`開始獲取第${index}條數據,code爲${code}`)
            const url = fund.fundDetail(code);
            const result = await request.goPage(url, {}, _callback);
            await sql.insert('fund_list', Object.assign({
                name,
                code
            }, result))
        }
    } catch (err) {
        log.err(`插入基金列表發生錯誤:${err}`);
        process.exit(1);
    }
}

複製代碼

8.3 獲取基金歷史的日淨值

因爲數據量比較大,作了簡單的併發50個請求和一次插入2萬條數據的sql處理,固然你們也能夠採用worker_threadschild_process以及proxy處理

async function getConcurrencyData(arrs, logs) {
    log.info(`開始獲取第${logs.index.join(', ')}條數據的歷史淨值,code爲${logs.code.join(', ')}`);
    try {
        const result = await Promise.all(arrs);
        if (result.every(res => res.status === 200)) {
            let allData = [];
            result.forEach((single, idx) => {
                const {
                    Data
                } = JSON.parse(single.data);
                const {
                    LSJZList
                } = Data;
                
                log.info(`第${logs.index[idx]}條的數據總量爲${LSJZList.length}`);

                const dealResult = LSJZList.map(item => {
                    return {
                        code: logs.code[idx],
                        FSRQ: item.FSRQ ? moment(item.FSRQ).valueOf() : '暫無', //淨值日期
                        DWJZ: item.DWJZ || '暫無', // 單位淨值
                        LJJZ: item.LJJZ || '暫無', // 累計淨值
                        JZZZL: item.JZZZL || '暫無', //日增加率
                        SGZT: item.SGZT || '暫無', //申購狀態
                        SHZT: item.SHZT || '暫無', //贖回狀態
                        FHSP: item.FHSP || '暫無', //分成
                    }
                });

                allData = [...allData, ...dealResult];
            })
            return allData;
        } else {
            throw new Error('獲取基金淨值失敗')
        }
    } catch (err) {
        log.err(err);
        process.exit(1);
    }
}

async function getDayValue(list = [], sql) {
    try {
        const {
            url,
            getConfig
        } = fund.dayValue, arrs = [], logs = {
            index: [],
            code: []
        }, len = list.length;
        for (let [index, {
                code
            }] of list.entries()) {
            // if(index < 8450) continue;
            if (arrs.length >= 50 || index >= len-1) {
                const allData = await getConcurrencyData(arrs, logs);
                // 每次插入500條
                while (allData.length > 0) {
                    console.log(allData.length)
                    await sql.insert('fund_value', allData.splice(0, 20000));
                };
                // 清空
                arrs.splice(0, arrs.length, request.curl(url, getConfig(code)));
                logs.index.splice(0, logs.index.length, index);
                logs.code.splice(0, logs.code.length, code);
                
            } else {
                logs.index.push(index);
                logs.code.push(code);
                arrs.push(request.curl(url, getConfig(code)));

            }

        }

    } catch (err) {
        log.err(`插入基金淨值發生錯誤:${err}`);
        process.exit(1);
    }
}

複製代碼

8.4 運行程序

async function run() {
    const sql = new(mysqlFactory(mysql));
    const data = await getFundList();
    await getFundDetail(data);
    await getDayValue(data, sql);
    await sql.close();
}
run()
複製代碼

若是對代碼要求嚴格的能夠採用爬去出錯process.exit()監聽,重啓程序來從斷開的數據繼續爬去(這也是當時爲啥將基金列表保存一份在json文件中的緣由,以便於出錯重啓程序繼續存斷開的條數繼續爬去)

9. 最終結果

下篇文章【node簡單分析基金定投是否靠譜(分析篇)】待寫...

相關文章
相關標籤/搜索