本文主要介绍LDAP SDK(python),对ldap正删改查操作。

LDAP一般公司都会用到,用来做统一用户管理,虽然有专门的管理软件,但是一般企业都有自己的管理平台,不如把他集成进自己的系统里。我自己把他集成在运维平台里,打通了堡垒机、VPN、等各种支持ldap的系统,用户在运维平台修改,其他地方都会生效。这里只是提炼出来的一个SDK,感谢jumpserver,对其代码修改了下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
from ldap3 import Server, Connection
from passlib.hash import ldap_salted_sha1 as ssha
from ldap3 import MODIFY_REPLACE


def pass_encrypt(passwd):
return ssha.encrypt(passwd, salt_size=16)


class LDAPUtil:

def __init__(self, use_settings_config=True, server_uri=None, bind_dn=None,
password=None, use_ssl=None, search_ougroup=None,
search_filter=None, attr_map=None, auth_ldap=None):

# config
if use_settings_config:
self._load_config_from_settings()
else:
self.server_uri = server_uri
self.bind_dn = bind_dn
self.password = password
self.use_ssl = use_ssl
self.search_ougroup = search_ougroup
self.search_filter = search_filter
self.attr_map = attr_map
self.auth_ldap = auth_ldap
self.conn = self.get_connection()

def _load_config_from_settings(self):
self.server_uri = AUTH_LDAP_SERVER_URI
self.bind_dn = AUTH_LDAP_BIND_DN
self.password = AUTH_LDAP_BIND_PASSWORD
self.use_ssl = AUTH_LDAP_START_TLS
self.search_ougroup = AUTH_LDAP_SEARCH_OU
self.search_filter = AUTH_LDAP_SEARCH_FILTER
self.attr_map = AUTH_LDAP_USER_ATTR_MAP
self.auth_ldap = AUTH_LDAP

def get_user_by_username(self, username):
"""
通过用户名获取ldap用户,
:param username: 用户名
:return: dict {}
"""
conn = self.conn
search_ougroup = str(self.search_ougroup).split("|")
for search_ou in search_ougroup:
ok = conn.search(
search_ou, self.search_filter % ({"user": username}),
attributes=list(self.attr_map.values())
)
if not ok:
error = "Search no entry matched in ou {}".format(search_ou)
return False, error
return True, self.entry_to_dict(conn.entries[0])

def entry_to_dict(self, entry):
"""
entry 转 dict对象
:param entry: ldap entry对象
:return:
"""
user_item = {}
for attr, mapping in self.attr_map.items():
user_item[mapping] = getattr(entry, mapping).value or ''
return user_item

def user_dict_to_entry_dict(self, user_dict):
"""
用户对象dict转成entry对象dict
:param user_dict: 用户对象
:return:
"""
user_item = {}
for attr, mapping in self.attr_map.items():
value = user_dict.get(attr, '')
if value == True or value == False or value == 'true' or value == 'false':
value = str(value).capitalize()
user_item[mapping] = value
if not user_item.get('uid'):
user_item['uid'] = user_item['cn']
user_item['sn'] = user_item['cn']
if not user_item.get('cn'):
user_item['cn'] = user_item['uid']
user_item['sn'] = user_item['uid']
if user_item.get('userPassword'):
user_item['userPassword'] = pass_encrypt(user_item['userPassword'])
return user_item

def update_user(self, username, attr):
"""
更新dn的entry属性
:param username: ldap username
:param attr: ldap 用户 dict
:return:
"""
changes_dic = {}
dn = "{},{}".format(str(self.search_filter % ({"user": username})).strip('()'), self.search_ougroup)
for k, v in attr.items():
if not self.conn.compare(dn=dn, attribute=k, value=v):
# password hash will be changed all time
if k == 'userPassword':
continue
changes_dic.update({k: [(MODIFY_REPLACE, [v])]})
if changes_dic:
self.conn.modify(dn=dn, changes=changes_dic)
return True, self.conn.result['description']
else:
return False, "no changed"

def update_user_password(self, username, passwd):
"""
更新用户密码
:param dn: ldap username
:param passwd: 密码
:return: bool, msg
"""
dn = "{},{}".format(str(self.search_filter % ({"user": username})).strip('()'), self.search_ougroup)
changes_dic = {'userPassword': [(MODIFY_REPLACE, [pass_encrypt(passwd)])]}
self.conn.modify(dn=dn, changes=changes_dic)
return self.conn.result

