python時間序列數據的對齊和數據庫的分批查詢

欲直接下載代碼文件,關注咱們的公衆號哦!查看歷史消息便可!python

0. 前言

在機器學習裏,咱們對時間序列數據作預處理的時候,常常會碰到一個問題:有多個時間序列存在多個表裏,每一個表的的時間軸不徹底相同,要如何把這些表在時間軸上進行對齊,從而合併成一個表呢?尤爲是當這些表都存在數據庫裏,並且超級超級大的時候,怎樣才能更高效地處理呢?mysql

上一篇文章中,已經介紹過了如何在Python中建立數據庫鏈接以及對數據庫進行增刪改查、分組聚合以及批量讀取和處理等操做。程序員

今天就以上面的問題爲導向,手把手教你如何用Python一步步實現相應的功能。講解的內容主要有:sql

  1. 如何實現兩個有序序列的合併;數據庫

  2. 延伸到兩個時間序列數據的對齊;express

  3. 從數據庫中自動循環分批讀取數據。編程

須要掌握的主要編程技巧包括:服務器

  • 用函數實現特定功能數據結構

  • 用類對功能進行封裝app

  • 實現基本的迭代器

使用的工具及版本:Python3.7,MySQL8.0, Jupyter Notebook

1. 有序序列的合併

本節主要介紹如何實現將2個有序(默認從小到大排序)序列合併成一個序列,同時介紹Python中基本的循環結構。

其實在Python中當然有相應的方法能夠很容易地作到(例如集合的set.union()方法),這裏之因此要本身實現,主要是要理解這種思想,爲後文的功能實現作鋪墊。

1.1 Python知識點之條件測試

if 語句的語法結構爲:

if boolean_expression1: #若是知足條件1,則執行suite1代碼塊
    suite1
elif boolean_expression2: #若是知足條件2,則執行suite2代碼塊
    suite2
else: #不然執行else_suite代碼塊
    else_suite

其中elifelse爲可選。

1.2 Python知識點之循環控制

1.2.1 while循環

(1) 循環機制及應用場景

  • 用於編寫通用迭代結構

  • 頂端測試爲真時執行循環體,並會重複屢次測試直到爲假後結束循環

(2) 語法格式

while boolean_expression: #若是測試爲真,則執行while_suite代碼塊(循環執行)
    while_suite
else: #直到測試爲假,則執行一遍else_suite代碼塊以後結束循環
    else_suit

其中else爲可選。

1.2.2 for 循環

(1)循環機制及應用場景

  • 通用的序列迭代器,用於遍歷任何有序的序列對象內的元素

  • 可用於字符串、元組、列表和其它的內置可迭代對象,以及經過類所建立的新對象

(2)語法格式

for expression in iterable:
    for_suite
else:
    else_suite

其中else爲可選。

🍎tips1: for循環比while循環執行速度快的多,能用for的儘可能使用for

1.3 Python知識點之函數

函數是python爲了代碼最大程度地重複利用最小化冗餘而提供的基本程序結構。

它可以將整塊代碼巧妙地隔離成易管理的一小塊,把重複代碼放在函數中,而不是進行大塊的複製,這是一個程序員應該具有的基本技能

1.3.1 建立函數

使用def語句定義函數,而且函數都會有一個返回值,默認爲None,也能夠用return語句明確指定返回值。

語法格式:

def funtionName(parameters): #定義函數名,設置函數的參數
    suite #函數體
    return something

1.3.2 調用函數

在Python中,函數是一個可調用對象,它有一個內置的方法,叫call

咱們在寫程序的時候,會碰到一類錯誤:"xxx" object is not callable,這就表示這個對象是不可調用的。

調用函數的方法也很簡單,在函數名後面加小括號(),有參數的時候在括號中傳入參數便可:funtionName(par1,..)

🍎tips2:python中定義函數名的時候,一般第一個單詞均小寫,第二個單詞開始一般首字母大寫,例如,printName,calculateSum

🍎tips3:寫函數的時候,儘可能寫得簡單,功能儘量單一,不要寫得又長又複雜

1.4 手動實現有序序列的合併

注:在 Python 中,list(列表)是最經常使用、最核心的數據結構之一,它是一種序列類型,能夠接收各類類型的元素,也能夠同時接收不一樣類型的元素。此外,list 仍是一個可迭代對象。本文的演示多采用 list 結構組織數據。

- 解題思路

假設有兩個序列:a = [1,3,7,9,11], b = [3,4,7,8],怎麼合併成一個序列?

