python調用阿里雲產品接口



由於有些人使用爬蟲訪問web,浪費服務器資源不說還會影響正經常使用戶,因此須要限制爬蟲ip,本能夠經過nginx或者防火牆限制,但須要從新編譯nginx,開啓防火牆怕又不知道會影響到哪些程序,因此

想用/etc/hosts.deny文件限制某些ip訪問web的,因而配置了http:124.90.*.* 發現根本沒用,但我以前使用該文件是能夠限制ip ssh的,爲何如今不能限制http了呢,

因而上網搜了下,發現以下內容:html

hosts.allow和hosts.deny規則的執行者爲TCP wrappers,對應守護進程爲tcpd;而tcpd執行依賴於程序使用了libwrap庫。python

也就是說:hosts.allow和hosts.deny支持且只支持使用了libwrap庫的服務。nginx

那如何查看程序是否使用libwarp?有倆種方法:web

方法1、查看hosts_access字段串正則表達式

查看應用程序是否支持 wrapper,能夠使用 strings 程序而後 grep 字符串 hosts_access:shell

方法2、使用ldd,json

ldd /usr/sbin/sshd | grep libwrap

查測發現使用xinetd的均可以、sshd能夠、vsftpd能夠,nginx httpd則不能夠。



繼續尋找方法時發現能夠調用負載均衡相關api結果,因而乎採用了此方法

'''
先安裝模塊,主要有兩個,一個是核心模塊,必須安裝 pip install aliyun-python-sdk-core
另外一個是你要修改哪一個產品,本次是更改負載均衡,因此安裝  pip install aliyun-python-sdk-slb  ,其餘產品參考 https://developer.aliyun.com/tools/sdk#/python
功能:每隔10分鐘分析各個服務器日誌,查看是否有異常IP調用短信接口(單個IP一分鐘內調用短信接口10次視爲異常IP),若有則把IP加入黑名單,2個小時後解封
'''

#!/usr/bin/python
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkslb.request.v20140515.AddAccessControlListEntryRequest import AddAccessControlListEntryRequest
from aliyunsdkslb.request.v20140515.RemoveAccessControlListEntryRequest import RemoveAccessControlListEntryRequest

import smtplib
from email.mime.text import MIMEText
from email.header import Header
import paramiko,re,logging,os
import time,datetime,json

#過濾日誌,最近一分鐘調用短信接口超過10次的ip
def check_log(host,file):
    # 建立SSH對象
    ssh = paramiko.SSHClient()
    # 容許鏈接不在know_hosts文件中的主機
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    # 鏈接服務器

    try:
        ssh.connect(host, port=22, username='root', password='******$%^')
        # 執行命令, 過濾日誌,最近一分鐘調用短信接口超過10次的ip
        one_min_time = (datetime.datetime.now()-datetime.timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M")  #一分鐘前時間
        shell = "sed -n '/%s/,$p' %s | grep regip | awk '{print $5}' | cut -d : -f2 | sort | uniq -c | sort -r | head -5 | awk '{if ($1 >9)print $2}'" % (one_min_time,file)
        #print(shell)
        stdin, stdout, stderr = ssh.exec_command(shell)
        # 獲取命令結果
        result = stdout.read().decode(encoding = "utf-8")

        deny_host_re = re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}')  #待拒絕ip的正則表達式
        deny_host_list = deny_host_re.findall(result)
        if deny_host_list:
            for deny_host in deny_host_list:
                now_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
                shell = 'cat %s  | grep "%s" | wc -l' % (deny_file,deny_host)
                if int(os.popen(shell).read()) == 0:      #判斷當前IP是否已存在黑名單,避免重複操做
                    f = open(deny_file, "a", encoding='utf-8')
                    str_info = now_time + "  " + deny_host + "  " + "服務器地址:" + host + "\n"  #文本格式:封禁時間  待封IP  待封IP所在服務器地址
                    logging.info("str_info: %s" % str_info)
                    f.write(str_info)
                    f.close()
                    slb_add_host(deny_host,accessKeyId,accessSecret,acl_id)   #執行添加黑名單動做
                else:
                    logging.info("IP:%s 已存在黑名單中" % deny_host)

        else:
            logging.info("主機:%s 本次檢測無異常~" % host)

    except Exception as e:
        logging.error("主機:%s 鏈接失敗,緣由:%s" % ( host,e ) )
    # 關閉鏈接
    ssh.close()

#ip加入黑名單時發送郵件提醒
#這種發郵件是使用服務器上郵件服務器,須要在服務器/etc/mail.rc裏進行相應配置
# def send_mail(host,str):
#     try:
#         shell = '''
#                 echo -e "短信接口異常調用IP:%s\n狀態:%s"|mail -s "短信接口異常調用"   zhangpan@tan66.com
#                 ''' % (host,str)
#         os.system(shell)
#         logging.info("封禁通知郵件發送成功")
#     except smtplib.SMTPException as e:
#         logging.error("發送郵件失敗,緣由:%s" % e)

