Python中如何实现同步企业微信和AD组织架构的脚本分享

https://github.com/huangchaosysu/optools/tree/master/sync_wxwork_2_ad

支持双向同步和纯检查两种模式

以企业的组织结构为主


Python中如何实现同步企业微信和AD组织架构的脚本分享
4 回复

你的 token 不怕泄漏吗


我最近正好搞过一个同步AD组织架构到企业微信的脚本,核心思路就是通过LDAP协议读取AD数据,然后调用企业微信的API进行同步。

import ldap
import requests
import json
from typing import List, Dict

class ADToWeChatSync:
    def __init__(self, ad_server: str, ad_user: str, ad_password: str, 
                 corp_id: str, corp_secret: str):
        # AD连接配置
        self.ad_conn = ldap.initialize(f'ldap://{ad_server}')
        self.ad_conn.simple_bind_s(ad_user, ad_password)
        
        # 企业微信配置
        self.corp_id = corp_id
        self.corp_secret = corp_secret
        self.access_token = self._get_wechat_token()
    
    def _get_wechat_token(self) -> str:
        """获取企业微信access_token"""
        url = f'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
        params = {
            'corpid': self.corp_id,
            'corpsecret': self.corp_secret
        }
        resp = requests.get(url, params=params).json()
        return resp['access_token']
    
    def fetch_ad_users(self, base_dn: str, filter_str: str = '(objectClass=user)') -> List[Dict]:
        """从AD获取用户信息"""
        users = []
        search_scope = ldap.SCOPE_SUBTREE
        attrs = ['sAMAccountName', 'displayName', 'mail', 'department', 'title']
        
        results = self.ad_conn.search_s(base_dn, search_scope, filter_str, attrs)
        
        for dn, entry in results:
            if entry:
                user = {
                    'userid': entry.get('sAMAccountName', [b''])[0].decode('utf-8'),
                    'name': entry.get('displayName', [b''])[0].decode('utf-8'),
                    'email': entry.get('mail', [b''])[0].decode('utf-8'),
                    'department': [int(entry.get('department', [b'1'])[0].decode('utf-8'))],
                    'position': entry.get('title', [b''])[0].decode('utf-8')
                }
                users.append(user)
        return users
    
    def sync_user_to_wechat(self, user_data: Dict) -> bool:
        """同步单个用户到企业微信"""
        url = f'https://qyapi.weixin.qq.com/cgi-bin/user/create'
        params = {'access_token': self.access_token}
        
        # 企业微信要求的字段
        wechat_user = {
            'userid': user_data['userid'],
            'name': user_data['name'],
            'department': user_data['department'],
            'position': user_data['position'],
            'email': user_data['email'],
            'mobile': '',  # 根据实际情况填充
            'enable': 1
        }
        
        resp = requests.post(url, params=params, json=wechat_user).json()
        return resp['errcode'] == 0
    
    def batch_sync(self, ad_base_dn: str):
        """批量同步所有用户"""
        users = self.fetch_ad_users(ad_base_dn)
        success_count = 0
        
        for user in users:
            if self.sync_user_to_wechat(user):
                success_count += 1
                print(f"成功同步用户: {user['name']}")
            else:
                print(f"同步失败: {user['name']}")
        
        print(f"同步完成,成功{success_count}个,失败{len(users)-success_count}个")

# 使用示例
if __name__ == '__main__':
    # 初始化同步器
    sync = ADToWeChatSync(
        ad_server='ad.yourcompany.com',
        ad_user='admin@yourcompany.com',
        ad_password='your_password',
        corp_id='your_corp_id',
        corp_secret='your_corp_secret'
    )
    
    # 执行同步
    sync.batch_sync('DC=yourcompany,DC=com')

这个脚本的核心逻辑:

  1. 通过python-ldap库连接AD服务器,查询用户信息
  2. 使用企业微信的/cgi-bin/user/create接口创建用户
  3. 字段映射:AD的sAMAccountName映射为企业微信的useriddisplayName映射为name

需要注意的几个关键点:

  • AD查询需要正确的base_dn和过滤条件
  • 企业微信的department需要是整数数组
  • 建议先同步部门结构,再同步用户

实际使用中你可能还需要处理部门同步、用户更新(使用update接口)、错误重试等逻辑。根据你的AD结构调整查询过滤条件。

总结:核心就是LDAP查询+企业微信API调用。

内网的测试机器

老哥牛逼,请问下支持增量同步吗

回到顶部