記兩件「會門腳本語言真香」的小事 | Python 主題月

本文正在參加「Python主題月」,詳情查看 活動連接java

這兩件小事發生在幾周前了,一直想記錄下,卻又一直擱置,今天抽點時間寫一下~python

小事一

① 原由

最近兩週公司項目暫時沒啥大的新需求,有時間整下技術規劃,跟組長商議後決定:對APP作下 啓動優化,就是:應用啓動、頁面加載提速web

說到頁面加載提速,那麼問題來了 → 頁面那麼多,怎麼知道有哪些頁面須要優化呢?正則表達式

公司APP的設計是單Activity多Fragment,那有多少Fragment呢,這時會門腳本語言的好處就來了,直接寫個腳本遞歸遍歷文件夾統計下就知道了:chrome

import os


def search_all_fragment(path):
    os.chdir(path)
    items = os.listdir(os.curdir)
    for item in items:
        path = os.path.join(item)
        # 獲取路徑分割後的最後部分,即文件名
        path_split_last = path.split(os.path.sep)[-1]
        # 判斷是否爲目錄,是往下遞歸
        if os.path.isdir(path):
            print("[-]", path)
            search_all_fragment(path)
            os.chdir(os.pardir)
        # 由於項目裏用到了ARouter會生成對應的Fragment,不在統計範疇中,要過濾掉
        elif 'Fragment' in path_split_last and '$$' not in path_split_last:
            print("[!]", path)
            # 只保存.java和.kt的文件
            if path_split_last.endswith('.java') or path_split_last.endswith('.kt'):
                result_list.append(path)
        else:
            print('[+]', path)


if __name__ == '__main__':
    result_list = []
    search_all_fragment(r'項目文件路徑')
    print(result_list)
    print('共有Fragment:', len(result_list), "個")
複製代碼

腳本一跑就知有沒有:數據庫

行吧,一共412個Fragment,用腳本秀了一波,迴歸正題,怎麼知道哪些頁面須要優化。json

巧了,恰好自家全埋點上有作 渲染時間埋點 的日誌上報,且不關心具體的實現方案是否靠譜,開打Kibana,輸入下篩選條件(僅查看這類Event),部分日誌以下:瀏覽器

② 是頁面的路徑,取最後的Fragment就好,③ renderCost,這裏就是渲染時間了,拿到這兩個數據就行。服務器

接着,怎麼分析這些日誌得出有哪些待優化頁面?拍拍腦門跟組長定了個不怎麼靠譜的方案:markdown

統計全部Fragment的渲染時間,計算平均數,而後從長到短排序,優先優化渲染時間長的頁面。

早上討論完,看了下過去一年渲染類型的日誌共有 53068377 條記錄,下午去看病的路上就開始構思該怎麼搞了,三條思路:

  • 一、Kibana支持保存查詢結果導出成csv的,直接導出csv,調下csv庫或pandas讀取解析一波;
  • 二、有數據庫權限的話,直接查詢出全部日誌,導出json或scv;
  • 三、抓包或模擬訪問抓取數據保存到本地,而後再作批處理;

事實證實留多點後手是沒錯的,前兩個思路+第三個思路的前半段都GG了,聽我娓娓道來~

② 思路一 ×

次日一早到公司,準備導出csv,看別人發的教程很簡單,就三步:

輸入查詢條件查詢 → 得出查詢結果後Save → 生成CSV

可是,我死活都找不到Save,故生成CSV的按鈕一直是灰的(點不了):

em...難道是權限的問題?換上了組長給的有查數據庫權限的帳號,同樣不醒,難道是要 開啓這樣的配置 ,點擊Kibana的設置,各類沒權限,因而去找後臺大佬,獲得的回覆是:

不能開這個,導幾百條几千條還好,導個幾百萬條服務器頂不住直接就掛了,有風險,因此把這個功能禁用了。

思路一慘遭滑鐵盧...


③ 思路二 ×

上思路二,有數據庫權限的帳號,執行查詢語句後走腳本導出,寫兩句簡單SQL條件查詢語句還不是手到擒來!

現實跟個人認知出了點 誤差,原來這個數據庫權限只是:能夠用Kibana的Dev Tools,在上面拼接json字符串查詢而已:

em...也不是不能用,抓包發現bool塊的數據於篩選日誌請求提交的數據相同,複製粘貼一波,而後把size改爲1000000,執行下康康:

em...返回了錯誤信息,大概意思是說一次最多隻能查10000條,若是一次查更多隻能去改配置。

臥槽,直接裂開,我還想着複製粘貼保存下Json還好,5306w條數據啊,手動複製粘貼,來算算要多久:

  • 修改查詢條件(起始日期時間和結束日期時間) → 10s
  • 點擊查詢等待查詢結果顯示 → 20s之內
  • 新建文件,複製粘貼,保存輸入文件名 → 30s

每存1w條數據,我要花至少1分鐘,換算成時,獲取完這些數據要多少小時:5306/60≈88.5h,換算成標準工做日(8h),須要11天多一點,這還沒算休息時間呢,要 花兩個多星期 重複作這樣重複的事,任誰也頂不住啊!!!

方案二也跪了...


④ 方案三 × √

em...抓下包?分析參數,而後寫爬蟲抓下,抓了幾回請求後我就放棄了,Cookie裏有個sid每請求一次變一次:

並且跟響應頭Set-Cookie返回的不同,短期內搗鼓怎麼構造的顯然不可能,唉只能上看上去最low的模擬用戶訪問瀏覽器了。

Tips:後面閒下來發現,Set-Cookie返回的只是Cookies裏的一部分,登陸後拿到cookies,而後本身每次請求後替換這部分就好~

把模擬步驟拆分下:

  • 一、打開登陸頁 → 等待加載完 → 填充帳號密碼 → 點擊登陸
  • 二、等待頁面跳轉主頁加載完 → 點擊左側Dev Tools圖標
  • 三、等待頁面加載完 → 清空左側查詢Json → 填充新的查詢Json
  • 四、點擊發送請求 → 等待右側查詢結果 → 選中查詢結果 → 保存到本地文件

怎麼方便怎麼來,筆者直接把查詢結果存txt裏了,模擬訪問用Selenium,直接開搞:

import time

from selenium import webdriver

base_url = 'http://kibana.xxx.xxx'
login_url = base_url + '/login'
login_data = {
    'password': 'xxx',
    'username': 'xxx',
}


# 初始化瀏覽器
def init_browser():
    chrome_options = webdriver.ChromeOptions()
    chrome_options.add_argument(r'--start-maximized')    # 開始就全屏
    return webdriver.Chrome(options=chrome_options)


# 模擬登陸
def login():
    browser.get(login_url)
    time.sleep(10)
    inputs = browser.find_elements_by_class_name('euiFieldText')
    inputs[0].send_keys(login_data['username'])
    inputs[1].send_keys(login_data['password'])
    submit = browser.find_element_by_xpath('//button[1]')
    submit.click()


if __name__ == '__main__':
    browser = init_browser()
    login()
複製代碼

用過selenium的朋友可能或說:添加下述配置設置下用戶數據目錄,下次打開瀏覽器訪問處於登陸態,就不用從新登陸了:

chrome_options.add_argument(r'--user-data-dir=D:\ChromeUserData')   
複製代碼

但實際的狀況是設置了沒用,仍是跳轉了登陸頁,我也不知道爲何,索性每次跑腳本都登陸下吧...

登陸成功後,稍待片刻會跳轉到主頁,等待加載完畢,點擊左側這個圖標,這沒有采用顯式或隱式等待的方式,而是笨方法休眠死等~

login()函數最後調下下面這個方法:

# 訪問主頁點擊Tab
def click_tab():
    time.sleep(8)   # 假死等待頁面加載完畢
    browser.find_element_by_xpath('//ul[3]/li[2]').click()
複製代碼

接着到輸入區域寫入文字:

Elements定位到目標位置:

臥槽,好像有點難搞啊,不是普通的文本輸入框,獲取外層ace_content的div,嘗試send_keys:

def set_left_text():
    inputs = browser.find_element_by_xpath('//div[@class="ace_content"]')
    inputs.send_keys('測試文本')
複製代碼

果真報錯:

不能直接設置文本就只能另闢蹊徑了,心生一計

點擊最後的遊標,而後一直按backspace鍵清空,接着模擬鍵盤輸入一個個字母敲進去

改動後的代碼