思路:用第3個序列 c 記錄結果,同時對 a、b 進行遍歷,按必定的順序依次將 a、b 中的元素添加到 c 中;遍歷的方法是用指針進行索引。

  1. 初始狀態:c=,idx_a=0,idx_b=0;

  2. a[0]=1, b[0]=3, a[0]<b[0]
    → 將 a[0] 添加到 c ,idx_a=idx_a+1
    → 此時 c=[1], idx_a=1, idx_b=0

  3. a[1]=3, b[0]=3, a[0]=b[0]
    → 將 a[1]或b[0] 添加到 c ,idx_a=idx_a+1, idx_b=idx_b+1 
    → 此時 c=[1,3], idx_a=2, idx_b=1;

  4. a[2]=7, b[1]=4, a[0]>b[1]
    → 將b[1] 添加到 c , idx_b=idx_b+1
    → 此時 c=[1,3,4], idx_a=2, idx_b=2;

  5. a[2]=7, b[2]=7, a[2]=b[2]
    → 將a[2]或b[2] 添加到 c , idx_a=idx_a+1, idx_b=idx_b+1 
    → 此時 c=[1,3,4,7], idx_a=3, idx_b=3;

  6. a[3]=9, b[3]=8, a[3]>b[3]
    → 將b[3] 添加到 c , idx_b=idx_b+1 
    → 此時 c=[1,3,4,7,8], idx_a=3, idx_b=4;

  7. idx_b=4超出了b的索引範圍,及idx_b=len(b),但此時idx_a<len(a),因此將 a[idx_a:] 直接添加到 c 
    → 此時c=[1,3,4,7,8,9,11],結束,輸出結果c 。

- 實現代碼

def orderedListUnion(a, b):
    '''
    合併兩個按從小到大排好序的序列a,b
    '''

    # 設置循環初始值
    idx_a = 0
    idx_b = 0     
    c = []

    # 聲明變量len_a,len_b,指向序列a,b的長度,用來控制循環條件
    len_a = len(a) 
    len_b = len(b)        
    while (idx_a < len_a) and (idx_b < len_b):

        #若兩個元素相等,則將該元素添加到c,且兩個idx同時右移:
        if a[idx_a] == b[idx_b]:
            c.append(a[idx_a])
            idx_a += 1
            idx_b += 1

        #若不相等,取較小的元素,且較小元素的idx右移
        elif a[idx_a] < b[idx_b]:
            c.append(a[idx_a])
            idx_a += 1
        else:
            c.append(b[idx_b])
            idx_b += 1

    # 當一個序列遍歷結束後,跳出循環,將未遍歷完的序列的剩餘元素添加到c
    if idx_a == len_a:
        c = c + b[idx_b:]
    if idx_b == len_b:
        c = c + a[idx_a:]

    return c

# 測試
a = [1,3,7,9,11]
b = [3,4,7,8]
print(orderedListUnion(a,b))

輸出結果:

二、時間序列的對齊

2.1 問題場景

前面的練習僅僅做爲熱身,如今回到文章開頭的問題,假設一個更具體場景:

在醫院的ICU裏,須要持續觀察病人的各項生命指標。這些指標的採集頻率每每是不一樣的(例若有些指標隔幾秒採集一個,有些幾個小時採集一個,有些一天採集一個),並且有些是按期的,有些是不按期的,或者因爲某些緣由某些指標在某段時間上是缺失的,因此不一樣生命指標的時間序列數據在時間軸上的表現每每是不對齊的。

因此如今的問題是:

如何將存儲在不一樣數據表裏,且時間軸不一樣的兩個時間序列進行合併,對齊到同一個時間軸上?

舉例說明:

假設如今有2個數據表,分別記錄了某個病人某一天當中某些時刻的一些生命體徵指標:

表1:

表2:

能夠看到,兩張表的時間點有些是相同的,更多時候是不一樣的,如今咱們想把這兩條時間線併到一條時間軸上。

2.2 問題分析

這裏咱們將一張表的信息用一個 list 的形式來表示:

每一行記錄爲這個列表的一個元素,每行記錄用一個元組tuple (python中另外一個經常使用的數據結構,與list的區別在於list是可變的,而tuple是不可變的)來表示。例如表1的第一行,即列表的第0個元素表示爲('01:30', 128, 19)

每一個元組的第0個元素是這條記錄發生的時間點,也就是咱們用來索引的指針。

全部的時間點連起來就造成了一條時間軸,也就是表的第一列Time。每一個時間點上的多項生命指標能夠理解爲這個指針所帶的屬性。

ts1 = [('01:30',128,19),
       ('05:00',124,20),
       ('13:00',131,18),
       ('20:00',138,24),
       ('21:30',122,22)]

ts2 = [('01:30',7,129,60),
       ('09:30',6.5,112,63),
       ('12:00',8,135,74),
       ('13:00',8.5,110,60),
       ('20:32',7,78,49),
       ('22:00',7.5,96,55),
       ('23:30',6,124,59)]

