# coding=utf-8 from ldap3 import Server, Connection, ALL, MODIFY_REPLACE from random import choice import string from ldap3.extend.microsoft.addMembersToGroups import ad_add_members_to_groups from ldap3.extend.microsoft.removeMembersFromGroups import ad_remove_members_from_groups class Adoper(object): def __init__(self): self.host = "****" # ad host self.user = '****' # 管理員 self.password = '****' # 密碼 self.server, self.conn = self._connect() def _connect(self): #鏈接AD server = Server(self.host, use_ssl=True, get_info=ALL) conn = Connection(server, user=self.user, password=self.password, auto_bind=True) return server, conn @staticmethod def _make_password(): #生成隨機密碼 special_char = '&!@#^*-=' length = 8 chars = string.ascii_letters + string.digits + special_char password = ''.join([choice(chars) for i in range(length)]) return password def search(self, base_dn=None): #獲取全部AD帳戶 attr_list = ['distinguishedName', 'cn', 'uid', 'displayName', 'mail', 'userAccountControl', 'sAMAccountName', 'pwdLastSet', 'department', 'employeeID'] res = self.conn.search(base_dn, search_filter='(objectclass=user)', attributes=attr_list, paged_size=10000) if not res: return None for user in self.conn.entries: user_infos = {} if user["userAccountControl"] == 66050: #禁用帳戶不顯示 continue user_infos["email"] = str(user["mail"]) user_infos["name"] = str(user["SAMAccountName"]) user_infos["department"] = str(user["department"]) user_infos["dn"] = str(user["distinguishedName"]) user_infos["employeeId"] = str(user["employeeID"]) if user["employeeID"] else "" yield user_infos def search_user_dn(self, user): #獲取指定用戶CN """ return str """ dn = None for i in self.search(): if i["name"] == user: dn = i["dn"] break return dn def is_forbidden(self, user, flag=True): #是否禁用帳戶,flag=Flase表示激活 """ return bool """ dn = self.search_user_dn(user) if dn is None: print("search user dn is none") return False value = ['66050'] if flag else ['66048'] res = self.conn.modify(dn, {'userAccountControl': [(MODIFY_REPLACE, value)]}) print(self.conn.result) return res def mdf_uid(self, user, uid): # 修改指定用戶uid值 """ return bool """ dn = self.search_user_dn(user) if dn is None: print("search user dn is none") return False res = self.conn.modify(dn, {"uid": [(MODIFY_REPLACE, uid)]}) print(self.conn.result) return res def modify_passwd(self, user, new_pwd, old_pwd=None): # 修改用戶密碼 """ return bool old_pwd 爲None時,強制修改密碼,用戶密碼找回 """ dn = self.search_user_dn(user) if old_pwd is None: res = self.conn.extend.microsoft.modify_password(dn, new_pwd) else: res = self.conn.extend.microsoft.modify_password(dn, new_pwd, old_pwd) print(self.conn.result) return res def add_user(self, user, uid, employeeId, dn=None): #增長用戶 password = self._make_password() dn = "CN={},OU=User,OU=**,DC=**,DC=com".format(user) if dn is None else dn mail = user+'@**.com' attr = { "sAMAccountName": user, "mail": mail, "name": user, "department": "ccc", "uid": uid, "userAccountControl": 66048, "employeeID": employeeId } res = self.conn.add(dn=dn, object_class=['organizationalPerson', 'person', 'top', 'user'], attributes=attr) if not res: print(self.conn.result) return False, self.conn.result # 設置密碼 self.conn.extend.microsoft.modify_password(dn, password) return True, password def delete_user(self, user=None): # 刪除用戶 dn = self.search_user_dn(user) res = self.conn.delete(dn) if not res: print(self.conn.result) return False return True def group_add_user(self, user, dn): # 增長用戶到某個AD group下 cn = self.search_user_dn(user) res = ad_add_members_to_groups(self.conn, cn, dn) return res def group_remove_user(self, user, dn, fix=True): # 從某個組中移除用戶 cn = self.search_user_dn(user) return ad_remove_members_from_groups(self.conn, cn, dn, fix) if __name__ == "__main__": ad = Adoper() for i in ad.search(): print(i)