# 設置左側文字
def set_left_text():
    time.sleep(5)
    cursor_div = browser.find_element_by_xpath('//div[@class="ace_cursor"]')
    cursor_div.click()
    action_chains = ActionChains(browser)
    for i in range(0, 500):
        action_chains.context_click(cursor_div).send_keys(Keys.BACKSPACE).perform()
    action_chains.context_click(cursor_div).send_keys('GET _search' + str(search_dict)).perform()
複製代碼

清空後輸入,有點鬼畜:

這當中其實作了不少無效操做,按回退鍵500下,實際上字符沒那麼多,還有得等它把字敲完,得辦法改進下。

又心生一計粘貼複製,實現起來就是:

往剪切板寫入本次要查詢的字符串 → 點擊遊標或內容得到焦點 → Ctrl+A全選內容 → 回退 → Ctrl+V粘貼內容

代碼實現一波:

def set_left_text():
    time.sleep(5)
    input_div = browser.find_element_by_xpath('//div[@class="ace_content"]')
    input_div.click()
    action = ActionChains(browser)
    action.key_down(Keys.CONTROL).key_down('a').key_up('a').key_up(Keys.CONTROL).perform()
    action.key_down(Keys.BACKSPACE).key_up(Keys.BACKSPACE).perform()
    action.key_down(Keys.CONTROL).key_down('v').key_up('v').key_up(Keys.CONTROL).perform()
複製代碼

看看效果:

能夠的,模擬點擊運行的小按鈕了:

# 點擊查詢按鈕
def click_submit():
    submit_button = browser.find_element_by_xpath('//button[@data-test-subj="sendRequestButton"]')
    submit_button.click()
複製代碼

接着到右側查詢結果,直接處理有些麻煩,獲取內容結點,遞歸遍歷全部子節點,提取文本去空格換行等,最後拼接輸出。

又又心生一計

能不能攔截selenium瀏覽器接收的請求,對特定請求,直接拿響應結果寫入

還真能夠,經過中間人代理的方式,此處使用 browsermob-proxy,下載完把庫拷貝到項目中:

# 開啓代理
server = Server(os.path.join(os.getcwd(), r'browsermob-proxy-2.1.4\bin\browsermob-proxy'))
server.start()
proxy = server.create_proxy()

# chrome加下配置
chrome_options.add_argument('--proxy-server={0}'.format(proxy.proxy))

# 抓包前:
proxy.new_har(options={
    'captureContent': True,
    'captureHeaders': True
})

# 抓包後過濾特定請求,並把內容保存到本地文件中:
def save_log(date_str, index):
    for entry in proxy.har['log']['entries']:
        if entry['request']['url'].endswith('path=_search&method=GET'):
            log_file_path = os.path.join(out_dir, date_str + '_' + str(index) + '.txt')
            with open(log_file_path, "w+", encoding='utf-8') as f:
                f.write(str(entry['response']['content'])
                    .replace("\n", '').replace("\\n", "").replace(' ', ''))
            print("日期日誌保存完畢:", log_file_path)
複製代碼

嘔吼,完美,接着補齊剪貼板寫入,以及查詢日期的構造了:

def set_copy_text(content):
    w.OpenClipboard()
    w.EmptyClipboard()
    w.SetClipboardData(win32con.CF_UNICODETEXT, content)
    w.CloseClipboard()

# 構造生成一個從20200709到今天的日期
def init_date_list(begin_date, end_date):
    date_list = []
    begin_date = datetime.datetime.strptime(begin_date, "%Y%m%d")
    end_date = datetime.datetime.strptime(end_date, "%Y%m%d")
    while begin_date <= end_date:
        date_str = begin_date.strftime("%Y-%m-%d")
        date_list.append(date_str)
        begin_date += datetime.timedelta(days=1)
    return date_list
複製代碼

最後,就是每次請求時更新請求參數寫入剪貼板,打開代理抓包:

def input_query_content():
    try:
        for pos, date in enumerate(str_date_list[]):
            for index in range(1, 3):
                input_div = browser.find_element_by_xpath('//div[@class="ace_content"]')
                input_div.click()
                action = ActionChains(browser)
                print(str(pos + 1) + "、請求日期:" + date + "-" + ("上半天" if (index == 1) else "下半天"))
                update_dict_and_proxy(date, index)
                action.key_down(Keys.CONTROL).key_down('a').key_up('a').key_up(Keys.CONTROL).perform()
                set_copy_text('GET _search' + '\n' + str(search_dict).replace("'", '"'))
                time.sleep(1)
                action.key_down(Keys.BACKSPACE).key_up(Keys.BACKSPACE).perform()
                action.key_down(Keys.CONTROL).key_down('v').key_up('v').key_up(Keys.CONTROL).perform()
                submit_button = browser.find_element_by_xpath('//button[@data-test-subj="sendRequestButton"]')
                submit_button.click()
                time.sleep(20)
                save_log(date, index)
    except Exception as e:
        print(e)
        proxy.close()
        browser.close()