這裏與1中 有序序列合併 的區別在於:

  • 前面的 list 的元素是個具體的數值,而這裏的 list 的元素是個 tuple;

  • 前面只須要比較當前指針所指的元素(數值)自己的大小,這裏須要比較的是當前指針所指的元素(tuple)的第0個元素的大小;

不過這只是形式上的區別,更重要、更核心的區別在於:

  • 前面的合併就是簡單地把元素拼在一塊兒,而這裏,進行對齊的時候,對於缺失的那一方,須要採起必定的措施。

例如,在01:30時刻,兩個表均有記錄,則合併的記錄爲('01:30',128,19,7,129,60);可是在05:00時刻,只有表1有記錄,表2沒有,那麼合併後的記錄應該是多少呢?

因此咱們須要採起一些策略,例如:

  • 直接用 None 表示,表示沒有:('05:00',124,20,None,None,None)

  • 用前一個時刻的記錄表示:('05:00',124,20,7,129,60)

  • 在前一條記錄和後一條記錄之間進行插值

具體應該根據實際的應用場景來選擇不一樣的處理方式。

2.3 代碼實現

本文先以第二個策略爲例:用前一個時刻的記錄表示。

因此這裏的代碼要比前面的,多設置一個變量pre,用來存儲上一條記錄。當指針移動到某個表沒有記錄的時刻時,就用pre來補上,而且pre也是跟着指針往前推移的。

因爲第一條記錄沒有上一條,因此把初始的pre設置爲None *(**這也是個小技巧,若沒有此項設置,則須要增長條件判斷)*

def tsAlign(x, y):
    '''
    x,y: lsit of tuple,
            每一個tuple表明一條記錄,tule的第0個元素爲這條記錄的id(eg,時間),
            list裏的tuple按照tuple的id從小到大進行排序

    把x和y在id上進行對齊:
        若x,y同時存在某個id,則將這兩個tuple進行合併;
        若x存在某個id而y不存在,則合併 x當前id對應的tuple 和 y小於當前id的最大id對應的tuple。

    return
        z: 對齊了x和y以後的list of tuple
    '''
    # 設置循環初始值
    i = 0 #index of x
    j = 0 #index of y
    z=[]  #store result of merging x and y   
    pre_x = (None,)*len(x)
    pre_y = (None,)*len(y)

    while (i<len(x)) and (j<len(y)):
        #若x當前的id小於y當前的id,則合併x當前的tuple和y的前一個tuple
        if x[i][0] < y[j][0]:
            z.append(x[i] + pre_y[1:])
            pre_x = x[i]
            i += 1
        #若x當前的id大於y當前的id,則合併x的前一個tuple和y當前的tuple
        elif x[i][0] > y[j][0]:
            z.append((y[j][0],) + pre_x[1:] + y[j][1:]) #注意:在定義tuple的時候,若只有一個元素,須要在元素後加個逗號
            pre_y = y[j]
            j += 1
        #若x當前的id等於y當前的id,則合併x當前的tuple和y當前的tuple
        else:
            z.append(x[i] + y[j][1:])
            pre_x = x[i]
            pre_y = y[j]
            i += 1
            j +=1

    while i < len(x):
        z.append(x[i] + pre_y[1:])
        i += 1

    while j < len(y):
            z.append((y[j][0],) + pre_x[1:] + y[j][1:])
            j += 1    

    return z

print(tsAlign(ts1, ts2))

輸出結果:

另外:用字典dict結構存儲的時候也很方便,dict 的 key 爲時間,value 爲各項生命指標組成的 list。須要注意的是,dict 是無序的,因此在進行遍歷的時候,須要將全部的key提取出來,先進行排列。(讀者可自行嘗試)

不過python裏其實還有個叫OrderDict的數據結構,就是個有序的字典,這裏也不作介紹。

3. 大型數據表的分批讀取

在前面的示例中,數據表的行和列都不多。若是當數據表很大的時候,直接把整張表讀進來,將會消耗巨大的內存,程序可能根本跑不起來。

一個很天然的想法是分批讀取並進行處理(前一篇文章中有相關的示例)。

也就是,能夠先把「讀取」+「處理」操做的功能封裝起來,再在外面套個循環,不斷地重複對應模塊的操做。

到這裏就須要跟你們講講Python中另外一個很是重要的概念——

3.1 Python知識點之類與對象

咱們常說有兩種程序設計的方式:面向過程和麪向對象。在Python中,這兩種編程方式均可實現。而面向對象封裝、繼承、多態性的三大特性,可使系統更加靈活、更加易於維護 。

3.1.1 對象的基本認識

首先須要理解 對象 的概念。

對象通常都由屬性+方法組成。你能夠這麼理解:屬性表示是什麼(變量或數據),方法表示能幹什麼(函數或功能)。

面向對象的一個特色就在於,把操做同一組數據的各類功能集成在一塊兒;對象的屬性就表示我要操做的這組數據,對象的方法就是我要怎麼操做這些數據。