#這種更靈活方便,無需在服務器上配置
def send_mail(host,str):
    # 第三方 SMTP 服務
    mail_host = "smtp.exmail.qq.com"  # 設置服務器
    mail_user = "********@tan66.com"  # 用戶名
    mail_pass = "********"  # 密碼

    sender = '******@tan66.com'
    receivers = ['*******@tan66.com']  # 接收郵件,可設置爲你的QQ郵箱或者其餘郵箱

    message = MIMEText('短信接口異常調用IP:%s\n狀態:%s!'% (host,str), 'plain', 'utf-8')
    message['From'] = Header("短信接口", 'utf-8')
    message['To'] = Header("研發組", 'utf-8')

    subject = '短信接口異常調用'
    message['Subject'] = Header(subject, 'utf-8')

    try:
        smtpObj = smtplib.SMTP_SSL(mail_host,465) # 465 爲騰訊企業郵箱SMTP SSL端口號
        smtpObj.login(mail_user, mail_pass)
        smtpObj.sendmail(sender, receivers, message.as_string())
        logging.info("郵件發送成功")
    except smtplib.SMTPException as e:
        logging.error("發送郵件失敗,緣由:%s" % e)



#阿里雲slb訪問控制裏添加ip
def slb_add_host(host,accessKeyId,accessSecret,acl_id):
    client = AcsClient(accessKeyId, accessSecret, 'cn-hangzhou')  # 阿里雲帳號的key
    request = AddAccessControlListEntryRequest()  # 本次api調用的接口,接口有不少,可參考https://help.aliyun.com/document_detail/27570.html?spm=a2c4g.11186623.6.622.3b735682J2ldXT
    request.set_accept_format('json')

    try:
        host = host + "/32"
        request.set_AclEntrys([{"entry": host, "comment": "deny"}])  # 須要拒絕的ip
        request.set_AclId(acl_id)  # 負載均衡安全組id
        response = client.do_action_with_exception(request)  # 執行操做
        logging.info(str(response, encoding='utf-8'))  # 查看調用接口結果
        logging.info("slb添加異常IP:%s成功!" % host)
        send_mail(host, str="添加至黑名單")
    except BaseException as e:
        logging.error("添加黑名單失敗,緣由:%s" % e)



#slb刪除ip
def slb_del_host(host,accessKeyId,accessSecret,acl_id):
    host = str(host) + "/32"
    logging.info("正在解封IP:%s" % host)
    try:
        client = AcsClient(accessKeyId, accessSecret, 'cn-hangzhou')
        request = RemoveAccessControlListEntryRequest()
        request.set_accept_format('json')
        request.set_AclEntrys([{"entry": host, "comment": "deny"}])
        request.set_AclId(acl_id)

        client.do_action_with_exception(request)
        logging.info("slb刪除IP:%s成功" % host)  # 查看調用接口結果
        send_mail(host, str="移出黑名單")
    except BaseException as e:
        logging.error("移出黑名單失敗,緣由:%s" % e)

# 刪除slb裏黑名單ip
def del_text(deny_file):
    with open(deny_file, "r", encoding='utf-8') as f:
        data = f.read()
    logging.info("黑名單文件:%s" % deny_file)
    if data:
        d_list = []  # 解封ip清單
        with open(deny_file, "r", encoding='utf-8') as f:
            lines = f.readlines()
            # logging.info("lines:%s" % lines)
            for line in lines:
                if line.strip().lstrip():
                    time1 = line.split("  ")[0]  # 獲取文本中被封ip的時間
                    time2 = time.mktime(time.strptime(time1, '%Y-%m-%d %H:%M'))  # 將時間轉換給時間戳
                    time3 = int(now_time - time2)  # 解封倒計時
                    host = line.split("  ")[1].strip().lstrip()
                    if time3 > 7200:
                        slb_del_host(host, accessKeyId, accessSecret, acl_id)
                        d_list.append(host)
                    else:
                        logging.info("%-15s 預計%s 秒解封" % (host, 7200 - time3))
        for deny in d_list:
            shell = "sed -i '/%s/d' %s" % (deny, deny_file)
            os.system(shell)
            logging.info("文本中刪除黑名單ip:%s" % deny)

