Python多進程與多線程html
Python使用Hadoop分佈式計算庫mrjobpython
Python使用Spark分佈式計算庫PySpark正則表達式
例子:分別使用MapReduce和Spark實現wordcount算法
正則表達式簡介shell
日期和時間編程
經常使用內建模塊: collections; itertools數組
進程:程序的一次執行(程序裝載入內存,系統分配資源運行)安全
每一個進程有本身的內存空間、數據棧等,只能使用進程間通信,而不能直接共享信息多線程
線程:全部線程運行在同一個進程中,共享相同的運行環境併發
每一個獨立的線程有一個程序運行的入口、順序執行序列和程序的出口
線程的運行能夠被搶佔(中斷),或暫時被掛起(睡眠),讓其餘線程運行(讓步)
一個進程中的各個線程間共享同一片數據空間
GIL全稱全局解釋器鎖Global Interpreter Lock, GIL並不
是Python的特性,它是在實現Python解析器(CPython)時
所引入的一個概念
GIL是一把全局排他鎖,同一時刻只有一個線程在運行
毫無疑問全局鎖的存在會對多線程的效率有不小影響。甚至就幾乎等於Python是個單線程的程序。
multiprocessing庫的出現很大程度上是爲了彌補thread庫由於
GIL而低效的缺陷。它完整的複製了一套thread所提供的接口方
便遷移。惟一的不一樣就是它使用了多進程而不是多線程。每一個
進程有本身的獨立的GIL,所以也不會出現進程之間的GIL爭搶。
join阻塞進程直到線程執行完畢
fork操做:
調用一次,返回兩次。由於操做系統自動把當前進程(稱爲父
進程)複製了一份(稱爲子進程), 而後分別在父進程和子進
程內返回。子進程永遠返回0,而父進程返回子進程的ID。子
進程只須要調用getppid()就能夠拿到父進程的ID。
因爲Windows沒有fork調用,上面的代碼在Windows上沒法運行。
multiprocessing是跨平臺版本的多進程模塊,它提供了
一個Process類來表明一個進程對象,下面是示例代碼:
這個程序若是用單進程寫則須要執行10秒以上的時間,而用多進程則啓動10個進程並行執行,只須要用1秒多的時間。
Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞
用於批量建立子進程,能夠靈活控制子進程的數量
在通常狀況下多個進程的內存資源是相互獨立的,而多線程能夠共享同一個進程中的內存資源
三大特性:
immutable data 不可變數據
first class functions:函數像變量同樣使用
尾遞歸優化:每次遞歸都重用stack
好處:
parallelization 並行
lazy evaluation 惰性求值
determinism 肯定性
函數式編程http://coolshell.cn/articles/10822.html
技術:
map & reduce
pipeline
recursing 遞歸
currying
higher order function 高階函數
lambda:快速定義單行的最小函數, inline的匿名函數
map(function, sequence) :對sequence中的item依次執行function(item),執行結果組成一個List返回
filter(function, sequence):對sequence中的item依次執行function(item),將執行結果爲
True的item組成一個List/String/Tuple(取決於sequence的類型)返回
reduce(function, sequence, starting_value):對sequence中的item順序迭代調用function,
若是有starting_value,還能夠做爲初始值調用
正常寫法:
函數式編程:
這樣的代碼是在描述要幹什麼,而不是怎麼幹
Hadoop是Apache開源組織的一個分佈式計算開源框架。
核心的設計就是: MapReduce和HDFS( HadoopDistributed File System)
思想:任務的分解與結果的彙總
import sys for line in sys.stdin: ls = line.split() for word in ls: if len(word.strip()) != 0: print word + ',' + str(1)
import sys word_dict = {} for line in sys.stdin: ls = line.split(',') word_dict.setdefault(ls[0], 0) word_dict[ls[0]] += int(ls[1]) for word in word_dict: print word, word_dict[word]
$ cat wordcount.input | python mapper.py | python reducer.py | sort -k 2r
Output:
n world 3
n hello 2
n hi 1
Hadoop有Java和Streaming兩種方式來編寫MapReduce任務。
Java的優勢是計算效率高,而且部署方便,直接打包成一個jar文件就好了。
Hadoop Streaming是Hadoop提供的一個編程工具,它容許用戶使用任何可執行文件或者腳本文件做爲Mapper和Reducer。
Streaming單機測試:
cat input | mapper | sort | reducer > output
mrjob實質上就是在Hadoop Streaming的命令行上包了一層,有了統一的Python界面,無需你再去直接調用Hadoop Streaming命令。
from mrjob.job import MRJob class MRWordFrequencyCount(MRJob): def mapper(self, _, line): yield "chars", len(line) yield "words", len(line.split()) yield "lines", 1 def reducer(self, key, values): yield key, sum(values) if __name__ == '__main__': MRWordFrequencyCount.run()
Spark是基於map reduce算法實現的分佈式計算框架:
Spark的中間輸出和結果輸出能夠保存在內存中,從而再也不須要讀寫HDFS。
Spark能更好地用於數據挖掘與機器學習等須要迭代的map reduce的算法中。
Spark能夠直接對HDFS進行數據的讀寫,一樣支持Spark on YARN。Spark能夠與MapReduce運行於同集羣中,共享存儲資源與計算。
本地模式
Standalone模式
Mesoes模式
yarn模式
彈性分佈式數據集Resilient Distributed Datasets:
集羣節點上不可變、已分區對象
可序列化
能夠控制存儲級別(內存、磁盤等)來進行重用。
計算特性:
血統lineage
惰性計算lazy evaluation
生成方式:
文件讀取
來自父RDD
兩種模式匹配:搜索search()和匹配match()
判斷一個字符串是不是合法的Email地址
做業1:電話號碼正則匹配
例子:
+008613112345678
+861795101023231212
+8608715432231
01023459764
06346046499
010120
time模塊和datetime模塊
import time print time.time() print time.localtime() for i in range(3): time.sleep(0.5) print "Tick!"
1479487832.06 time.struct_time(tm_year=2016, tm_mon=11, tm_mday=19, tm_hour=0, tm_min=50, tm_sec=32, tm_wday=5, tm_yday=324, tm_isdst=0) Tick! Tick! Tick!
import datetime print "today is: ", datetime.date.today() print "now is: ", datetime.datetime.now() print datetime.date(2016,6,4) print datetime.time(14,00)
today is: 2016-11-19 now is: 2016-11-19 00:50:38.551905 2016-06-04 14:00:00
# 計算昨天和明天的日期 import datetime today = datetime.date.today() yesterday = today - datetime.timedelta(days=1) tomorrow = today + datetime.timedelta(days=1) print yesterday,today,tomorrow
2016-11-18 2016-11-19 2016-11-20
做業2:計算日期之間的工做日
enumerate函數
# 對一個列表或數組既要遍歷索引又要遍歷元素時 l = [1,2,3] for i in range (len(l)): print i ,l[i]
0 1 1 2 2 3
# enumerate會將數組或列表組成一個索引序列。使咱們再獲取索引和索引內容的時候更加方便以下: for index,text in enumerate(l): print index ,text
0 1 1 2 2 3
collections是Python內建的一個集合模塊,提供了許多有用的集合類。
deque是爲了高效實現插入和刪除操做的雙向列表,適合用於隊列和棧。
OrderedDict的Key會按照插入的順序排列。 Counter是一個簡單的計數器,也是dict的一個子類。
from collections import namedtuple Point = namedtuple('Point', ['x', 'y']) p = Point(1, 2) print p.x print p.y
from collections import deque q = deque(['a', 'b', 'c']) q.append('x') q.appendleft('y') print q deque(['y', 'a', 'b', 'c', 'x'])
from collections import defaultdict dd = defaultdict(lambda: 'N/A') dd['key1'] = 'abc' print dd['key1'] # key1存在 print dd['key2'] # key2不存在,返回默認值 abc N/A
from collections import OrderedDict d = dict([('a', 1), ('b', 2), ('c', 3)]) print d # dict的Key是無序的,{'a': 1, 'c': 3, 'b': 2} od = OrderedDict([('a', 1), ('b', 2), ('c', 3)]) print od # OrderedDict的Key是有序的,OrderedDict([('a', 1), ('b', 2), ('c', 3)]) {'a': 1, 'c': 3, 'b': 2} OrderedDict([('a', 1), ('b', 2), ('c', 3)])
from collections import Counter c = Counter() for ch in 'programming': c[ch] = c[ch] + 1 print c #Counter({'g': 2, 'm': 2, 'r': 2, 'a': 1, 'i': 1, 'o': 1, 'n': 1, 'p': 1}) Counter({'g': 2, 'm': 2, 'r': 2, 'a': 1, 'i': 1, 'o': 1, 'n': 1, 'p': 1})
爲類序列對象提供了一個類序列接口
無限迭代器:
在最短輸入序列終止的迭代器:
組合生成器:
import itertools for i in itertools.izip(itertools.count(1), ['a', 'b', 'c']): print i
(1, 'a') (2, 'b') (3, 'c')
參考:http://python.usyiyi.cn/python_278/library/itertools.html