API驗證說明html
API驗證: a. 發令牌: 靜態 PS: 隱患 key被別人獲取 b. 動態令牌 PS: (問題越嚴重)用戶生成的每一個令牌被黑客獲取到,都會破解 c. 高級版本 PS: 黑客網速快,會竊取, so要對數據加密 d. 終極版本 特色: 爲何要用API ? - 數據在傳輸過程當中,保證數據安全 你是如何設計的 ? - Tornado 中的加密Cookie相似 - 建立動態key md5(key + time)|time (Tornado中也是這麼作) - 限制 - 第一關: 時間 - 第二關: 算法規則 - 第三關: 已訪問的記錄 PS: 黑客網速快,會竊取, so要對數據加密
一、發令牌: 靜態(服務器、客戶端有相同的靜態key)算法
###客戶端 import requests key = "asdfasdfasdfasdf098712sdfs" response = requests.get("http://127.0.0.1:8000/api/asset.html",headers={'OpenKey':key}) print(response.text)
###服務端 def api_check(request): if request.method == "GET": #################靜態################ server_key = "adsfasdfzxcvzw241wdsaddsfsdf" # 應放在settings client_key = request.META.get("HTTP_OPENKEY") # META裏的Openkey字段會自動修改,需提早看META裏的字段 if server_key != client_key: return HttpResponse("key錯誤,你是非法用戶") return HttpResponse("歡迎光臨")
二、動態令牌
### 客戶端 import requests import time import hashlib ctime = time.time() key = "adsfasdfzxcvzw241wdsaddsfsdf" new_key = "%s|%s" % (key,ctime,) m = hashlib.md5() m.update(bytes(new_key,encoding="utf-8")) md5_key = m.hexdigest() # key進行hash new_time_key = "%s|%s" % (md5_key,ctime,) response = requests.get("http://127.0.0.1:8000/api/asset.html",headers={"openkey":new_time_key}) print(response.text)
#### 服務端 def api_check(request): if request.method == "GET": server_key = "adsfasdfzxcvzw241wdsaddsfsdf" # 應放在settings client_md5_time_key = request.META.get("HTTP_OPENKEY") client_md5_key, client_ctime = client_md5_time_key.split("|") temp = "%s|%s" % (server_key, client_ctime) m = hashlib.md5() m.update(bytes(temp, encoding='utf-8')) server_md5_key = m.hexdigest() if server_md5_key != client_md5_key: return HttpResponse("驗證失敗") return HttpResponse("歡迎光臨,訪問成功")
三、高級版本django
(1)、時間限制,超過自定義時間(自定義10s)不讓訪問json
(2)、hash限制,比對server和client的hash值api
(3)、記錄自定義時間內已訪問的hash_key,禁止重複訪問安全
### 客戶端 import requests import time import hashlib ctime = time.time() key = "adsfasdfzxcvzw241wdsaddsfsdf" new_key = "%s|%s" % (key,ctime,) m = hashlib.md5() m.update(bytes(new_key,encoding="utf-8")) md5_key = m.hexdigest() # key進行hash new_time_key = "%s|%s" % (md5_key,ctime,) response = requests.get("http://127.0.0.1:8000/api/asset.html",headers={"openkey":new_time_key}) print(response.text)
### 服務端(裝飾器版本) api_key_record = {} # 建立全局的字段,記錄自定義時間內已訪問的hash_key def API_check(func): def wrapper(request,*args,**kwarg): if request.method == "GET": import hashlib # ####################API認證################## client_md5_time_key = request.META.get("HTTP_OPENKEY") client_md5_key, client_ctime = client_md5_time_key.split("|") client_ctime = float(client_ctime) server_time = time.time() # 第一關 if server_time - client_ctime > 10: return HttpResponse("【第一關】時間有點長") # 第二關 tmp = "%s|%s" % (settings.AUTH_KEY, str(client_ctime)) m = hashlib.md5() m.update(bytes(tmp, encoding="utf-8")) server_md5_key = m.hexdigest() if server_md5_key != client_md5_key: return HttpResponse("【第二關】規則不正確,驗證碼錯誤") for k in list(api_key_record.keys()): v = api_key_record[k] if server_time > v: api_key_record.pop(k) # 第三關 if client_md5_time_key in api_key_record: return HttpResponse("【第三關】有人已經來過了") else: api_key_record[client_md5_time_key] = client_ctime + 10 # ####################API認證################## ret = func(request,*args,**kwarg) return ret return wrapper @API_check def asset(request): 。。。。。。 # 主函數
四、終極版本(加密)服務器
from Crypto.Cipher import AES from lib.conf.config import settings def encrypt(message): """ 數據加密 :param message: :return: """ key = settings.DATA_KEY cipher = AES.new(key, AES.MODE_CBC, key) ba_data = bytearray(message,encoding='utf-8') v1 = len(ba_data) v2 = v1 % 16 if v2 == 0: v3 = 16 else: v3 = 16 - v2 for i in range(v3): ba_data.append(v3) final_data = ba_data.decode('utf-8') msg = cipher.encrypt(final_data) # 要加密的字符串,必須是16個字節或16個字節的倍數 return msg def decrypt(msg): """ 數據解密 :param message: :return: """ from Crypto.Cipher import AES key = settings.DATA_KEY cipher = AES.new(key, AES.MODE_CBC, key) result = cipher.decrypt(msg) # result = b'\xe8\xa6\x81\xe5\x8a\xa0\xe5\xaf\x86\xe5\x8a\xa0\xe5\xaf\x86\xe5\x8a\xa0sdfsd\t\t\t\t\t\t\t\t\t' data = result[0:-result[-1]] return str(data,encoding='utf-8') def auth(): # hash(key+時間) """ API驗證 :return: """ import time import requests import hashlib ctime = time.time() key = "asdfasdfasdfasdf098712sdfs" new_key = "%s|%s" %(key,ctime,) m = hashlib.md5() m.update(bytes(new_key,encoding='utf-8')) #裏面是字節數據 md5_key = m.hexdigest() #返回值是字符竄類型 md5_time_key = "%s|%s" %(md5_key,ctime) return md5_time_key
from Crypto.Cipher import AES import requests import json from lib.utils import encrypt from lib.utils import auth #對數據加密字典 v1 = encrypt(json.dumps({"k1":"v1"})) #獲取的是加密後的字節 print(v1) response = requests.post( url="http://127.0.0.1:8000/api/asset.html", headers={'OpenKey':auth(),'content-type':'application/json'}, data=v1 ) print(response.text)
import json import hashlib from django.shortcuts import render,HttpResponse from repository import models from django.conf import settings from api.service import PluginManager import time import json from Crypto.Cipher import AES api_key_record ={ "76942d662d98ebe3b920a7b791bf5040|1501510243.92804":1501510243.92804, } def decrypt(msg): key = b'dfdsdfsasdfdsdfs' cipher = AES.new(key, AES.MODE_CBC, key) result = cipher.decrypt(msg) # 把加密後的字節解密成不加密的字節 data = result[0:-result[-1]] return str(data, encoding='utf-8') def outer(func): def wrapper(request): client_md5_time_key = request.META.get("HTTP_OPENKEY") client_md5_key, client_ctime = client_md5_time_key.split("|") client_ctime = float(client_ctime) server_ctime = time.time() # 第一關 時間關 if server_ctime - client_ctime > 30: return HttpResponse("第一關 小夥子,別虎我,太長了") # 第二關 客戶端時間和服務端key加密和 客戶端的密鑰對比 temp = "%s|%s" % (settings.AUTH_KEY, client_ctime) m = hashlib.md5() m.update(bytes(temp, encoding='utf-8')) server_md5_key = m.hexdigest() if server_md5_key != client_md5_key: return HttpResponse("第二關 規則正確") # 之後基於memcache,目前先寫入內存刪除超過10s的值 for k in list(api_key_record.keys()): v = api_key_record[k] if server_ctime > v: del api_key_record[k] # 第三關 判斷字典裏是否有以前訪問的key,若是有不經過,沒有加入字典 if client_md5_time_key in api_key_record: return HttpResponse("第三關 已經有人來過了") else: api_key_record[client_md5_time_key] = client_ctime + 10 obj = func(request) return obj return wrapper @outer def asset(request): if request.method == 'GET': ys = '重要的不能被閒雜人等看的數據' return HttpResponse(ys) elif request.method == 'POST': server_info = decrypt(request.body) server_info = json.loads(server_info) # # 新資產信息 # server_info = json.loads(request.body.decode('utf-8')) hostname = server_info['basic']['data']['hostname'] # 老資產信息 server_obj = models.Server.objects.filter(hostname=hostname).first() if not server_obj: return HttpResponse('當前主機名在資產中未錄入') PluginManager(server_info,server_obj,hostname).exec_plugin() return HttpResponse("...")