Python經過LDAP驗證、查找用戶(class,logging)

定義一個類,用於初始化ldap鏈接,驗證、查找用戶等功能html

# -*- coding: UTF-8 -*-
import sys 
reload(sys) 
sys.setdefaultencoding('utf-8') 

import ldap,logging,time
logfile = 'e:\\a.txt'
# logging.basicConfig(filename=logfile,level=logging.INFO)
# logging.basicConfig(format='%(time.asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
logging.basicConfig(level=logging.INFO,  
                    #format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', #返回值:Thu, 26 May 2016 15:09:31 t11.py[line:92] INFO 
                    format='%(asctime)s %(levelname)s %(message)s', 
                    #datefmt='%a, %d %b %Y %H:%M:%S',  
                    #datefmt='%Y/%m/%d %I:%M:%S %p', #返回2016/05/26 03:12:56 PM
                    datefmt='%Y-%m-%d %H:%M:%S', #返回2016/05/26 03:12:56 PM
                    filename=logfile#,  
                    #filemode='a' #默認爲a
                   ) 
#logging輸出結果:
#2016-05-26 15:22:29 INFO liu1 valid passed.
#2016-05-26 15:22:37 INFO liu1 valid passed.


class ldapc:
    def __init__(self,ldap_path,baseDN,domainname,ldap_authuser,ldap_authpass):
        self.baseDN = baseDN
        self.ldap_error = None
        ldap_authduser = '%s\%s' %(domainname,ldap_authuser)
        self.l=ldap.initialize(ldap_path)
        self.l.protocol_version = ldap.VERSION3
        try:
            self.l.simple_bind_s(ldap_authduser,ldap_authpass)
        except ldap.LDAPError,err:
            self.ldap_error = 'Connect to %s failed, Error:%s.' %(ldap_path,err.message['desc'])
            print self.ldap_error
        # finally:
        #     self.l.unbind_s()
        #     del self.l

    def search_users(self,username): #模糊查找,返回一個list,使用search_s()
        if self.ldap_error is None:
            try:
                searchScope = ldap.SCOPE_SUBTREE
                searchFiltername = "sAMAccountName" #經過samaccountname查找用戶
                retrieveAttributes = None
                searchFilter = '(' + searchFiltername + "=" + username +'*)'
                ldap_result =self.l.search_s(self.baseDN, searchScope, searchFilter, retrieveAttributes)
                if len(ldap_result) == 0: #ldap_result is a list.
                    return "%s doesn't exist." %username
                else:
                    # result_type, result_data = self.l.result(ldap_result, 0)  
                    # return result_type, ldap_result
                    return ldap_result
            except ldap.LDAPError,err:
                return err

    def search_user(self,username): #精確查找,返回值爲list,使用search()
        if self.ldap_error is None:
            try:
                searchScope = ldap.SCOPE_SUBTREE
                searchFiltername = "sAMAccountName" #經過samaccountname查找用戶
                retrieveAttributes = None
                searchFilter = '(' + searchFiltername + "=" + username +')'
                ldap_result_id =self.l.search(self.baseDN, searchScope, searchFilter, retrieveAttributes)
                result_type, result_data = self.l.result(ldap_result_id, 0)
                if result_type == ldap.RES_SEARCH_ENTRY:
                    return result_data
                else:
                    return "%s doesn't exist." %username
            except ldap.LDAPError,err:
                return err

    def search_userDN(self,username): #精確查找,最後返回該用戶的DN值
        if self.ldap_error is None:
            try:
                searchScope = ldap.SCOPE_SUBTREE
                searchFiltername = "sAMAccountName" #經過samaccountname查找用戶
                retrieveAttributes = None
                searchFilter = '(' + searchFiltername + "=" + username +')'
                ldap_result_id =self.l.search(self.baseDN, searchScope, searchFilter, retrieveAttributes)
                result_type, result_data = self.l.result(ldap_result_id, 0)
                if result_type == ldap.RES_SEARCH_ENTRY:
                    return result_data[0][0] #list第一個值爲用戶的DN,第二個值是一個dict,包含了用戶屬性信息
                else:
                    return "%s doesn't exist." %username
            except ldap.LDAPError,err:
                return err

    def valid_user(self,username,userpassword): #驗證用戶密碼是否正確
        if self.ldap_error is None:
            target_user = self.search_userDN(username) #使用前面定義的search_userDN函數獲取用戶的DN
            if target_user.find("doesn't exist") == -1:
                try:
                    self.l.simple_bind_s(target_user,userpassword)
                    logging.info('%s valid passed.\r'%(username)) #logging會自動在每行log後面添加"\000"換行,windows下未自動換行
                    return True
                except ldap.LDAPError,err:
                    return err
            else:
                return target_user

    def update_pass(self,username,oldpassword,newpassword): #####未測試#########
        if self.ldap_error is None:
            target_user = self.search_userDN(username) 
            if target_user.find("doesn't exist") == -1:
                try:
                    self.l.simple_bind_s(target_user,oldpassword)
                    self.l.passwd_s(target_user,oldpassword,newpassword)
                    return 'Change password success.'
                except ldap.LDAPError,err:
                    return err
            else:
                return target_user



