import requests
#基於urllib封裝
# url = 'http://aliuwmp3.changba.com/userdata/userwork/1128447787.mp3'
# d = {'stu_name':'礦泉水'}
# r = requests.get(url) # get 請求
# print('json',r.json()) #返回一個字典
# print('text',r.text) #返回一個字符串
# print('content',r.content) #返回一個bytes二進制結果 主要用來下載
# fw = open('ldh.mp3','wb')
# fw.write(r.content)
# fw.close()
#post請求
# url = 'http://api.nnzhp.cn/api/user/login'
# data = {'username':'niuhanyang','passwd':'aA123456'}
# r = requests.post(url,data)
# print(r.text)
#文件上傳
# http://q4.qlogo.cn/g?b=qq&nk=921375025&s=140 獲取qq頭像
#http://api.nnzhp.cn/api/file/file_upload 上傳路徑
# url = 'http://api.nnzhp.cn/api/file/file_upload'
# r = requests.post(url,files={'file':open('ldh.mp3','rb')})
# print(r.text)
# r = requests.session()
# # r.get()
# result = r.post('http://api.nnzhp.cn/api/user/login',data={'username':'niuhanyang','passwd':'aA123456'})
# result = r.post('http://api.nnzhp.cn/api/user/login',params={'username':'niuhanyang','passwd':'aA123456'})
# print(result.text)
redis操做
# redis是一個數據庫 數據存在內存裏面 每秒支持10w次讀寫 性能高
import redis
r = redis.Redis(host='118.24.3.40',password='HK139bc&*',db=0,decode_responses=True)
# r.set('user','小白')新增
# print(r.get('name').decode()) 查詢
# string 類型數據
# r.delete('key')#刪除
# r.set('session','jkjkjkljkl',60)#設置session失效時間
# r.flushdb() #清除當前redis數據庫中的key
# r.flushall() #清除全部數據庫中的全部的key
# r.keys() #獲取當前數據庫裏面全部的key
# r.keys('xx*') 獲取以xx開頭的全部的key
# r.exists('key') #判斷這個key是否存在 存在返回1 不存在返回0
#hash類型數據
# r.hset('cnz_l','ldh','12345')
# r.hset('cnz_l','ldh2','12345') #插入修改hash類型數據
# print(r.hget('cnz_l','ldh')) #獲取hash數據
# r.hdel('cnz_l','ldh') #刪除hash中的某個key
# r.hgetall('cnz_l') #獲取hash中全部的key 和value
# r.hmset('cnz_l',{'key':'val','key1':'val1'}) #批量set數據
# r.type('key') #查看數據類型
redis遷移
import redis
a = redis.Redis(host='118.24.3.40',password='HK139bc&*',
db=15,decode_responses=True) #0-16
b = redis.Redis(host='118.24.3.40',password='HK139bc&*',
db=10,decode_responses=True) #0-16
for k in a.keys():
if a.type(k) == 'string':
value = a.get(k)
b.set(k,value)
elif a.type(k) == 'hash':
all_data = a.hgetall(k)
b.hmset(k,all_data)
else:
print('其餘類型不支持!')
接口開發
import flaskimport json#一、mock接口#二、給別人提供數據#flask web開發框架server = flask.Flask(__name__)import pymysqldef op_mysql(sql,many_tag=False): conn = pymysql.connect(host='118.24.3.40',user='jxz',password='123456', db='jxz',port=3306,charset='utf8', autocommit=True) cur = conn.cursor(pymysql.cursors.DictCursor) cur.execute(sql) if many_tag: result = cur.fetchall() # [{"id":1,"name":"xxx"},{"id":2,"name":"xxx"}] else: result = cur.fetchone() cur.close() conn.close() return result@server.route('/index')def login(): d = {"code":0,"msg":"登陸成功 niuhanyangq111!"} return json.dumps(d,ensure_ascii=False)@server.route('/get_data')def table_data(): table_list = ['app_myuser','app_product','app_student'] table_name = flask.request.args.get('table_name') limit = flask.request.args.get('limit',10) if table_name: if table_name in table_list: sql='select * from %s limit %s;'%(table_name,limit) print(sql) data = op_mysql(sql,True) else: data = {"code": -2, 'msg': "你沒有權限查看這個表裏面的數據!"} else: data = {"code":-1,'msg':"必填參數未填,請查看接口文檔"} return json.dumps(data,ensure_ascii=False)@server.route('/post_data',methods=['post'])def post(): username = flask.request.args.get('username')#參數在url裏面的話,用它 username2 = flask.request.values.get('username2')#參數在body裏面,用它 file = flask.request.files.get('f')#上傳文件 cookies = flask.request.cookies.get('f')#cookie headers = flask.request.headers.get('f')#headers # headers = flask.request.json.get('username')#json格式的 file.save(file.filename) return 'abc'server.run(host='0.0.0.0',port=8989,debug=True)#app_user,app_product,app_student