自動化運維工具

前言

本文主要記錄了一些比較實用的運維自動化小工具php

  

Oauth2.0認證(第三方受權登陸)

 

1.Oauth2.0協議是什麼?css

我去逛慕課網 若是不註冊帳號,我還能夠用QQ登陸,我在慕課網點擊QQ登陸,而後頁面彈出QQ的受權界面,我點擊受權慕課網就能夠拿到個人我的信息,頭像信息,完成快速登陸;省去了用戶註冊、輸入密碼的時間;html

 

 Oauth2.0驗證流程分析html5

 

 

def get_auth_url():
    weibo_authurl='https://api.weibo.com/oauth2/authorize'
    client_id=439057412
    redirect_uri='http://127.0.0.01:8001/comepele/weibo/'
    auth_url='%s?client_id=%s&redirect_uri=%s'%(weibo_authurl,client_id,redirect_uri)
    return auth_url


print(get_auth_url())
個人web應用受權url
def weibo_auth (request):  #1.用戶訪問進入該視圖https://api.weibo.com/oauth2/authorize?client_id=439057412&redirect_uri=http://127.0.0.01:8001/comepele/weibo/
    code=request.GET.get('code')
    if code:
        access_tocken_url='https://api.weibo.com/oauth2/access_token'
        data={"client_id":439057412,
              "client_secret":'d2b81e254e0dbf705f53d98ee5fe0fc9',
              'grant_type':'authorization_code',
              'code':code,
               'redirect_uri':'http://127.0.0.01:8001/comepele/weibo/'
               }

        #{'access_token': '2.009tC8pF0KtOiT2e83c56652wB8j3B', 'remind_in': '157679999', 'expires_in': 157679999, 'uid': '5340703278', 'isRealName': 'true'}
        response_info=requests.post(url=access_tocken_url,data=data)
        access_token=json.loads(response_info.text).get('access_token')
        uid=json.loads(response_info.text).get('uid')
        #https://api.weibo.com/2/users/show.json #微博獲取用戶信息接口
        get_user_info_api='https://api.weibo.com/2/users/show.json?access_token={0}&uid={1}'.format(access_token,uid)
        response_user_info=requests.get(url=get_user_info_api)
        print(response_user_info.text)




    return HttpResponse('OK')
python實現Oauth2.0協議
import requests
import json,datetime

def session_permission(request,obj,models,init_permission):
    if obj:
        username=obj.username
        request.session['username'] = username
        user_obj = models.UserInfo.objects.filter(username=username).first()
        imgsrc = str(user_obj.image)
        request.session['Avatar'] = imgsrc

        work_order_seach = Q(Q(initiator__icontains=username) | Q(agent__icontains=username))
        ret = models.Worker_order.objects.filter(work_order_seach).count()
        request.session['Wcount'] = ret  # 和當前用戶相關的工單數量

        work_order_seach1 = Q(Q(status=0) & Q(agent__icontains=username))
        ret1 = models.Worker_order.objects.filter(work_order_seach1).values_list("seach", "alarm_time", "initiator",
                                                                                 "title", "agent", "responsible_person",
                                                                                 "status", "fault_category",
                                                                                 )
        request.session['Wuntreated_count'] = ret1.count()  # 當前用戶待處理工單數量

        enddate_list = models.Worker_order.objects.filter(agent=username).filter(level__in=[1, 2]).values_list(
            'enddate')
        time_out_count = 0
        for date in enddate_list:
            enddate = datetime.datetime.strptime(date[0], '%Y年%m月%d日%H時%M分%S秒')
            if enddate < datetime.datetime.now():
                time_out_count += 1
        else:
            request.session['Work_list_timeout_Count'] = time_out_count  # 當前用戶超時工單數量

        department_obj = obj.department.all().first()
        # print(department_obj.title)
        ret2 = models.ZabbixMsg.objects.filter(department=department_obj.title).all()

        request.session['ZabbixMsg_Count'] = ret2.count()  # 當前用戶部門的zabbix MSG的總數量
        request.session['ZabbixMsg_Off-line_Count'] = ret2.filter(available=0).count()  # 當前用戶部門的zabbix MSG的離線數量

        if department_obj.title == 'OPS':
            ret2 = models.ZabbixMsg.objects.all().count()
            request.session['ZabbixMsg_Count'] = ret2

            ret3 = models.ZabbixMsg.objects.all().filter(available=0).count()
            request.session['ZabbixMsg_Off-line_Count'] = ret3

        request.session['user_info'] = {"id": models.UserInfo.objects.get(username=username).id,
                                        "username": username,
                                        "image": models.UserInfo.objects.get(username=username).image.url}
        department_list = [d.title for d in obj.department.all()]
        department = department_list[0]
        request.session['departments'] = department_list
        init_permission(obj.auth, department, request)
        return True
    else:
        return False





#用戶首先訪問,https://api.weibo.com/oauth2/authorize?client_id=3863976085&redirect_uri=http://127.0.0.1:8000/test/


class Oauth2_App_Info(object):  #生成APP信息
    sina_oauth_setings={
        'oauth_urls' :
                 {
                 'access_tocken_url': 'https://api.weibo.com/oauth2/access_token',
                    'user_info_api': 'https://api.weibo.com/2/users/show.json',
                },
        'oauth_apps':
                {
                '/login/': {'app_key':439057412,#439057412
                'callback_uri':'http://172.17.10.112:8001/login/',
                'secret_key':'d2b81e254e0dbf705f53d98ee5fe0fc9',
                             },
                '/oauth2/sina/':  {'app_key':334415239,
                'callback_uri':'http://172.17.10.112:8001/oauth2/sina/',
                'secret_key':'cfee03f62a9c5ccd4f10620fa06be3c6',
                             },

                '/test/': {'app_key':3863976085,
                                  'callback_uri': 'http://127.0.0.1:8000/test/',
                                  'secret_key': '213ebf44c86bc013aba5d3cf2ea47820',
                                  },

                           }

        }
    def __init__(self,request,oauth2_type='sina_oauth_setings'):
        self.request=request
        self.seting_dict=getattr(self,oauth2_type)
        self.app_info=self.seting_dict['oauth_apps'][self.request.path_info]
        self.oauth_urls_info=self.seting_dict['oauth_urls']
        self.app_key =self.app_info['app_key']
        self.callback_uri = self.app_info['callback_uri']
        self.secret_key = self.app_info['secret_key']
        self.code=self.request.GET.get('code')
        self.access_uri=self.oauth_urls_info['access_tocken_url']
        self.user_info_api = self.oauth_urls_info['user_info_api']



class Oauth2(object): #Oauth邏輯
    def __init__(self,Oauth2_App):
        self.outh2_app=Oauth2_App

    def get_tocken(self):
        data = {"client_id": self.outh2_app.app_key,
                "client_secret": self.outh2_app.secret_key,
                'grant_type': 'authorization_code',
                'code':self.outh2_app.code,
                'redirect_uri':self.outh2_app.callback_uri
                }
        response_info = requests.post(url=self.outh2_app.access_uri,data=data)
        data= json.loads(response_info.text)
# {'access_token': '2.009tC8pFvgqUNEf9bc678379M6CG3C', 'remind_in': '157679999', 'expires_in': 157679999, 'uid': '5340703278', 'isRealName': 'true'}
        return data


#{'id': 5340703278, 'idstr': '5340703278', 'class': 1,
    #  'screen_name': 'Martin--------', 'name': 'Martin--------', 'province': '11', 'city': '1',
    # 'location': '北京 東城區', 'description': '',
    # 'url': 'http://www.cnblogs.com/sss4/',
    #  'profile_image_url': 'http://tvax3.sinaimg.cn/crop.0.0.400.400.50/005Pr2Tsly8fwtkd9o55jj30b40b4qbg.jpg',
    #  'profile_url': 'u/5340703278', 'domain': '', 'weihao': '', 'gender': 'm', 'followers_count': 1, 'friends_count': 3, 'pagefriends_count': 0, 'statuses_count': 0, 'video_status_count': 0, 'favourites_count': 0, 'created_at': 'Mon Oct 20 20:31:53 +0800 2014', 'following': False, 'allow_all_act_msg': False, 'geo_enabled': True, 'verified': False, 'verified_type': -1, 'remark': '', 'insecurity': {'sexual_content': False}, 'ptype': 0, 'allow_all_comment': True, 'avatar_large': 'http://tvax3.sinaimg.cn/crop.0.0.400.400.180/005Pr2Tsly8fwtkd9o55jj30b40b4qbg.jpg', 'avatar_hd': 'http://tvax3.sinaimg.cn/crop.0.0.400.400.1024/005Pr2Tsly8fwtkd9o55jj30b40b4qbg.jpg', 'verified_reason': '', 'verified_trade': '', 'verified_reason_url': '', 'verified_source': '', 'verified_source_url': '', 'follow_me': False, 'like': False, 'like_me': False, 'online_status': 0, 'bi_followers_count': 0, 'lang': 'zh-cn', 'star': 0, 'mbtype': 0, 'mbrank': 0, 'block_word': 0,
    # 'block_app': 0, 'credit_score': 80, 'user_ability': 0, 'urank': 3, 'story_read_state': -1, 'vclub_member': 0}
    def get_user_info(self):
        '''
        user_info_api: 'https://api.weibo.com/2/users/show.json?access_token={0}&uid={1}'.format(access_token,uid)
        '''
        tocken_data=self.get_tocken()
        access_token=tocken_data.get('access_token')
        uid=tocken_data.get('uid')
        param='?access_token={0}&uid={1}'.format(access_token,uid)
        user_info_api=self.outh2_app.user_info_api+param
        response_user_info = requests.get(url=user_info_api)
        user_info = json.loads(response_user_info.text)
        return user_info

    def is_grant(self, models):
        sina_uid = self.get_tocken().get('uid')
        obj = models.UserInfo.objects.filter(sina_blog_name=sina_uid).first()
        if obj:
            return obj
        return False

    def grant(self, models):
        curent_sina_user = self.get_user_info()
        sina_uid = curent_sina_user['idstr']
        avatar_url = curent_sina_user['profile_image_url']
        curent_user = self.outh2_app.request.session.get('username')
        obj = models.UserInfo.objects.filter(username=curent_user).first()
        obj.sina_blog_name = sina_uid
        obj.image=avatar_url
        obj.save()
DjangoOauth2

 

1.用戶GET 請求訪問慕課網,點擊了QQ登陸,慕課網node

2.騰訊QQ認證服務器response用戶1個頁面,詢問用戶是否受權?python