ldap_authuser='liu1'
ldap_authpass='pass'
domainname='uu'
ldappath='ldap://192.168.200.25:389'

baseDN='OU=優優,DC=uu,DC=yuu,DC=com' #ldap_authuser在鏈接到LDAP的時候不會用到baseDN,在驗證其餘用戶的時候才須要使用
username = 'liu1' #要查找/驗證的用戶
p=ldapc(ldappath,baseDN,domainname,ldap_authuser,ldap_authpass)
print p.valid_user('Lily','lpass') #調用valid_user()方法驗證用戶是否爲合法用戶

 

遍歷OU下的用戶函數:windows

def search_OU(self): #精確查找,最後返回該用戶的DN值
        if self.ldap_error is None:
            try:
                searchScope = ldap.SCOPE_SUBTREE
                #searchFiltername = "sAMAccountName" #經過samaccountname查找用戶
                retrieveAttributes = None
                searchFilter = '(&(objectClass=person))'
                ldap_result =self.l.search_s(self.baseDN, searchScope, searchFilter, retrieveAttributes)
                if ldap_result is not None:
                    udict = {}
                    usersinfor = []
                    for pinfor in ldap_result:
                    #pinfor是一個tuple,第一個元素是該用戶的CN,第二個元素是一個dict,包含有用戶的全部屬性
                        if pinfor[1]:
                            p=pinfor[1]
                            sAMAccountName = p['sAMAccountName'][0] #返回值是一個list
                            displayName = p['displayName'][0]
                            #若是用戶的某個屬性爲空,則dict中不會包含有相應的key
                            if 'department' in p:
                                department = p['department'][0]
                            else:
                                department = None
                            #print sAMAccountName,displayName,department
                            udict['sAMAccountName'] = sAMAccountName
                            udict['displayName'] = displayName
                            udict['department'] = department
                            usersinfor.append(udict)
                            # print udict
                    return usersinfor
            except ldap.LDAPError,err:
                return err
            finally:
                self.l.unbind_s()
                del self.l
 

 

baseDN='OU=Admin,DC=u,DC=y,DC=com' #須要遍歷的OU
p=ldapc(ldappath,baseDN,domainname,ldap_authuser,ldap_authpass)
users = p.search_OU()
print users[0]['department']cookie

 

#retrieveAttributes = None
searchFilter = '(&(objectClass=person))'
attrs = ['cn','uid','mail']
ldap_result =self.l.search_s(self.baseDN, searchScope, searchFilter, attrs) #只過濾attrs屬性,若是爲*,則過濾全部屬性

 

分頁返回LDAP查詢結果:app

import ldap  
from ldap.controls import SimplePagedResultsControl  
  
l = ldap.initialize('ldap://1.1.1.16')  
l.simple_bind_s('user', 'Password')  
baseDN=unicode('OU=集團,DC=uxin,DC=youxinpai,DC=com','utf8')
PAGE_SIZE = 500  #設置每頁返回的條數  
ATTRLIST = ['sAMAccountName','name', 'mail','department','title']  #設置返回的屬性值  
# ATTRLIST = None #設置爲None則返回用戶的全部屬性
searchFilter =  '(&(objectCategory=person)(objectClass=user)(!(userAccountControl:1.2.840.113556.1.4.803:=2)))' #查詢Enabled的用戶
pg_ctrl = SimplePagedResultsControl(True, size=PAGE_SIZE, cookie="")  
userdata = []  
  
while True:  
    msgid = l.search_ext(baseDN,ldap.SCOPE_SUBTREE, searchFilter, ATTRLIST, serverctrls=[pg_ctrl])  
    _a, res_data, _b, srv_ctrls = l.result3(msgid)  
    # print 'res_data', len(res_data) ,msgid
    userdata.extend(res_data)  
    cookie = srv_ctrls[0].cookie  
    if cookie:  
        pg_ctrl.cookie = cookie  
    else:  
        break  
  
print 'totalnum:', len(userdata)  
print userdata[0]

 

 

ObjectClass類型以下:詳細參考:http://www.cnblogs.com/dreamer-fish/p/5832735.html(LDAP查詢過濾語法)http://www.morgantechspace.com/2013/05/ldap-search-filter-examples.htmldom

objectClass: person
objectClass: organizationalPerson
objectClass: inetOrgPerson函數

相關文章
相關標籤/搜索