而在有對象以前,必需要有類。

,就是具備同類屬性的對象,是個抽象的概念。而對象是由類實例化而來的。

同一個類實例化出來的不一樣對象,具備相同的方法和相同的屬性,但屬性的值不同。

好比,貓是一個類,是一個抽象的概念;而中華田園貓是實例化出來的具體對象,英國短毛貓是實例化出來的另外一個對象,這兩個對象都有本身的屬性(體型,毛長,毛色等),和相同的方法(會吃,會跑,會喵喵喵)

在咱們的問題中,也定義了這樣一個類,提供給它一個數據庫鏈接(屬性),它就能夠對這個數據庫的表進行增刪查改等各類操做(功能)。提供給它另外一個數據庫鏈接,又能夠對另外一個數據庫進行操做。每提供一個數據庫鏈接,就至關於實例化出一個對象。當數據存在多個數據庫中時,咱們就能夠實例化出多個對象,同時進行操做。

3.1.2 建立類

Python使用class關鍵字建立類,通常形式爲:

class ClassName(bases): # bases表示這個類是從哪一個類繼承而來的,即父類,爲可選
    'class documentation string' # 文檔字符串,爲可選
    data = value # 定義類變量
    def method(self, ...): # 定義類方法
        self.member = value

主要包括兩個部分:定義類變量 和 定義類方法

注意,在這裏定義類方法的方式比較獨特。

雖然說其實類方法就是函數,可是這個函數的首個參數必須爲self

由於類自己不能對方法進行調用,必需要實例化成對象了,對象才能調用方法。

因此這裏就意味着,這個方法的目的是對實例化對象進行操做,也就是說,self 的屬性只能被實例化對象本身使用,是私有的,咱們稱之爲「實例變量」。

相比之下,在方法外面定義的屬性,則是能夠被全部實例化對象共同使用,是公共的,咱們稱之爲「類變量」

在class語句內,任何賦值語句都會建立類屬性。

定義實例變量則須要採用一種特殊的方式,稱爲類的構造器初始化方法:__init__(),後面會舉例說明

🍎tips4:python中定義類名的時候,一般從第一個單詞開始,每一個單詞開始首字母大寫,例如,Animal,TableReader

3.1.3 對象實例化

建立實例化對象在其餘編程語言中通常使用關鍵字new,可是python裏面沒有這個關鍵字,而是用相似函數調用的方式:ClassName(args...)

這裏傳進去的參數會被__init__方法接收,成爲實例變量,也就是這個實例化對象私有的屬性。

3.1.4 代碼示例:類的建立和使用

'''代碼示例:類的建立和使用'''
# 定義類
class MyClass1():
    say = 'Hello' #定義類變量

    def __init__(self, name): # 類的構造函數
        self.name = name #定義實例變量

    def show(self): #定義類方法
        print(self.say, self.name)

#把類實例化成對象
obj1 = MyClass1('Tom') #傳入實例變量參數
print('obj1的變量say爲:', obj1.say) #self.say='Hello'
print('obj1的變量name爲:', obj1.name) #此時self.name='Tom'
print('obj1執行show()方法:')
obj1.show() #調用show()方法

print('--------------')

obj2 = MyClass1('Lisa') #傳入實例變量參數
print('obj2的變量say爲:', obj2.say) #self.say='Hello'
print('obj2的變量name爲:', obj2.name) #此時self.name='Lisa'
print('obj2執行show()方法:')
obj2.show() #調用show()方法

輸出結果:

在這裏,咱們定義了一個類,叫 MyClass1 ,而且由這個類實例化出來兩個對象,叫 obj1 和 obj2 。

從輸出結果咱們能夠看到,obj1 和 obj2 都有兩個變量:

  • 變量say:是相同的。它是定義在類方法外面的變量,是全部對象公共的,屬於類變量;

  • 變量name:是不一樣的。它是定義在__init__方法內的,是每一個對象私有的,屬於實例變量。

從輸出結果還能夠看到,obj1 和 obj2 都有相同的方法 show()``。

3.2 Python知識點之迭代器

再次如今回到咱們前面的需求:在數據庫中讀取併合並兩個超級大的數據表並進行必定的處理。

分解一下任務流程:

  1. 從數據庫中讀取一批數據

  2. 對該批數據進行處理

  • 2.1 對當前行進行處理

  • 2.2 判斷是否存在下一行:

  • 存在:跳到下一行,回到2.1

  • 不存在:回到1

發現了嗎,這裏存在兩個循環的過程:1是經過循環遍歷整個數據庫,2是經過循環遍歷每一個批次中的每一行。

這種遍歷咱們稱爲迭代(Iteration)——能夠說這是Python最強大的功能之一了。

3.2.1 迭代器的基本認識

