API驗證

 

API驗證說明html

API驗證:

    a. 發令牌: 靜態
        PS: 隱患 key被別人獲取
    b. 動態令牌
        PS: (問題越嚴重)用戶生成的每一個令牌被黑客獲取到,都會破解
    c. 高級版本
        PS: 黑客網速快,會竊取, so要對數據加密
    d. 終極版本
 
 
特色:
   爲何要用API ?
       - 數據在傳輸過程當中,保證數據安全
   你是如何設計的 ?
       - Tornado 中的加密Cookie相似
       - 建立動態key  md5(key + time)|time (Tornado中也是這麼作)
       - 限制
         - 第一關: 時間
         - 第二關: 算法規則
         - 第三關: 已訪問的記錄
       PS: 黑客網速快,會竊取, so要對數據加密

 

一、發令牌: 靜態(服務器、客戶端有相同的靜態key)算法

###客戶端
import requests
 
key = "asdfasdfasdfasdf098712sdfs"
 
response = requests.get("http://127.0.0.1:8000/api/asset.html",headers={'OpenKey':key})
print(response.text)
 
###服務端
 
def api_check(request):
    if request.method == "GET":
        #################靜態################
        server_key = "adsfasdfzxcvzw241wdsaddsfsdf"   # 應放在settings
        client_key = request.META.get("HTTP_OPENKEY")  # META裏的Openkey字段會自動修改,需提早看META裏的字段

        if server_key != client_key:
            return HttpResponse("key錯誤,你是非法用戶")

        return HttpResponse("歡迎光臨")

二、動態令牌
### 客戶端
import requests
import time
import hashlib

ctime = time.time()
key = "adsfasdfzxcvzw241wdsaddsfsdf"
new_key = "%s|%s" % (key,ctime,)

m = hashlib.md5()
m.update(bytes(new_key,encoding="utf-8"))
md5_key = m.hexdigest()  # key進行hash

new_time_key = "%s|%s" % (md5_key,ctime,)

response = requests.get("http://127.0.0.1:8000/api/asset.html",headers={"openkey":new_time_key})
print(response.text)
#### 服務端
def api_check(request):
    if request.method == "GET":
        server_key = "adsfasdfzxcvzw241wdsaddsfsdf"   # 應放在settings

        client_md5_time_key = request.META.get("HTTP_OPENKEY")
        client_md5_key, client_ctime = client_md5_time_key.split("|")

        temp = "%s|%s" % (server_key, client_ctime)
        m = hashlib.md5()
        m.update(bytes(temp, encoding='utf-8'))
        server_md5_key = m.hexdigest()

        if server_md5_key != client_md5_key:
            return HttpResponse("驗證失敗")

        return HttpResponse("歡迎光臨,訪問成功")

 

三、高級版本django

(1)、時間限制,超過自定義時間(自定義10s)不讓訪問json

(2)、hash限制,比對server和client的hash值api

(3)、記錄自定義時間內已訪問的hash_key,禁止重複訪問安全

### 客戶端
import requests
import time
import hashlib

ctime = time.time()
key = "adsfasdfzxcvzw241wdsaddsfsdf"
new_key = "%s|%s" % (key,ctime,)

m = hashlib.md5()
m.update(bytes(new_key,encoding="utf-8"))
md5_key = m.hexdigest()  # key進行hash

new_time_key = "%s|%s" % (md5_key,ctime,)

response = requests.get("http://127.0.0.1:8000/api/asset.html",headers={"openkey":new_time_key})
print(response.text)
### 服務端(裝飾器版本)

api_key_record = {}  # 建立全局的字段,記錄自定義時間內已訪問的hash_key
def API_check(func): 
    def wrapper(request,*args,**kwarg):
        if request.method == "GET":
            import hashlib
            # ####################API認證##################
            client_md5_time_key = request.META.get("HTTP_OPENKEY")
            client_md5_key, client_ctime = client_md5_time_key.split("|")
            client_ctime = float(client_ctime)
            server_time = time.time()
            # 第一關
            if server_time - client_ctime > 10:
                return HttpResponse("【第一關】時間有點長")

            # 第二關
            tmp = "%s|%s" % (settings.AUTH_KEY, str(client_ctime))
            m = hashlib.md5()
            m.update(bytes(tmp, encoding="utf-8"))
            server_md5_key = m.hexdigest()
            if server_md5_key != client_md5_key:
                return HttpResponse("【第二關】規則不正確,驗證碼錯誤")

            for k in list(api_key_record.keys()):
                v = api_key_record[k]
                if server_time > v:
                    api_key_record.pop(k)

            # 第三關
            if client_md5_time_key in api_key_record:
                return HttpResponse("【第三關】有人已經來過了")
            else:
                api_key_record[client_md5_time_key] = client_ctime + 10

        # ####################API認證##################
        ret = func(request,*args,**kwarg)
        return ret
    return wrapper


@API_check
def asset(request):
    。。。。。。
     # 主函數

 四、終極版本(加密)服務器

