概述html
1、線程池python
上下文管理mysql
終止線程redis
2、redissql
發佈訂閱數據庫
鏈接池express
服務器列表操做編程
事物api
3、rebbitMQ安全
基礎
4、MySQL
一、建立文件夾和文件
二、文件內部數據
表內部數據
inner join
left join
right join
5、python pymysql
6、sqlachemy
7、python ORM
8、Paramiko
線程池裏的with
import contextlib @contextlib.contextmanager #加上這個裝飾器 下面with就能夠調用 worker-state函數 管理上下文 def worker_state(state_list,worker_thread): """ 用於記錄線程中正在等待的線程數 :param state_list: 至關於free_list :param worker_thread: 當前線程 :return: """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) free_list = [] current_thread = "alex" with worker_state(free_list,current_thread): print(123) print(456) #執行順序:
#先執行with一行-->而後執行函數worker_state裏的state_list.append-->yield-->出去執行with裏的print-->再回到函數裏執行finally下的state_list.remove
自定義用with管理socket,不用關閉
import contextlib import socket @contextlib.contextmanager def context_socket(host,port): sk = socket.socket() sk.bind((host,port)) sk.listen(5) try: yield sk #至關於把sk返回給as sock的sock創建了鏈接 finally: sk.close() with context_socket('127.0.0.1',8888) as sock: print(sock)
之後若是涉及到須要close就能夠用with不用關閉
終止線程
設置空標識
上面把self.terminal 設置爲False,若是想中止線程,就設置爲True,讓event爲空。
import queue import threading import contextlib import time StopEvent = object() #能夠設置爲空,是爲了終止進程 class ThreadPool(): def __init__(self, max_num, max_task_num = None): if max_task_num: #若是max_task_num有值就建立隊列 self.q = queue.Queue(max_task_num) #建立隊列,最多放max_task_num個任務 else: #若max_task_num無值,則不定義最大長度 self.q = queue.Queue() #建立先進先出隊列 self.max_num = max_num #表示最多有多少個線程 self.cancel = False self.terminal = False self.generate_list = [] #表示當前已經建立多少個線程 self.free_list = [] #當前還空閒多少個線程 def run(self, func, args, callback=None): """ 線程池執行一個任務 :param func: 任務函數 :param args: 任務函數所需參數 :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數) :return: 若是線程池已經終止,則返回True不然None """ if self.cancel: return # 判斷 沒有空閒線程而且已經建立的線程小於最大線程數 if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() #調用建立線程的方法 w = (func, args, callback,) #把傳過來的三個參數放到一個元組裏 self.q.put(w) #把任務放到隊列中 def generate_thread(self): """ 建立一個線程 """ t = threading.Thread(target=self.call)#建立的線程執行call方法 t.start() #執行 def call(self): """ 循環去獲取任務函數並執行任務函數 """ current_thread = threading.currentThread #返回當前線程對象 self.generate_list.append(current_thread) #把當前建立的線程放到列表裏 event = self.q.get() #獲取隊列中的任務 while event != StopEvent: #任務不等於後面的值就會一直循環 func, arguments, callback = event try: #捕捉異常 result = func(*arguments) #執行action函數 success = True except Exception as e: success = False result = None if callback is not None: try: #捕捉異常 callback(success, result) except Exception as e: pass #使線程變爲空閒,以便下次再取任務 #等待着,等待着...... #有任務來了 #當前線程狀態,再也不空閒 with self.worker_state(self.free_list, current_thread): if self.terminal: ###########讓線程終止############ event = StopEvent else: event = self.q.get() else: self.generate_list.remove(current_thread) #移除當前線程 終止線程 def close(self): """ 執行完全部的任務後,全部線程中止 """ self.cancel = True full_size = len(self.generate_list) #獲取建立的線程個數 while full_size: self.q.put(StopEvent) #往隊列裏放入空值,空值的個數和建立線程的個數相等 full_size -= 1 def terminate(self): """ 不管是否還有任務,終止線程 """ self.terminal = True while self.generate_list: #若列表還有建立的線程 self.q.put(StopEvent) #把空值放入隊列中 self.q.empty() #隊列爲空返回True,不然返回False @contextlib.contextmanager def worker_state(self, state_list, worker_thread): """ 用於記錄線程中正在等待的線程數 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) pool = ThreadPool(5) #線程池裏只能放5個 def callback(status, result): # status, execute action status # result, execute action return value pass def action(i): print(i) for i in range(300): ret = pool.run(action, (i,), callback) #執行run方法,參數(函數,i,函數名)
RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。
MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消 息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過 隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。
RabbitMQ安裝
安裝配置epel源 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 安裝erlang $ yum -y install erlang 安裝RabbitMQ $ yum -y install rabbitmq-server
啓動中止服務:service rabbitmq-server start/stop
安裝API
pip install pika or easy_install pika or 源碼 https://pypi.python.org/pypi/pika
使用API操做RabbitMQ
能夠基於Queue實現生產者消費者模型
對於RabbitMQ來講,生產和消費再也不針對內存裏的一個Queue對象,而是某臺服務器上的RabbitMQ Server實現的消息隊列。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) channel = connection.channel() channel.queue_declare(queue='hello2') #ch=頻道,method=隊列名字,properties=屬性,body=取到的內容 def callback(ch,method,properties,body): print("[x]Received%r"%body) #到hello2中取數據,取完調用callback函數 channel.basic_consume(callback,queue='hello2',no_ack=True) print('[*]Waiting for messages. To exit press CTRL+C') channel.start_consuming()
import pika #建立鏈接 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) #建立頻道 channel = connection.channel() #定義隊列,若是存在忽略,不存在就建立 channel.queue_declare(queue='hello2') #往hello2隊列中發內容hello world! channel.basic_publish(exchange='',routing_key='hello2',body='hello world!') print("[x]Sent 'hello world!'") connection.close()
acknowledgment 消息不丟失
no-ack = False,若是消費者遇到狀況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼,RabbitMQ會從新將該任務添加到隊列中。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) channel = connection.channel() channel.exchange_declare(exchange='logs_fanout',type='fanout') # 隨機建立隊列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # exchange和隊列作綁定 channel.queue_bind(exchange='logs_fanout',queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()
狀況一:no_ack=true (此時爲自動應答)
在這種狀況下,consumer 會在接收到 Basic.Deliver + Content-Header + Content-Body 以後,當即回覆 Ack 。而這個 Ack 是 TCP 協議中的 Ack 。此 Ack 的回覆不關心 consumer 是否對接收到的數據進行了處理,固然也不關心處理數據所須要的耗時。
狀況二:no_ack=false (此時爲手動應答)
在這種狀況下,要求 consumer 在處理完接收到的 Basic.Deliver + Content-Header + Content-Body 以後纔回復 Ack 。而這個 Ack 是 AMQP 協議中的 Basic.Ack 。此 Ack 的回覆是和業務處理相關的,因此具體的回覆時間應該要取決於業務處理的耗時。
隨機建立隊列
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) channel = connection.channel() channel.exchange_declare(exchange='logs_fanout',type='fanout') # 隨機建立隊列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # exchange和隊列作綁定 channel.queue_bind(exchange='logs_fanout',queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()
import pika #建立鏈接 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) #建立頻道 channel = connection.channel() #建立exchange channel.exchange_declare(exchange='logs_fanout',type='fanout') message = '123' #發佈 channel.basic_publish(exchange='logs_fanout',routing_key='',body=message) print("[x] Sent %r"%message) connection.close()
消息獲取順序
默認消息隊列裏的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。
channel.basic_qos(prefetch_count=1) 表示誰來誰取,再也不按照奇偶數排列
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) channel = connection.channel() # make message persistent channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
發佈訂閱
發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給全部的訂閱者,而消息隊列中的數據被消費一次便消失。因此,RabbitMQ實現發佈和訂閱時,會爲每個訂閱者建立一個隊列,而發佈者發佈消息時,會將消息放置在全部相關隊列中。
exchange type = fanout
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
關鍵字發送
exchange type = direct
以前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs_test',type='direct') result = channel.queue_declare(exclusive=True) queque_name = result.method.queue severities = ['error','info','warning'] for severity in severities: channel.queue_bind(exchange='direct_logs_test',queue=queque_name,routing_key=severity) print('[*] Waiting for logs. To exit press CTRL+C') def calback(ch,method,properties,body): print("[x] %r:%r"%(method.routing_key,body)) channel.basic_consume(calback,queue=queque_name,no_ack=True) channel.start_consuming()
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) channet = connection.channel() channet.exchange_declare(exchange='direct_logs_test',type='direct') result = channet.queue_declare(exclusive=True) queque_name = result.method.queue severities = ['error',] for severity in severities: channet.queue_bind(exchange='direct_logs_test',queue=queque_name,routing_key=severity) print('[*] Waiting for logs. To exit press CTRL+C') def calback(ch,method,properties,body): print("[x] %r:%r"%(method.routing_key,body)) channet.basic_consume(calback,queue=queque_name,no_ack=True) channet.start_consuming()
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs_test',type='direct') severity = 'error' message = '123' channel.basic_publish(exchange='direct_logs_test',routing_key=severity,body=message) print("[x] Sent %r:%r"%(severity,message)) connection.close()
模糊匹配
exchange type = topic
在topic類型下,可讓隊列綁定幾個模糊的關鍵字,以後發送者將數據發送到exchange,exchange將傳入」路由值「和 」關鍵字「進行匹配,匹配成功,則將數據發送到指定隊列。
發送者路由值 隊列中 old.boy.python old.* -- 不匹配 old.boy.python old.# -- 匹配
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
Demo:
##############producer.py################ import pika import sys #建立鏈接connection到localhost con = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #建立虛擬鏈接channel cha = con.channel() #建立隊列anheng,durable參數爲真時,隊列將持久化;exclusive爲真時,創建臨時隊列 result=cha.queue_declare(queue='anheng',durable=True,exclusive=False) #建立名爲yanfa,類型爲fanout的exchange,其餘類型還有direct和topic,若是指定durable爲真,exchange將持久化 cha.exchange_declare(durable=False, exchange='yanfa', type='direct',) #綁定exchange和queue,result.method.queue獲取的是隊列名稱 cha.queue_bind(exchange='yanfa', queue=result.method.queue, routing_key='',) #公平分發,使每一個consumer在同一時間最多處理一個message,收到ack前,不會分配新的message cha.basic_qos(prefetch_count=) #發送信息到隊列‘anheng' message = ' '.join(sys.argv[:]) #消息持久化指定delivery_mode=; cha.basic_publish(exchange='', routing_key='anheng', body=message, properties=pika.BasicProperties( delivery_mode = , )) print '[x] Sent %r' % (message,) #關閉鏈接 con.close() ################consumer.py################## # 博客 http://www.cnblogs.com/duanv/ import pika #創建鏈接connection到localhost con = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #建立虛擬鏈接channel cha = con.channel() #建立隊列anheng result=cha.queue_declare(queue='anheng',durable=True) #建立名爲yanfa,類型爲fanout的交換機,其餘類型還有direct和topic cha.exchange_declare(durable=False, exchange='yanfa', type='direct',) #綁定exchange和queue,result.method.queue獲取的是隊列名稱 cha.queue_bind(exchange='yanfa', queue=result.method.queue, routing_key='',) #公平分發,使每一個consumer在同一時間最多處理一個message,收到ack前,不會分配新的message cha.basic_qos(prefetch_count=) print ' [*] Waiting for messages. To exit press CTRL+C' #定義回調函數 def callback(ch, method, properties, body): print " [x] Received %r" % (body,) ch.basic_ack(delivery_tag = method.delivery_tag) cha.basic_consume(callback, queue='anheng', no_ack=False,) cha.start_consuming()
1、概念:
Connection: 一個TCP的鏈接。Producer和Consumer都是經過TCP鏈接到RabbitMQ Server的。程序的起始處就是創建這個TCP鏈接。
Channels: 虛擬鏈接。創建在上述的TCP鏈接中。數據流動都是在Channel中進行的。通常狀況是程序起始創建TCP鏈接,第二步就是創建這個Channel。
2、隊列:
首先創建一個Connection,而後創建Channels,在channel上創建隊列
創建時指定durable參數爲真,隊列將持久化;指定exclusive爲真,隊列爲臨時隊列,關閉consumer後該隊列將再也不存在,通常狀況下創建臨時隊列並不指定隊列名稱,rabbitmq將隨機起名,經過result.method.queue來獲取隊列名:
result = channel.queue_declare(exclusive=True)
result.method.queue
區別:durable是隊列持久化與否,若是爲真,隊列將在rabbitmq服務重啓後仍存在,若是爲假,rabbitmq服務重啓前不會消失,與consumer關閉與否無關;
而exclusive是創建臨時隊列,當consumer關閉後,該隊列就會被刪除
3、exchange和bind
Exchange中durable參數指定exchange是否持久化,exchange參數指定exchange名稱,type指定exchange類型。Exchange類型有direct,fanout和topic。
Bind是將exchange與queue進行關聯,exchange參數和queue參數分別指定要進行bind的exchange和queue,routing_key爲可選參數。
Exchange的三種模式:
Direct:
任何發送到Direct Exchange的消息都會被轉發到routing_key中指定的Queue 1.通常狀況可使用rabbitMQ自帶的Exchange:」」(該Exchange的名字爲空字符串); 2.這種模式下不須要將Exchange進行任何綁定(bind)操做; 3.消息傳遞時須要一個「routing_key」,能夠簡單的理解爲要發送到的隊列名字; 4.若是vhost中不存在routing_key中指定的隊列名,則該消息會被拋棄。 Demo中雖然聲明瞭一個exchange='yanfa'和queue='anheng'的bind,可是在後面發送消息時並無使用該exchange和bind,而是採用了direct的模式,沒有指定exchange,而是指定了routing_key的名稱爲隊列名,消息將發送到指定隊列。 若是一個exchange 聲明爲direct,而且bind中指定了routing_key,那麼發送消息時須要同時指明該exchange和routing_key.
Fanout:
任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的全部Queue上 1.能夠理解爲路由表的模式 2.這種模式不須要routing_key 3.這種模式須要提早將Exchange與Queue進行綁定,一個Exchange能夠綁定多個Queue,一個Queue能夠同多個Exchange進行綁定。 4.若是接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄。 Demo中建立了一個將一個exchange和一個queue進行fanout類型的bind.可是發送信息時沒有用到它,若是要用到它,只要在發送消息時指定該exchange的名稱便可,該exchange就會將消息發送到全部和它bind的隊列中。在fanout模式下,指定的routing_key是無效的 。
Topic:
任何發送到Topic Exchange的消息都會被轉發到全部關心routing_key中指定話題的Queue上 1.這種模式較爲複雜,簡單來講,就是每一個隊列都有其關心的主題,全部的消息都帶有一個「標題」(routing_key),Exchange會將消息轉發到全部關注主題能與routing_key模糊匹配的隊列。 2.這種模式須要routing_key,也許要提早綁定Exchange與Queue。 3.在進行綁定時,要提供一個該隊列關心的主題,如「#.log.#」表示該隊列關心全部涉及log的消息(一個routing_key爲」MQ.log.error」的消息會被轉發到該隊列)。 4.「#」表示0個或若干個關鍵字,「*」表示一個關鍵字。如「log.*」能與「log.warn」匹配,沒法與「log.warn.timeout」匹配;可是「log.#」能與上述二者匹配。 5.一樣,若是Exchange沒有發現可以與routing_key匹配的Queue,則會拋棄此消息。
4、任務分發
1.Rabbitmq的任務是循環分發的,若是開啓兩個consumer,producer發送的信息是輪流發送到兩個consume的。 2.在producer端使用cha.basic_publish()來發送消息,其中body參數就是要發送的消息,properties=pika.BasicProperties(delivery_mode = 2,)啓用消息持久化,能夠防止RabbitMQ Server 重啓或者crash引發的數據丟失。 3.在接收端使用cha.basic_consume()無限循環監聽,若是設置no-ack參數爲真,每次Consumer接到數據後,而無論是否處理完成,RabbitMQ Server會當即把這個Message標記爲完成,而後從queue中刪除了。爲了保證數據不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。爲了保證數據能被正確處理而不只僅是被Consumer收到,那麼咱們不能採用no-ack。而應該是在處理完數據後發送ack。 在處理數據後發送的ack,就是告訴RabbitMQ數據已經被接收,處理完成,RabbitMQ能夠去安全的刪除它了。若是Consumer退出了可是沒有發送ack,那麼RabbitMQ就會把這個Message發送到下一個Consumer。這樣就保證了在Consumer異常退出的狀況下數據也不會丟失。 這裏並無用到超時機制。RabbitMQ僅僅經過Consumer的鏈接中斷來確認該Message並無被正確處理。也就是說,RabbitMQ給了Consumer足夠長的時間來作數據處理。 Demo的callback方法中ch.basic_ack(delivery_tag = method.delivery_tag)告訴rabbitmq消息已經正確處理。若是沒有這條代碼,Consumer退出時,Message會從新分發。而後RabbitMQ會佔用愈來愈多的內存,因爲RabbitMQ會長時間運行,所以這個「內存泄漏」是致命的。去調試這種錯誤,能夠經過一下命令打印un-acked Messages: sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged 4.公平分發:設置cha.basic_qos(prefetch_count=1),這樣RabbitMQ就會使得每一個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。
5、注意:
生產者和消費者都應該聲明創建隊列,網上教程上說第二次建立若是參數和第一次不同,那麼該操做雖然成功,可是queue的屬性並不會被修改。
可能由於版本問題,在個人測試中若是第二次聲明創建的隊列屬性和第一次不徹底相同,將報相似這種錯406, "PRECONDITION_FAILED - parameters for queue 'anheng' in vhost '/' not equivalent"
若是是exchange第二次建立屬性不一樣,將報這種錯406, "PRECONDITION_FAILED - cannot redeclare exchange 'yanfa' in vhost '/' with different type, durable, internal or autodelete value"
若是第一次聲明創建隊列也出現這個錯誤,說明以前存在名字相同的隊列且本次聲明的某些屬性和以前聲明不一樣,可經過命令sudo rabbitmqctl list_queues查看當前有哪些隊列。解決方法是聲明創建另外一名稱的隊列或刪除原有隊列,若是原有隊列是非持久化的,可經過重啓rabbitmq服務刪除原有隊列,若是原有隊列是持久化的,只能刪除它所在的vhost,而後再重建vhost,再設置vhost的權限(先確認該vhost中沒有其餘有用隊列)。
用戶受權
用戶管理:
建立用戶 create user '用戶名'@'IP地址' identified by '密碼'; 刪除用戶 drop user '用戶名'@'IP地址'; 修改用戶 rename user '用戶名'@'IP地址'; to '新用戶名'@'IP地址';; 修改密碼 set password for '用戶名'@'IP地址' = Password('新密碼') PS:用戶權限相關數據保存在mysql數據庫的user表中,因此也能夠直接對其進行操做(不建議)
受權管理:
show grants for '用戶'@'IP地址' -- 查看權限 grant 權限 on 數據庫.表 to '用戶'@'IP地址' -- 受權 revoke 權限 on 數據庫.表 from '用戶'@'IP地址' -- 取消權限
all privileges 除grant外的全部權限 select 僅查權限 select,insert 查和插入權限 ... usage 無訪問權限 alter 使用alter table alter routine 使用alter procedure和drop procedure create 使用create table create routine 使用create procedure create temporary tables 使用create temporary tables create user 使用create user、drop user、rename user和revoke all privileges create view 使用create view delete 使用delete drop 使用drop table execute 使用call和存儲過程 file 使用select into outfile 和 load data infile grant option 使用grant 和 revoke index 使用index insert 使用insert lock tables 使用lock table process 使用show full processlist select 使用select show databases 使用show databases show view 使用show view update 使用update reload 使用flush shutdown 使用mysqladmin shutdown(關閉MySQL) super 使用change master、kill、logs、purge、master和set global。還容許mysqladmin調試登錄 replication client 服務器位置的訪問 replication slave 由複製從屬使用
對於目標數據庫以及內部其餘: 數據庫名.* 數據庫中的全部 數據庫名.表 指定數據庫中的某張表 數據庫名.存儲過程 指定數據庫中的存儲過程 *.* 全部數據庫
用戶名@IP地址 用戶只能在改IP下才能訪問 用戶名@192.168.1.% 用戶只能在改IP段下才能訪問(通配符%表示任意) 用戶名@% 用戶能夠再任意IP下訪問(默認IP地址爲%)
grant all privileges on db1.tb1 TO '用戶名'@'IP' grant select on db1.* TO '用戶名'@'IP' grant select,insert on *.* TO '用戶名'@'IP' revoke select on db1.tb1 from '用戶名'@'IP'
表操做
一、建立表
是否可空,null表示空,非字符串 not null - 不可空 null - 可空
默認值,建立列時能夠指定默認值,當插入數據時若是未主動設置,則自動添加默認值 create table tb1( nid int not null defalut 2, num int not null )
自增,若是爲某列設置自增列,插入數據時無需設置此列,默認將自增(表中只能有一個自增列) create table tb1( nid int not null auto_increment primary key, num int null ) 或 create table tb1( nid int not null auto_increment, num int null, index(nid) ) 注意:1、對於自增列,必須是索引(含主鍵)。 2、對於自增能夠設置步長和起始值 show session variables like 'auto_inc%'; set session auto_increment_increment=2; set session auto_increment_offset=10; shwo global variables like 'auto_inc%'; set global auto_increment_increment=2; set global auto_increment_offset=10;
主鍵,一種特殊的惟一索引,不容許有空值,若是主鍵使用單個列,則它的值必須惟一,若是是多列,則其組合必須惟一。 create table tb1( nid int not null auto_increment primary key, num int null ) 或 create table tb1( nid int not null, num int not null, primary key(nid,num) )
外鍵,一個特殊的索引,只能是指定內容 creat table color( nid int not null primary key, name char(16) not null ) create table fruit( nid int not null primary key, smt char(32) null , color_id int not null, constraint fk_cc foreign key (color_id) references color(nid) )
二、刪除表
drop table 表名
三、清空表
delete from 表名 truncate table 表名
四、修改表
添加列:alter table 表名 add 列名 類型 刪除列:alter table 表名 drop column 列名 修改列: alter table 表名 modify column 列名 類型; -- 類型 alter table 表名 change 原列名 新列名 類型; -- 列名,類型 添加主鍵: alter table 表名 add primary key(列名); 刪除主鍵: alter table 表名 drop primary key; alter table 表名 modify 列名 int, drop primary key; 添加外鍵:alter table 從表 add constraint 外鍵名稱(形如:FK_從表_主表) foreign key 從表(外鍵字段) references 主表(主鍵字段); 刪除外鍵:alter table 表名 drop foreign key 外鍵名稱 修改默認值:ALTER TABLE testalter_tbl ALTER i SET DEFAULT 1000; 刪除默認值:ALTER TABLE testalter_tbl ALTER i DROP DEFAULT;
五、基本數據類型
MySQL的數據類型大體分爲:數值、時間和字符串
http://www.runoob.com/mysql/mysql-data-types.html
Windows能夠安裝Navicat for mysql操做數據庫。
insert into 表 (列名,列名...) values (值,值,值...) insert into 表 (列名,列名...) values (值,值,值...),(值,值,值...) insert into 表 (列名,列名...) select (列名,列名...) from 表
delete from 表 delete from 表 where id=1 and name='alex'
update 表 set name = 'alex' where id>1
select * from 表 select * from 表 where id > 1 select nid,name,gender as gg from 表 where id > 1
1、條件 select * from 表 where id > 1 and name != 'alex' and num = 12; select * from 表 where id between 5 and 16; select * from 表 where id in (11,22,33) select * from 表 where id not in (11,22,33) select * from 表 where id in (select nid from 表) 2、通配符 select * from 表 where name like 'ale%' - ale開頭的全部(多個字符串) select * from 表 where name like 'ale_' - ale開頭的全部(一個字符) 3、限制 select * from 表 limit 5; - 前5行 select * from 表 limit 4,5; - 從第4行開始的5行 select * from 表 limit 5 offset 4 - 從第4行開始的5行 4、排序 select * from 表 order by 列 asc - 根據 「列」 從小到大排列 select * from 表 order by 列 desc - 根據 「列」 從大到小排列 select * from 表 order by 列1 desc,列2 asc - 根據 「列1」 從大到小排列,若是相同則按列2從小到大排序 5、分組 select num from 表 group by num select num,nid from 表 group by num,nid select num,nid from 表 where nid > 10 group by num,nid order nid desc select num,nid,count(*),sum(score),max(score),min(score) from 表 group by num,nid select num from 表 group by num having max(id) > 10 特別的:group by 必須在where以後,order by以前 6、連表 無對應關係則不顯示 select A.num, A.name, B.name from A,B Where A.nid = B.nid 無對應關係則不顯示 select A.num, A.name, B.name from A inner join B on A.nid = B.nid A表全部顯示,若是B中無對應關係,則值爲null select A.num, A.name, B.name from A left join B on A.nid = B.nid B表全部顯示,若是B中無對應關係,則值爲null select A.num, A.name, B.name from A right join B on A.nid = B.nid 7、組合 組合,自動處理重合 select nickname from A union select name from B 組合,不處理重合 select nickname from A union all select name from B
pymsql是Python中操做MySQL的模塊,其使用方法和MySQLdb幾乎相同。
1、下載安裝:
pip3 install pymysql
2、使用
一、執行SQL
#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql # 建立鏈接 conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1') # 建立遊標 cursor = conn.cursor() # 執行SQL,並返回收影響行數 effect_row = cursor.execute("update hosts set host = '1.1.1.2'") # 執行SQL,並返回受影響行數 #effect_row = cursor.execute("update hosts set host = '1.1.1.2' where nid > %s", (1,)) # 執行SQL,並返回受影響行數 #effect_row = cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)]) # 提交,否則沒法保存新建或者修改的數據 conn.commit() # 關閉遊標 cursor.close() # 關閉鏈接 conn.close()
二、獲取新建立數據自增ID
#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1') cursor = conn.cursor() cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)]) conn.commit() cursor.close() conn.close() # 獲取最新自增ID new_id = cursor.lastrowid
三、獲取查詢數據
#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1') cursor = conn.cursor() cursor.execute("select * from hosts") # 獲取第一行數據 row_1 = cursor.fetchone() # 獲取前n行數據 # row_2 = cursor.fetchmany(3) # 獲取全部數據 # row_3 = cursor.fetchall() conn.commit() cursor.close() conn.close()
注:在fetch數據時按照順序進行,可使用cursor.scroll(num,mode)來移動遊標位置,如:
四、fetch數據類型
關於默認獲取的數據是元祖類型,若是想要或者字典類型的數據,即:
#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1') # 遊標設置爲字典類型 cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) r = cursor.execute("call p1()") result = cursor.fetchone() conn.commit() cursor.close() conn.close()
SQLAlchemy是Python編程語言下的一款ORM框架,該框架創建在數據庫API之上,使用關係對象映射進行數據庫操做,簡言之即是:將對象轉換成SQL,而後使用數據API執行SQL並獲取執行結果。
Dialect用於和數據API進行交流,根據配置文件的不一樣調用不一樣的數據庫API,從而實現對數據庫的操做,如:
MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多詳見:http://docs.sqlalchemy.org/en/latest/dialects/index.html
步驟一:
使用 Engine/ConnectionPooling/Dialect 進行數據庫操做,Engine使用ConnectionPooling鏈接數據庫,而後再經過Dialect執行SQL語句。
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) engine.execute( "INSERT INTO ts_test (a, b) VALUES ('2', 'v1')" ) engine.execute( "INSERT INTO ts_test (a, b) VALUES (%s, %s)", ((555, "v1"),(666, "v1"),) ) engine.execute( "INSERT INTO ts_test (a, b) VALUES (%(id)s, %(name)s)", id=999, name="v1" ) result = engine.execute('select * from ts_test') result.fetchall()
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) # 事務操做 with engine.begin() as conn: conn.execute("insert into table (x, y, z) values (1, 2, 3)") conn.execute("my_special_procedure(5)") conn = engine.connect() # 事務操做 with conn.begin(): conn.execute("some statement", {'x':5, 'y':10})
注:查看數據庫鏈接:show status like 'Threads%';
步驟二:
使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 進行數據庫操做。Engine使用Schema Type建立一個特定的結構對象,以後經過SQL Expression Language將該對象轉換成SQL語句,而後經過 ConnectionPooling 鏈接數據庫,再而後經過 Dialect 執行SQL,並獲取結果。
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey metadata = MetaData() user = Table('user', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) color = Table('color', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) metadata.create_all(engine) # metadata.clear() # metadata.remove()
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey metadata = MetaData() user = Table('user', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) color = Table('color', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) conn = engine.connect() # 建立SQL語句,INSERT INTO "user" (id, name) VALUES (:id, :name) conn.execute(user.insert(),{'id':7,'name':'seven'}) conn.close() # sql = user.insert().values(id=123, name='wu') # conn.execute(sql) # conn.close() # sql = user.delete().where(user.c.id > 1) # sql = user.update().values(fullname=user.c.name) # sql = user.update().where(user.c.name == 'jack').values(name='ed') # sql = select([user, ]) # sql = select([user.c.id, ]) # sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id) # sql = select([user.c.name]).order_by(user.c.name) # sql = select([user]).group_by(user.c.name) # result = conn.execute(sql) # print result.fetchall() # conn.close()
更多內容詳見:
http://www.jianshu.com/p/e6bba189fcbd
http://docs.sqlalchemy.org/en/latest/core/expression_api.html
注:SQLAlchemy沒法修改表結構,若是須要可使用SQLAlchemy開發者開源的另一個軟件Alembic來完成。
步驟三:
使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 全部組件對數據進行操做。根據類建立對象,對象轉換成SQL,執行SQL。
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(50)) # 尋找Base的全部子類,按照子類的結構在數據庫中生成對應的數據表信息 # Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() # ########## 增 ########## # u = User(id=2, name='sb') # session.add(u) # session.add_all([ # User(id=3, name='sb'), # User(id=4, name='sb') # ]) # session.commit() # ########## 刪除 ########## # session.query(User).filter(User.id > 2).delete() # session.commit() # ########## 修改 ########## # session.query(User).filter(User.id > 2).update({'cluster_id' : 0}) # session.commit() # ########## 查 ########## # ret = session.query(User).filter_by(name='sb').first() # ret = session.query(User).filter_by(name='sb').all() # print ret # ret = session.query(User).filter(User.name.in_(['sb','bb'])).all() # print ret # ret = session.query(User.name.label('name_label')).all() # print ret,type(ret) # ret = session.query(User).order_by(User.id).all() # print ret # ret = session.query(User).order_by(User.id)[1:3] # print ret # session.commit()
2、ORM功能使用
使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 全部組件對數據進行操做。根據類建立對象,對象轉換成SQL,執行SQL。
一、建立表
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5) Base = declarative_base() # 建立單表 class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32)) extra = Column(String(16)) __table_args__ = ( UniqueConstraint('id', 'name', name='uix_id_name'), Index('ix_id_name', 'name', 'extra'), ) # 一對多 class Favor(Base): __tablename__ = 'favor' nid = Column(Integer, primary_key=True) caption = Column(String(50), default='red', unique=True) class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) favor_id = Column(Integer, ForeignKey("favor.nid")) # 多對多 class Group(Base): __tablename__ = 'group' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) port = Column(Integer, default=22) class Server(Base): __tablename__ = 'server' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) class ServerToGroup(Base): __tablename__ = 'servertogroup' nid = Column(Integer, primary_key=True, autoincrement=True) server_id = Column(Integer, ForeignKey('server.id')) group_id = Column(Integer, ForeignKey('group.id')) def init_db(): Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine)
二、操做表
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5) Base = declarative_base() # 建立單表 class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32)) extra = Column(String(16)) __table_args__ = ( UniqueConstraint('id', 'name', name='uix_id_name'), Index('ix_id_name', 'name', 'extra'), ) def __repr__(self): return "%s-%s" %(self.id, self.name) # 一對多 class Favor(Base): __tablename__ = 'favor' nid = Column(Integer, primary_key=True) caption = Column(String(50), default='red', unique=True) def __repr__(self): return "%s-%s" %(self.nid, self.caption) class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) favor_id = Column(Integer, ForeignKey("favor.nid")) # 與生成表結構無關,僅用於查詢方便 favor = relationship("Favor", backref='pers') # 多對多 class ServerToGroup(Base): __tablename__ = 'servertogroup' nid = Column(Integer, primary_key=True, autoincrement=True) server_id = Column(Integer, ForeignKey('server.id')) group_id = Column(Integer, ForeignKey('group.id')) group = relationship("Group", backref='s2g') server = relationship("Server", backref='s2g') class Group(Base): __tablename__ = 'group' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) port = Column(Integer, default=22) # group = relationship('Group',secondary=ServerToGroup,backref='host_list') class Server(Base): __tablename__ = 'server' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) def init_db(): Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine) Session = sessionmaker(bind=engine) session = Session()
增
obj = Users(name="alex0", extra='sb') session.add(obj) session.add_all([ Users(name="alex1", extra='sb'), Users(name="alex2", extra='sb'), ]) session.commit()
刪
session.query(Users).filter(Users.id > 2).delete()
session.commit()
改
session.query(Users).filter(Users.id > 2).update({"name" : "099"}) session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False) session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate") session.commit()
查
ret = session.query(Users).all() ret = session.query(Users.name, Users.extra).all() ret = session.query(Users).filter_by(name='alex').all() ret = session.query(Users).filter_by(name='alex').first()
其餘
# 條件 ret = session.query(Users).filter_by(name='alex').all() ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all() ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() ret = session.query(Users).filter(Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all() from sqlalchemy import and_, or_ ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() ret = session.query(Users).filter( or_( Users.id < 2, and_(Users.name == 'eric', Users.id > 3), Users.extra != "" )).all() # 通配符 ret = session.query(Users).filter(Users.name.like('e%')).all() ret = session.query(Users).filter(~Users.name.like('e%')).all() # 限制 ret = session.query(Users)[1:2] # 排序 ret = session.query(Users).order_by(Users.name.desc()).all() ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 分組 from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() # 連表 ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() ret = session.query(Person).join(Favor).all() ret = session.query(Person).join(Favor, isouter=True).all() # 組合 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all()
paramiko模塊,基於SSH用於鏈接遠程服務器並執行相關操做。
1、安裝
pip3 install paramiko
2、使用
SSHClient
用於鏈接遠程服務器並執行基本命令
基於用戶名密碼鏈接:
import paramiko # 建立SSH對象 ssh = paramiko.SSHClient() # 容許鏈接不在know_hosts文件中的主機 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 鏈接服務器 ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', password='123') # 執行命令 stdin, stdout, stderr = ssh.exec_command('ls') # 獲取命令結果 result = stdout.read() # 關閉鏈接 ssh.close()