python3
python -m pip install --upgrade pip
1、基礎
(1)變量與賦值
變量命名規則:
一、顯示
二、shang_shan_yang、ShangShanYang br/>三、數字不能開頭
四、特殊字符不能有(!@¥%……&*())
五、不能有空格
六、不能是關鍵字html
a=2
b=a
a=3
print(a,b)#3,2python
a,b=1,2
id(a)#查看a的內存地址(不是實際的物理內存地址)
(2)用戶交互
name=raw_input('please input your name:')#python2
name=input('please input your name:')#python3
(3)條件判斷與縮進
a=1
if a>3 :
print('a>3')
elif a<3:
print('a<3')
else:
print('a=3')正則表達式
(4)while循環控制
import time
a=1
while a<10:
print(a)
time.sleep(1)
a+=1shell
#break終止本層循環
(5)for循環控制
for i in range(11):
print(i)json
(6)經常使用數據類型多線程
常數
布爾
字符串:移除空白('a'.strip())、分割(split())、長度(len())、索引(index())、切片
a='xkqx'
print(a.center(10,'*').center(20,'#'))
a.startswith('xk')
a.endswith('kq')
a.endswith('x',0,2)#判斷0到2位置的字符串是否以x結尾
a.find('x')#找不到會返回-1
a.index('x')#找不到會報錯
a.partition('k')#用k分割
a.replace('x','X')
a.replace('x','X',1)#替換1個
a.title()#全部單詞首字母大寫併發
a='zq'
b='dq'
print("a=%s\nb=%s"%(a,b))app
smg='''
a=%s
b=%s'''%(a,b)
print(smg)dom
列表:索引(下標)、切片、追加、刪除、長度、包含、循環
#a=[1,2,'3d','2','3d','3d']
a=[1,2,3,4,5]
b=a[0:4]
#a.append('sdd')
a.pop()
a.insert(1,5)
a.remove(2)
a.reverse()
#a.sort()
#print(a.index('3d'))
#print(a.count('3d'))
a.extend('xkq')
a in a
print(a)ssh
元組
字典
a={'a':1,'b':"ddddd"}
a['a']
a['c']='dfgh'刪除a
a.pop('a')#s
a.keys() a.values() a.items()
a={'k1':'v1','k2':2}
a['k1']
a['k2']
a['k3']#會報錯
a.get('k3','bucunzai')#當k3不存在時把bucunzai賦值給k3
集合
s1=set()
s1.add('xkq')
雙向隊列
import collections
q1=collections.deque()
q1.append('1')
q1.appendleft('2')
q1.appendleft('3')
q1.append('4')
q1.count('2')#2的個數
q1.extend([1,2])
q1.extendleft([2,1,3])
q1.pop()
q1.popleft()
print(q1)
單項隊列
import queue
q2=queue.Queue(2)#隊列長度爲2
q2=queue.Queue()
q2.put('12')
q2.put('12')
q2.put(23)
q2.qsize()
q2.get()
深淺拷貝
import copy
a={'k1':1,'k2':[1,2,3]}
b=copy.copy(a)#淺拷貝,只拷貝第一層
c=copy.deepcopy(a)#深拷貝,拷貝全部層
d=a#d指向a的第一層,修改d 的值a也會變
d['k3']=13
print(id(a['k2'][0]),a)
print(id(d['k2'][0]),d)
(7)文件操做
f=open('test.txt','a')
f.write('sfcsdsdwwdwd\n')
f.write('sfcsdsdwwdwd\n')
f.flush()
f.write('sfcsdsdwwdwd\n')
f.close()
'''
f=open('test.txt','r')
a=f.read()#讀取全部內容,當成一個字符串
b=f.readline()#只讀取一行
c=f.readlines()#讀取全部內容,將每一行當成一個字符串組成一個列表
for i in a:
print(i)
print(a)
f.close()
'''
with open('test.txt','r') as f:
a = f.read() # 讀取全部內容,當成一個字符串
b = f.readline() # 只讀取一行
c = f.readlines() # 讀取全部內容,將每一行當成一個字符串組成一個列表
for i in c:
print(i)
print(a)
for line in f:#迭代方式讀,速度快
print(line)
(8)函數
def fun(name,*args,*kwargs):
li=args
dic=kwargs
print(name,li,dic)
li=[1,2,3]
dic={'a':1,'b':2}
a=fun('xukeqiang','sd','ssss','123',a=1,s=3)
fun('xkq',12,34,li,**dic)
print(a)
(9)字符串格式化
a='my name is {0},come from {1}'
li=['xkq','anhui']
b=a.format(*li)
print(b)
dic={'name':'xkq','address':'anhui'}
a='my name is {name},come from {address}'
b=a.format(**dic)
print(b)
a='my name is %s,come from %s'%('xkq','anhui')
print(a)
(10)lambda表達式
func=lambda a,b:a+b
#a,b爲形參
#計算a+b並返回
ret=func(99,1)
print(ret)
a=[i for i in range(10)]
print(a)
(11)內置函數
chr(97) ord('A')
import random
a=chr(random.randint(1,99))
print(a)
a=['a','b','c','d']
for i,item in enumerate(a,1):
print(i,':',item)
a="{'a':1,'b':2}"
b=eval(a)
print(type(b))
#將a中的元素都加100
a=[1,2,3,4]
b=map(lambda x:x+100,a)
b=list(b)
print(b)
#選出a中大於2的元素
a=[1,2,3,4]
b=filter(lambda x:x>2,a)
b=list(b)
print(b)
import datetime
print(round(datetime.datetime.now().timestamp()))
x=[1,2,3]
y=[4,5,6]
z=zip(x,y)
print(list(z))
(12)迭代器,生成器
#迭代器
a=iter(['a',2,4,'sss'])
print(a.next())
#生成器:能返回迭代器的函數
def cash_money(account):
while account>0:
account-=100
print('取款100')
yield 'qu:100' #調用時返回的值
print('hello')
a=cash_money(300)
print(a.next())
print(a.next())
print('asdasdadadxaadxa')
print(a.next())
print(a.next())
(13)生產者消費者模型
import time
def consumer(name):#消費者
print('%s準備吃包子了。。。'%name)
while True:
baizi=yield#能夠接收值,並賦值給baozi
print('包子[%s]來了,被[%s]吃掉了'%(baizi,name))
def producter(name):#生產者
c1=consumer('A')
c2=consumer('B')
c1.next()
c2.next()
print('廚師【%s】開始作包子了。。。'%name)
for i in range(10):
print('作了2個包子')
c1.send(i)#將i傳遞給yield
c2.send(i)#將i傳遞給yield
time.sleep(1)
producter('xkq')
(14)裝飾器
def login(fun):
def wrapper(*args,*kwargs):
print('驗證。。。。')
return fun(args,**kwargs)
return wrapper
def home(*args,**kwargs):
print('歡迎訪問。。。')
@login
def home2(*args,**kwargs):
li=args
dic=kwargs
if li:
print('歡迎訪問%s。。。'%li)
else:
print('歡迎訪問。。。')
home()
home2()
home2('ssss')
(15)遞歸
def jiecheng(n):
if n==1:
return 1
else:
return n*jiecheng(n-1)
print(len(str(jiecheng(998))))
#斐波那契數列
def feibonaqi(n,n1=0,n2=1,):
if n<2:
exit()
if n1==0:
print(n1)
print(n2)
n3=n1+n2
if n3<n:
print(n3)
feibonaqi(n,n2,n3)
feibonaqi(20)
(16)模塊
import os
os.chdir('/tmp')#更改目錄
os.mkdir('/tmp/dir')#建立目錄
os.system('ls -lrt')#執行相關命令
os.getcwd()#查看當前目錄
a=os.popen('df -m'),a.read()#將執行結果保存到a中
import sys
sys.path()#查看python的PATH路徑
sys.path.append('/var')#將/var路徑增長到python的PATH路徑中
import subprocess
subprocess.call(['ls','-lrt'])
sys.argv[0]#腳本名稱
sys.argv[1]#第一個參數
#######################20170821
range(1,100)#輸出1到99的整數
for i in range(1,100):
print 'The number is: %d' %i
#######################20170822
下載網頁內容
import urllib
a=urllib.urlopen('http://qqran.blog.51cto.com/10014850/1957881').read().decode('gb2312')
f=open('url.txt','w')
f.write(a.encode('utf-8'))
f.flush()
f.close()
---------------------------或者
a=urllib.urlopen('http://qqran.blog.51cto.com/10014850/1957736')
a_docs=a.read().decode('gb2312')
a.close()
f=open('url3.txt','w')
f.write(a_docs.encode('utf-8'))
f.flush()
f.close()
#########################
(16)正則表達式
import restr="qwed123@qqqq.com1,1234567@qq.org,qwed123@qqqq.com1"
br/>a=re.match('ab','babsa')
print(a.group())
a=re.sub('[0-9]','||','12absd2321c1sab234ds')
a='qwedqw123dqwdqwd123'
a=a.replace('123','321')
a=re.compile("^[0-9]+")
str="qwed123@qqqq.com1,1234567@qq.org,qwed123@qqqq.com1"
a=re.findall("[a-zA-Z0-9]{6,10}[@][a-zA-Z0-9]{2,10}[.][a-zA-Z]{3}",str)br/>a=re.sub('@','||',str,count=2)
print(a)
m=re.findall('\S+','ss ss s')#匹配非空字符br/>m=re.findall('.','asca!@#$%^&*()_+cf24e2@3423!@#')
m=re.findall('.+','asca!@#$%^&()_+cf24e2@3423!@#')
m=re.findall('.','asca!@#$%^&*()_+cf24e2@3423!@#')
print(m)
m=re.match('abc','abcddd')#開頭匹配
m=re.match('abc','2abcddd')
m=re.search('abc','2abcl3abc')#找到第一個
m=re.search('a2bc','2abcl3abc')
print(m.group())
(17)Python-不一樣子目錄之間進行模塊調用時的路徑問題
在Python的不一樣子目錄中進行模塊調用時會出現「ModuleNotFoundError: No module named 'backend'」 之類的問題,由於Python在
執行時默認會將當前目錄加入到sys.path中,而後會根據sys.pyth中的路徑去找須要導入的模塊,若是找不到就會報錯,所以咱們須要將模塊
的最上級目錄(以下圖中的pathtest)的絕對路徑添加到sys.path中,而後就能夠按照「from backend.func import login」的方式將模塊正
常導入了,詳見以下事例:
#!/usr/bin/env python
#* coding:utf-8 *
#encoding=utf-8
#function:
#created by xkq
#date: 2018
import sys,os
base_dir=os.path.dirname(os.path.dirname(os.path.abspath(file)))#獲取pathtest的絕對路徑
#os.path.abspath(file)#獲取當前文件的絕對路徑
#os.path.dirname(os.path.abspath(file))#獲取當前文件所在目錄的絕對路徑
#print(sys.path)
sys.path.append(base_dir)#將pathtest的絕對路徑加入到sys.path中
from backend.func import login
login.login()
注意:寫文件是要絕對路徑,不然其餘子模塊調用文件是會報錯如:FileNotFoundError: [Errno 2] No such file or directory:
'user.pickle'。能夠用以下方法寫絕對路徑:
file=os.path.join(os.path.dirname(os.path.abspath(file)), 'user.pickle')#user.pickle的絕對路徑
#file='user.pickle'
f=open(file,'rb')
user_all=pickle.load(f)
f.close()
(18)序列化pickle json shelve
import pickle,json
a={'k1':1,'k2':'v2'}
with open('text.txt','wb') as f:
f.write(pickle.dumps(a))
#pickle.dump(a,f)
with open('text.txt','rb') as f:
a=pickle.loads(f.read())
#a=pickle.load(f)
print(a)
with open('text.txt','w') as f:
f.write(json.dumps(a))
#json.dump(a,f)
with open('text.txt','r') as f:
a=json.loads(f.read())
#a=json.load(f)
print(a)
import shelve
d = shelve.open('shelve_test') #打開一個文件
class Test(object):
def init(self,n):
self.n = n
t = Test(123)
t2 = Test(123334)
name = ["alex","rain","test"]
d["test"] = name #持久化列表
d["t1"] = t #持久化類
d["t2"] = t2
d.close()
d = shelve.open('shelve_test')
test=d["test"]#d.get("test")
t1=d["t1"]
t2=d["t2"]
d.close()
(19)經常使用模塊
http://www.cnblogs.com/wupeiqi/articles/4963027.html
http://www.cnblogs.com/alex3714/articles/5161349.html
shutil高級的 文件、文件夾、壓縮包 處理模塊
import shutil
ret=shutil.make_archive("/tmp/",'gztar',root_dir='/root/')#將將root下文件打包放到/tmp下
(20)類
http://www.cnblogs.com/wupeiqi/p/4766801.html
python2
經典類:深度優先
新式類:廣度優先
python3
都是廣度優先
class test(object):
def init(self):
pass
def talk(self):
raise NotImplementedError("sss")
class A(test):
def init(self):
pass
def talk(self): print('hello')
a=A()
a.talk()
(21)socket
http://www.cnblogs.com/wupeiqi/articles/5040823.html
####################################################
#server
import socket
ip_port = ('127.0.0.1',9999)
sk = socket.socket()
sk.bind(ip_port)
sk.listen(5)
while True:
print('server waiting...')
conn,addr = sk.accept()
client_data = conn.recv(1024)
print(str(client_data,'utf8'))
conn.sendall(bytes('不要回答,不要回答,不要回答','utf8'))
conn.close()
#client
import socket
ip_port = ('127.0.0.1',9999)
sk = socket.socket()
sk.connect(ip_port)
sk.sendall(bytes('請求佔領地球','utf8'))
server_reply = sk.recv(1024)
print(str(server_reply,'utf8'))
sk.close()
#########################################################
########################################
#ssh
#server
import socket,subprocess
ip_port = ('127.0.0.1',9999)
sk = socket.socket()
sk.bind(ip_port)
sk.listen(5)
while True:
print('server waiting...')
conn,addr = sk.accept()
while True:
try:
client_data=conn.recv(1024)
#print(len(client_data.strip()))
if not client_data:
break
print('recv:',str(client_data,'utf8'))
cmd=str(client_data,'utf8').strip()
print("--->",cmd)
cmd_res=subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
cmd_ress=cmd_res.stdout.read()
if len(cmd_ress)==0:
cmd_ress=b'cmd has no output'
ack_msg=bytes('cmd_result_size|%s'%len(cmd_ress),'utf8')
print(ack_msg)
#conn.sendall(bytes(cmd_ress,'utf8'))
conn.sendall(ack_msg)
print("cmd_ress", cmd_ress)
client_ack=conn.recv(50)
print("client_ack",client_ack)
if client_ack.decode()=='ready_ok':
conn.sendall(cmd_ress)
except Exception:
break
conn.close()
#client
import socket
ip_port = ('127.0.0.1',9999)
sk = socket.socket()
sk.connect(ip_port)
while True:
msg=input('please input:')
if len(msg.strip())==0:
continue
sk.sendall(bytes(msg,'utf8'))
ack_recv=sk.recv(50)
ack_recv_res=str(ack_recv.decode()).split("|")
#print(ack_recv_res)
if ack_recv_res[0]=='cmd_result_size':
recv_size=eval(ack_recv_res[1])
sk.sendall(b'ready_ok')
recv=b''
receved=0
print("recv_size:",recv_size)
while receved<int(recv_size):
server_reply = sk.recv(10)
recv += server_reply
receved+=len(server_reply)
#print(recv)
#print("receved",receved)
else:
print(str(recv,'gbk'))
#print("recv----dine---")
#print(str(server_reply,'utf8'))
sk.close()
###########################################
#####################
#多併發socket
#server
import socketserver
class MyTCPHandle(socketserver.BaseRequestHandler):
def handle(self):
print('new conn:',self.client_address)
while True:
data=self.request.recv(8096)
if not data:break
print('client say:',data.decode())
self.request.send(bytes('數據:%s'%data.decode(),'utf8'))
if name=="main":
HOST,PORT="localhost",9999
server=socketserver.ThreadingTCPServer((HOST,PORT),MyTCPHandle)
server.serve_forever()
#client
import socket
ip_port = ('127.0.0.1',9999)
sk = socket.socket()
sk.connect(ip_port)
while True:
msg=input('please input:')
if len(msg.strip())==0:
continue
sk.sendall(bytes(msg,'utf8'))
recv=sk.recv(8096)
print(str(recv,'utf8'))
sk.close()
#####################
#######
#多併發ssh
#server
import socketserver,subprocess
class MyTCPHandle(socketserver.BaseRequestHandler):
def handle(self):
print('new conn:',self.client_address)
print('server waiting...')
while True:
try:
client_data = self.request.recv(8096)
#print(len(client_data.strip()))
if not client_data:
break
#print('recv:', str(client_data, 'utf8'))
cmd = str(client_data, 'utf8').strip()
#print("--->", cmd)
cmd_res = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
cmd_ress = cmd_res.stdout.read()
if len(cmd_ress) == 0:
cmd_ress = b'cmd has no output'
ack_msg = bytes('cmd_result_size|%s' % len(cmd_ress), 'utf8')
#print(ack_msg)
#conn.sendall(bytes(cmd_ress,'utf8'))
self.request.send(ack_msg)
#print("cmd_ress", cmd_ress)
client_ack = self.request.recv(50)
#print("client_ack", client_ack)
if client_ack.decode() == 'ready_ok':
self.request.send(cmd_ress)
except Exception:
break
if name=="main":
HOST,PORT="localhost",9999
server=socketserver.ThreadingTCPServer((HOST,PORT),MyTCPHandle)
server.serve_forever()
#client
import socket
ip_port = ('127.0.0.1',9999)
sk = socket.socket()
sk.connect(ip_port)
while True:
msg=input('please input:')
if len(msg.strip())==0:
continue
sk.sendall(bytes(msg,'utf8'))
ack_recv=sk.recv(50)
ack_recv_res=str(ack_recv.decode()).split("|")
#print(ack_recv_res)
if ack_recv_res[0]=='cmd_result_size':
recv_size=eval(ack_recv_res[1])
sk.sendall(b'ready_ok')
recv=b''
receved=0
print("recv_size:",recv_size)
while receved<int(recv_size):
server_reply = sk.recv(10)
recv += server_reply
receved+=len(server_reply)
#print(recv)
#print("receved",receved)
else:
print(str(recv,'gbk'))
#print("recv----dine---")
#print(str(server_reply,'utf8'))
sk.close()
########
(22)多線程(IO密集型時能夠用多線程)
http://www.cnblogs.com/alex3714/articles/5230609.html
#直接調用
import threading,time,random
def do_func(n):
time.sleep(random.randint(1, 10))
print('線程%s'%n)
#time.sleep(10)
if name=='main':
res_list=[]
for i in range(10):
t=threading.Thread(target=do_func,args=[i,])#生成線程
t.start()
#print(t.getName())#打印線程名稱
res_list.append(t)
for i in res_list:
i.join()#等待線程結束
print("主線程")
#繼承調用
import threading,time
class MyThread(threading.Thread):
def init(self,n):
threading.Thread.init(self)
self.n=n
def run(self):#定義每一個線程要執行的函數,名稱必須是run
print('線程%s' %self.n)
#time.sleep(4)
if name=='main':
res_list=[]
for i in range(10):
t=MyThread(i)#生成線程
t.start()
res_list.append(t)
for i in res_list:
i.join()#等待線程結束
print('主線程')
#鎖
import threading,time,random
num=0
lock=threading.Lock()
def addsum(n):
global num
time.sleep(random.randint(1,10))
print('thread[%s]get the num:%s' % (n,num))
lock.acquire()
num+=1
lock.release()
if name=='main':
res_list=[]
for i in range(10):
t=threading.Thread(target=addsum,args=[i,])
t.start()
res_list.append(t)
for i in res_list:
i.join()
print('num:',num)
#信號量,線程池
import threading,time,random
def do_func(n):
semaphore.acquire()
time.sleep(3)
print('線程%s' % n)
semaphore.release()
if name=='main':
semaphore=threading.BoundedSemaphore(10)#同時最多容許10個線程運行
res_list=[]
for i in range(100):
t=threading.Thread(target=do_func,args=[i,])
t.start()
res_list.append(t)
for i in res_list:
i.join()
print("主線程")
#event
import threading,time
import random
def light():
if not event.isSet():
event.set() #wait就不阻塞 #綠燈狀態
count = 0
while True:
if count < 10:
print('\033[42;1m--green light on---\033[0m')
elif count <13:
print('\033[43;1m--yellow light on---\033[0m')
elif count <20:
if event.isSet():
event.clear()
print('\033[41;1m--red light on---\033[0m')
else:
count = 0
event.set() #打開綠燈
time.sleep(1)
count +=1
def car(n):
while 1:
time.sleep(random.randrange(10))
if event.isSet(): #綠燈
print("car [%s] is running.." % n)
else:
print("car [%s] is waiting for the red light.." %n)
event.wait()
if name == 'main':
event = threading.Event()
Light = threading.Thread(target=light)
Light.start()
for i in range(3):
t = threading.Thread(target=car,args=(i,))
t.start()
(23)多進程
######################################
#!/usr/bin/env python
#encoding=utf-8
from multiprocessing import Process,Lock
import time,os
def say(n):
print('nihao%s,進程[%s],個人父進程是[%s]'%(n,os.getpid(),os.getppid()))
if name=='main':
p_list=[]
for n in range(10):
p=Process(target=say,args=(n,))
p.start()
p_list.append(p)
for i in p_list:
p.join()
print('---main-----done------')
########################################多進程池Pool,可限制多進程數量
#!/usr/bin/env python
#encoding=utf-8
#進程池
from multiprocessing import Process,Pool
import time,os
def say(n):
time.sleep(2)
print('nihao%s,進程[%s],個人父進程是[%s]'%(n,os.getpid(),os.getppid()))
if name=='main':
p = Pool(processes=2)
p_list=[]
for n in range(10):
P=p.apply_async(say,(n,))
p_list.append(P)
p.close()
for i in p_list:
i.get()
print('---main-----done------主進程ID:',os.getpid())
##################################################
#進程間通訊multiprocessing.Queue
from multiprocessing import Process,Pool,Queue
def cmd_fun(q,n):
q.put([n,2,'sds'])
if name=='main':
q=Queue()#在主進程生成一個隊列
l_list=[]
for i in range(20):
p=Process(target=cmd_fun,args=(q,i))#生成一個子進程,並在子進程中向隊列中添加信息
p.start()
l_list.append(p)
for i in l_list:
i.join()
while q.qsize()>0:
print('在主進程中取隊列值:%s'%q.get())
#######################################
#進程間通訊multiprocessing.Manager
from multiprocessing import Process,Pool,Queue,Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.append(1)
print(l)
if name == 'main':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(5))
p_list = []
for i in range(10):
p = Process(target=f, args=(d, l))
p.start()
p_list.append(p)
for res in p_list:
res.join()
print(d)
print(l)
######################################多進程pool,paramiko
import multiprocessing
import os,time
import paramiko
hosts=['192.168.1.11','192.168.1.10']
#,'192.168.1.11','192.168.1.12','192.168.1.13']
username='root'
password='123456'
port=22
name={}
d_usage={}
def cmd(hostname):
if os.system('ping %s -c 1 &>/dev/null'%hostname)==0: paramiko.util.log_to_file('paramiko.log') s = paramiko.SSHClient() s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) s.connect(hostname,port,username,password) stdin,stdout,stderr=s.exec_command('df -kP') time.sleep(2) d_usage['%s'%hostname]= stdout.read() print '---------------------------------------------' print d_usage['%s'%hostname] print stdout.read() s.close() else: d_usage['%s'%hostname]='host Destination Host Unreachable' print 'host Destination Host Unreachable' name={'xk':[25,'male'],'zq':[23,'male'],}
p=multiprocessing.Pool(processes=10)
for hostname in hosts:
p.apply_async(cmd,('%s'%hostname,)) #time.sleep(10)
time.sleep(10)
p.close()
#p.join()
#print d_usage
(24)