# 更新請求字典及新建抓包
def update_dict_and_proxy(date_str, index):
    gte_str = date_str + 'T00:00:00.000Z' if (index == 1) else date_str + 'T12:00:00.000Z'
    lte_str = date_str + 'T12:00:01.000Z' if (index == 1) else date_str + 'T23:59:59.000Z'
    search_dict['query']['bool']['filter'][20]['range']['time']['gte'] = gte_str
    search_dict['query']['bool']['filter'][21]['range']['time']['lte'] = lte_str
    proxy.new_har(options={
        'captureContent': True,
        'captureHeaders': True
    })
複製代碼

腳本一跑,就能夠開始掛機了,建議找一臺空閒的電腦掛着,由於腳本會 佔用剪切板,會影響正常工做哦!另外,這裏把查詢時間劃分紅上跟下,是想盡量多的查詢到所需數據。

會門腳本語言真香啊!把本來的工做委託給了自動化腳本,效率也高一倍,2w條數據只要1分鐘,採集完全部數據的耗時驟降至少44個小時,機器還能24小時跑,因此其實只須要兩天,還不影響我作 工(mo)做(yu),固然還能夠再優化,將腳本部署到多臺機子上同時執行,一減再減小,原本兩個多星期的活,一天不到幹完。還不香?

更幸運的是,實際上有效數據只有600w條,原來渲染事件的埋點是前年10月份才加的,因此單機跑了5個鐘就把數據爬完了。

早上寫腳本,下午就跑完,中途把統計腳本也寫下,這一Part就很Easy了,正則表達式 yyds

data_pattern = re.compile('pagePath":"(.*?)".*?"renderCost","value":(.*?)}', re.S)
複製代碼

讀取文件內容,全文匹配,遍歷匹配結果,依次對兩個分組作處理,而後把渲染時間寫入頁面.txt文件中:

for log in log_list:
    print("解析文件:", log)
    with open(log, 'r', encoding='utf8') as f:
        content = f.read()
    data_result = data_pattern.findall(content)
    if data_result is not None:
        for data in data_result:
            page_name = ''
            page_render_time = 0
            page_split = data[0].split('-')
            if page_split is not None and len(page_split) > 0:
                other_page_split = page_split[-1].split(",")
                if other_page_split is not None and len(other_page_split) > 0:
                    page_name = other_page_split[-1]
                else:
                    page_name = page_split[-1]
            else:
                other_page_split = data[0].split(",")
                if other_page_split is not None and len(other_page_split) > 0:
                    page_name = other_page_split[-1]
                else:
                    page_name = data[0]
            page_render_time = data[1].replace('"', '')
            if page_name == 'i':
                print(data)
                break
            cp_utils.write_str_data(page_render_time, os.path.join(page_dir, page_name + ".txt"))
複製代碼

寫入示例以下:

再接着又是遍歷文件夾子,字典存儲數據 (頁面:平均值),保存統計數據:

def average_time(file_path):
    page_dir_split = file_path.split(os.path.sep)
    if page_dir_split is not None and len(page_dir_split) > 0:
        result_average = 0
        render_time_list = cp_utils.load_list_from_file(file_path)
        for render_time in render_time_list:
            if render_time != '0':
                result_average = int((result_average + int(render_time)) / 2)
        print(page_dir_split[-1] + "結果計算完畢...")
        cp_utils.write_str_data(page_dir_split[-1] + "-" + str(result_average), result_file)
    else:
        print("異常退出")

複製代碼

最後按照倒序輸出到文件中:

def order_list(file_path):
    time_info_list = cp_utils.load_list_from_file(file_path)
    order_list_result = sorted(time_info_list, key=lambda x: int(x.split('-')[-1]), reverse=True)
    cp_utils.write_list_data(order_list_result, result_order_file)
複製代碼

