注:若是該文章侵犯數據來源網站數據安全,本人會當即刪除前端
當前環境,對於大多數人而言,理財渠道愈來愈窄,就連餘額寶的年化都降至2%如下,大多數狀況是沒法跑贏通脹的,並且市面其餘理財產品不是風險過高就是要求額度比較大亦或收益不達預期,所以,閒來無事,簡單的分析下基金定投。node
分析數據以前固然離不開數據,固然,對於基金數據各大平臺也提供了專門的對外接口供數據分析,但大多數都是收費的,並且還要註冊帳號,折騰太煩人,仍是直接把數據搞進本身的數據庫舒坦。mysql
先簡單總結下前端幾種爬蟲方式sql
方式 | 簡介 | 優勢 | 缺點 |
---|---|---|---|
瀏覽器dom操做 | 瀏覽器的控制檯直接dom操做獲取內容 | 方便簡潔 | 單頁面少許數據獲取 |
iframe | 經過在網站插入iframe標籤,不斷更改src監聽iframe內容加載完成事件獲取數據 | 解決同源問題,速度快 | 數據收集困難,只能採集完一次性打印到控制檯 |
headless | 經過無頭瀏覽器在node端模擬人爲操做,獲取數據 | 能夠僞造權限,來源,可以獲取頁面的幾乎全部數據 | 採集速度慢 |
接口請求 | 對於異步數據,抓取接口,尋找參數規律無腦請求 | 抓取速度快,數據易分析 | 部分網站須要僞造來源和身份,IP容易被封(不斷更換代理能夠解決) |
切記,分析以前必定要查看數據網站的robots.txt文件和申明,是否容許爬取chrome
咱們發現基金概況頁面數據服務端渲染,所以須要經過headless獲取,並且每一個頁面對應的get參數爲該基金編碼,所以第一步獲取的基金列表的編碼能夠用來拼湊基金概況連接數據庫
咱們發現能夠經過接口獲取,有效參數爲基金編碼,頁碼,每頁數以及時間段json
經過對網站分析,創建sql表,基金列表和基金淨值表瀏覽器
分析完數據來源網站並建完數據表,接下來開始擼代碼,go!!!安全
因爲須要將爬取的數據存入數據庫,因此對mysql進行了簡單的封裝bash
新建文件log.js封裝簡單的日誌打印
const log = console.log;
const chalk = require('chalk');
module.exports = {
info(s){
log(chalk.green(s));
},
warn(s){
log(chalk.yellow(s));
},
err(s){
log(chalk.red(s));
}
}
複製代碼
新建TransactionConnection.js對sql的事務進行async/await封裝
const log = require('./log');
class TransactionConnection{
constructor(conn){
this._connection = conn; //鏈接
this._isReleased = false; //改鏈接是否已釋放
}
/**
* @description: query封裝
* @param {type}
* @return:
*/
query(sql, values){
if( ! this._connection ){
return Promise.reject('當前MySQL鏈接已經釋放, 不能調用 query');
}
return new Promise( (resolve, reject) => {
this._connection.query(sql, values, function(err, results, fields){
if( err ){
return reject(err);
}
resolve({
results : results,
fields : fields
});
});
});
}
/**
* @description: 釋放鏈接
* @param {type}
* @return:
*/
release(){
if( ! this._isReleased && this._connection ){
this._connection.release();
this._isReleased = true;
this._connection = null;
log.info('鏈接已釋放')
}
}
/**
* @description: 銷燬鏈接
* @param {type}
* @return:
*/
destroy(){
if( this._connection ){
this._connection.destroy();
this._connection = null;
}
}
/**
* @description: 開始事務
* @param {type}
* @return:
*/
beginTransaction(){
if( ! this._connection ){
return Promise.reject('當前MySQL鏈接已經釋放, 不能調用 beginTransaction');
}
let that = this;
return new Promise( (resolve, reject) => {
this._connection.beginTransaction( function(err){
if( err ){
return reject(err);
}
resolve(that);
} );
});
}
/**
* @description: 提交事務
* @param {type}
* @return:
*/
commit(){
if( ! this._connection ){
return Promise.reject('當前MySQL鏈接已經釋放, 不能調用 commit');
}
let that = this;
return new Promise( (resolve, reject) => {
this._connection.commit( function(err){
if( err ){
return reject(err);
}
resolve(that);
} );
});
}
/**
* @description: 事務回滾
* @param {type}
* @return:
*/
rollback(){
if( ! this._connection ){
return Promise.reject('當前MySQL鏈接已經釋放, 不能調用 rollback');
}
let that = this;
return new Promise( (resolve, reject) => {
this._connection.rollback( function(err){
if( err ){
return reject(err);
}
resolve(that);
} );
});
}
}
複製代碼
新建mysqlFactory.js對sql進行簡單封裝
'use strict';
const mysql = require('mysql');
const log = require('./log');
const TransactionConnection = require('./TransactionConnection');
module.exports = function mysqlFactory(config){
const pool = mysql.createPool(config);
let singleton = null;
class MysqlClient{
/**
* @description: 獲取單例靜態方式
* @param {type}
* @return:
*/
static getSingle(){
if(! singleton){
singleton = new MysqlClient();
return singleton;
}
}
/**
* @description: 斷開數據庫鏈接
* @param {type}
* @return:
*/
close(){
return new Promise( function(resolve, reject){
pool.end(err => {
if(err){
log.err('斷開數據庫鏈接失敗');
return reject(err);
}
resolve();
})
})
}
/**
* @description: 獲取事務操做鏈接
* @param {type}
* @return:
*/
getConnection(){
return new Promise( function(resolve, reject){
//當前配置的是 鏈接池 模式, 直接從池子中獲取
pool.getConnection( function(err, connection){
if( err ){
log.err(`從MySQL鏈接池中獲取connection失敗: ${err}`);
return reject(err);
}
let conWrap = new TransactionConnection(connection);
resolve( conWrap );
});
});
}
/**
* @description: query
* @param {type}
* @return:
*/
query(sql, values){
return new Promise( (resolve, reject) => {
let finalSQL = sql;
if( arguments.length === 2 ){
finalSQL = mysql.format(sql, values);
}
// log.info(`執行MySQL的SQL語句: ${finalSQL}`);
pool.query(finalSQL, function(err, results, fields){
if( err ){
return reject(err);
}
resolve({
results : results,
fields : fields
});
});
});
}
/**
* @description: 格式化字符串
* @param {type}
* @return:
*/
format(sql, values){
return mysql.format(sql, values);
}
/**
* @description: 批量插入數據
* @param {type}
* @return:
*/
async insert(table, rows){
if (!Array.isArray(rows)) {
rows = [rows];
}
const fields = Object.keys(rows[0]), len = rows.length;
const template = `(${new Array(fields.length).fill('?').join(', ')})`;
const sql = rows.reduce((str, obj, idx) => {
const currentVal = fields.map(key => obj[key]);
if(idx >= len - 1) return `${str} ${this.format(template, currentVal)}`
return `${str} ${this.format(template, currentVal)}, `
}, `INSERT INTO ${table} (${fields.join(', ')}) VALUES`)
let out = await this.query(sql);
return out.results;
}
}
return MysqlClient;
}
複製代碼
因爲抓取數據牽扯到頁面抓取和接口抓取,決定使用urllib
進行接口請求,對頁面用puppeteer-core
進行瀏覽器模擬抓取
const urllib = require('urllib');
const path = require('path');
const puppeteer = require('puppeteer-core');
const {chromePath} = require('../config');
class Request{
constructor(){
this.browser = null;
this.page = null;
}
/**
* @description: 請求封裝
* @param {type}
* @return:
*/
curl(url, options){
return new Promise((resolve, reject)=>{
urllib.request(url, options, (err, data, res)=>{
if(err) return reject(err);
resolve({
status: res.statusCode,
data: data.toString()
})
})
})
}
/**
* @description: 打開瀏覽器並新建窗口
* @param {type}
* @return:
*/
async initBrowser(){
if(this.browser && this.page) return this.page;
this.browser = await puppeteer.launch({
executablePath: chromePath,
headless: true,
})
this.page = await this.browser.newPage();
}
/**
* @description: 打開頁面獲取數據
* @param {type}
* @return:
*/
async goPage(url, config, callback){
if(!this.page){
await this.initBrowser();
}
config = Object.assign({
timeout: 0,
waitUntil: 'domcontentloaded'
}, config);
await this.page.goto(url, config);
const result = await this.page.evaluate(callback);
return result;
}
/**
* @description: 關閉瀏覽器
* @param {type}
* @return:
*/
async close(){
if(this.browser || this.page){
await this.browser.close();
}
}
}
module.exports = new Request();
複製代碼
新建config/index.js增長配置文件,因爲數據比較敏感,因此將配置文件中的有關連接部分隱去
const path = require('path')
module.exports = {
// mysql配置
mysql: {
connectionLimit: 10,
host: '127.0.0.1',
user: 'root',
password: '123456',
database: 'fund',
charset: 'UTF8_GENERAL_CI',
timezone: 'local',
connectTimeout: 10000
},
// headless配置
chromePath: path.resolve(__dirname, '../Chromium.app/Contents/MacOS/Chromium'),
//數據接口配置
fund: {
fundList: {
url: '基金列表請求連接',
config: {
method: 'GET',
headers: {
referer: '請求來源僞造'
},
data: {
page: '1,50000'
}
}
},
fundDetail: (code) => '基金詳情頁面連接',
dayValue: {
url: '基金每日淨值連接',
getConfig: function (fundCode) {
return {
method: 'GET',
headers: {
referer: '請求來源僞造'
},
data: {
fundCode,
pageIndex: 1,
pageSize: 365 * 10,
startDate: '',
endDate: '',
}
}
}
}
}
}
複製代碼
將基金列表先寫入json文件,後續和詳情一塊兒寫入基金列表數據表
const mysqlFactory = require('./lib/mysqlFactory');
const request = require('./lib/Request');
const fs = require('fs-extra');
const moment = require('moment');
const {
mysql,
fund
} = require('./config');
const log = require('./lib/log');
// 獲取基金列表
async function getFundList() {
const {
url,
config
} = fund.fundList;
const {
data,
status
} = await request.curl(url, config);
if (status === 200) {
eval(data); // 該接口是jsonp直接執行獲得變量db
const {
datas
} = db;
log.info(`基金列表總共${datas.length}條數據,最後一條的名字爲${datas[datas.length-1][1]}`);
const result = datas.map(e => {
return {
code: e[0],
name: e[1]
};
})
await fs.outputJSON('data.json', JSON.stringify(result));
log.info(`基金列表寫入完成!!!`);
return result;
} else {
log.err('獲取基金列表出錯')
process.exit(1);
}
}
複製代碼
unction _callback() {
const tr = document.querySelectorAll('.info tr td');
const [establishDate, total] = tr[5].textContent && tr[5].textContent.split('/');
return {
type: tr[3].textContent || '暫無',
publicDate: tr[4].textContent || '暫無',
establishDate: establishDate.trim() || '暫無',
total: total.trim() || '暫無',
company: tr[8].textContent || '暫無',
trusteeship: tr[9].textContent || '暫無',
manager: tr[10].textContent || '暫無',
}
}
// 獲取基金詳情
async function getFundDetail(list = [], sql) {
try {
for (let [index, {
code,
name
}] of list.entries()) {
log.info(`開始獲取第${index}條數據,code爲${code}`)
const url = fund.fundDetail(code);
const result = await request.goPage(url, {}, _callback);
await sql.insert('fund_list', Object.assign({
name,
code
}, result))
}
} catch (err) {
log.err(`插入基金列表發生錯誤:${err}`);
process.exit(1);
}
}
複製代碼
因爲數據量比較大,作了簡單的併發50個請求和一次插入2萬條數據的sql處理,固然你們也能夠採用worker_threads
和child_process
以及proxy
處理
async function getConcurrencyData(arrs, logs) {
log.info(`開始獲取第${logs.index.join(', ')}條數據的歷史淨值,code爲${logs.code.join(', ')}`);
try {
const result = await Promise.all(arrs);
if (result.every(res => res.status === 200)) {
let allData = [];
result.forEach((single, idx) => {
const {
Data
} = JSON.parse(single.data);
const {
LSJZList
} = Data;
log.info(`第${logs.index[idx]}條的數據總量爲${LSJZList.length}`);
const dealResult = LSJZList.map(item => {
return {
code: logs.code[idx],
FSRQ: item.FSRQ ? moment(item.FSRQ).valueOf() : '暫無', //淨值日期
DWJZ: item.DWJZ || '暫無', // 單位淨值
LJJZ: item.LJJZ || '暫無', // 累計淨值
JZZZL: item.JZZZL || '暫無', //日增加率
SGZT: item.SGZT || '暫無', //申購狀態
SHZT: item.SHZT || '暫無', //贖回狀態
FHSP: item.FHSP || '暫無', //分成
}
});
allData = [...allData, ...dealResult];
})
return allData;
} else {
throw new Error('獲取基金淨值失敗')
}
} catch (err) {
log.err(err);
process.exit(1);
}
}
async function getDayValue(list = [], sql) {
try {
const {
url,
getConfig
} = fund.dayValue, arrs = [], logs = {
index: [],
code: []
}, len = list.length;
for (let [index, {
code
}] of list.entries()) {
// if(index < 8450) continue;
if (arrs.length >= 50 || index >= len-1) {
const allData = await getConcurrencyData(arrs, logs);
// 每次插入500條
while (allData.length > 0) {
console.log(allData.length)
await sql.insert('fund_value', allData.splice(0, 20000));
};
// 清空
arrs.splice(0, arrs.length, request.curl(url, getConfig(code)));
logs.index.splice(0, logs.index.length, index);
logs.code.splice(0, logs.code.length, code);
} else {
logs.index.push(index);
logs.code.push(code);
arrs.push(request.curl(url, getConfig(code)));
}
}
} catch (err) {
log.err(`插入基金淨值發生錯誤:${err}`);
process.exit(1);
}
}
複製代碼
async function run() {
const sql = new(mysqlFactory(mysql));
const data = await getFundList();
await getFundDetail(data);
await getDayValue(data, sql);
await sql.close();
}
run()
複製代碼
若是對代碼要求嚴格的能夠採用爬去出錯process.exit()
監聽,重啓程序來從斷開的數據繼續爬去(這也是當時爲啥將基金列表保存一份在json文件中的緣由,以便於出錯重啓程序繼續存斷開的條數繼續爬去)