3.假設用戶給予受權,認證服務器將用戶導向 客戶端事先指定的"重定向URI"(redirection URI),同時附上一個受權碼。mysql

4.客戶端收到受權碼,附上早先的"重定向URI",向認證服務器申請令牌。這一步是在客戶端的後臺的服務器上完成的,對用戶不可見。jquery

5.認證服務器覈對了受權碼和重定向URI,確認無誤後,向客戶端發送訪問令牌(access token)和更新令牌(refresh token)。ios

 

 

 

自動化運維工具Fabric

 

Fabric是什麼?nginx

Fabric是基於SSH協議和paramiko實現的快速自動化部署的運維工具python第三方模塊

咱們在大型企業裏面須要批量部署一些 web服務、mysql服務....若是是服務器數量少能夠1臺1臺部署,若是成千上萬服務器就須要 更加快捷、高效的部署工具了!

 

Fabric的功能?

上傳、下載遠程服務器文件

執行遠程命令

用戶提示

錯誤處理

pip install fabric3  #python3安裝fabric

 

使用Fabirc

使用fabric分爲2個步驟:

1.建立fabfile.py文件是 fabric在執行時默認讀取的文件,因此咱們要在這個文件中編寫執行的任務;

from fabric.api import *

hosts1 = 'root@172.17.10.112'
# hosts2 = 'root@172.16.22.1'

env.hosts=[hosts1]
env.password='xxxxxx1234' #設置密碼!

def hello():  #任務1
    run('echo hello world')

def check(): #任務2
    run('ls /Users/')

def remote(): #任務3
    run('ping www.baidu.com')

 

 2.執行fabfile文件中的任務內容

D:\版本\cmdb_rbac_arya>fab -l
Available commands: check hello remote D:\版本\cmdb_rbac_arya
>fab remote [root@172.17.10.112] Executing task 'remote' [root@172.17.10.112] run: ping www.baidu.com [root@172.17.10.112] out: PING www.a.shifen.com (220.181.112.244) 56(84) bytes of data. [root@172.17.10.112] out: 64 bytes from 220.181.112.244: icmp_seq=1 ttl=52 time=5.41 ms

 

 

Fabric使用參數

指定執行的主機

D:\版本\cmdb_rbac_arya>fab hello -H root@172.17.10.112:22
[root@172.17.10.112:22] Executing task 'hello'
[root@172.17.10.112:22] run: echo hello world
[root@172.17.10.112:22] out: hello world
[root@172.17.10.112:22] out:


Done.
Disconnecting from root@172.17.10.112... done.

 

指定文件

D:\版本\cmdb_rbac_arya>fab -f zhanggen.py hello -H root@172.17.10.112:22
[root@172.17.10.112:22] Executing task 'hello'
[root@172.17.10.112:22] run: echo hello world
[root@172.17.10.112:22] out: hello world
[root@172.17.10.112:22] out:


Done.
Disconnecting from root@172.17.10.112... done.

 

fab -p並行執行

[root@cmdb fabric]# fab -l
Available commands:

    hello
[root@cmdb fabric]# fab -p hello
Usage: fab [options] <command>[:arg1,arg2=val2,host=foo,hosts='h1;h2',...] ...

Options:
  -h, --help            show this help message and exit
  -d NAME, --display=NAME
                        print detailed info about command NAME
  -F FORMAT, --list-format=FORMAT
                        formats --list, choices: short, normal, nested
  -I, --initial-password-prompt
                        Force password prompt up-front
  --initial-sudo-password-prompt
                        Force sudo password prompt up-front
  -l, --list            print list of possible commands and exit
  --set=KEY=VALUE,...   comma separated KEY=VALUE pairs to set Fab env vars
  --shortlist           alias for -F short --list
  -V, --version         show program's version number and exit
  -a, --no_agent        don't use the running SSH agent
  -A, --forward-agent   forward local agent to remote end
  --abort-on-prompts    abort instead of prompting (for password, host, etc)
  -c PATH, --config=PATH
                        specify location of config file to use
  --colorize-errors     Color error output
  -D, --disable-known-hosts
                        do not load user known_hosts file
  -e, --eagerly-disconnect
                        disconnect from hosts as soon as possible
  -f PATH, --fabfile=PATH
                        python module file to import, e.g. '../other.py'
  -g HOST, --gateway=HOST
                        gateway host to connect through
  --gss-auth            Use GSS-API authentication
  --gss-deleg           Delegate GSS-API client credentials or not
  --gss-kex             Perform GSS-API Key Exchange and user authentication
  --hide=LEVELS         comma-separated list of output levels to hide
  -H HOSTS, --hosts=HOSTS
                        comma-separated list of hosts to operate on
  -i PATH               path to SSH private key file. May be repeated.
  -k, --no-keys         don't load private key files from ~/.ssh/
  --keepalive=N         enables a keepalive every N seconds
  --linewise            print line-by-line instead of byte-by-byte
  -n M, --connection-attempts=M
                        make M attempts to connect before giving up
  --no-pty              do not use pseudo-terminal in run/sudo
  -p PASSWORD, --password=PASSWORD
                        password for use with authentication and/or sudo
  -P, --parallel        default to parallel execution method
  --port=PORT           SSH connection port
  -r, --reject-unknown-hosts
                        reject unknown hosts
  --sudo-password=SUDO_PASSWORD
                        password for use with sudo only
  --system-known-hosts=SYSTEM_KNOWN_HOSTS
                        load system known_hosts file before reading user
                        known_hosts
  -R ROLES, --roles=ROLES
                        comma-separated list of roles to operate on
  -s SHELL, --shell=SHELL
                        specify a new shell, defaults to '/bin/bash -l -c'
  --show=LEVELS         comma-separated list of output levels to show
  --skip-bad-hosts      skip over hosts that can't be reached
  --skip-unknown-tasks  skip over unknown tasks
  --ssh-config-path=PATH
                        Path to SSH config file
  -t N, --timeout=N     set connection timeout to N seconds
  -T N, --command-timeout=N
                        set remote command timeout to N seconds
  -u USER, --user=USER  username to use when connecting to remote hosts
  -w, --warn-only       warn, instead of abort, when commands fail
  -x HOSTS, --exclude-hosts=HOSTS
                        comma-separated list of hosts to exclude
  -z INT, --pool-size=INT
                        number of concurrent processes to use in parallel mode
[root@cmdb fabric]# fab -P hello
[root@172.17.10.112:22] Executing task 'hello'
[root@172.17.10.112:22] run: ls
[root@172.17.10.112:22] out: anaconda-ks.cfg         ecdsa-0.11          nohup.out           root                模板
[root@172.17.10.112:22] out: ansible-2.6.2         ecdsa-0.11.tar.gz      paramiko-1.15.1       setuptools-7.0        視頻
[root@172.17.10.112:22] out: ansible-2.6.2.tar.gz     install.log          paramiko-1.15.1.tar.gz   setuptools-7.0.tar.gz    圖片
[root@172.17.10.112:22] out: ansible_api         install.log.syslog      pycrypto-2.6.1       simplejson-3.6.5        文檔
[root@172.17.10.112:22] out: ansible-cmdb-1.17     Jinja2-2.7.3          pycrypto-2.6.1.tar.gz    simplejson-3.6.5.tar.gz  下載
[root@172.17.10.112:22] out: ansible-cmdb-1.17.tar     Jinja2-2.7.3.tar.gz      pycrypto-2.6.1.tar.gz.1  untitled folder        音樂
[root@172.17.10.112:22] out: cmdb_rbac_arya         MarkupSafe-0.9.3      Python-3.6.1           v1.7.2.tar.gz        桌面
[root@172.17.10.112:22] out: cmdb_rbac_arya.4_28.bak  MarkupSafe-0.9.3.tar.gz  Python-3.6.1.tgz       web.sql
[root@172.17.10.112:22] out: cmdb_rbac_arya.51.bak     node-v0.10.30          PyYAML-3.11           zhanggen.txt
[root@172.17.10.112:22] out: cmdb_rbac_arya.bak     node-v0.10.30.tar.gz      PyYAML-3.11.tar.gz       公共的
[root@172.17.10.112:22] out: 


Done.
[root@cmdb fabric]# 

 

 

 

Fabric經常使用AIP

def hello():  #任務1
    lcd(path='/')                #切換本地目錄
    cd('/')                      #切換遠程目錄
    local(command='fuck')        #執行本地命令
    run('echo hello world')      #執行遠程命令
    sudo('rm -rf /')             #執行遠程sudo命令

 

Fabric錯誤處理 try 

判斷

def hello():  #任務1
    with cd('/sb'):#當cd('/sb')執行成功以後!
        run('ls')   #執行('ls')

捕捉異常

def hello():  #任務1
    with settings(warn_only=True): #忽略  cd('/SB')
        cd('/SB')
    run('ls')#執行run('ls')

 

Fabric的裝飾器

from fabric.api import *

@task
def nginx():
    run('hostname')
nginx.py
from nginx import nginx
#@hosts(hosts1) #指定被裝飾的任務函數在哪臺服務器上運行
#@parallel          #強制被裝飾的函數並行執行(只能在Linux執行)
#@serial             #強制串行執行
#@roles('db')      #指定執行的角色!
@task()              #導入其餘模塊中的任務(定義新任務)
def hello():  #任務
    run('ls')#執行run('ls')
from fabric.api import *  #加載fabric模塊
hosts1 = 'root@172.17.10.112:22'
hosts2 = 'root@192.168.1.18:22'

env.hosts=[hosts1,hosts2]
#env.password='xxxxxx1234' #全局設置密碼!
env.passwords={
    hosts1:'xxxxxx1234',
    hosts2:'xxxxxx123'}


## @hosts(hosts1) #指定被裝飾的任務函數在哪臺服務器上運行
#@parallel      #強制被裝飾的函數並行執行(只能在Linux執行)
#@serial         #強制串行執行
#@roles('db')     #指定執行的角色!

          #導入其餘模塊中的任務(定義新任務)
def hello():  #任務
    run('ls')#執行run('ls')
設置不一樣密碼登陸

 

 

 

 

覆蓋重寫Django的auth_user表

1.用戶表

建立Django項目能夠把APP所有放在1個文件夾裏,這樣的目錄結構會更加清晰;

 

 

覆蓋擴展Django的auth_user表

AUTH_USER_MODEL='users.UserProfile' #須要在setting中重載AUTH_USER_MODE
setings.py
from django.contrib.auth.models import AbstractUser #自定製擴展Django自帶表格

class UserProfile(AbstractUser):
    pass
models.py

 

設置media上傳

 

