ldap3 操做 AD 域帳號

# coding=utf-8

from ldap3 import Server, Connection, ALL, MODIFY_REPLACE
from random import choice
import string
from ldap3.extend.microsoft.addMembersToGroups import ad_add_members_to_groups
from ldap3.extend.microsoft.removeMembersFromGroups import ad_remove_members_from_groups

class Adoper(object):
    def __init__(self):
        self.host = "****"    # ad host
        self.user = '****'     # 管理員
        self.password = '****' # 密碼 
        self.server, self.conn = self._connect()

    def _connect(self):   #鏈接AD
        server = Server(self.host, use_ssl=True, get_info=ALL)
        conn = Connection(server, user=self.user, password=self.password, auto_bind=True)
        return server, conn

    @staticmethod
    def _make_password():        #生成隨機密碼
        special_char = '&!@#^*-='
        length = 8
        chars = string.ascii_letters + string.digits + special_char
        password = ''.join([choice(chars) for i in range(length)])
        return password

    def search(self, base_dn=None):  #獲取全部AD帳戶
        attr_list = ['distinguishedName', 'cn', 'uid', 'displayName', 'mail', 'userAccountControl', 'sAMAccountName', 'pwdLastSet', 'department', 'employeeID']
        res = self.conn.search(base_dn, search_filter='(objectclass=user)', attributes=attr_list, paged_size=10000)
        if not res:
            return None
        for user in self.conn.entries:
            user_infos = {}
            if user["userAccountControl"] == 66050:  #禁用帳戶不顯示
                continue
            user_infos["email"] = str(user["mail"])
            user_infos["name"] = str(user["SAMAccountName"])
            user_infos["department"] = str(user["department"])
            user_infos["dn"] = str(user["distinguishedName"])
            user_infos["employeeId"] = str(user["employeeID"]) if user["employeeID"] else ""
            yield user_infos

    def search_user_dn(self, user):  #獲取指定用戶CN
        """
            return str
        """
        dn = None
        for i in self.search():
            if i["name"] == user:
                dn = i["dn"]
                break
        return dn

    def is_forbidden(self, user, flag=True):  #是否禁用帳戶,flag=Flase表示激活
        """
        return bool
        """
        dn = self.search_user_dn(user)
        if dn is None:
            print("search user dn is none")
            return False
        value = ['66050'] if flag else ['66048']
        res = self.conn.modify(dn, {'userAccountControl': [(MODIFY_REPLACE, value)]})
        print(self.conn.result)
        return res

    def mdf_uid(self, user, uid):  # 修改指定用戶uid值
        """
        return bool
        """
        dn = self.search_user_dn(user)
        if dn is None:
            print("search user dn is none")
            return False
        res = self.conn.modify(dn, {"uid": [(MODIFY_REPLACE, uid)]})
        print(self.conn.result)
        return res

    def modify_passwd(self, user, new_pwd, old_pwd=None): # 修改用戶密碼
        """
            return bool
            old_pwd 爲None時,強制修改密碼,用戶密碼找回
        """
        dn = self.search_user_dn(user)
        if old_pwd is None:
            res = self.conn.extend.microsoft.modify_password(dn, new_pwd)
        else:
            res = self.conn.extend.microsoft.modify_password(dn, new_pwd, old_pwd)
        print(self.conn.result)
        return res

    def add_user(self, user, uid, employeeId, dn=None): #增長用戶
        password = self._make_password()
        dn = "CN={},OU=User,OU=**,DC=**,DC=com".format(user) if dn is None else dn
        mail = user+'@**.com'
        attr = {
            "sAMAccountName": user,
            "mail": mail,
            "name": user,
            "department": "ccc",
            "uid": uid,
            "userAccountControl": 66048,
            "employeeID": employeeId
        }
        res = self.conn.add(dn=dn, object_class=['organizationalPerson', 'person', 'top', 'user'], attributes=attr)
        if not res:
            print(self.conn.result)
            return False, self.conn.result
        # 設置密碼
        self.conn.extend.microsoft.modify_password(dn, password)
        return True, password

    def delete_user(self, user=None): # 刪除用戶
        dn = self.search_user_dn(user)
        res = self.conn.delete(dn)
        if not res:
            print(self.conn.result)
            return False
        return True

    def group_add_user(self, user, dn):  # 增長用戶到某個AD group下
        cn = self.search_user_dn(user)
        res = ad_add_members_to_groups(self.conn, cn, dn)
        return res

    def group_remove_user(self, user, dn, fix=True): # 從某個組中移除用戶
        cn = self.search_user_dn(user)
        return ad_remove_members_from_groups(self.conn, cn, dn, fix)

if __name__ == "__main__":
    ad = Adoper()
    for i in ad.search():
        print(i)
相關文章
相關標籤/搜索