def compare_attr(self, dn, attr, value):
"""
比较员工指定的某个属性
:param dn:
:param attr:
:param value:
:return:
"""
res = self.conn.compare(dn=dn, attribute=attr, value=value)
return res

def create_user(self, user_item):
"""
创建用户
:param user_item: 用户字典
"""
if user_item.get('uid') or user_item.get('cn'):
cn_user = "uid={},{}".format(user_item.get('uid'), self.search_ougroup)
ok = self.conn.add(
cn_user, ['inetOrgPerson', 'top', 'extensibleObject'], user_item
)
if not ok:
error = "Add entry {} error: {}".format(cn_user, self.conn.last_error)
return False, error
return True, ""
error = "Add entry {} error: User entry must have uid/cn.".format(user_item)
return False, error

@staticmethod
def get_or_construct_email(user_item):
if not user_item.get('email', None):
if '@' in user_item['username']:
email = user_item['username']
else:
email = '{}@{}'.format(
user_item['username'], EMAIL_SUFFIX)
else:
email = user_item['email']
return email

def create_or_update_users(self, user_items):
"""
创建或更新用户list
:param user_items: 用户字典
:return:
"""
succeed = failed = 0
failed_list = []
succeed_list = []
for user_item in user_items:
username = user_item.get('uid') or user_item.get('cn')
if username:
exist, data = self.get_user_by_username(username)
if exist:
ok, error = self.update_user(username, user_item)
else:
ok, error = self.create_user(user_item)
if not ok:
failed += 1
failed_list.append(username)
else:
succeed += 1
succeed_list.append(username)
else:
pass
result = {'total': len(user_items), 'succeed': succeed, 'failed': failed, 'failed_list': failed_list, 'succeed_list': succeed_list}
return result

def _ldap_entry_to_user_item(self, entry):
user_item = {}
for attr, mapping in self.attr_map.items():
if not hasattr(entry, mapping):
continue
user_item[attr] = getattr(entry, mapping).value or ''
return user_item

def get_connection(self):
server = Server(self.server_uri, use_ssl=self.use_ssl)
conn = Connection(server, self.bind_dn, self.password, read_only=False)
conn.bind()
return conn

def get_search_user_items(self):
conn = self.get_connection()
user_items = []
search_ougroup = str(self.search_ougroup).split("|")
for search_ou in search_ougroup:
ok = conn.search(
search_ou, self.search_filter % ({"user": "*"}),
attributes=list(self.attr_map.values())
)
if not ok:
error = "Search no entry matched in ou {}".format(search_ou)
raise Exception(error)

for entry in conn.entries:
user_item = self._ldap_entry_to_user_item(entry)
user = self.get_user_by_username(user_item['username'])
user_item['existing'] = bool(user)
user_items.append(user_item)

return user_items

def del_user(self, username):
dn = "uid={},{}".format(username, self.search_ougroup)
ok = self.conn.delete(dn)
if ok:
return True, ''
return False, self.conn.last_error


if __name__ == '__main__':
AUTH_LDAP = False
AUTH_LDAP_SERVER_URI = 'ldap://oneops.top:389'
AUTH_LDAP_BIND_DN = 'cn=Manager,dc=oneops,dc=com'
AUTH_LDAP_BIND_PASSWORD = 'xxxx'
AUTH_LDAP_SEARCH_OU = 'ou=People,dc=oneops,dc=com'
AUTH_LDAP_SEARCH_FILTER = '(uid=%(user)s)'
AUTH_LDAP_START_TLS = False
AUTH_LDAP_USER_ATTR_MAP = {"username": "uid", "name": "displayName",
"email": "mail", "phone": "mobile",
"is_active": "olcAllows", "password": "userPassword"}
EMAIL_SUFFIX = "oneops.top"
ldap_tool = LDAPUtil()
# print(ldap_tool.get_user_by_username(username='xx'))
print(ldap_tool.get_search_user_items())
entry_dict = ldap_tool.user_dict_to_entry_dict({'username': 'xiaomao', 'name': '薛', 'email': 'xiaomao@oneops.top', 'phone': '136xx', 'is_active': 'true'})
# print(entry_dict)
# print(ldap_tool.create_user(entry_dict))

# dn = "uid=test,ou=People,dc=kingxunlian,dc=com"
# print(ldap_tool.update_user('xxx', entry_dict))
# print(ldap_tool.del_user('test'))