迭代器是一個能夠記住當前遍歷位置的對象。(python裏面一切皆對象)

迭代器對象從集合的第一個元素開始訪問,直到全部的元素被訪問完結束。

有個專門的 iter() 函數,傳入一個可迭代對象,便可建立一個迭代器。舉個例子:

'''示例:迭代器測建立及調用'''
l = [1,2,3,4] #列表是可迭代對象
it = iter(l) #建立迭代器對象

print('經過for循環調用:')
for i in it: print(i)

it = iter(l)
print('經過next()方法調用:')
print(next(it)) #pythonz2中是it.next()
print(next(it))
print(next(it))
print(next(it))
print(next(it)) #遍歷結束,觸發StopIteration異常

輸出結果:

在這裏,咱們建立了一個迭代器,叫it,迭代器的調用方式通常有兩種:

  1. 經過for…in結構進行調用,對迭代器裏的元素逐個進行讀取

  2. 全部的迭代器都有個next()方法,就是用來逐個訪問迭代器內的元素的,調用一次就讀一個出來,直到結束。

從上面的結果咱們能夠看到,當迭代器內的元素所有遍歷完以後,繼續調用next()方法會觸發 StopIteration 異常。

因此這裏須要特別注意:迭代器只能往前不會後退。若是遍歷完想再遍歷一遍,就須要從新再建立一個迭代器。

3.2.2 在類中實現迭代器

若是要把一個類做爲一個迭代器使用的話,須要在類中實現兩個方法 __iter__() 與 __next__()

  • __iter__():返回一個特殊的迭代器對象,這個迭代器對象實現了 __next__()方法並經過 StopIteration 異常標識迭代的完成。

  • __next__():會返回下一個迭代器對象,每一次for循環都調用該方法(必須存在)

'''示例:在類中實現迭代器'''
class MyClass2():
    def __init__(self, start, end): 
        self.s = start
        self.e = end

    def __iter__(self): 
        '''
        @summary: 生成迭代對象時調用,返回值必須是對象本身,而後for能夠循環調用next方法

        '''
        return self

    def __next__(self): 
        '''
        @summary: 每一次for循環都調用該方法(必須存在)
        '''
        if self.s < self.e:
            x = self.s
            self.s += 1
            return x
        else:
            raise StopIteration

# 實例化出來的對象是個可迭代對象
it = MyClass2(1,5)
for i in it: print(i)

輸出結果:

3.3 代碼實現

知識點講的差很少了,如今就一步步來實現解決問題所須要的功能。

3.3.1 BufferTableReader版本

  1. 首先,咱們定義一個叫 BufferTableReader 的類(pass 爲佔位符,表示什麼也不作,這裏的做用是爲了演示代碼的完整性):
class BufferTableReader():
    pass

2. 在類中應該包含哪些參數呢?
簡單起見,先假設數據表不是在數據庫中,而是已經存在於當前工做空間中了:

# 初始化
    def __init__(self, data, batch_size ):
        self.data = data # 數據源
        self.bs = batch_size # 每次讀取的批次大小 
        self.buf = [] # 用來存儲當前批次的數據,初始化爲空 
        self.idx = 0 # 當前批次數據(self.buf)指針,初始化爲0
        self.offset = 0 # 數據源(data)指針,初始化爲0

3. 接着,實現從 data 中讀取一個batch_size 的數據的方法:

  • 判斷何時須要從data中讀取數據:

  • 當前批次數據已經處理完的時候,即self.idx==len(self.buf)時;

  • 讀取數據,需判斷從哪裏讀到哪裏:

  • 若是self.offset+batch_size沒有超出data的範圍,則讀取data[self.offset:self.offset+batch_size];

  • 若是self.offset+batch_size已經超出data的範圍,即data剩下的數據量已經小於一個batch_size,則直接讀取剩下的所有數據,即data[self.offset:];

  • 把讀取的batch數據存在self.buf中。

def readBatch(self):
        if self.idx==len(self.buf):
            slice_size=min(len(self.data)-self.offset,self.bs)#讀多少
            self.buff=self.data[self.offset: self.offset+slice_size]

            #更新指針
            self.offset+=slice_size
            self.idx=0

4.定義一個判斷data中是否還有下一行的方法:

  • 該方法首先會調用 readBatch() 方法

  • 若是當前的self.buf中還沒讀完,則顯然self.idx<len(self.buf)爲真,此時readBatch()中什麼也不作,且該方法返回 True;

  • 若是當前的 self.buf 恰好讀完,則self.idx==len(self.buf),此時readBatch()會讀取下一批次,更新self.idx=0:

  • 若是data中還有數據,則len(self.buf)>0,self.idx<len(self.buf)爲真,返回True;

  • 若是data中沒有數據了,則len(self.buf)=0,self.idx<len(self.buf)爲假,返回False。