image=models.FileField(verbose_name='頭像', upload_to='upload/avatar/')
models.py
from django.views.static import serve
urlpatterns = [
    url(r'^media/(?P<path>.*)$',serve,{'document_root': settings.MEDIA_ROOT}),
]
urls.py
import os
import django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "cmdb_rbac_arya.settings")
django.setup()  # 在Django視圖以外,調用Django功能設置環境變量!

from cmdb import models
import xlwt

# 建立一個文件對象
wb = xlwt.Workbook(encoding='utf8')
# 建立一個sheet對象
sheet = wb.add_sheet('order-sheet',cell_overwrite_ok=True)

# 設置文件頭的樣式,這個不是必須的能夠根據本身的需求進行更改
style_heading = xlwt.easyxf("""
        font:
            name Arial,
            colour_index white,
            bold on,
            height 0xA0;
        align:
            wrap off,
            vert center,
            horiz center;
        pattern:
            pattern solid,
            fore-colour 0x19;
        borders:
            left THIN,
            right THIN,
            top THIN,
            bottom THIN;
        """
    )
style_body = xlwt.easyxf("""
    font:
        name Arial,
        bold off,
        height 0XA0;
    align:
        wrap on,
        vert center,
        horiz left;
    borders:
        left THIN,
        right THIN,
        top THIN,
        bottom THIN;
    """
)
style_green = xlwt.easyxf(" pattern: pattern solid,fore-colour 0x11;")
style_red = xlwt.easyxf(" pattern: pattern solid,fore-colour 0x0A;")
fmts = [
    'M/D/YY',
    'D-MMM-YY',
    'D-MMM',
    'MMM-YY',
    'h:mm AM/PM',
    'h:mm:ss AM/PM',
    'h:mm',
    'h:mm:ss',
    'M/D/YY h:mm',
    'mm:ss',
    '[h]:mm:ss',
    'mm:ss.0',
]
style_body.num_format_str = fmts[0]

# 寫入文件標題
sheet.write(0,0,'工單標題',style_heading)
sheet.write(0,1,'工單發送方',style_heading)
sheet.write(0,2,'工單處理方',style_heading)
sheet.write(0,3,'相關責任人',style_heading)
sheet.write(0,4,'辦理日期',style_heading)
sheet.write(0,5,'結束日期',style_heading)


# 寫入數據
data_row = 1
# UserTable.objects.all()這個是查詢條件,能夠根據本身的實際需求作調整.
for i in models.Worker_order.objects.all() :
    # 格式化datetime
    sheet.write(data_row,0,i.title,style_body)
    sheet.write(data_row,1,i.initiator,style_body)
    sheet.write(data_row,2,i.agent,style_body)
    sheet.write(data_row,3,i.responsible_person,style_body)
    sheet.write(data_row,4,i.startdate,style_body)
    sheet.write(data_row,5,i.enddate,style_body)

# 寫出到IO

wb.save('張根.xls')
# 從新定位到開始
使用xlwt導出數據庫數據

 

 

 

psutil模塊

psutil模塊是什麼?

psutil能夠跨平臺獲取系統運行的進程和系統利用率(CP、內存、硬盤、網絡信息的python擴展庫),跨平臺這就意味着Windows系統主機的信息也能夠被採集到!

一專多能:實現了ps、top、lsof、netstat、ifconfig、who、df、kill、free、nice、ionice、iostat、iotop、uptime、pidof、tty、taskset、pmap等命令工具的功能;

pip install psutil

 

1. 使用用psutil獲取系統相關信息

import psutil
#-----------------------獲取CPU相關-------------------------------------
print(psutil.cpu_times())               #查看CPU的完整信息,獲取單項信息 .user .system .idel.....
print(psutil.cpu_count())               #查看CPU的邏輯個數
print(psutil.cpu_count(logical=False)) #查看CPU的物理個數
print(psutil.cpu_percent(interval=0))   #查看單個CPU的使用率
print(psutil.cpu_percent(percpu=True))  #查看每一個CPU的使用率

#------------------------獲取內存相關的信息---------------------------------
print(psutil.virtual_memory())          #查看內存的完整信息,獲取單項信息 .total .available .percent .used .free
print(psutil.virtual_memory().percent)  #查看內存使用百分比
print(psutil.swap_memory())             #查看swap分區的完整信息sswap(total=16075837440, used=4880826368, free=11195011072, percent=30.4, sin=0, sout=0)

#-----------------------獲取硬盤分區相關信息--------------------------------------
print(psutil.disk_usage('D:\\'))         #獲取硬盤的使用狀況
print(psutil.disk_io_counters(perdisk=True)) #查看硬盤的讀寫情況

#----------------------獲取網絡相關信息-----------------------
print(psutil.net_io_counters())           #獲取網絡的整體信息
print(psutil.net_io_counters(pernic=True))#獲取每一個網卡的IO信息
print(psutil.net_if_addrs())              #獲取網卡相關地址信息
print(psutil.net_connections())            #獲取全部接口的鏈接信息

#-------------------獲取系統其餘相關信息-------------------------
print(psutil.users())                      #獲取當前登陸信息
print(datetime.datetime.fromtimestamp(psutil.boot_time()).strftime('%Y-%m-%d %H:%M:%S')) #獲取開機時間

 

#!/usr/bin/env python
# -*- coding: utf-8 -*-

try:
    import psutil
except ImportError:
    print('錯誤: psutil模塊沒有發現!')
    exit()
import platform
import datetime
import time

def get_osinfo():
    '''獲取操做系統相關信息'''
    osType = platform.system()
    osVersion = platform.version()
    osArchitecture = platform.architecture()
    hostName = platform.node()
    return osType, osVersion, osArchitecture,hostName

def get_processor():
    '''獲取物理CPU個數'''
    return psutil.cpu_count(logical=False)


def get_cores():
    '''獲取邏輯CPU個數'''
    return psutil.cpu_count()


def get_boot_time():
    '''獲取開機時間'''
    return datetime.datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H:%M:%S")


def get_disk_root():
    '''獲取根分區磁盤空間'''
    return psutil.disk_usage('D:')


def get_mem_total():
    '''獲取內存容量'''
    return psutil.virtual_memory()[0] / 1024 / 1024


def get_mem_free():
    '''獲取可用內存大小'''
    return psutil.virtual_memory()[4] / 1024 / 1024


def get_key():
    '''函數獲取各網卡發送、接收字節數'''

    key_info = psutil.net_io_counters(pernic=True).keys()  # 獲取網卡名稱

    recv = {}
    sent = {}

    for key in key_info:
        recv.setdefault(key, psutil.net_io_counters(
            pernic=True).get(key).bytes_recv)  # 各網卡接收的字節數
        sent.setdefault(key, psutil.net_io_counters(
            pernic=True).get(key).bytes_sent)  # 各網卡發送的字節數

    return key_info, recv, sent


def get_rate(func):
    '''函數計算每1秒網卡速率'''
    key_info, old_recv, old_sent = func()  # 上1秒收集的數據

    time.sleep(1)

    key_info, now_recv, now_sent = func()  # 當前所收集的數據

    net_in = {}
    net_out = {}

    for key in key_info:
        net_in.setdefault(key, (now_recv.get(key) - old_recv.get(key)) / 1024)  # 每秒接收速率
        net_out.setdefault(key, (now_sent.get(key) - old_sent.get(key)) / 1024)  # 每秒發送速率

    return key_info, net_in, net_out


def main():
    '''程序入口函數'''
    ostype, osversion, osarchitecture,hostname = get_osinfo()

    print('操做系統類型:',ostype)
    print('操做系統版本:', osversion)
    print('操做系統位數:', osarchitecture[0])
    print('主機名:', hostname)
    print('物理CPU個數:', get_processor())
    print('邏輯CPU個數:', get_cores())
    print('開機時間:', get_boot_time())
    print('根分區可用空間(單位爲MB):', get_disk_root()[2] / 1024 / 1024)
    print('內存總量(單位爲MB):', get_mem_total())
    print('可用內存大小(單位爲MB):', get_mem_free())

    i = 0
    while i < 3:                                                            #去獲取每秒每塊網卡的 速率
        key_info, net_in, net_out = get_rate(get_key)
        for key in key_info:
            print('%s\nInput:\t %-5sKB/s\nOutput:\t %-5sKB/s\n' %
                  (key, net_in.get(key), net_out.get(key)))
        i += 1


if __name__ == '__main__':
    main()
獲取本機的基本信息和 3秒的網卡速率

 

2.使用psutil管理系統進程

A.process類的做用

依據進程ID來獲取單個進程的名稱、執行路徑、執行狀態、系統資源利用率等信息;

查看系統中全部運行的進程

import psutil
pid_list=psutil.pids() #獲取當前系統進程ID列表
for i in pid_list:
    p=psutil.Process(i)
    print('我是進程:{0},個人進程ID:{1} 俺爹的進程ID:{2}'.format(p.name(),str(p.pid),str(p.ppid()  )))#獲取全部進程的名稱

pidList = psutil.pids()
for pid in pidList: #查看系統中進行的相關信息
proc = psutil.Process(pid)
pidDictionary=proc.as_dict(attrs=['pid', 'name', 'username', 'exe', 'create_time', 'status', 'num_threads'])#獲取全部進程的相關信息以字典顯示
tempText = ''
for keys in pidDictionary.keys():
tempText += keys + ':' + str(pidDictionary[keys]) + '\n'
print(tempText)
print('*********************\n')
 

查看單個進程的詳細信息

p = psutil.Process(pid=7160)
print(p.exe())           #獲取進程的工做目錄
print(p.cwd())           #獲取進程的工做目錄的絕對路徑
print(p.cpu_times())     #c查看進程CPU時間信息
print(p.cpu_affinity())  #查看CPU佔用狀況
print(p.username())            #開啓該進程的用戶
print(p.num_threads())         #查看進程包含的線程數量
print(p.memory_percent())      #查看進程的內存使用率
print(p.memory_info())         #查看進程rss、vms信息
print(p.connections())         #獲取進程的namedutples列表、fs、family、laddr等信息
print(p.io_counters())         #查看進程的IO
print(p.status())              #查看進程狀態
print(datetime.datetime.fromtimestamp(p.create_time()).strftime('%Y-%m-%d %H:%M:%S') )#查看進程的建立時間
p.suspend()                    #掛起進程
p.resume()                     #恢復
p.kill()                       #殺死進程 

 

B.使用Popen方法啓動進程;

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import psutil
from subprocess import PIPE
p=psutil.Popen(['python','nginx.py'])  #開啓1個進程
p.communicate()                        #打印開啓進程的輸出

 

 

ipy模塊

