Python構建企業微信自動消息轉發服務端

1、背景

目前有在項目分組,就小組成員中,微信羣消息回覆較多的狀況下,想根據組來轉發特定消息,包含文字、圖片、語言等。在此只是本身實現僅供參考,能夠根據自身需求修改更多功能。python

2、代碼

2.1 企業微信相關信息git

  • 企業ID:corpid

Python構建企業微信自動消息轉發服務端

  • 自建應用appid
  • 自建應用secret

Python構建企業微信自動消息轉發服務端

2.2 服務端部署
運行環境:
python 版本 2.7github

git clone https://github.com/redhatxl/wechatmsg.git
nohup python2.7 wechatmsg/wx_msg_server.py &

2.3 參考RUL:數據庫

獲取access_token
發送消息json

2.4 代碼flask

# flask 框架後臺
  def server_run(self):
        app = Flask(__name__)
        @app.route('/index', methods=['GET', 'POST'])
        def index():

            wxcpt = WXBizMsgCrypt(self.sToken, self.sEncodingAESKey, self.sCorpID)
            # 獲取url驗證時微信發送的相關參數
            sVerifyMsgSig = request.args.get('msg_signature')
            sVerifyTimeStamp = request.args.get('timestamp')
            sVerifyNonce = request.args.get('nonce')
            sVerifyEchoStr = request.args.get('echostr')

            # 驗證url
            if request.method == 'GET':
                ret, sEchoStr = wxcpt.VerifyURL(sVerifyMsgSig, sVerifyTimeStamp, sVerifyNonce, sVerifyEchoStr)
                print type(ret)
                print type(sEchoStr)

                if (ret != 0):
                    print "ERR: VerifyURL ret:" + str(ret)
                    sys.exit(1)
                return sEchoStr

            # 接收客戶端消息
            if request.method == 'POST':
                sReqMsgSig = sVerifyMsgSig
                sReqTimeStamp = sVerifyTimeStamp
                sReqNonce = sVerifyNonce
                sReqData = request.data
                print(sReqData)

                ret, sMsg = wxcpt.DecryptMsg(sReqData, sReqMsgSig, sReqTimeStamp, sReqNonce)
                print ret, sMsg
                if (ret != 0):
                    print "ERR: DecryptMsg ret: " + str(ret)
                    sys.exit(1)
                # 解析發送的內容並打印

                xml_tree = ET.fromstring(sMsg)
                print('xml_tree is ', xml_tree)
  • 消息內容發送
def _send_text_msg(self, content):
        data = {
            "touser": ('|').join(self.userid.split(',')),
            "toparty": ('|').join(self.partid.split(',')),
            # "toparty":int(self.partid),
            "msgtype": "text",
            "agentid": self.agent_id,
            "text": {
                "content": content
            },
            "safe": 0
        }
        try:
            response = requests.post(self.send_msg_url.format(self.access_token), json.dumps(data))
            self.logoper.info(response.text)
            print(response.text)
            result_msg = json.loads(response.content)['errmsg']
            return result_msg
        except Exception as e:
            self.logoper.info(e)
  • 日誌
def create_dir(self):
        """
        建立目錄
        :return: 文件名稱
        """
        _LOGDIR = os.path.join(os.path.dirname(__file__), self.logdir_name)
        _TIME = time.strftime('%Y-%m-%d', time.gmtime()) + '-'
        _LOGNAME = _TIME + self.logfile_name
        LOGFILENAME = os.path.join(_LOGDIR, _LOGNAME)
        if not os.path.exists(_LOGDIR):
            os.mkdir(_LOGDIR)
        return LOGFILENAME

    def create_logger(self, logfilename):
        """
        建立logger對象
        :param logfilename:
        :return: logger對象
        """
        logger = logging.getLogger()
        logger.setLevel(logging.INFO)
        handler = logging.FileHandler(logfilename)
        handler.setLevel(logging.INFO)
        formater = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        handler.setFormatter(formater)
        logger.addHandler(handler)
        return logger

配置文件c#

# 定義微信公衆號信息
[common]
# 企業微信企業ID
corpid = wxe23xxxxxxxxxxx

# 接收消息服務器配置
[recmsg]

Token = mVNAAw3xxxxxxxxxxxxxxxxx
EncodingAESKey = vwbKImxc3WPeE073xxxxxxxxxxxxxxxxxx

# 自建應用信息
[appconfig]
# 自建應用agentid
agentid = 1000002
# 自建應用secret
secret = 6HAGX7Muw36pv5anxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

# 消息接收信息
# 消息接收用戶id,若是多個用戶用英文','隔開
userid = xuel|yaoy

# 消息接收部門id,若是多個用英文','隔開
partid = 11

[urlconfig]
# 獲取應用token的api接口
get_access_token_url = https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={}&corpsecret={}
# 發送消息api接口
send_msg_url = https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}
# 上傳媒體api接口,獲取mediaid
upload_media_url = https://qyapi.weixin.qq.com/cgi-bin/media/upload?access_token={}&type=image
# 上傳高清語音接口
upload_video_url = https://qyapi.weixin.qq.com/cgi-bin/media/get/jssdk?access_token={}&media_id={}

[loginfo]
#日誌目錄
logdir_name = logdir
#日誌文件名稱
logfile_name = wechat_server.log

3、測試

在企業微信發送消息,能夠修改配置文件制定轉發到特定的羣組,從而避免消息分流。
啓用應用API,設置回調地址
Python構建企業微信自動消息轉發服務端
Python構建企業微信自動消息轉發服務端
測試發送消息
Python構建企業微信自動消息轉發服務端
查看接受消息
Python構建企業微信自動消息轉發服務端api

4、優化

  • 後期能夠配合數據庫將每次獲取的access_token 保存至數據庫,待2小時過時後,再從新獲取新的。
  • 更多內容轉發
相關文章
相關標籤/搜索