from Crypto.Cipher import AES
from lib.conf.config import settings
def encrypt(message):
    """
    數據加密
    :param message:
    :return:
    """
    key = settings.DATA_KEY
    cipher = AES.new(key, AES.MODE_CBC, key)
    ba_data = bytearray(message,encoding='utf-8')
    v1 = len(ba_data)
    v2 = v1 % 16
    if v2 == 0:
        v3 = 16
    else:
        v3 = 16 - v2
    for i in range(v3):
        ba_data.append(v3)
    final_data = ba_data.decode('utf-8')
    msg = cipher.encrypt(final_data) # 要加密的字符串,必須是16個字節或16個字節的倍數
    return msg

def decrypt(msg):
    """
    數據解密
    :param message:
    :return:
    """
    from Crypto.Cipher import AES
    key = settings.DATA_KEY
    cipher = AES.new(key, AES.MODE_CBC, key)
    result = cipher.decrypt(msg) # result = b'\xe8\xa6\x81\xe5\x8a\xa0\xe5\xaf\x86\xe5\x8a\xa0\xe5\xaf\x86\xe5\x8a\xa0sdfsd\t\t\t\t\t\t\t\t\t'
    data = result[0:-result[-1]]
    return str(data,encoding='utf-8')


def auth():  # hash(key+時間)
    """
    API驗證
    :return:
    """
    import time
    import requests
    import hashlib

    ctime = time.time()
    key = "asdfasdfasdfasdf098712sdfs"
    new_key = "%s|%s" %(key,ctime,)

    m = hashlib.md5()
    m.update(bytes(new_key,encoding='utf-8'))  #裏面是字節數據
    md5_key = m.hexdigest()                    #返回值是字符竄類型

    md5_time_key = "%s|%s" %(md5_key,ctime)

    return md5_time_key
lib/utils.py
from Crypto.Cipher import AES
import requests
import json
from lib.utils import encrypt
from lib.utils import auth

#對數據加密字典
v1 = encrypt(json.dumps({"k1":"v1"}))      #獲取的是加密後的字節
print(v1)

response = requests.post(
    url="http://127.0.0.1:8000/api/asset.html",
    headers={'OpenKey':auth(),'content-type':'application/json'},
    data=v1
)


print(response.text)
客戶端
import json
import hashlib
from django.shortcuts import render,HttpResponse
from repository import models
from django.conf import settings
from api.service import PluginManager
import time
import json
from Crypto.Cipher import AES


api_key_record ={
    "76942d662d98ebe3b920a7b791bf5040|1501510243.92804":1501510243.92804,
}


def decrypt(msg):
    key = b'dfdsdfsasdfdsdfs'
    cipher = AES.new(key, AES.MODE_CBC, key)
    result = cipher.decrypt(msg)  # 把加密後的字節解密成不加密的字節
    data = result[0:-result[-1]]
    return str(data, encoding='utf-8')


def outer(func):
    def wrapper(request):
        client_md5_time_key = request.META.get("HTTP_OPENKEY")

        client_md5_key, client_ctime = client_md5_time_key.split("|")
        client_ctime = float(client_ctime)
        server_ctime = time.time()

        # 第一關 時間關
        if server_ctime - client_ctime > 30:
            return HttpResponse("第一關  小夥子,別虎我,太長了")

        # 第二關 客戶端時間和服務端key加密和 客戶端的密鑰對比
        temp = "%s|%s" % (settings.AUTH_KEY, client_ctime)
        m = hashlib.md5()
        m.update(bytes(temp, encoding='utf-8'))
        server_md5_key = m.hexdigest()
        if server_md5_key != client_md5_key:
            return HttpResponse("第二關   規則正確")

        # 之後基於memcache,目前先寫入內存刪除超過10s的值
        for k in list(api_key_record.keys()):
            v = api_key_record[k]
            if server_ctime > v:
                del api_key_record[k]

        # 第三關 判斷字典裏是否有以前訪問的key,若是有不經過,沒有加入字典
        if client_md5_time_key in api_key_record:
            return HttpResponse("第三關  已經有人來過了")
        else:
            api_key_record[client_md5_time_key] = client_ctime + 10
            obj = func(request)
            return obj

    return wrapper



@outer
def asset(request):


    if request.method == 'GET':
        ys = '重要的不能被閒雜人等看的數據'
        return HttpResponse(ys)

    elif request.method == 'POST':

        server_info = decrypt(request.body)
        server_info = json.loads(server_info)



        # # 新資產信息
        # server_info = json.loads(request.body.decode('utf-8'))
        hostname = server_info['basic']['data']['hostname']
        # 老資產信息
        server_obj = models.Server.objects.filter(hostname=hostname).first()
        if not server_obj:
            return HttpResponse('當前主機名在資產中未錄入')


        PluginManager(server_info,server_obj,hostname).exec_plugin()

        return HttpResponse("...")
服務端
相關文章
相關標籤/搜索