def hasNext(self):
        self.readBatch()
        return self.idx < len(self.buf)

5.定義一個在當前批次self.buf中讀取行的方法:

def readNext(self):
        self.readBatch()
        if self.idx < len(self.buf):
            line = self.buf[self.idx]
            self.idx += 1
            return line

到這裏,已經能夠實現一些功能了,你能夠嘗試一下:

'''類BufferTableReader(未實現迭代功能)'''
class BufferTableReader():
    # 初始化
    def __init__(self, data, batch_size ):
        self.data = data # 數據源
        self.bs = batch_size # 每次讀取的批次大小 
        self.buf = [] # 用來存儲當前批次的數據,初始化爲空 
        self.idx = 0 # 當前批次數據(self.buf)指針,初始化爲0
        self.offset = 0 # 數據源(data)指針,初始化爲0

    def readBatch(self):
        if self.idx == len(self.buf):
            slice_size=min(len(self.data)-self.offset,self.bs) #讀多少
            self.buf=self.data[self.offset:self.offset+slice_size]

            #更新指針
            self.offset+=slice_size
            self.idx=0    

            # 爲了便於觀察,每讀完一個批次作一個標註
            print('-------')

    def hasNext(self):
        self.readBatch()
        return self.idx < len(self.buf)

    def readNext(self):
        self.readBatch()
        if self.idx < len(self.buf):
            line = self.buf[self.idx]
            self.idx += 1
            return line

#實例化
ts1 = [('01:30',128,19),
       ('05:00',124,20),
       ('13:00',131,18),
       ('20:00',138,24),
       ('21:30',122,22)]

btr = BufferTableReader(ts1, 2)

while btr.hasNext():
    print(btr.readNext())

輸出結果:

從結果來看,類BufferTableReader的實例化對象btr已經可以實現從list中分批讀取數據了。(在這裏,每次讀取2條,到最後不足2條的時候,則把剩下的一次讀出)

但到目前爲止,這還不能算是一個Iter,只能說是個Reader,你會發現用for...in循環對沒法對其進行遍歷,由於它不是一個可迭代對象。

btr = BufferTableReader(ts1, 2)
for l in btr: print(l)

輸出結果:

3.3.2 BufferTableIter版本

  • 版本1.0:實現通常迭代功能

前面說了,可迭代對象須要在類中實現兩個方法 __iter__() 與__next__()。

因此接下來咱們在BufferTableReader的基礎上,定義一個新的類BufferTableIter,增長上述兩個方法(因爲後面的代碼都比較長,爲了方便讀者閱讀,我會在相較於上一個版本新增或修改部分作特別的標註):

'''類BufferTableIter 1.0 - 實現通常迭代功能'''
class BufferTableIter():
    # 初始化
    def __init__(self, data, batch_size ):
        self.data = data # 數據源
        self.bs = batch_size # 每次讀取的批次大小 
        self.buf = [] # 用來存儲當前批次的數據,初始化爲空 
        self.idx = 0 # 當前批次數據(self.buf)指針,初始化爲0
        self.offset = 0 # 數據源(data)指針,初始化爲0

    def readBatch(self):
        if self.idx == len(self.buf):
            slice_size=min(len(self.data)-self.offset,self.bs) #讀多少
            self.buf=self.data[self.offset:self.offset+slice_size]

            #更新指針
            self.offset += slice_size
            self.idx = 0    

            # 爲了便於觀察,每讀完一個批次作一個標註
            print('-------')

    def hasNext(self):
        self.readBatch()
        return self.idx < len(self.buf)

    def readNext(self):
        self.readBatch()
        if self.idx < len(self.buf):
            line = self.buf[self.idx]
            self.idx += 1
            return line

    ############新增部分#############
    def __iter__(self):
        print('iter called')
        return self

    def __next__(self):
        if self.hasNext():
            return self.readNext()
        else:
            raise StopIteration

    ##################################

#實例化
ts1 = [('01:30',128,19),
       ('05:00',124,20),
       ('13:00',131,18),
       ('20:00',138,24),
       ('21:30',122,22)]

print('經過next()方法調用:')
btr = BufferTableIter(ts1, 2)
while btr.hasNext():
    print(btr.readNext())
    ##或者:
    #print(next(btr))

print('經過for循環調用:')
btr = BufferTableIter(ts1, 2)    
for l in btr: print(l)

輸出結果:

此時既能夠經過next()方法調用,也能夠經過 for 循環進行調用。

固然仍是須要注意,迭代不能重複,即遍歷結束後不能從頭再遍歷一次(再執行以下代碼,結果爲空;雖然此時也調用了__iter__函數,但迭代器自己如今已經爲空了),須要從新建立一個實例化對象才行。

for l in btr: print(l)

輸出結果:

  • 版本2.0:實現重複迭代功能

