單線程腳本python
導入文件的行數mysql
# wc -l /data/logs/testlog/20120219/testlog1/*
1510503 totalsql
- # -*- coding: utf-8 -*-
- #!/usr/bin/env python
- #create database pythondata
- #create table log (logline varchar(500));
- #grant all on pythondata.* to 'pyuser'@'localhost' identified by "pypasswd";
- import MySQLdb
- import os
- import time
-
- def writeLinestoDb(sql,content):
- conn=MySQLdb.connect(host="localhost",user="pyuser",passwd="pypasswd",db="pythondata")
- cur =conn.cursor()
- cur.executemany(sql,content)
- cur.close()
- conn.commit()
- conn.close()
-
- def readLinestoList(path):
- alllines=[]
- for file in os.listdir(path):
- files=os.path.join(path,file)
- for line in open(files):
- alllines.append(line)
- return alllines
-
- def main():
- insertsql ="INSERT INTO log(logline) VALUES(%s)"
- alllines=readLinestoList('/data/logs/testlog/20120219/testlog1')
- for line in alllines:
- content=line.strip()
- print content
- writeLinestoDb(insertsql,content)
- time.sleep(10)
-
- if __name__=="__main__":
- print('starting at:',time.ctime())
- main()
- print('ending at:',time.ctime())
('starting at:', 'Tue Mar 27 11:09:20 2012')
('ending at:', 'Tue Mar 27 11:13:20 2012')
耗時4分鐘
mysql> select count(*) from log ;
+----------+
| count(*) |
+----------+
| 1510551 |
+----------+
多線程腳本數據庫
- # -*- coding: utf-8 -*-
- #!/usr/bin/env python
- import MySQLdb
- import os
- from time import ctime
- from threading import Thread
- from Queue import Queue
- in_num_thread=10
- out_num_thread=10
- in_queue=Queue()
- out_queue=Queue()
-
- def listDir(path):
- for filename in os.listdir(path):
- in_queue.put(os.path.join(path,filename))
- def readFile(iq,in_queue):
- filelines=[]
- while True:
- file=in_queue.get()
- for line in open(file):
- filelines.append(line)
- out_queue.put(filelines)
- in_queue.task_done()
- def writeLinestoDb(oq,out_queue):
- sql=insertsql ="INSERT INTO log(logline) VALUES(%s)"
- while True:
- content=out_queue.get()
- conn=MySQLdb.connect(host="localhost",user="pyuser",passwd="pypasswd",db="pythondata")
- cur =conn.cursor()
- cur.executemany(sql,content)
- cur.close()
- conn.commit()
- conn.close()
- out_queue.task_done()
- def main():
- listDir('/data/logs/testlog/20120219/testlog1')
- for iq in range(in_num_thread):
- worker=Thread(target=readFile,args(iq,in_queue))
- worker.setDaemon(True)
- worker.start()
- print "Readfile Main Thread Waiting at",ctime()
- in_queue.join()
- print "Readfile Done at,",ctime()
- for oq in range(out_num_thread):
- worker=Thread(target=writeLinestoDb,args(oq,out_queue))
- worker.setDaemon(True)
- worker.start()
- print "Insert into mysql Main Thread at",ctime()
- out_queue.join()
- print "Insert into mysql at,",ctime()
- if __name__=="__main__":
- print('starting at:',time.ctime())
- main()
- print('ending at:',time.ctime())
- 數據庫位於本機
- ('starting at:', 'Tue Mar 27 10:57:01 2012')
Readfile Main Thread Waiting at Tue Mar 27 10:57:01 2012
Readfile Done at, Tue Mar 27 10:57:04 2012
Insert into mysql Main Thread at Tue Mar 27 10:57:04 2012
Insert into mysql at, Tue Mar 27 11:03:34 2012
('ending at:', 'Tue Mar 27 11:03:34 2012')
mysql> select count(*) from log ;
+----------+
| count(*) |
+----------+
| 3676015 |
+----------+
- 兩次個數據不一致,多線的導入有問題。
服務器配置4G8核,mysql本地 兩個腳本在同一臺機器上運行
多線程腳本改進
- #!/usr/bin/env python
- #create table log ( logline varchar(300));
- #grant all on pythondata.* to 'pyuser'@'localhost' identified by "pypasswd"
- import MySQLdb
- import os
- import sys
- from time import ctime
- from threading import Thread
- from Queue import Queue
-
- num_thread=10
- queue=Queue()
-
- def listDir(path):
- file_list=[]
- for filename in os.listdir(path):
- file_list.append(os.path.join(path,filename))
- return file_list
-
- def readFile(file):
- alllines=[]
- for line in open(file):
- alllines.append(line)
- return alllines
-
- def writeLinestoDb(q,queue):
- sql=insertsql ="INSERT INTO log(logline) VALUES(%s)"
- while True:
- content=queue.get()
- conn=MySQLdb.connect(host="localhost",user="pyuser",passwd="pypasswd",db="pythondata")
- cur =conn.cursor()
- cur.executemany(sql,content)
- cur.close()
- conn.commit()
- conn.close()
- queue.task_done()
-
- def main():
- print "Readfile Start at,",ctime()
- for file in listDir('/data/logs/testlog/20120219/testlog1'):
- queue.put(readFile(file))
- print "Readfile Done at,",ctime()
- for q in range(num_thread):
- worker=Thread(target=writeLinestoDb,args=(q,queue))
- worker.setDaemon(True)
- worker.start()
- print "Insert into mysql Main Thread at",ctime()
- queue.join()
- print "Insert into mysql at,",ctime()
- if __name__=="__main__":
- print('starting at:',ctime())
- main()
- print('ending at:',ctime())
結果服務器
('starting at:', 'Tue Mar 27 14:32:05 2012')
Readfile Start at, Tue Mar 27 14:32:05 2012
Readfile Done at, Tue Mar 27 14:32:07 2012
Insert into mysql Main Thread at Tue Mar 27 14:32:08 2012
Insert into mysql at, Tue Mar 27 14:34:31 2012
('ending at:', 'Tue Mar 27 14:34:31 2012')
mysql> select count(*) from log;
+----------+
| count(*) |
+----------+
| 1510551 |
+----------+多線程
讀用了2秒中,插入使用2分23秒app
第一個多線程腳本錯誤的緣由是傳入隊列的數據問題。 還有一個問題,讀的文件超過物理內存和虛擬內存的總量,會形成內存溢出程序掛掉,解決辦法每次讀取指定行