ipy模塊主要對IP地址進制處理和轉換;

from IPy import IP
private_ipaddr=IP('192.168.0.0/24')
public_ipaddr=IP('123.150.204.166')
print(public_ipaddr.make_net('255.255.0.0')) #查詢IP所在的網段
print(public_ipaddr.reverseNames())       #查看該IP地址的反向解析
print(private_ipaddr.version())          #查看IP地址的版本
print(public_ipaddr.iptype())            #查看是 私網地址?公網地址?
########################ip地址格式轉換#####################

print(private_ipaddr.int()) #轉換成整型格式 3232235520 print(private_ipaddr.strBin()) #把IP地址轉換成二進制格式 11000000101010000000000000000000 print(private_ipaddr.strHex()) #把IP地址轉換成16進制格式 0xc0a80000

 

        kafka消息中間件

kafka簡介

kafka是一款高性能:以O(1)系統開銷下把消息寫到磁盤)、高吞吐、分佈式(方便架構擴展)的發佈訂閱消息系統,數據流(區別於redis隊列《---------》能夠重複消費 消費以後數據不銷燬)處理平臺;

 

 

 

安裝kafka

wget https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
tar -zxf kafka_2.11-0.10.2.0.tgz 
cd kafka_2.11-0.10.2.0/
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &           #後臺啓動zookeeper
nohup bin/kafka-server-start.sh config/server.properties &          #後臺啓動kafka服
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1              #前臺啓動生產者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning #前臺啓動消費者

 

kafka的配置文件

broker.id=0                    #在集羣中的惟一標識
num.network.threads=3          #處理忘了請求的最大線程(CPU個數)
num.io.threads=8          #處理磁盤IO的線程數
socket.send.buffer.bytes=102400 
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs       #日誌路徑
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168         #保存消息的時間
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181     #集羣地址多個,分割
zookeeper.connection.timeout.ms=6000
advertised.host.name=192.168.226.137 #切記不要寫0.0.0.0
advertised.listeners=PLAINTEXT://192.168.226.137:9092
 

 

kafka的python API (pykafka)

pip install pykafka

from pykafka import KafkaClient
import json

client = KafkaClient(hosts='192.168.226.137:9092')
print(client.topics)
topic = client.topics[b'test']
producer = topic.get_producer(sync=True)        #建立1個生產者
producer.start()
data=json.dumps({"topic0":"Zhang Gen is very handsome."}).encode('utf-8')

with topic.get_sync_producer() as producer:  #在某個topic中建立1個生產者
    producer.produce(message=data)            #生產者 生產消息
    print('插入成功')
pykafka生產者
from pykafka import KafkaClient
import json

client = KafkaClient(hosts="192.168.226.137:9092")
print(client.topics)
topic = client.topics[b'test']    #topic名稱
consumer = topic.get_simple_consumer()
for record in consumer:
    if record is not None:
        valuestr = record.value.decode()   #從bytes轉爲string類型
        print(valuestr)
pykafka消費者

 

 

安裝logstash

wget https://artifacts.elastic.co/downloads/logstash/logstash-5.5.2.zip
unzip logstash-5.5.2.zip
cd logstash-5.5.2/
bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebu}'}

 json格式輸出

{
    "@timestamp" => 2018-11-15T03:16:14.366Z,
      "@version" => "1",
          "host" => "localhost.localdomain",
       "message" => ""
}

 

 

Django項目結合極驗滑動驗證

 

 

簡介

下載極驗pythonSDK

pip install geetest #安裝jeetest包

 

from django.conf.urls import url
from django.contrib import admin
from app01 import views
urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^login/', views.login,name='login'),
    url(r'^index/', views.index,name='index'),
    url(r'^pc-geetest/register', views.get_geetest, name='get_geetest'),#極驗獲取驗證碼url
]
url.py
from django.shortcuts import render,HttpResponse,redirect
import json
from app01 import models
from geetest import GeetestLib

pc_geetest_id = "254cb3cd9d98d2635352609157931960"
pc_geetest_key = "2d3dc172309e33998c5eba085be1cf48"

def login(request):
    if request.method=='GET':
        return render(request,'login.html')
    else:
        response_info={'status':200,'msg':'success'}
        gt = GeetestLib(pc_geetest_id, pc_geetest_key)
        challenge = request.POST.get(gt.FN_CHALLENGE, '')
        validate = request.POST.get(gt.FN_VALIDATE, '')
        seccode = request.POST.get(gt.FN_SECCODE, '')
        status = request.session[gt.GT_STATUS_SESSION_KEY]
        user_id = request.session["user_id"]
        username = request.POST.get("username")
        password = request.POST.get("password")
        if status:
            result = gt.success_validate(challenge, validate, seccode, user_id)
        else:
            result = gt.failback_validate(challenge, validate, seccode)
        if result:
            user_obj=models.User_info.objects.filter(username=username,password=password).first()
            if not user_obj:
                response_info = {'status': 404, 'msg': 'not find it'}
        print(response_info)
        return HttpResponse(json.dumps(response_info,ensure_ascii=False))


def index(request):
    return render(request,'index.html')


def get_geetest(request): #極驗驗證
    user_id = 'test'
    gt = GeetestLib(pc_geetest_id, pc_geetest_key)
    status = gt.pre_process(user_id)
    request.session[gt.GT_STATUS_SESSION_KEY] = status
    request.session["user_id"] = user_id
    response_str = gt.get_response_str()
    return HttpResponse(response_str)