可是若是我就是想可以重複遍歷,而又不想從新建立實例化對象怎麼辦呢?

也是能夠的,修改一下__iter__()函數的返回值,讓它從新生成一個實例化對象。也就是說,每for一次,就會調用__iter__從新建立一個迭代器。

以下,把__iter__()的返回值return self改成return BufferedTableIter(self.data, self.bs)

'''類BufferTableIter 2.0 - 實現重複迭代功能'''
class BufferTableIter():
    # 初始化
    def __init__(self, data, batch_size ):
        self.data = data # 數據源
        self.bs = batch_size # 每次讀取的批次大小 
        self.buf = [] # 用來存儲當前批次的數據,初始化爲空 
        self.idx = 0 # 當前批次數據(self.buf)指針,初始化爲0
        self.offset = 0 # 數據源(data)指針,初始化爲0

    def readBatch(self):
        if self.idx == len(self.buf):
            slice_size=min(len(self.data)-self.offset,self.bs) #讀多少
            self.buf=self.data[self.offset:self.offset+slice_size]

            #更新指針
            self.offset += slice_size
            self.idx = 0    

    def hasNext(self):
        self.readBatch()
        return self.idx < len(self.buf)

    def readNext(self):
        self.readBatch()
        if self.idx < len(self.buf):
            line = self.buf[self.idx]
            self.idx += 1
            return line

    ############修改部分#############
    def __iter__(self):
        print('iter called')
        return BufferTableIter(self.data, self.bs)
    ##################################

    def __next__(self):
        if self.hasNext():
            return self.readNext()
        else:
            raise StopIteration


#實例化
ts1 = [('01:30',128,19),
       ('05:00',124,20),
       ('13:00',131,18),
       ('20:00',138,24),
       ('21:30',122,22)]

btr = BufferTableIter(ts1, 2)   
for l in btr: print(l)
for l in btr: print(l)
for l in btr: print(l) #任你for多少次

輸出結果:

  • 版本2.1:實現重複迭代功能(拆分紅兩個類)

或者不修改__iter__()函數的返回值,而是將每for一次就實例化一次的這部分功能抽離出來,定義成另外一個類,就命名爲DBTable。說白了這個類就是專門用來初始化迭代器的:

class DBTable:
    def __init__(self, data, batch_size):
        self.data = data
        self.bs = batch_size
    def __iter__(self):
        print("__iter__ called")
        return BufferTableIter(self)

原來的類BufferTableIter則修改成:

'''類BufferTableIter 2.1 - 實現重複迭代功能'''
class BufferTableIter():
    # 初始化
    def __init__(self, dbTable):
        ############修改部分#############
        self.data = dbTable.data 
        self.bs = dbTable.bs 
        ##################################
        self.buf = [] 
        self.idx = 0 
        self.offset = 0 

    def readBatch(self):
        if self.idx == len(self.buf):
            slice_size=min(len(self.data)-self.offset,self.bs) 
            self.buf=self.data[self.offset:self.offset+slice_size]

            #更新指針
            self.offset += slice_size
            self.idx = 0    

    def hasNext(self):
        self.readBatch()
        return self.idx < len(self.buf)

    def readNext(self):
        self.readBatch()
        if self.idx < len(self.buf):
            line = self.buf[self.idx]
            self.idx += 1
            return line

    def __iter__(self):
        return self

    def __next__(self):
        if self.hasNext():
            return self.readNext()
        else:
            raise StopIteration
#實例化ts1 = [('01:30',128,19),       ('05:00',124,20),       ('13:00',131,18),       ('20:00',138,24),       ('21:30',122,22)]dbTable = DBTable(ts1, 2)btr = BufferTableIter(dbTable)

實例化以後進行調用,見下面代碼的運行結果能夠發現:DBTable實例化出來的對象能夠任意屢次重複遍歷;而BufferTableIter則不行。

#實例化
ts1 = [('01:30',128,19),
       ('05:00',124,20),
       ('13:00',131,18),
       ('20:00',138,24),
       ('21:30',122,22)]

dbTable = DBTable(ts1, 2)
btr = BufferTableIter(dbTable)```

輸出結果:

![](https://upload-images.jianshu.io/upload_images/10386940-8155824bfb9a6aa1?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

for l in btr: print(l)
print('--------------')
for l in btr: print(l)```

輸出結果:

image

  • 版本3.0:實現數據庫鏈接

最後,因爲實際中咱們的數據是存在數據庫中的,因此初始化函數__init__readBatch()函數須要作些修改(python鏈接數據庫請參考上一篇),具體再也不贅述,最終的代碼以下:

'''類BufferTableIter 3.0 - 實現數據庫鏈接'''
class DBTable:
    def __init__(self, con, sql, batch_size):
        ############修改部分#############
        self.con = con #建立鏈接
        self.sql = sql #須要執行的sql語句
        ################################
        self.bs = batch_size

    def __iter__(self):
        return BufferTableIter(self)

