python學習 網絡編程(文件上傳下載) -- redis操做----接口開發

import requests
#基於urllib封裝
# url = 'http://aliuwmp3.changba.com/userdata/userwork/1128447787.mp3'
# d = {'stu_name':'礦泉水'}
# r = requests.get(url) # get 請求
# print('json',r.json()) #返回一個字典
# print('text',r.text) #返回一個字符串
# print('content',r.content) #返回一個bytes二進制結果 主要用來下載
# fw = open('ldh.mp3','wb')
# fw.write(r.content)
# fw.close()

#post請求
# url = 'http://api.nnzhp.cn/api/user/login'
# data = {'username':'niuhanyang','passwd':'aA123456'}
# r = requests.post(url,data)
# print(r.text)

#文件上傳
# http://q4.qlogo.cn/g?b=qq&nk=921375025&s=140 獲取qq頭像
#http://api.nnzhp.cn/api/file/file_upload 上傳路徑
# url = 'http://api.nnzhp.cn/api/file/file_upload'
# r = requests.post(url,files={'file':open('ldh.mp3','rb')})
# print(r.text)

# r = requests.session()
# # r.get()
# result = r.post('http://api.nnzhp.cn/api/user/login',data={'username':'niuhanyang','passwd':'aA123456'})
# result = r.post('http://api.nnzhp.cn/api/user/login',params={'username':'niuhanyang','passwd':'aA123456'})

# print(result.text)



redis操做

# redis是一個數據庫 數據存在內存裏面  每秒支持10w次讀寫  性能高
import redis
r = redis.Redis(host='118.24.3.40',password='HK139bc&*',db=0,decode_responses=True)
# r.set('user','小白')新增
# print(r.get('name').decode()) 查詢


# string 類型數據
# r.delete('key')#刪除
# r.set('session','jkjkjkljkl',60)#設置session失效時間
# r.flushdb() #清除當前redis數據庫中的key
# r.flushall() #清除全部數據庫中的全部的key
# r.keys() #獲取當前數據庫裏面全部的key
# r.keys('xx*') 獲取以xx開頭的全部的key
# r.exists('key') #判斷這個key是否存在 存在返回1 不存在返回0

#hash類型數據
# r.hset('cnz_l','ldh','12345')
# r.hset('cnz_l','ldh2','12345') #插入修改hash類型數據
# print(r.hget('cnz_l','ldh')) #獲取hash數據
# r.hdel('cnz_l','ldh') #刪除hash中的某個key
# r.hgetall('cnz_l') #獲取hash中全部的key 和value
# r.hmset('cnz_l',{'key':'val','key1':'val1'}) #批量set數據
# r.type('key') #查看數據類型

redis遷移
import redis

a = redis.Redis(host='118.24.3.40',password='HK139bc&*',
db=15,decode_responses=True) #0-16
b = redis.Redis(host='118.24.3.40',password='HK139bc&*',
db=10,decode_responses=True) #0-16

for k in a.keys():
if a.type(k) == 'string':
value = a.get(k)
b.set(k,value)
elif a.type(k) == 'hash':
all_data = a.hgetall(k)
b.hmset(k,all_data)
else:
print('其餘類型不支持!')

接口開發

import flaskimport json#一、mock接口#二、給別人提供數據#flask web開發框架server = flask.Flask(__name__)import pymysqldef op_mysql(sql,many_tag=False):    conn = pymysql.connect(host='118.24.3.40',user='jxz',password='123456',                           db='jxz',port=3306,charset='utf8',                           autocommit=True)    cur = conn.cursor(pymysql.cursors.DictCursor)    cur.execute(sql)    if many_tag:        result = cur.fetchall() # [{"id":1,"name":"xxx"},{"id":2,"name":"xxx"}]    else:        result = cur.fetchone()    cur.close()    conn.close()    return result@server.route('/index')def login():    d = {"code":0,"msg":"登陸成功 niuhanyangq111!"}    return json.dumps(d,ensure_ascii=False)@server.route('/get_data')def table_data():    table_list = ['app_myuser','app_product','app_student']    table_name = flask.request.args.get('table_name')    limit = flask.request.args.get('limit',10)    if table_name:        if table_name in table_list:            sql='select * from %s limit %s;'%(table_name,limit)            print(sql)            data = op_mysql(sql,True)        else:            data = {"code": -2, 'msg': "你沒有權限查看這個表裏面的數據!"}    else:        data = {"code":-1,'msg':"必填參數未填,請查看接口文檔"}    return json.dumps(data,ensure_ascii=False)@server.route('/post_data',methods=['post'])def post():    username = flask.request.args.get('username')#參數在url裏面的話,用它    username2 = flask.request.values.get('username2')#參數在body裏面,用它    file = flask.request.files.get('f')#上傳文件    cookies = flask.request.cookies.get('f')#cookie    headers = flask.request.headers.get('f')#headers    # headers = flask.request.json.get('username')#json格式的    file.save(file.filename)    return 'abc'server.run(host='0.0.0.0',port=8989,debug=True)#app_user,app_product,app_student
相關文章
相關標籤/搜索