views.py
{% load staticfiles %}
<!DOCTYPE html>
<html lang="en" class="no-js">
<head>
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>綾致-運維管理平臺</title>
    <link rel="icon" href="{% static 'img/demo-1-bg.jpg' %}">

    <link rel="stylesheet" type="text/css" href="{% static 'bootstrap/css/normalize.css' %}">
    <link rel="stylesheet" type="text/css" href="{% static 'bootstrap/css/demo.css' %}">
    <!--必要樣式-->
    <link rel="stylesheet" type="text/css" href="{% static 'bootstrap/css/component.css' %}">
    <link rel="stylesheet" href="/static/bootstrap/css/style.css" media="screen" type="text/css"/>
    <script src="http://static.geetest.com/static/tools/gt.js"></script>
    <!--[if IE]>
{#<script src="js/html5.js"></script>#}
{#<script src="{% static 'bootstrap/js/html5.js' %}"></script>#}

<![endif]-->
    <style>
        .error_div {
            width: 300px;
            height: 20px;
            color: white;
            border-color: #ebccd1;
            margin-left: 10px;

        }

        .error_div1 {
            margin-top: 45px;
            width: 300px;
            height: 20px;
            color: white;
            border-color: #ebccd1;
            margin-left: 10px;

        }

        .error_div2 {
            width: 300px;
            height: 20px;
            color: red;
            border-color: #c9302c;
            margin-left: 10px;
            text-align: center;

        }
    </style>
</head>
<body>
<div class="container demo-1">
    <div class="content">
        <div id="large-header" class="large-header" style="height: 1094px;">
            <canvas id="demo-canvas" width="1360" height="1094"
                    style="background-image:url('{{ request.session.url_photo }}')"></canvas>
            <div class="logo_box">

                <p style="height: 30px"><a href="#"
                                           style="font-size: 40px;vertical-align: 80px;color: yellow;font-family:Microsoft YaHei"><strong>IT運維管理平臺</strong></a>
                </p>
                <p style="height: 30px"><a href="#"
                                           style="font-size: 20px;vertical-align: 80px;color: white;font-family: 'Microsoft YaHei'">IT
                    Management Center</a></p>
                <div class="input_outer">
                    <span class="u_user"></span>
                    <input id='username' name="user" class="text" style="color: #FFFFFF !important" type="text"
                           placeholder="請輸入帳戶">
                </div>
                <div class="input_outer">
                    <span class="us_uer"></span>
                    <input id='password' name="pass" class="text"
                           style="color: #FFFFFF !important; position:absolute; z-index:100;" value="" type="password"
                           placeholder="請輸入密碼">
                </div>
                {#                <div class="input_outer"  style="width: 200px">#}
                {#                    <span class="us_uer"></span>#}
                {#                    <input id='code' name="code" class="text"#}
                {#                           style="color: #FFFFFF !important; position:absolute; z-index:100;width: 100px" value=""#}
                {#                           type="text" placeholder="請輸入驗證碼">#}
                {#                    <img src="/check_code/" alt="驗證碼" onclick="changeImg(this)"#}
                {#                         style="position: absolute;top:1px;right: 0;margin-right: -125px">#}
                {#                </div>#}
                <div id="captcha-box">
                    <div id="loading-tip" style="color:red">正在爲您火速加載中....</div>
                </div>
                <div id="popup-captcha">
                    <!--存放極驗驗證碼區域!! -->
                </div>

                <div class="mb2" id="login_zone" hidden="hidden">
                    <button id='mybtn' class="act-but submit"
                            style="color: #FFFFFF;width: 330px;border: none;opacity:0.9;font-size: 26px; ">登陸
                    </button>
                </div>
                <a title="首次使用該功能須要提早在運維平臺作用戶認證"
                   href="https://api.weibo.com/oauth2/authorize?client_id=439057412&redirect_uri=http://172.17.10.112:8001/login/">
                    <img src="/static/weibo.png" alt="">
                </a>
            </div>
        </div>
    </div>
</div><!-- /container -->

<script src="{% static "bootstrap/js/TweenLite.min.js" %}"></script>
<script src="{% static "bootstrap/js/EasePack.min.js" %}"></script>
<script src="{% static "bootstrap/js/rAF.js" %}"></script>
<script src="{% static "bootstrap/js/demo-1.js" %}"></script>
<script src="{% static "bootstrap/js/jquery.js" %}"></script>

<script>
    function changeImg(ths) {
        ths.src = ths.src + "?";
    }

    {#    $('#mybtn').click(function () {#}
    {#        $('#username').next().remove();#}
    {#        $('#password').next().remove();#}
    {#        $('#code').siblings('.error_div1').remove();#}
    {#        $('#username').parent().parent().find(".error_div2").remove();#}
    {#        var flag = true;#}
    {#        var user_name = $('#username').val();#}
    {#        var user_pwd = $('#password').val();#}
    {#        var check_code = $('#code').val();#}
    {#        if (flag) {#}
    {#            $.ajaxSetup({#}
    {#                data: {csrfmiddlewaretoken: '{{ csrf_token }}'}#}
    {#            });#}
    {#            $.ajax({#}
    {#                url: '/login/',#}
    {#                type: 'POST',#}
    {#                data: {"username": user_name, "password": user_pwd, "code": check_code},#}
    {#                dataType: "JSON",#}
    {#                success: function (data) {#}
    {#                    if (data.status) {#}
    {#                        location.href = '/login_first/'#}
    {#                    }#}
    {#    else#}
    {#    {#}
    {#        console.log(data);#}
    {#        $.each(data.error, function (k, v) {#}
    {#            if (k == 'username') {#}
    {#                var $msg = "<div class='error_div'>" + v[0] + '</div>';#}
    {#                $('#' + k).after($msg)#}
    {#            } else if (k == 'password') {#}
    {#                var $msg = "<div class='error_div1'>" + v[0] + '</div>';#}
    {#                $('#' + k).after($msg)#}
    {#            } else if (k == 'code') {#}
    {#                var $msg = "<div class='error_div1'>" + v[0] + '</div>';#}
    {#                $('#' + k).after($msg)#}
    {#            }#}
    {#            else {#}
    {#                var $msg = "<div class='error_div2'>" + v + '</div>';#}
    {#                $('#username').parent().before($msg);#}
    {#            }#}
    {##}
    {#        });#}
    {#    }#}
    {#                }#}
    {#            })#}
    {#        }#}
    {##}
    {#    });#}


    var handlerPopup = function (captchaObj) {
        captchaObj.onReady(function () {        //1.極驗加載/準備時回調函數
            $("#loading-tip").hide();
            $('#login_zone').show();
        });
        
        captchaObj.onSuccess(function () {   //2.極驗加載成功的回調
            $('#login_zone').hide();
            var validate = captchaObj.getValidate();
            $.ajax({                          //3.發送用戶輸入到後臺
                url: "{% url 'login'%}", 
                type: "post",
                dataType: "json",
                data: {
                    username: $('#username').val(),
                    password: $('#password').val(),
                    csrfmiddlewaretoken: '{{ csrf_token }}',
                    geetest_challenge: validate.geetest_challenge,
                    geetest_validate: validate.geetest_validate,
                    geetest_seccode: validate.geetest_seccode
                },
                success: function (data) {
                    if (data.status) {           //4.後端驗證成功跳轉
                        location.href = '/login_first/'
                    }
                    else {
                        $('#login_zone').show();
                        captchaObj.reset();         //4.後端驗證失敗,重新加載極致驗證插件;
                        $('#popup-captcha').hide();
                        $('#username').next().remove();
                        $('#password').next().remove();
                        $('#code').siblings('.error_div1').remove();
                        $('#username').parent().parent().find(".error_div2").remove();
                        $.each(data.error, function (k, v) {
                            if (k == 'username') {
                                var $msg = "<div class='error_div'>" + v[0] + '</div>';
                                $('#' + k).after($msg)
                            } else if (k == 'password') {
                                var $msg = "<div class='error_div1'>" + v[0] + '</div>';
                                $('#' + k).after($msg)
                            } else if (k == 'code') {
                                var $msg = "<div class='error_div1'>" + v[0] + '</div>';
                                $('#' + k).after($msg)
                            }
                            else {
                                var $msg = "<div class='error_div2'>" + v + '</div>';
                                $('#username').parent().before($msg);
                            }

                        });
                    }
                }
            });
        });

        $("#mybtn").click(function () {
            captchaObj.appendTo("#popup-captcha");
            $('#popup-captcha').show()
        });
    };
 
    $.ajax({   //0.加載極驗插件
        url: "/pc-geetest/register?t=" + (new Date()).getTime(), // 加隨機數防止緩存
        type: "get",
        dataType: "json",
        success: function (data) {
            // 使用initGeetest接口
            // 參數1:配置參數
            // 參數2:回調,回調的第一個參數驗證碼對象,以後可使用它作appendTo之類的事件
            initGeetest({
                gt: data.gt,
                challenge: data.challenge,
                product: "popup", // 產品形式,包括:float,embed,popup。注意只對PC版驗證碼有效
                offline: !data.success // 表示用戶後臺檢測極驗服務器是否宕機,通常不須要關注
                // 更多配置參數請參見:http://www.geetest.com/install/sections/idx-client-sdk.html#config
            }, handlerPopup);
        }
    });


</script>


</body>
</html>
login.html

 

百度語音提示

安裝百度文字裝語音模塊

pip install baidu-aip 

播放audio的標籤

 <embed height="1" width="1" src="/media/audio/{{ request.session.username }}/audio.mp3">
import datetime
import os,time
from aip import AipSpeech
from django.conf import settings
import threading

class Speak_timeout(threading.Thread):
    APP_ID = '14895770'
    API_KEY = 'PDpuxPZ3pLo9Lx7wsjFXZlAB'
    SECRET_KEY = '1OpDz7xyc6xnWSWUUIGnxT87zEkWtj73'
    client = AipSpeech(APP_ID, API_KEY, SECRET_KEY)
    member = {'van': '劉浩', 'markguo': '郭光', 'owen.wu': '吳詩萌',
              'benson': '曹立林', 'zhanggen': '張根', 'lvhongfang': '呂洪芳'}

    def __init__(self, request, models):
        super(Speak_timeout,self).__init__()
        self.request = request
        self.curent_user = request.session.get('username')  # session
        self.audio_path = os.path.join(settings.MEDIA_ROOT, 'audio', self.curent_user)
        self.file_name = os.path.join(self.audio_path, 'audio.mp3')
        self.models = models

    def timeout_count(self):
        temp = self.models.Worker_order.objects.filter(level__in=[1, 2, 3], agent=self.curent_user,
                                                       status__in=[0,1,3])

        if temp:  # 就是有未處理的工單
            self.count = 0
            for w in temp:
                enddate = datetime.datetime.strptime(w.enddate, '%Y年%m月%d日%H時%M分%S秒')
                if enddate < datetime.datetime.now():  # 表明:未處理&超時 工單
                    self.count += 1

            if self.count:
                chinese_name = self.member.get(self.curent_user)
                self.curent_user = chinese_name if chinese_name else self.curent_user
                content = 'HI:%s 你有%s條超時工單!' % (self.curent_user,self.count)
                self.speak(content=content)


        if os.path.exists(self.audio_path) is False:
            os.makedirs(self.audio_path)

        else:
            if self.count==0 and os.path.isfile(self.file_name):
                os.remove(self.file_name)

    def speak(self, content):
        if not os.path.exists(self.audio_path):
            os.makedirs(self.audio_path)
        result = self.client.synthesis(content, 'zh', 1, {
            'vol': 5, 'per': 4
        })
        # 識別正確返回語音二進制 錯誤則返回dict 參照下面錯誤碼
        if not isinstance(result,dict):
            with open(self.file_name,'wb') as f:
                f.write(result)

    def run(self):
        self.timeout_count()
百度文字轉語音
class WorOrder_Middleware(RbacMiddleware,MiddlewareMixin):
    def process_request(self,request):
        if request.path_info not in ['/login/','/pc-geetest/register','/arya/cmdb/worker_order/handle/']:
            speak_obj=Speak_timeout(request,models) #工單語言
            speak_obj.start()
Django中間件

 

 

python3調用zabbix3API

zabbix提供着豐富的API功能,但在使用它們的前提是你須要獲得1個受權碼;

zabbix文檔

 

import requests
import json
class Smart_select(object):
    members = {
        'owen.wu': {
            "mediatypeid": "10", #原來 'mediatypeid': '5', 'type': '1', 'description': '微信_DB'
            "sendto": "owen.wu@bestseller.com.cn",
            "active": 0,
            "severity": 63,
            "period": "1-7,00:00-24:00",
        },
        'zhanggen': {
            "mediatypeid": "10", #
            "sendto": "zhanggen@bestseller.com.cn",
            "active": 0,
            "severity": 63,
            "period": "1-7,00:00-24:00",
        },
        'markguo': {
            "mediatypeid": "10",
            "sendto": "markguo@bestseller.com.cn",
            "active": 0,
            "severity": 63,
            "period": "1-7,00:00-24:00",
        },
        'lvhongfang': {
            "mediatypeid": "10",
            "sendto": "lvhongfang@bestseller.com.cn",
            "active": 0,
            "severity": 63,
            "period": "1-7,00:00-24:00",
        },
    }
    zabbix_api= 'http://10.150.22.211/zabbix/api_jsonrpc.php'
    headers = {'content-type':'application/json-rpc'}
    auth_data = json.dumps(
        {"jsonrpc": "2.0",'method': 'user.login', 'params': {"user": 'admin', "password": '123123xxx'}, "auth": None,
         'id': 0})

    def __init__(self,current_user):
        self.current_user=current_user
        self.auth = requests.post(url=self.zabbix_api, headers=self.headers, data=self.auth_data).json()['result']

    def select(self):
        self.updatemedia_data = json.dumps({
            "jsonrpc": "2.0",
            "method": "user.updatemedia",
            "params": {
                "users": [
                    {
                        "userid": "1"  #admin組
                    }
                ],
                "medias":self.members[self.current_user]
            },
            "auth": self.auth,
            "id": 1
        })
        response=requests.post(url=self.zabbix_api,data=self.updatemedia_data,headers=self.headers).json()
        print(response)
        return response
調用zabbix api 每週自動更新admin用戶的媒介

 

 

 觸發報警

 

 

動做發送給用戶

 

用戶關聯媒介

 zabbix目前使用Admin 用戶調用了zabbix報警推送到運維平臺  媒介       

Zabbix報警推送到運維平臺 markguo@bestseller.com.cn 1-7,00:00-24:00 已啓用

 

 

記念2次zabbix報警沒法調用微信接口的問題

多個media媒介同時調用微信接口衝突

解決:zabbix報警流程分析,從新建立報警流程

 新用戶--->關聯新媒介-->調用不一樣文件名python腳本 

新動做---->發送新用戶

 

 

class Smart_select(object):
    members = {
        'owen.wu': {
            "mediatypeid": "13",  # 原來 'mediatypeid': '5', 'type': '1', 'description': '微信_DB'
            "sendto": "owen.wu@bestseller.com.cn",
            "active": 0,
            "severity": 63,
            "period": "1-7,00:00-24:00",
        },
        'zhanggen': {
            "mediatypeid": "13",  #
            "sendto": "zhanggen@bestseller.com.cn",
            "active": 0,
            "severity": 63,
            "period": "1-7,00:00-24:00",
        },
        'markguo': {
            "mediatypeid": "13",
            "sendto": "markguo@bestseller.com.cn",
            "active": 0,
            "severity": 63,
            "period": "1-7,00:00-24:00",
        },
        'lvhongfang': {
            "mediatypeid": "13",
            "sendto": "lvhongfang@bestseller.com.cn",
            "active": 0,
            "severity": 63,
            "period": "1-7,00:00-24:00",
        },
    }
    zabbix_api = 'http://10.150.22.211/zabbix/api_jsonrpc.php'
    headers = {'content-type': 'application/json-rpc'}
    auth_data = json.dumps(
        {"jsonrpc": "2.0", 'method': 'user.login', 'params': {"user": 'admin', "password": '123123xxx'}, "auth": None,
         'id': 0})

    def __init__(self, current_user):
        self.current_user = current_user
        self.auth = requests.post(url=self.zabbix_api, headers=self.headers, data=self.auth_data).json()['result']

    def select(self):
        self.updatemedia_data = json.dumps({
            "jsonrpc": "2.0",
            "method": "user.updatemedia",
            "params": {
                "users": [
                    {
                        "userid": "21"  # admin組
                    }
                ],
                "medias": self.members[self.current_user]
            },
            "auth": self.auth,
            "id": 1
        })
        response = requests.post(url=self.zabbix_api, data=self.updatemedia_data, headers=self.headers).json()['result']
        return response

    def get_media_type(self):
        self.get_media_type_data = json.dumps({
            "jsonrpc": "2.0",
            "method":"mediatype.get",
            "params": {
                "output": "extend"
            },
            "auth": self.auth,
            "id": 1
        })
        response = requests.get(url=self.zabbix_api, data=self.get_media_type_data, headers=self.headers).json()['result']
        for i in response:
            print(i)
        return response
    def get_users(self):
        self.get_user_data = json.dumps({
            "jsonrpc": "2.0",
            "method": "user.get",
            "params": {
                "output": "extend"
            },
            "auth": self.auth,
            "id": 1
        })
        response = requests.get(url=self.zabbix_api, data=self.get_user_data, headers=self.headers).json()['result']
        return response

    def get_usermedia(self):
        self.get_user_data = json.dumps(
            {
                "jsonrpc": "2.0",
                "method": "usermedia.get",
                "params": {
                    "output": "extend",
                    "userids": "1"
                },
                "auth": self.auth,
                "id": 1
            }

        )
        response = requests.get(url=self.zabbix_api, data=self.get_user_data, headers=self.headers).json()['result']
        return response
python 調用zabbix api示例代碼

 

 

https://blog.csdn.net/weixin_33875839/article/details/88226753

 

 

Django-jet

 

Django-jet的功能就是對Django的 Django-admin後臺管理插件進行美化

 

參考博客

GitHub地址

                        yagmail發送郵件

 

yagmail模塊是1個極其好用的郵件發送模塊

 

0.安裝yagmail模塊

[root@cmdb /]# pip install yagmail

 

1.配置郵箱

 

 2.設置受權碼

 

 

 3.使用yagmail模塊

注意 password="受權碼」,若是按照以上的步驟操做以後仍然報錯

smtplib.SMTPDataError: (554, b'DT:SPM 163 smtp1,C9GowAD32Bi_sxhcdfRjAw--.682S2 1545122751,please see http://mail.163.com/help/help_spam_16.htm?ip=219.142.140.226&hostid=smtp1&time=1545122751')

Process finished with exit code 1

就換個IP地址!!

import yagmail

# 鏈接郵箱服務器
yag = yagmail.SMTP(user="13220198866@163.com", password="步驟2設置的受權碼", host="smtp.163.com")

# 郵箱正文
contents = ['韋哥,我正在用yagmail這個庫給你發郵件,感受賊雞巴簡單']

# # 發送郵件,給一我的
# yag.send('645172205@qq.com', 'subject', contents)

#給多我的發送郵件,併發送附件
yag.send(['645172205@qq.com', '454381958@qq.com'], '發送附件', conte

 

python-nmap模塊

nmap是一款功能強大網絡掃描、嗅探工具,正好用它去自動發現主機,作CMDB信息採集工做;

 

0.Linux安裝

 wget http://nmap.org/dist/nmap-6.40-1.x86_64.rpm 
 rpm -Uvh nmap-6.40-1.x86_64.rpm

 

2.pip安裝python-nmap模塊

pip3.6 install python-nmap

 

3.簡單使用

import nmap
nm=nmap.PortScanner()
result=nm.scan('192.168.1.0/24',arguments="-sP").get('scan') # 

for k,v in result.items(): #result返回1個大字典{'nmap': '本次掃描元信息描述','scan':'掃描結果'}
    print(k,v)
'''
('192.168.1.0',
 {'nmap': {'command_line': 'nmap -oX - -p 22,80,8888,8080,443 -sS 192.168.1.0',           #本次掃描元信息描述
           'scaninfo': {'tcp': {'method': 'syn', 'services': '22,80,443,8080,8888'}},
           'scanstats': {'timestr': 'Thu Dec 20 09: 49:40 2018',
           'elapsed': '9.16',
           'uphosts': '0',
           'downhosts': '1',
           'totalhosts': '1'}
 },
 'scan': {}})                                                                        #掃描結果

 

192.168.112.1 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.1', 'mac': '1C:6A:7A:0B:1D:FF'}, 'vendor': {'1C:6A:7A:0B:1D:FF': 'Cisco Systems'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.2 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.2', 'mac': '00:6B:F1:08:30:65'}, 'vendor': {'00:6B:F1:08:30:65': 'Cisco Systems'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.4 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.4', 'mac': '90:60:F1:39:DC:03'}, 'vendor': {'90:60:F1:39:DC:03': 'Apple'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.5 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.5', 'mac': 'B0:19:C6:A8:25:50'}, 'vendor': {'B0:19:C6:A8:25:50': 'Apple'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.8 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.8', 'mac': 'E4:B3:18:39:6C:0A'}, 'vendor': {'E4:B3:18:39:6C:0A': 'Intel Corporate'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.9 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.9', 'mac': '48:45:20:47:07:66'}, 'vendor': {'48:45:20:47:07:66': 'Intel Corporate'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.10 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.10', 'mac': '24:F6:77:23:A3:C1'}, 'vendor': {'24:F6:77:23:A3:C1': 'Apple'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.12 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.12', 'mac': 'D8:1D:72:81:C3:72'}, 'vendor': {'D8:1D:72:81:C3:72': 'Apple'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.13 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.13', 'mac': '30:B4:9E:C7:63:0B'}, 'vendor': {'30:B4:9E:C7:63:0B': 'Tp-link Technologies'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.15 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.15', 'mac': 'A0:D7:95:62:95:4B'}, 'vendor': {'A0:D7:95:62:95:4B': 'Apple'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.16 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.16', 'mac': '38:53:9C:3B:C0:74'}, 'vendor': {}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.21 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.21', 'mac': '3C:2E:F9:88:08:86'}, 'vendor': {'3C:2E:F9:88:08:86': 'Apple'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.22 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.22', 'mac': '20:F7:7C:73:D4:29'}, 'vendor': {}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.23 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.23', 'mac': '5C:1D:D9:46:46:88'}, 'vendor': {}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.24 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.24', 'mac': '70:77:81:3B:85:51'}, 'vendor': {'70:77:81:3B:85:51': 'Hon Hai Precision Ind.'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.25 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.25', 'mac': '70:48:0F:4D:53:47'}, 'vendor': {'70:48:0F:4D:53:47': 'Apple'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.26 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.26', 'mac': '90:B0:ED:69:07:94'}, 'vendor': {'90:B0:ED:69:07:94': 'Apple'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.27 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.27', 'mac': 'E4:A7:A0:34:E3:9C'}, 'vendor': {'E4:A7:A0:34:E3:9C': 'Intel Corporate'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.29 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.29', 'mac': '90:DD:5D:82:C5:D9'}, 'vendor': {}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.30 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.30', 'mac': 'EC:D0:9F:97:D1:4D'}, 'vendor': {'EC:D0:9F:97:D1:4D': 'Xiaomi Communications'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.31 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.31', 'mac': '38:89:2C:65:26:47'}, 'vendor': {}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.32 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.32', 'mac': '24:F0:94:C0:EB:61'}, 'vendor': {'24:F0:94:C0:EB:61': 'Apple'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.33 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.33', 'mac': '04:B1:67:60:79:B0'}, 'vendor': {}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.35 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.35', 'mac': '34:AB:37:F1:6F:06'}, 'vendor': {'34:AB:37:F1:6F:06': 'Apple'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.38 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.38', 'mac': 'E0:33:8E:AE:60:62'}, 'vendor': {}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.40 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.40', 'mac': '38:53:9C:EC:9F:FF'}, 'vendor': {}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.42 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.42', 'mac': '90:94:97:16:50:35'}, 'vendor': {}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.46 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.46', 'mac': 'E4:9A:DC:B6:BF:B8'}, 'vendor': {'E4:9A:DC:B6:BF:B8': 'Apple'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.47 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.47', 'mac': '70:48:0F:37:A0:71'}, 'vendor': {'70:48:0F:37:A0:71': 'Apple'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.49 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.49', 'mac': 'B0:E5:ED:B0:E5:11'}, 'vendor': {'B0:E5:ED:B0:E5:11': 'Huawei Technologies'}, 'status': {'state': 'up', 'reason': 'arp-response'}}
192.168.112.50 {'hostnames': [{'name': '', 'type': ''}], 'addresses': {'ipv4': '192.168.112.50', 'mac': '30:B4:9E:A9:25:D3'}, 'vendor': {'30:B4:9E:A9:25:D3': 'Tp-link Technologies'}, 'status': {'state': 'up', 'reason': 'arp-response'}}

4.nmap使用參數

-sP :進行ping掃描 這個命令能夠用於探測局域網有哪些機器 打印出對ping掃描作出響應的主機,不作進一步測試(如端口掃描或者操做系統探測):
-sn:  Ping Scan - disable port scan  #ping探測掃描主機,不進行端口掃描 (測試過對方主機把icmp包都丟棄掉,依然能檢測到對方開機狀態)
-sA (發送tcp的ack包進行探測,能夠探測主機是否存活)

 

5.高級用法

-sS :半開放掃描(非3次握手的tcp掃描)

使用頻率最高的掃描選項:SYN掃描,又稱爲半開放掃描,它不打開一個徹底的TCP鏈接,執行得很快,效率高

一個完整的tcp鏈接須要3次握手,而-sS選項不須要3次握手) 缺點:它須要root/administrator權限執行

 

 sT:3次握手方式tcp的掃描

 

sU:udp端口的掃描

Udp scan(sU) 顧名思義,這種掃描技術用來尋找目標主機打開的UDP端口.它不須要發送任何的SYN包,由於這種技術是針對UDP端口的。UDP掃描發送UDP數據包到目標主機,並等待響應,

 

sW:窗口掃描  

 

sV:版本檢測(sV)
版本檢測是用來掃描目標主機和端口上運行的軟件的版本,以下掃描,多出了ssh的版本信息 

 

更多nmap 參數參考

 

telnetlib模塊

 

telnet是Linux中測試遠程主機端口是否開啓、登陸遠程主機、交互機的工具,python也是有模塊與之對應的,它就是telnetlib模塊;

其實telnetlib模塊 經過telnet協議登陸服務器,然並卵 不是有paramiko模塊嗎?實際上是有telnetlib作一個登陸交換機的程序還能夠哈哈哈~;

 

我只是用telnetlib模塊 過濾主機列表中的服務器IP 是否爲Linux服務器?在ssh服務開啓在22號端口的前提下;

import  telnetlib
#斷定是否爲Linux系統(約定開啓22號端口)
tn=telnetlib.Telnet(host='172.17.10.112',port='22',timeout=4)
result=tn.read_until(b"\n") #讀取截止到 /n的返回結果; b'SSH-2.0-OpenSSH_5.3\r\n'
print(result)

 

        pexpect模塊      

1.pexpect簡介

pexpect是經過

  • 0.啓動子進程
  • 1.去鏈接Linux主機
  • 2.執行shell相關命令
  • 3.經過正則匹配 緩衝區執行結果
  • 4.返回不一樣響應結果

的方式實現了於Linux主機自動交互的 Python 模塊。

 

2.簡單使用

#!/usr/bin/python3
import pexpect
#spawn類啓動一個子程序,有豐富的方法實現對 子程序的控制
ssh_k=pexpect.spawn('ssh root@172.16.22.1 -p22')#生成子程序,去鏈接192.168.1.18

#ssh_k.expect('[p,P]assword:')
#expect 已正則表達式的方式:匹配緩衝區執行結果包含password:的內容;
# 正則匹配成功 返回0,正則匹配不成功一直等待返回,直到子程序到達超時時間(30秒);

ret=ssh_k.expect([pexpect.EOF,pexpect.TIMEOUT,'password'])
print(ret)
#匹配3種狀況:錯誤輸出:pexpect.EOF 超時:pexpect.TIMEOUT 正確:password
#正則匹配pexpect.EOF返回0 ,pexpect.TIMEOUT 返回1 , password成功 返回2,

 

3.向子程序發送指令

send():            發送執行指令時會 自動加回車符 /n
sendline():        發送執行指令 不會自動加回車符
sendcontrol(char):  發送控制符 ctrl c

 

4.自動登陸Linux主機並切換到Oracle用戶;

import pexpect
import sys
password='EC_history&LINGzhi'
ssh=pexpect.spawn('ssh root@10.102.6.38 -p 22')
status=ssh.expect(['password:','continue connecting (yes/no)?',pexpect.TIMEOUT,pexpect.EOF],timeout=50)
if status==0:
    ssh.sendline(password)  #注意輸入密碼 不要使用send 不須要\n回車!
elif status ==1:
    ssh.send('yes')
    ssh.expect('password: ')
    ssh.sendline(password)
elif status==2:
    print('賤婢作了很久~作不到')
elif status==3:
    print('賤婢死爹了,您請回!')
    sys.exit('Bye...')
index=ssh.expect(['#',pexpect.EOF,pexpect.TIMEOUT])
if index==0:
    print('You were logged in as root.')
    ssh.sendline('su - oracle')
    ssh.sendline('whoami')
    status=ssh.expect(['oracle', pexpect.EOF, pexpect.TIMEOUT])
    if status==0:
        print('切換到oracle用戶!')

 

5.pexpect的nteract() 和遠程主機交互

import pexpect
import sys
password='EC_history&LINGzhi'
ssh=pexpect.spawn('ssh root@10.102.6.38 -p 22')
status=ssh.expect(['password:','continue connecting (yes/no)?',pexpect.TIMEOUT,pexpect.EOF],timeout=50)
if status==0:
    ssh.sendline(password)  #注意輸入密碼 不要使用send 不須要\n回車!
elif status ==1:
    ssh.send('yes')
    ssh.expect('password: ')
    ssh.sendline(password)
elif status==2:
    print('賤婢作了很久~作不到')
elif status==3:
    print('賤婢死爹了,您請回!')
    sys.exit('Bye...')
index=ssh.expect(['#',pexpect.EOF,pexpect.TIMEOUT])
if index==0:
    print('You were logged in as root.')
    ssh.interact()  #直接 進入遠程主機交互式模式

 

6.python遠程登陸Linux各模塊小結

fabric:方便與shell腳本結合,擅長批量部署,任務管理。

paramiko:方便嵌套系統平臺中,擅長遠程執行命令,文件傳輸,自身不執行多進程;

pexpect擅長自動交互,好比ssh、ftp、telnet,原生支持多線程

參考

 

 

ASE加密解密

有時候咱們不能把一些敏感信息直接存儲到文件或者數據庫裏,全部就須要加密以後再存儲;

ASE對稱加密算法

加密/解密雙方 約定好 對稱的加密/解密規則 (以什麼規則加密的 就以什麼規則解密)

 

pip install pycryptodome  #安裝pycryptodome模塊
import base64
from Crypto.Cipher import AES

# str不是16的倍數那就補足爲16的倍數
def add_to_16(text):
    while len(text) % 16 != 0:
        text += '\0' #\0 就是加1個長度,直到加到長度爲16
    return str.encode(text)  # 返回bytes

key = 'woshinibaba!'  # 密碼
text = 'zhanggen'  # 待加密文本


#對稱加密算法:加密/解密雙方 約定好 對稱的加密/解密規則 (以什麼規則加密的 就以什麼規則解密)
aes = AES.new(add_to_16(key), AES.MODE_ECB)  # 初始化加密器接收 1個16位長度的字符串
#使用add_to_16  加密
encrypted_text = str(base64.encodebytes(aes.encrypt(add_to_16(text))), encoding='utf8').replace('\n', '')
#使用add_to_16  解密
text_decrypted = str(aes.decrypt(base64.decodebytes(bytes(encrypted_text, encoding='utf8'))).rstrip(b'\0').decode("utf8"))

print('加密值:', encrypted_text)
print('解密值:', text_decrypted)

在線AES加密解密

 

ssh無密碼登陸

若是你想不經過密碼的方式登陸1個Linux主機,A------>B ;

A:本機  B:目標主機

1.本機生成一對公、私鑰

ssh-keygen -t rsa            #一組公私鑰在~/.ssh/

2.本機將公鑰拷貝到 目標主機

 scp ~/.ssh/id_rsa.pub root@192.168.1.18:~/.ssh/

#-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------#

3.目標主機建立authorized_keys文件

cat id_rsa.pub >> ~/.ssh/authorized_keys

4.設置目標主機權限 

sudo chmod 700 ~/.ssh
sudo chmod 700 /home/當前用戶
sudo chmod 600 ~/.ssh/authorized_keys

5.目標主機修改sshd服務配置文件

vi /etc/ssh/sshd_config
StrictModes no

6.目標主機重啓sshd服務

[root@svnnew .ssh]# /etc/init.d/sshd restart
中止 sshd:                                                [肯定]
正在啓動 sshd:                                            [肯定]

 

 

 

Python的ConfigParser模塊

 

 

Parser模塊用於讀取配置文件內容

from configparser import ConfigParser
'''
[db]
db_port = 3306
db_user = root
db_host = 127.0.0.1
db_pass = xgmtest

[concurrent]
processor = 20
thread = 10


[zabbix]
host= 3306
port = root
username=admin
password = 127.0.0.1

'''
cp = ConfigParser()
cp.read("test.config")#讀取配置文件
section = cp.sections() #獲取全部的secetions         ['db', 'concurrent', 'zabbix']
items=cp.items('zabbix')#獲取當個secetion中的item  [('host', '3306'), ('port', 'root'), ('username', 'admin'), ('password', '127.0.0.1')]
str_port=cp.get('zabbix',"host") #獲取item中的值(str類型)   '3306'
int_port=cp.get('zabbix',"host")#獲取item中的值(int類型)  3306
print(int_port)
from configparser import ConfigParser

 


Python操做crontab

crontab是運維工做裏不可缺乏的一環,如何經過自動化的方式讓它使用起來更加快捷,簡單呢?Python中有1個模塊對其進行了封裝;

pip install python-crontab

 

PS:

當我下載完了python-crontab模塊以後,想獲取1個job的執行日誌,死活獲取不到,還好我沒有死心,

發現了python-crontab模塊源碼中的正則表達式 和 個人crontable日誌 match不上,居然少了1個D

MATCHER = r'(?P<date>\w+ +\d+ +\d\d:\d\d:\d\d) (?P<host>\w+) ' + \
        r'CRON\[(?P<pid>\d+)\]: \((?P<user>\w+)\) CMD \((?P<cmd>.*)\)'

                                         查詢每一個job的日誌就是經過這種正則匹配出來的

Jun  6 15:43:01 cmdb CROND[11986]: (root) CMD (python3 /root/cmdb_rbac_arya/check_late_work_order.py)
Jun  6 15:44:01 cmdb CROND[11997]: (root) CMD (echo date >> ~/time.log # zhanggen)
Jun  6 15:44:01 cmdb CROND[11998]: (root) CMD (echo date >> ~/time.log # zhanggen)
Jun  6 15:44:01 cmdb CROND[12001]: (root) CMD (echo date >> ~/time.log # zhanggen)
Jun  6 15:44:01 cmdb CROND[11999]: (root) CMD (echo date >> ~/time.log # zhanggen)
Jun  6 15:44:01 cmdb CROND[12000]: (root) CMD (python3 /root/cmdb_rbac_arya/check_late_work_order.py)
Jun  6 15:45:01 cmdb CROND[12009]: (root) CMD (python3 /root/cmdb_rbac_arya/check_late_work_order.py)
Jun  6 15:46:01 cmdb CROND[12021]: (root) CMD (python3 /root/cmdb_rbac_arya/check_late_work_order.py)
Jun  6 15:46:01 cmdb CROND[12022]: (root) CMD (echo date >> ~/time.log # zhanggen)
Jun  6 15:46:01 cmdb CROND[12023]: (root) CMD (echo date >> ~/time.log # zhanggen)
Jun  6 15:46:01 cmdb CROND[12024]: (root) CMD (echo date >> ~/time.log # zhanggen)
Jun  6 15:46:01 cmdb CROND[12025]: (root) CMD (echo date >> ~/time.log # zhanggen)
from crontab import CronTab

cron  = CronTab(user=True,log='/var/log/cron') # 表明 crontab -l 中的全部 job ,log爲crontable 的日誌路徑


##建立job
# job = my_user_cron.new(command='echo date >> ~/time.log')#在crontab -l 列表中建立 job
# job.setall('*/2 * * * *')                                  #設置執行時間
# job.set_comment("zhanggen")                             #設置任務描述
# job.enable()                                            #啓用/禁用
# my_user_cron.write()

# #經過_comment、_command、_time 3種形式查找job
# job1=list(cron.find_comment('zhanggen'))[0] #獲取1條crontable記錄的最近1條日誌,注意日誌排列爲倒序
# job2=list(cron.find_command('echo date'))[0] # matches foobar1
# job3=list(cron.find_time('*/2'))[0]
#
# #查看job的日誌記錄
# latest_log1=list(job1.log)[1]
# latest_log2=list(job2.log)[1]
# latest_log3=list(job3.log)[1]


#刪除job對象
# cron.remove(job)
# cron.remove_all(command= 'echo')
cron.remove_all(comment='zhanggen')
# cron.remove_all(time='*/2')

#提交執行結果
cron.write() #最後寫入纔會生效
代碼 

                   定時任務框架APScheduler

1.APScheduler結合Django

from django.shortcuts import render,HttpResponse
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), "default")
scheduler.start()
def time_task(**task):
    print(task)
from django_apscheduler import models
from datetime import datetime

register_events(scheduler)

def task(request):
    # obj=scheduler.get_job(job_id='zhanggen')

    #
    # scheduler.remove_job(job_id='zhanggen')#s刪除
    scheduler.add_job(name='您好',func=time_task,trigger="interval",seconds=3,id='七點',kwargs={'name':'zhanggen'})
    scheduler.add_job(name='您好', func=time_task, trigger="interval", seconds=3, id='八點', kwargs={'name': '張根'})
    # scheduler.add_job(name='您好',id='六點',func=time_task,trigger='date',run_date=datetime(2019,6,27,17,58,5),kwargs={'name':'zhanggen'})
    objs = models.DjangoJob.objects.filter(name__endswith='六點')
    print(objs)
    # obj = scheduler.get_job(job_id='benson')
    # obj.remove()#刪除
    # obj.pause()#暫停
    #obj.resume()#恢復
    return HttpResponse('OK')
views.py

 

 

ldap3

使用ldap用戶認證:

0.讓管理ldap服務器的運維人員給你開通帳號

2.使用Python登陸驗證

from ldap3 import Server, Connection
def Ldap_auth(username,pasword):
    '''

    :param username:你的用戶名
    :param pasword: 你的密碼
    :return: 驗證成功:success 驗證失敗:invalidCredentials
    '''
    ldaphost = ("ldap://10.10.82.222:10389")
    s = Server(ldaphost)
    conn2 = Connection(s,user='uid={0},ou=people,dc=example,dc=com'.format(username) , password=pasword,check_names=True, lazy=False,raise_exceptions=False)
    conn2.bind()
    return conn2.result["description"]

ret=Ldap_auth('zhanggen','zhanggen123.com')
print(ret)
ldap3驗證

 ldap獲取全部用戶信息

import ldap
def get_userinfo_from_ldap():
    try:
        server = "ldap://10.10.82.222:10389"
        membersDN ="ou=people,dc=example,dc=com"
        groups_DN="ou=groups,dc=example,dc=com"

        conn = ldap.initialize(server)
        conn.set_option(ldap.OPT_REFERRALS, 0)
        conn.protocol_version = ldap.VERSION3
        filterstr = '(objectClass=organizationalPerson)'
        # 獲取的字段
        attrlist = ['uid', 'displayName']
        conn.search(membersDN, ldap.SCOPE_ONELEVEL,
                    filterstr=filterstr, attrlist=attrlist)
        code, members = conn.result(timeout=3)

        name_map = {}
        for member in members:
            data = member[1]
            uid = data.get('uid', [''])[0].decode(
                encoding='utf-8')
            displayName = data.get('displayName', [''])[
                0].decode(encoding='utf-8')
            if uid:
                name_map[uid] = displayName
        conn.unbind_s()

        return True, name_map
    except ldap.LDAPError as e:

        return False
    pass

print(get_userinfo_from_ldap())
獲取全部用戶信息

 

 

 調用Ucloud API

import base64
import hashlib
import json
import os
import sys
import urllib.parse
import urllib.request

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import conf


class UCloudHelper():
    def __init__(self):
        pass

    def get_token(self, uhost_id, uhost_name):
        return hashlib.md5(('%s%s' % (uhost_id, uhost_name)).encode('utf8')).hexdigest()[:16]

    def __generate_url(self, params):
        # 1. 把排好序的k,v+private key做爲明文
        params['PublicKey'] = conf.ucloud_public_key
        s = ''.join([''.join((k, params[k])) for k in sorted(params.keys())])
        s = '%s%s' % (s, conf.ucloud_private_key)
        # 2. sha1加密
        params['Signature'] = hashlib.sha1(s.encode('utf8')).hexdigest()
        # 3. 請求ucloud
        url = 'https://api.ucloud.cn/?%s' % urllib.parse.urlencode(params)
        # print('params: %s' % params)
        # print('url: %s' % url)
        return url

    def describe_all_uhosts(self, limit):
        '''返回一坨實例信息'''
        params = {
            'Action': 'DescribeUHostInstance',
            'Region': conf.ucloud_region,
            'Tag': conf.ucloud_tag,
            'Limit': str(limit),
        }
        return self.__generate_url(params)

    def describe_uhost_instance(self, uhost_id):
        '''返回一個實例的信息'''
        params = {
            'Action': 'DescribeUHostInstance',
            'Region': conf.ucloud_region,
            'Tag': conf.ucloud_tag,
            'UHostIds.0': uhost_id,
        }
        return self.__generate_url(params)

    def describe_image(self):
        '''返回鏡像'''
        params = {
            'Action': 'DescribeImage',
            'Region': 'cn-bj2',
            # 'Zone':'cn-bj2',
            'ImageType': 'Base'
        }
        return self.__generate_url(params)

    def parse_image(self, result):
        '''describe_image結果解析'''
        prefix = 'sensors-analytics-saas-'
        # prefix = 'sensors-analytics-standalone'
        ret = []
        for image in result['ImageSet']:
            if image['ImageName'].startswith(prefix):
                image['Version'] = tuple([int(x) for x in image['ImageName'].replace(prefix, '').split('.')])
                ret.append(image)
        if not ret:
            raise Exception('cannot find %s image: %s!' % (prefix, str(result)))
        return sorted(ret, key=lambda x: x['Version'])[-1]

    #def create_uhost_instance(self, name, cpu, memory, disk, host_type, image_id,MachineType=False):
    def create_uhost_instance(self, image_id):
        '''建立一個新的實例'''
        # image = self.describe_image() -> image['ImageId']
        params = {
            'Action': 'CreateUHostInstance',
            'Region': conf.ucloud_region,
            'Zone': conf.ucloud_zone,
            'Tag': conf.ucloud_tag,
            'ImageId': 'uimage-5fhmo0',#image_id,
            'LoginMode': 'Password',
            'Password': base64.b64encode(conf.ucloud_default_password.encode('utf8')).decode('utf8'),
            'CPU': str(4),
            'Memory': str(1024 * 16),
            'StorageType': 'LocalDisk',
            'DiskSpace': str(200),
            'Name': 'cloud-TEST',
            "MachineType": 'O',
            # 'UHostType': host_type,
            # 'HostType': 'N2',
            'ChargeType': 'Month',
        }
        if image_id=='111':
           pass

        print('建立主機的參數:',params,'\r\n')
        return self.__generate_url(params)

    def delete_uhost_instance(self, uhost_id):
        '''關閉實例'''
        params = {
            'Action': 'TerminateUHostInstance',
            'UHostId': uhost_id,
            'Region': conf.ucloud_region,
        }
        return self.__generate_url(params)

    def stop_uhost_instance(self, uhost_id):
        '''暫停實例'''
        params = {
            'Action': 'StopUHostInstance',
            'UHostId': uhost_id,
            'Region': conf.ucloud_region,
        }
        return self.__generate_url(params)

    def start_uhost_instance(self, uhost_id):
        '''啓動實例'''
        params = {
            'Action': 'StartUHostInstance',
            'UHostId': uhost_id,
            'Region': conf.ucloud_region,
        }
        return self.__generate_url(params)

    def reinstall_uhost_instance(self, image_id, uhost_id, reserve_disk=False):
        '''重裝實例 返回重裝後的版本號'''
        # image = self.describe_image() -> image['ImageId']
        params = {
            'Action': 'ReinstallUHostInstance',
            'UHostId': uhost_id,
            'Region': conf.ucloud_region,
            'Password': base64.b64encode(conf.ucloud_default_password.encode('utf8')).decode('utf8'),
            'ReserveDisk': {True: 'Yes', False: 'No'}[reserve_disk],
            'ImageId': image_id,
        }
        return self.__generate_url(params)
if __name__ == '__main__':
    h=UCloudHelper()




'''
建立主機的參數: {'Action': 'CreateUHostInstance', 'Region': 'cn-bj2', 'Zone': 'cn-bj2-05', 'Tag': '雲試用版本', 'ImageId': 'uimage-5fhmo0', 'LoginMode': 'Password', 'Password': 'TWh4ektobDIwMTU=', 'CPU': '4', 'Memory': '16384', 'StorageType': 'LocalDisk', 'DiskSpace': '200', 'Name': 'cloud-TEST', 'MachineType': 'O', 'ChargeType': 'Month'} 

https://api.ucloud.cn/?Action=CreateUHostInstance&Region=cn-bj2&Zone=cn-bj2-05&Tag=%E4%BA%91%E8%AF%95%E7%94%A8%E7%89%88%E6%9C%AC&ImageId=uimage-5fhmo0&LoginMode=Password&Password=TWh4ektobDIwMTU%3D&CPU=4&Memory=16384&StorageType=LocalDisk&DiskSpace=200&Name=cloud-TEST&MachineType=O&ChargeType=Month&PublicKey=ucloudsangwf%40qq.com14284190000002005024391&Signature=ed8dad59f0dbf70bc5c5575854f2a3b77d174d51
{'RetCode': 0, 'Action': 'CreateUHostInstanceResponse', 'UHostIds': ['uhost-snd4rh2n'], 'IPs': ['10.42.175.77']}

'''
ucloud_helper.py
相關文章
相關標籤/搜索