class BufferTableIter():
    # 初始化
    def __init__(self, dbTable):
        ############修改部分#############
        self.cursor = dbTable.con.cursor(buffered=True) #建立遊標
        self.cursor.execute(dbTable.sql) #執行sql語句
        self.readCount = 0
        ################################
        self.bs = dbTable.bs 
        self.buf = [] 
        self.idx = 0 

    def readBatch(self):
        if self.idx == len(self.buf):
            ############修改部分#############
            self.buf = self.cursor.fetchmany(size=self.bs) #從數據庫中讀取批次數據

            #更新指針
            self.readCount += len(self.buf)
            self.idx = 0   
            ################################

    def hasNext(self):
        self.readBatch()
        return self.idx < len(self.buf)

    def readNext(self):
        self.readBatch()
        if self.idx < len(self.buf):
            line = self.buf[self.idx]
            self.idx += 1
            return line
        return None

    def __iter__(self):
        return self

    def __next__(self):
        if self.hasNext():
            return self.readNext()
        raise StopIteration

如今結合上一篇的內容,往數據庫中添加一些數據:

'''在mysql數據庫中添加測試數據'''

import mysql.connector 

con = mysql.connector.connect(
  host="127.0.0.1",
  user="WXT",
  passwd="12345",
  database="wxt_db",
  use_pure="False"
)

mycursor=con.cursor(buffered=True)

# 建立一個表     
mycursor.execute("CREATE TABLE patient (time VARCHAR(255), hr INT, hxpl INT)")

# 往表裏插入一些記錄
mycursor.execute("INSERT INTO patient (time, HR, HXPL) VALUES ('01:30',128,19)")
mycursor.execute("INSERT INTO patient (time, HR, HXPL) VALUES ('05:00',124,20)")
mycursor.execute("INSERT INTO patient (time, HR, HXPL) VALUES ('13:00',131,18)")
mycursor.execute("INSERT INTO patient (time, HR, HXPL) VALUES ('20:00',138,24)")
mycursor.execute("INSERT INTO patient (time, HR, HXPL) VALUES ('21:30',122,22)")

con.commit()
con.close()

進行測試:

'''鏈接mysql數據庫進行功能測試:分批讀取數據'''

import mysql.connector 

con = mysql.connector.connect(
  host="127.0.0.1",
  user="WXT",
  passwd="12345",
  database="wxt_db",
  use_pure="False"
)

# 檢查一個表是否存在
def tableExists(mycursor, name):
    stmt="SHOW TABLES LIKE '"+name+"'"
    mycursor.execute(stmt)
    return mycursor.fetchone()

try:    
    mycursor=con.cursor(buffered=True)

    if tableExists(mycursor, 'patient'):
        print('process table:', 'patient')
        print("--------")

        #查詢表裏的記錄
        sql = "SELECT * FROM patient"

        dbTbl = DBTable(con,sql,2)
        btr = BufferTableIter(dbTbl)

        for rec in dbTbl:
            print("read record:", rec)

finally:    
    con.close() #關閉數據

輸出結果:

再補充一個小知識點,這裏使用了try..finally結構,這是一種檢驗和處理異常的機制。一般狀況下,若是程序在某個位置出現異常,整個程序會被直接中斷,後面的語句不會再執行。

try..finally中,try 語句塊裏的代碼會被監測,不論這部分有沒有發生異常,finally 裏的語句都會執行,這樣就能夠對異常作一些收尾工做,好比這裏的關閉數據庫鏈接操做。

由於若是前面一旦發生異常,數據庫沒可以被關閉,會存在必定的危險性。

🍎tips5:try-finally結果能夠對異常進行檢測和處理,若是try語句塊中出現了異常,finally後面能夠作一些必要的清理工做(如關閉文件或斷開服務器鏈接等)

4. 結語

總結一下,本文實現了有序序列的合併、時間序列數據表的對齊、以及對數據庫中的數據表進行分批查詢,主要使用的Pyhton編程技巧有循環、函數、類和迭代器

但其實尚未徹底解決問題,目前只是把數據從數據庫給讀出來了,尚未對其進行處理,因此以後還會再寫後半部分的內容,計劃有:

  1. 把從數據庫中讀取出來的、來自不一樣數據表的時間序列進行合併對齊
  • 嘗試不一樣的對齊方式,如插值
  1. 對齊後的時間序列作分組(例如每小時,天天)聚合(例如每組作計數,求平均等)

  2. 用生成器機制(yield)對迭代器的功能進行優化。

(注:本文是由團隊內部培訓的筆記整理而來,若有問題,歡迎交流指正!)

相關文章
相關標籤/搜索