流式數據處理

 

一、 直接登錄服務器:ssh 2014210***@thumedia.org -p 6349python

建立streaming.py:   touch streaming.py,而且以下編輯:git

#! /usr/bin/python緩存

import loggingbash

import math服務器

import timeapp

pg2count={}ssh

t=1ide

while 1:測試

    fp=open('/tmp/hw3.log','r')spa

    for line in fp:

        line = line.strip()

        times, page, count = line.split()[0],line.split()[1],line.split()[2]

        if count.isdigit() & page.startswith('Page-'):

            try:

                pg2count[page] = [pg2count[page][0] + int(count),t]             

            except:

                pg2count[page] = [int(count),t]

    fp.close()

    a=sorted(pg2count.items(), key=lambda page:page[1][0], reverse = True)

    print '%s%s%s' % ('the page rank at current time ',times,' is:')

    for i in range(0,10):

        print '%s\t%d' % (a[i][0],a[i][1][0])

    logger = logging.getLogger()

    #set loghandler 

    file = logging.FileHandler("output.log")

    logger.addHandler(file)

    #set formater   

    formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")

    file.setFormatter(formatter)

    #set log level 

    logger.setLevel(logging.NOTSET)

    logger.info('%s%s%s' % ('the page rank at current time ',times,' is:'))

    for i in range(0,10):

        logger.info('%s\t%d' % (a[i][0],a[i][1][0]))

        time.sleep(60)

二、 寫好代碼以後測試運行:python streaming.py輸出以下:

nohup: ignoring input and appending output to `nohup.out',則表示後臺運行成功,輸出顯示會保存到nohup.out中,

clip_image002

也能夠查看output.log文件裏的輸出:

clip_image004

最後咱們讓它在後臺一直執行:nohup python streaming.py &輸出:

[1] 8994

2014210***@cluster-3-1:~$ nohup: ignoring input and appending output to `nohup.out'

一天以後,咱們再次查看結果:

clip_image006

能夠看到,累計的結果已經和第一次不太同樣

三、 殺掉進程:ps -ef|grep 1020獲得以下輸出:

2014210***@cluster-3-1:~$ ps -ef|grep 1020

1020      7512  7471  0 Jan10 ?        00:00:00 sshd: 2014210***@pts/30

1020      7513  7512  0 Jan10 pts/30   00:00:00 -bash

1020      7574  7508  0 20:55 ?        00:00:00 sshd: 2014210***@pts/52

1020      7575  7574  0 20:55 pts/52   00:00:00 -bash

1020      8282  7575  0 21:04 pts/52   00:00:00 ps -ef

1020      8283  7575  0 21:04 pts/52   00:00:00 grep --color=auto 1020

1020      8994     1  0 13:20 ?        00:01:46 python streaming.py

1020     12260 12232  0 Jan10 ?        00:00:00 sshd: 2014210***@pts/35

1020     12261 12260  0 Jan10 pts/35   00:00:01 –bash

輸入kill 8994

2014210***@cluster-3-1:~$ kill 8994

2014210***@cluster-3-1:~$ ps -ef|grep 1020

1020      7512  7471  0 Jan10 ?        00:00:00 sshd: 2014210***@pts/30

1020      7513  7512  0 Jan10 pts/30   00:00:00 -bash

1020      7574  7508  0 20:55 ?        00:00:00 sshd: 2014210***@pts/52

1020      7575  7574  0 20:55 pts/52   00:00:00 -bash

1020      8335  7575  0 21:05 pts/52   00:00:00 ps -ef

1020      8336  7575  0 21:05 pts/52   00:00:00 grep --color=auto 1020

1020     12260 12232  0 Jan10 ?        00:00:00 sshd: 2014210***@pts/35

1020     12261 12260  0 Jan10 pts/35   00:00:01 –bash

至此,streaming.py運行結束。

 

Question

How can your design scale when the streaming is large and the calculation is complicated?

答:首先肯定每一個程序週期須要的時間,而後肯定這段時間內的流數據可以保存在一塊足夠大的緩存區域,等到下個程序週期處理前一個緩存的流數據便可。

相關文章
相關標籤/搜索