# -*- coding: utf-8 -*- # author: qinshixu import requests import logging class Dingding(object): """ dingding controller include get_user_info send_message """ ding_api = "https://oapi.dingtalk.com/" def __init__(self, copy_id=None, copy_secret=None, agent_id=None): self.logger = logging.getLogger("dingding") self.copy_id = copy_id self.copy_secret = copy_secret self.agent_id = agent_id self._init_token() def _init_token(self): get_token_url = "%s%s?corpid=%s&corpsecret=%s" % ( self.ding_api, "gettoken", self.copy_id, self.copy_secret) try: res = requests.get(get_token_url) if res.status_code != 200: self.access_token = None self.logger.error( "Get dingding access_token error: msg:%s" % res.text) elif res.json()["errcode"] != 0: self.access_token = None self.logger.error( "Get dingding access_token error: msg:%s" % res.json()["errmsg"]) else: self.access_token = res.json()["access_token"] except Exception as e: self.logger.error(str(e)) def department_list(self): get_department_url = "%sdepartment/list?access_token=%s" % (self.ding_api, self.access_token) try: res = requests.get(get_department_url) if res.status_code != 200: self.logger.error( "Get dingding department error: msg:%s" % res.text) elif res.json()["errcode"] != 0: self.logger.error("Get dingding department error: msg:%s" % res.json()["errmsg"]) for i in res.json()["department"]: parentid = i["parentid"] if not i["createDeptGroup"] else None department = { "name": i["name"], "id": i["id"], "parentid": parentid } yield department except Exception as e: self.logger.error(str(e)) return None def get_department_users(self, dep_id=None): get_users_url = "%s%s?access_token=%s&department_id=%s&offset=0&size=100" % ( self.ding_api, "user/listbypage", self.access_token, dep_id) res = requests.get(get_users_url) if res.status_code != 200: self.logger.error("Get depart users error: msg:%s" % res.text) return None elif res.json()["errcode"] != 0: self.logger.error("Get depart users error: msg:%s" % res.json()["errmsg"]) return None else: dep_users = res.json()["userlist"] return dep_users def get_all_users(self): for i in self.department_list(): for j in self.get_department_users(i["id"]): userinfo = {} userinfo["mobile"] = j.get("mobile") userinfo["position"] = j.get("position") userinfo["userid"] = j.get("userid") userinfo["name"] = j.get("name") userinfo["isLeader"] = j.get("isLeader") userinfo["openId"] = j.get("openId") userinfo["department"] = sorted(j.get("department"))[-1] userinfo["email"] = j.get("orgEmail") if j.get("orgEmail") is None: userinfo["email"] = j.get("email") if not userinfo["email"]: print("%s no config email" % j.get("name")) continue yield userinfo def get_user_info(self, username): for userinfo in self.get_all_users(): if userinfo.email.split("@")[0] == username: return userinfo return None # example # if __name__ == "__main__": # ding = Dingding("dingk*************", "****", "****") # for i in ding.get_all_users(): # print(i)