這樣就能夠獲得頁面的平均渲染時間了~固然,後面以爲看這個平均數不靠譜,由於影響變量太多了:

設備硬件不一樣,加載速度確定是有差距的,有些用戶活躍,有些不活躍,還有版本等等...

不靠譜但又想依賴這個全埋點的數據作點什麼,想了想又定了另一個方案:

按照頁面使用頻度排序,優先針對用戶經常使用的頁面進行優化,好比這個版本優化2個經常使用頁面,挑幾個典型特定設備進行跟蹤,發版一段時間後那這兩個頁面的新數據跟舊數據作對比,就能夠對優化的收益作量化了~

固然,這些是後話了,不敢想象,若是我不是Python玩得還能夠的話,該怎麼解決這些問題...


小事二

第二次小事比起第一件來講就小巫見大巫了,基友小A,讓我幫忙給他搞點行業報告,一會兒給了好幾個網址,開頭幾個還好,就是模擬請求,解析頁面,拿個ID啥的拼接,得出真實的PDF下載連接,而後下載。

後面的幾個網站,就很雞賊了,直接把PDF的每一頁做爲圖片貼出來,如:

想把圖片轉成PDF,若是不會腳本語言,須要一張張圖片右鍵保存到本地,最後用合成工具把圖片合成成PDF。

不過巧了,我恰好 會點Python,因此這件事就變成了爬圖片,找個圖片轉PDF的庫了,找到個 img2pdf庫,API簡單,用着還行,不過若是圖片有Alpha通道,會直接報錯,因此須要本身去下,簡單,用 pillow庫 就能夠作:

from PIL import Image, ImageFont, ImageDraw

# 批量對RGBA圖片進行轉換,同時刪除無效文件
def remove_pic_alpha(pic):
    try:
        img = Image.open(pic)
        if img.mode == "RGBA":
            img = img.convert("RGB")
            img.save(pic)
            print("轉換圖片:", pic)
    except Exception as e:
        print("文件異常,移除:" + pic)
        os.remove(pic)
複製代碼

簡單轉換代碼以下:

import img2pdf

try:
    with open(pdf_path, "wb+") as f:
        f.write(img2pdf.convert(pic_path_list))
    print("輸出PDF文件:", pdf_path)
except Exception as e:
    print("發生異常:", pdf_path)
複製代碼

後面發現了一個大塊頭,851頁,總共13950個有效的報告,有一些報告的頁面結構不是純圖片,而是相似於:文字-圖片-文字-圖片這樣,不想把文字漏掉,能夠把它轉換成圖片,就是利用 pillow庫,按照必定的規則,把文字繪製到一個白色的背景上。

def font2pic(content, pic_path):
    # 先轉換爲列表
    content_list = list(content)
    i = 30
    while i < len(content_list):
        content_list.insert(i, "\n")
        i += 30
    content_after = ''.join(content_list)
    im = Image.new("RGB", (960, 720), (255, 255, 255))
    dr = ImageDraw.Draw(im)
    font = ImageFont.truetype(os.path.join("fonts", "msyh.ttf"), 24)
    dr.text((50, 50), content_after, font=font, fill="#000000")
    im.save(pic_path)
複製代碼

爬取處理頁面是,還得記錄順序,而且把它做爲圖片名,一個爬取的臨時文件示例以下:

而後就是遍歷文件每一行,文字生成圖片,圖片連接執行下載(也能夠批量下載後替換url),得出的PDF樣例以下:

文本的渲染比較無腦,不是很美觀,具體的渲染規則還得從長計議下,不過這些也是後話了,數據到手,你想怎麼處理,均可以~


小結

當咱們須要作一些重複性任務,且量比較大的時候,腳本的優點就出來了:只要程序足夠穩健24小時不間斷跑還不會累,把腳本部署到多臺機子上,還能夠縮短完成時間。固然,腳本是死的,人是活的,有些問題沒考慮到,腳本跑到中途就掛了,因此大型任務還須要引入 告警及日誌系統,以便及時跟進及對錯誤進行排查能夠快速定位到問題。

腳本語言除了Python還有不少:Windows的.batLinux的.shC ShellJavaScriptLua 等。而筆者偏心Python的緣由主要仍是由於它的 類庫豐富,你能想到的基本都能找到對應的第三方庫。

人生苦短,我用Python~

相關文章
相關標籤/搜索