if __name__ == "__main__":

    accessKeyId = '*********'               #key  須要登陸阿里雲帳號獲取
    accessSecret = '********'
    acl_id = 'acl-**********'           #負載均衡安全組id
    log_file = '/var/log/msg-api.log'
    BASE_DIR = os.path.dirname(os.path.abspath(__file__))
    deny_file = BASE_DIR + "/deny.hosts"
    if not os.path.exists(log_file):
        os.mknod(log_file)
    if not os.path.exists(deny_file):
        os.mknod(deny_file)
    now_time = time.time()
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s %(levelname)s %(message)s',
                        filename=log_file,
                        filemode='a+')

    logging.info("starting")
    host_dict = {"10.253.168.*":"api",
                 "10.253.208.*":"api",
                 "10.253.210.*":"cs",
                 "10.253.210.*":"cs",
                 }
    api_log_file = '/data1/application/api/tomcat/logs/catalina.out'
    cs_log_file = '/data1/application/cs/tomcat-1/logs/catalina.out'
    oss_log_file = '/data1/application/server/tomcat-2/logs/catalina.out'
    for host in host_dict:
        if host_dict[host] == "api":
            check_log(host,api_log_file)
        else:
            check_log(host,cs_log_file)
            check_log(host,oss_log_file)

    del_text(deny_file)

 



若是日誌使用了es收集,則更方便操做
'''
先安裝模塊,主要有兩個,一個是核心模塊,必須安裝 pip install aliyun-python-sdk-core
另外一個是你要修改哪一個產品,本次是更改負載均衡,因此安裝  pip install aliyun-python-sdk-slb  ,其餘產品參考 https://developer.aliyun.com/tools/sdk#/python
功能:每隔10分鐘分析各個服務器日誌,查看是否有異常IP調用短信接口(單個IP一分鐘內調用短信接口10次視爲異常IP),若有則把IP加入黑名單,2個小時後解封
'''

#!/usr/bin/python
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkslb.request.v20140515.AddAccessControlListEntryRequest import AddAccessControlListEntryRequest
from aliyunsdkslb.request.v20140515.RemoveAccessControlListEntryRequest import RemoveAccessControlListEntryRequest
from elasticsearch import Elasticsearch
import smtplib
from email.mime.text import MIMEText
from email.header import Header
import paramiko,re,logging,os
import time,datetime,json



