由於有些人使用爬蟲訪問web,浪費服務器資源不說還會影響正經常使用戶,因此須要限制爬蟲ip,本能夠經過nginx或者防火牆限制,但須要從新編譯nginx,開啓防火牆怕又不知道會影響到哪些程序,因此
想用/etc/hosts.deny文件限制某些ip訪問web的,因而配置了http:124.90.*.* 發現根本沒用,但我以前使用該文件是能夠限制ip ssh的,爲何如今不能限制http了呢,
因而上網搜了下,發現以下內容:html
hosts.allow和hosts.deny規則的執行者爲TCP wrappers,對應守護進程爲tcpd;而tcpd執行依賴於程序使用了libwrap庫。python
也就是說:hosts.allow和hosts.deny支持且只支持使用了libwrap庫的服務。nginx
那如何查看程序是否使用libwarp?有倆種方法:web
方法1、查看hosts_access字段串正則表達式
查看應用程序是否支持 wrapper,能夠使用 strings 程序而後 grep 字符串 hosts_access:shell
方法2、使用ldd,json
ldd /usr/sbin/sshd | grep libwrap
查測發現使用xinetd的均可以、sshd能夠、vsftpd能夠,nginx httpd則不能夠。
繼續尋找方法時發現能夠調用負載均衡相關api結果,因而乎採用了此方法
''' 先安裝模塊,主要有兩個,一個是核心模塊,必須安裝 pip install aliyun-python-sdk-core 另外一個是你要修改哪一個產品,本次是更改負載均衡,因此安裝 pip install aliyun-python-sdk-slb ,其餘產品參考 https://developer.aliyun.com/tools/sdk#/python 功能:每隔10分鐘分析各個服務器日誌,查看是否有異常IP調用短信接口(單個IP一分鐘內調用短信接口10次視爲異常IP),若有則把IP加入黑名單,2個小時後解封 ''' #!/usr/bin/python from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkslb.request.v20140515.AddAccessControlListEntryRequest import AddAccessControlListEntryRequest from aliyunsdkslb.request.v20140515.RemoveAccessControlListEntryRequest import RemoveAccessControlListEntryRequest import smtplib from email.mime.text import MIMEText from email.header import Header import paramiko,re,logging,os import time,datetime,json #過濾日誌,最近一分鐘調用短信接口超過10次的ip def check_log(host,file): # 建立SSH對象 ssh = paramiko.SSHClient() # 容許鏈接不在know_hosts文件中的主機 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 鏈接服務器 try: ssh.connect(host, port=22, username='root', password='******$%^') # 執行命令, 過濾日誌,最近一分鐘調用短信接口超過10次的ip one_min_time = (datetime.datetime.now()-datetime.timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M") #一分鐘前時間 shell = "sed -n '/%s/,$p' %s | grep regip | awk '{print $5}' | cut -d : -f2 | sort | uniq -c | sort -r | head -5 | awk '{if ($1 >9)print $2}'" % (one_min_time,file) #print(shell) stdin, stdout, stderr = ssh.exec_command(shell) # 獲取命令結果 result = stdout.read().decode(encoding = "utf-8") deny_host_re = re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}') #待拒絕ip的正則表達式 deny_host_list = deny_host_re.findall(result) if deny_host_list: for deny_host in deny_host_list: now_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") shell = 'cat %s | grep "%s" | wc -l' % (deny_file,deny_host) if int(os.popen(shell).read()) == 0: #判斷當前IP是否已存在黑名單,避免重複操做 f = open(deny_file, "a", encoding='utf-8') str_info = now_time + " " + deny_host + " " + "服務器地址:" + host + "\n" #文本格式:封禁時間 待封IP 待封IP所在服務器地址 logging.info("str_info: %s" % str_info) f.write(str_info) f.close() slb_add_host(deny_host,accessKeyId,accessSecret,acl_id) #執行添加黑名單動做 else: logging.info("IP:%s 已存在黑名單中" % deny_host) else: logging.info("主機:%s 本次檢測無異常~" % host) except Exception as e: logging.error("主機:%s 鏈接失敗,緣由:%s" % ( host,e ) ) # 關閉鏈接 ssh.close() #ip加入黑名單時發送郵件提醒 #這種發郵件是使用服務器上郵件服務器,須要在服務器/etc/mail.rc裏進行相應配置 # def send_mail(host,str): # try: # shell = ''' # echo -e "短信接口異常調用IP:%s\n狀態:%s"|mail -s "短信接口異常調用" zhangpan@tan66.com # ''' % (host,str) # os.system(shell) # logging.info("封禁通知郵件發送成功") # except smtplib.SMTPException as e: # logging.error("發送郵件失敗,緣由:%s" % e) #這種更靈活方便,無需在服務器上配置 def send_mail(host,str): # 第三方 SMTP 服務 mail_host = "smtp.exmail.qq.com" # 設置服務器 mail_user = "********@tan66.com" # 用戶名 mail_pass = "********" # 密碼 sender = '******@tan66.com' receivers = ['*******@tan66.com'] # 接收郵件,可設置爲你的QQ郵箱或者其餘郵箱 message = MIMEText('短信接口異常調用IP:%s\n狀態:%s!'% (host,str), 'plain', 'utf-8') message['From'] = Header("短信接口", 'utf-8') message['To'] = Header("研發組", 'utf-8') subject = '短信接口異常調用' message['Subject'] = Header(subject, 'utf-8') try: smtpObj = smtplib.SMTP_SSL(mail_host,465) # 465 爲騰訊企業郵箱SMTP SSL端口號 smtpObj.login(mail_user, mail_pass) smtpObj.sendmail(sender, receivers, message.as_string()) logging.info("郵件發送成功") except smtplib.SMTPException as e: logging.error("發送郵件失敗,緣由:%s" % e) #阿里雲slb訪問控制裏添加ip def slb_add_host(host,accessKeyId,accessSecret,acl_id): client = AcsClient(accessKeyId, accessSecret, 'cn-hangzhou') # 阿里雲帳號的key request = AddAccessControlListEntryRequest() # 本次api調用的接口,接口有不少,可參考https://help.aliyun.com/document_detail/27570.html?spm=a2c4g.11186623.6.622.3b735682J2ldXT request.set_accept_format('json') try: host = host + "/32" request.set_AclEntrys([{"entry": host, "comment": "deny"}]) # 須要拒絕的ip request.set_AclId(acl_id) # 負載均衡安全組id response = client.do_action_with_exception(request) # 執行操做 logging.info(str(response, encoding='utf-8')) # 查看調用接口結果 logging.info("slb添加異常IP:%s成功!" % host) send_mail(host, str="添加至黑名單") except BaseException as e: logging.error("添加黑名單失敗,緣由:%s" % e) #slb刪除ip def slb_del_host(host,accessKeyId,accessSecret,acl_id): host = str(host) + "/32" logging.info("正在解封IP:%s" % host) try: client = AcsClient(accessKeyId, accessSecret, 'cn-hangzhou') request = RemoveAccessControlListEntryRequest() request.set_accept_format('json') request.set_AclEntrys([{"entry": host, "comment": "deny"}]) request.set_AclId(acl_id) client.do_action_with_exception(request) logging.info("slb刪除IP:%s成功" % host) # 查看調用接口結果 send_mail(host, str="移出黑名單") except BaseException as e: logging.error("移出黑名單失敗,緣由:%s" % e) # 刪除slb裏黑名單ip def del_text(deny_file): with open(deny_file, "r", encoding='utf-8') as f: data = f.read() logging.info("黑名單文件:%s" % deny_file) if data: d_list = [] # 解封ip清單 with open(deny_file, "r", encoding='utf-8') as f: lines = f.readlines() # logging.info("lines:%s" % lines) for line in lines: if line.strip().lstrip(): time1 = line.split(" ")[0] # 獲取文本中被封ip的時間 time2 = time.mktime(time.strptime(time1, '%Y-%m-%d %H:%M')) # 將時間轉換給時間戳 time3 = int(now_time - time2) # 解封倒計時 host = line.split(" ")[1].strip().lstrip() if time3 > 7200: slb_del_host(host, accessKeyId, accessSecret, acl_id) d_list.append(host) else: logging.info("%-15s 預計%s 秒解封" % (host, 7200 - time3)) for deny in d_list: shell = "sed -i '/%s/d' %s" % (deny, deny_file) os.system(shell) logging.info("文本中刪除黑名單ip:%s" % deny) if __name__ == "__main__": accessKeyId = '*********' #key 須要登陸阿里雲帳號獲取 accessSecret = '********' acl_id = 'acl-**********' #負載均衡安全組id log_file = '/var/log/msg-api.log' BASE_DIR = os.path.dirname(os.path.abspath(__file__)) deny_file = BASE_DIR + "/deny.hosts" if not os.path.exists(log_file): os.mknod(log_file) if not os.path.exists(deny_file): os.mknod(deny_file) now_time = time.time() logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', filename=log_file, filemode='a+') logging.info("starting") host_dict = {"10.253.168.*":"api", "10.253.208.*":"api", "10.253.210.*":"cs", "10.253.210.*":"cs", } api_log_file = '/data1/application/api/tomcat/logs/catalina.out' cs_log_file = '/data1/application/cs/tomcat-1/logs/catalina.out' oss_log_file = '/data1/application/server/tomcat-2/logs/catalina.out' for host in host_dict: if host_dict[host] == "api": check_log(host,api_log_file) else: check_log(host,cs_log_file) check_log(host,oss_log_file) del_text(deny_file)
若是日誌使用了es收集,則更方便操做
''' 先安裝模塊,主要有兩個,一個是核心模塊,必須安裝 pip install aliyun-python-sdk-core 另外一個是你要修改哪一個產品,本次是更改負載均衡,因此安裝 pip install aliyun-python-sdk-slb ,其餘產品參考 https://developer.aliyun.com/tools/sdk#/python 功能:每隔10分鐘分析各個服務器日誌,查看是否有異常IP調用短信接口(單個IP一分鐘內調用短信接口10次視爲異常IP),若有則把IP加入黑名單,2個小時後解封 ''' #!/usr/bin/python from aliyunsdkcore.client import AcsClient from aliyunsdkcore.acs_exception.exceptions import ClientException from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkslb.request.v20140515.AddAccessControlListEntryRequest import AddAccessControlListEntryRequest from aliyunsdkslb.request.v20140515.RemoveAccessControlListEntryRequest import RemoveAccessControlListEntryRequest from elasticsearch import Elasticsearch import smtplib from email.mime.text import MIMEText from email.header import Header import paramiko,re,logging,os import time,datetime,json class slb: accessKeyId = '********' # key 須要登陸阿里雲帳號獲取 accessSecret = '********' acl_id = 'acl-****' # 負載均衡安全組id log_file = '/var/log/msg-api.log' BASE_DIR = os.path.dirname(os.path.abspath(__file__)) deny_file = BASE_DIR + "/deny.hosts" logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', filename=log_file, filemode='a+') logging.info("starting") def __init__(self,index): self.index = index def start(self): if not os.path.exists(slb.log_file): os.mknod(slb.log_file) if not os.path.exists(slb.deny_file): os.mknod(slb.deny_file) def chack_log(self): self.start() es=Elasticsearch("10.253.**.**",port=9200) body = { "query": { "bool": { "must": [ { "match": { "message": "regip" } }, { "range": { "@timestamp": { "gte": "now-1m", "lte": "now", "format": "epoch_millis" } } } ], "must_not": [] } } } logging.info(f"開始檢查{index}日誌") result = es.search(index=self.index, body=body) #返回字典形式 result1 = json.dumps(result, indent=2, ensure_ascii=False) #返回格式化形式,方便查看,但不是字典 if result["hits"]["total"] > 9: #es查詢結果大於9 表示可能有用戶異常調用短信接口 ip_list = [] ip_dict = {} try: #獲取一分鐘內調用短信接口的全部IP for info in result["hits"]["hits"]: log_info = info["_source"]["log_info"] deny_host_re = re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}') # ip的正則表達式 deny_host = re.search(deny_host_re,log_info).group() ip_list.append(deny_host) # 統計各個IP調用結果次數 for deny_ip in ip_list: if deny_ip in ip_dict: ip_dict[deny_ip] += 1 else: ip_dict[deny_ip] = 1 #判斷異常IP是否已存在,不存在則執行添加黑名單操做 for deny_host,count in ip_dict.items(): #遍歷字典,deny_host爲調用短信接口的ip count爲調用次數 if count > 2: now_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") shell = f'cat {slb.deny_file} | grep "{deny_host}" | wc -l' if int(os.popen(shell).read()) == 0: # 判斷當前IP是否已存在黑名單,避免重複操做 f = open(slb.deny_file, "a", encoding='utf-8') str_info = now_time + " " + deny_host + " " + f"web:{index}" + "\n" # 文本格式:封禁時間 待封IP web服務器 logging.info(f"{str_info}str_info: %s") f.write(str_info) f.close() self.slb_add_host(deny_host) # 執行添加黑名單動做 logging.info("執行添加黑名單操做") else: logging.info(f"IP:{deny_host} 已存在黑名單中") except BaseException as e: logging.error(f"檢查{index}日誌時發現錯誤,錯誤信息:{e}") else: logging.info(f"{index} 本次檢測無異常") #添加ip至黑名單 def slb_add_host(self,host): client = AcsClient(self.accessKeyId, self.accessSecret, 'cn-hangzhou') # 阿里雲帳號的key request = AddAccessControlListEntryRequest() # 本次api調用的接口,接口有不少,可參考https://help.aliyun.com/document_detail/27570.html?spm=a2c4g.11186623.6.622.3b735682J2ldXT request.set_accept_format('json') try: host = host + "/32" request.set_AclEntrys([{"entry": host, "comment": "deny"}]) # 須要拒絕的ip request.set_AclId(self.acl_id) # 負載均衡安全組id response = client.do_action_with_exception(request) # 執行操做 logging.info(str(response, encoding='utf-8')) # 查看調用接口結果 self.send_mail(host, str="添加至黑名單") logging.info(f"slb添加異常IP:{host}成功!") except BaseException as e: logging.error(f"添加黑名單失敗,緣由:{e}" ) #slb黑名單裏移出ip def slb_del_host(self,host): host = str(host) + "/32" logging.info(f"正在解封IP:{host}" ) try: client = AcsClient(self.accessKeyId, self.accessSecret, 'cn-hangzhou') request = RemoveAccessControlListEntryRequest() request.set_accept_format('json') request.set_AclEntrys([{"entry": host, "comment": "deny"}]) request.set_AclId(self.acl_id) client.do_action_with_exception(request) self.send_mail(host, str="移出黑名單") logging.info(f"slb刪除IP:{host}成功" ) # 查看調用接口結果 except BaseException as e: logging.error("移出黑名單失敗,緣由:%s" % e) #發送郵件 def send_mail(self,host, str): # 第三方 SMTP 服務 mail_host = "smtp.exmail.qq.com" # 設置服務器 mail_user = "****@tan66.com" # 用戶名 mail_pass = "*****" # 密碼 sender = '******@tan66.com' receivers = ['*****@tan66.com'] # 接收郵件,可設置爲你的QQ郵箱或者其餘郵箱 message = MIMEText(f'短信接口異常調用IP:{host}\n狀態:{str}!', 'plain', 'utf-8') message['From'] = Header("短信接口", 'utf-8') message['To'] = Header("研發組", 'utf-8') subject = '短信接口異常調用' message['Subject'] = Header(subject, 'utf-8') try: smtpObj = smtplib.SMTP_SSL(mail_host, 465) # 465 爲騰訊企業郵箱SMTP SSL端口號 smtpObj.login(mail_user, mail_pass) smtpObj.sendmail(sender, receivers, message.as_string()) logging.info("郵件發送成功") except smtplib.SMTPException as e: logging.error("發送郵件失敗,緣由:%s" % e) # 刪除slb文本里黑名單ip def del_text(self): with open(self.deny_file, "r", encoding='utf-8') as f: data = f.read() #logging.info("黑名單文件:%s" % self.deny_file) if data: d_list = [] # 解封ip清單 with open(self.deny_file, "r", encoding='utf-8') as f: lines = f.readlines() # logging.info("lines:%s" % lines) for line in lines: if line.strip().lstrip(): now_time = time.time() time1 = line.split(" ")[0] # 獲取文本中被封ip的時間 time2 = time.mktime(time.strptime(time1, '%Y-%m-%d %H:%M')) # 將時間轉換給時間戳 time3 = int(now_time - time2) # 解封倒計時 host = line.split(" ")[1].strip().lstrip() if time3 > 7200: self.slb_del_host(host) d_list.append(host) else: logging.info("%-15s 預計%s 秒解封" % (host, 7200 - time3)) for deny in d_list: shell = "sed -i '/%s/d' %s" % (deny, self.deny_file) os.system(shell) logging.info("文本中刪除黑名單ip:%s" % deny) if __name__ == "__main__": index_list = ["**-tomcat","**-tomcat","*****"] for index in index_list: s = slb(index) s.chack_log() s.del_text()