class slb:
    accessKeyId  =  '********'                  # key  須要登陸阿里雲帳號獲取
    accessSecret =  '********'
    acl_id       =  'acl-****'         # 負載均衡安全組id
    log_file     =  '/var/log/msg-api.log'
    BASE_DIR     =  os.path.dirname(os.path.abspath(__file__))
    deny_file    =  BASE_DIR + "/deny.hosts"

    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s %(levelname)s %(message)s',
                        filename=log_file,
                        filemode='a+')

    logging.info("starting")
    def __init__(self,index):
        self.index = index

    def start(self):
        if not os.path.exists(slb.log_file):
            os.mknod(slb.log_file)
        if not os.path.exists(slb.deny_file):
            os.mknod(slb.deny_file)


    def chack_log(self):
        self.start()
        es=Elasticsearch("10.253.**.**",port=9200)
        body = {
                  "query": {
                    "bool": {
                      "must": [
                        {
                          "match": {
                            "message": "regip"
                          }
                        },
                        {
                          "range": {
                            "@timestamp": {
                              "gte": "now-1m",
                              "lte": "now",
                              "format": "epoch_millis"
                            }
                          }
                        }
                      ],
                      "must_not": []
                    }
                  }
                }
        logging.info(f"開始檢查{index}日誌")
        result = es.search(index=self.index, body=body)  #返回字典形式
        result1 = json.dumps(result, indent=2, ensure_ascii=False) #返回格式化形式,方便查看,但不是字典
        if result["hits"]["total"] > 9:   #es查詢結果大於9  表示可能有用戶異常調用短信接口
            ip_list = []
            ip_dict = {}
            try:
                #獲取一分鐘內調用短信接口的全部IP
                for info in result["hits"]["hits"]:
                    log_info = info["_source"]["log_info"]
                    deny_host_re = re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}')  # ip的正則表達式
                    deny_host = re.search(deny_host_re,log_info).group()
                    ip_list.append(deny_host)
                # 統計各個IP調用結果次數
                for deny_ip in ip_list:
                    if deny_ip in ip_dict:
                        ip_dict[deny_ip] += 1
                    else:
                        ip_dict[deny_ip] = 1
                #判斷異常IP是否已存在,不存在則執行添加黑名單操做
                for deny_host,count in ip_dict.items():  #遍歷字典,deny_host爲調用短信接口的ip  count爲調用次數
                    if count > 2:
                        now_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
                        shell = f'cat {slb.deny_file}  | grep "{deny_host}" | wc -l'
                        if int(os.popen(shell).read()) == 0:  # 判斷當前IP是否已存在黑名單,避免重複操做
                            f = open(slb.deny_file, "a", encoding='utf-8')
                            str_info = now_time + "  " + deny_host + "  " + f"web:{index}" + "\n"  # 文本格式:封禁時間  待封IP  web服務器
                            logging.info(f"{str_info}str_info: %s")
                            f.write(str_info)
                            f.close()
                            self.slb_add_host(deny_host)  # 執行添加黑名單動做
                            logging.info("執行添加黑名單操做")
                        else:
                            logging.info(f"IP:{deny_host} 已存在黑名單中")
            except BaseException as e:
                logging.error(f"檢查{index}日誌時發現錯誤,錯誤信息:{e}")
        else:
            logging.info(f"{index} 本次檢測無異常")



     #添加ip至黑名單
    def slb_add_host(self,host):
        client = AcsClient(self.accessKeyId, self.accessSecret, 'cn-hangzhou')  # 阿里雲帳號的key
        request = AddAccessControlListEntryRequest()  # 本次api調用的接口,接口有不少,可參考https://help.aliyun.com/document_detail/27570.html?spm=a2c4g.11186623.6.622.3b735682J2ldXT
        request.set_accept_format('json')
        try:
            host = host + "/32"
            request.set_AclEntrys([{"entry": host, "comment": "deny"}])  # 須要拒絕的ip
            request.set_AclId(self.acl_id)  # 負載均衡安全組id
            response = client.do_action_with_exception(request)  # 執行操做
            logging.info(str(response, encoding='utf-8'))  # 查看調用接口結果
            self.send_mail(host, str="添加至黑名單")
            logging.info(f"slb添加異常IP:{host}成功!")
        except BaseException as e:
            logging.error(f"添加黑名單失敗,緣由:{e}" )


    #slb黑名單裏移出ip
    def slb_del_host(self,host):
        host = str(host) + "/32"
        logging.info(f"正在解封IP:{host}" )
        try:
            client = AcsClient(self.accessKeyId, self.accessSecret, 'cn-hangzhou')
            request = RemoveAccessControlListEntryRequest()
            request.set_accept_format('json')
            request.set_AclEntrys([{"entry": host, "comment": "deny"}])
            request.set_AclId(self.acl_id)

            client.do_action_with_exception(request)
            self.send_mail(host, str="移出黑名單")
            logging.info(f"slb刪除IP:{host}成功" )  # 查看調用接口結果
        except BaseException as e:
            logging.error("移出黑名單失敗,緣由:%s" % e)

    #發送郵件
    def send_mail(self,host, str):
        # 第三方 SMTP 服務
        mail_host = "smtp.exmail.qq.com"  # 設置服務器
        mail_user = "****@tan66.com"  # 用戶名
        mail_pass = "*****"  # 密碼

        sender = '******@tan66.com'
        receivers = ['*****@tan66.com']  # 接收郵件,可設置爲你的QQ郵箱或者其餘郵箱

        message = MIMEText(f'短信接口異常調用IP:{host}\n狀態:{str}!', 'plain', 'utf-8')
        message['From'] = Header("短信接口", 'utf-8')
        message['To'] = Header("研發組", 'utf-8')

        subject = '短信接口異常調用'
        message['Subject'] = Header(subject, 'utf-8')

        try:
            smtpObj = smtplib.SMTP_SSL(mail_host, 465)  # 465 爲騰訊企業郵箱SMTP SSL端口號
            smtpObj.login(mail_user, mail_pass)
            smtpObj.sendmail(sender, receivers, message.as_string())
            logging.info("郵件發送成功")
        except smtplib.SMTPException as e:
            logging.error("發送郵件失敗,緣由:%s" % e)


    # 刪除slb文本里黑名單ip
    def del_text(self):
        with open(self.deny_file, "r", encoding='utf-8') as f:
            data = f.read()
        #logging.info("黑名單文件:%s" % self.deny_file)
        if data:
            d_list = []  # 解封ip清單
            with open(self.deny_file, "r", encoding='utf-8') as f:
                lines = f.readlines()
                # logging.info("lines:%s" % lines)
                for line in lines:
                    if line.strip().lstrip():
                        now_time = time.time()
                        time1 = line.split("  ")[0]  # 獲取文本中被封ip的時間
                        time2 = time.mktime(time.strptime(time1, '%Y-%m-%d %H:%M'))  # 將時間轉換給時間戳
                        time3 = int(now_time - time2)  # 解封倒計時
                        host = line.split("  ")[1].strip().lstrip()
                        if time3 > 7200:
                            self.slb_del_host(host)
                            d_list.append(host)
                        else:
                            logging.info("%-15s 預計%s 秒解封" % (host, 7200 - time3))
            for deny in d_list:
                shell = "sed -i '/%s/d' %s" % (deny, self.deny_file)
                os.system(shell)
                logging.info("文本中刪除黑名單ip:%s" % deny)


if __name__ == "__main__":
    index_list = ["**-tomcat","**-tomcat","*****"]
    for index in index_list:
        s = slb(index)
        s.chack_log()
    s.del_text()
相關文章
相關標籤/搜索