赞
踩
基于 PC 企业微信的 api 接口,支持收发文本、群@、名片、图片、文件、视频、链接卡片等。
pip install ntwork
国内源安装
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple ntwork
官方下载:https://dldir1.qq.com/wework/work_weixin/WeCom_4.0.8.6027.exe
import sys import ntwork wework = ntwork.WeWork() # 打开pc企业微信, smart: 是否管理已经登录的企业微信`在这里插入代码片` wework.open(smart=True) # 等待登录 wework.wait_login() # 向文件助手发送一条消息 wework.send_text(conversation_id="FILEASSIST", content="hello, NtWork") try: while True: pass except KeyboardInterrupt: ntwork.exit_() sys.exit()
# -*- coding: utf-8 -*- import sys import ntwork wework = ntwork.WeWork() # 打开pc企业微信, smart: 是否管理已经登录的微信 wework.open(smart=True) # 等待登录 wework.wait_login() # 获取内部(同事)列表并输出 contacts = wework.get_inner_contacts() print("内部(同事)联系人列表: ") print(contacts) # 获取外部(客户)列表并输出 contacts = wework.get_external_contacts() print("外部(客户)联系人列表: ") print(contacts) # 获取群列表并输出 rooms = wework.get_rooms() print("群列表: ") print(rooms) try: while True: pass except KeyboardInterrupt: ntwork.exit_() sys.exit()
import sys import ntwork wework = ntwork.WeWork() # 打开pc企业微信, smart: 是否管理已经登录的微信 wework.open(smart=True) # 注册消息回调 @wework.msg_register(ntwork.MT_RECV_TEXT_MSG) def on_recv_text_msg(wework_instance: ntwork.WeWork, message): data = message["data"] sender_user_id = data["sender"] self_user_id = wework_instance.get_login_info()["user_id"] conversation_id: str = data["conversation_id"] # 判断消息不是自己发的并且不是群消息时,回复对方 if sender_user_id != self_user_id and not conversation_id.startswith("R:"): wework_instance.send_text(conversation_id=conversation_id, content=f"你发送的消息是: {data['content']}") try: while True: pass except KeyboardInterrupt: ntwork.exit_() sys.exit()
通过 fastapi 的 swagger 在线文档可以很方便的管理 ntwork 接口:案例
如果出现ntwork.exception.WeWorkVersionNotMatchError异常
异常, 请确认是否安装github上指定的企业微信版本,如果确认已经安装,还是报错,可以在代码中添加以下代码,跳过微信版本检测
import ntwork
ntwork.set_wework_exe_path(wework_version='4.0.8.6027')
新建一些 ntwork.wework 实例,然后调用 open 方法。
import ntwork
# 多开3个微信
for i in range(3):
wework = ntwork.WeWork()
wework.open(smart=False)
更完善的多实例管理查看 fastapi_example例子
# 注册监听所有消息回调
import ntwork
@wework.msg_register(ntwork.MT_ALL)
def on_recv_text_msg(wework_instance: ntwork.WeWork, message):
print("########################")
print(message)
完全例子查看examples/msg_register_all.py
os.environ['NTWORK_LOG'] = "ERROR"
要在import ntwork
前执行
# -*- coding: utf-8 -*-
import os
import sys
import time
os.environ['NTWORK_LOG'] = "ERROR"
import ntwork
先使用pip install pywin32
安装pywin32模块, 然后在代码中添加以下代码, 完整例子查看examples/cmd_close_event.py
import sys
import ntwork
import win32api
def on_exit(sig, func=None):
ntwork.exit_()
sys.exit()
# 当关闭cmd窗口时
win32api.SetConsoleCtrlHandler(on_exit, True)
使用 pyinstaller 打包 ntwork 项目,需要添加 --collect-data=ntwork
选项
打包成单个 exe 程序
pyinstaller -F --collect-data=ntwork main.py
将所有的依赖文件打包到一个目录中
pyinstaller -y --collect-data=ntwork main.py
打包 fastapi_example 示例,需要添加 --paths=. --collect-data=ntchat
pyinstaller -F --paths=. --collect-data=ntchat main.py
2024-04-12 00:45:21,949 - WeWorkManager - INFO - initialize wework, version: 4.0.8.6027
2024-04-12 00:45:21,949 - WeWorkManager - DEBUG - new wework instance
2024-04-12 00:45:21,982 - WeWorkInstance - INFO - open wework pid: 57024
2024-04-12 00:45:23,671 - WeWorkManager - DEBUG - accept client_id: 1
2024-04-12 00:45:26,225 - WeWorkInstance - DEBUG - on recv message: {'data': {'account': '', 'acctid': 'WangPaiGeGe', 'avatar': 'https://wework.qpic.cn/wwpic3az/457956_UJkq5TJkTfCnA5x_1712341231/0', 'corp_id': '1970325052533916', 'document_root': 'C:\\Users\\Administrator\\Documents\\WXWork\\1688856738534700', 'email': '', 'job_name': '', 'mobile': '19561983930', 'nickname': '', 'pid': 57024, 'position': '', 'sex': 1, 'user_id': '1688856738534700', 'username': '王牌哥哥'}, 'type': 11026}
2024-04-12 00:45:26,241 - WeWorkInstance - INFO - login success, name: 王牌哥哥
私聊自动回复案例
import sys import ntwork wework = ntwork.WeWork() # 打开 pc 企业微信, smart: 是否管理已经登录的微信 wework.open(smart=True) # 注册消息回调 @wework.msg_register(ntwork.MT_RECV_TEXT_MSG) def on_recv_text_msg(wework_instance: ntwork.WeWork, message): data = message["data"] sender_user_id = data["sender"] self_user_id = wework_instance.get_login_info()["user_id"] conversation_id: str = data["conversation_id"] # 判断消息不是自己发的并且不是群消息时,回复对方 if sender_user_id != self_user_id and not conversation_id.startswith("R:"): wework_instance.send_text(conversation_id=conversation_id, content=f"你发送的消息是: {data['content']}") try: while True: pass except KeyboardInterrupt: ntwork.exit_() sys.exit()
{
'appinfo': 'CAQQiZrgsAYYpMv0mZmAgAMgyIKKUw==',
'at_list': [],
'content': '我喜欢你',
'content_type': 0,
'conversation_id': 'S:1688856625489316_1688856738534700',
'is_pc': 0,
'local_id': '34',
'receiver': '1688856738534700',
'send_time': '1712852233',
'sender': '1688856625489316',
'sender_name': '唤醒手腕',
'server_id': '1000235'
}
{
'appinfo': 'CAQQiKTgsAYYpMv0mZmAgAMg99ak/Ak=',
'at_list': [],
'content': '我喜欢你',
'content_type': 0,
'conversation_id': 'R:1970325052533916',
'is_pc': 0,
'local_id': '41',
'receiver': '1688856738534700',
'send_time': '1712853512',
'sender': '1688856625489316',
'sender_name': '唤醒手腕',
'server_id': '1000258'
}
import re # 假设这是你的消息体字符串 test_message = """ 【故障位置】化州西收费站97车道 【故障情况】不能缴费 【填报人】唤醒手腕 """ def analysis_repair_data(message): # 定义正则表达式模式,用于匹配每个字段 pattern = r"【故障位置】(.*)\n【故障情况】(.*)\n【填报人】(.*)" # 使用re.search来查找匹配项 match = re.search(pattern, message, re.DOTALL) if match: fault_location = match.group(1).strip() # 提取故障位置 fault_situation = match.group(2).strip() # 提取故障情况 reporter = match.group(3).strip() # 提取填报人 return {"fault_location": fault_location, "fault_situation": fault_situation, "reporter": reporter } else: return False
import re # 假设这是你的消息体字符串 test_message = """ 【故障编号】 【维修情况】已修复 【维修人】xxx """ def analysis_solve_data(message): # 定义正则表达式模式,用于匹配每个字段 pattern = r""" 【故障编号】(.*) 【维修情况】(.*) 【维修人】(.*) """ pattern = re.compile(pattern, re.DOTALL | re.VERBOSE) # 使用re.search来查找匹配项 match = pattern.search(message) if match: serial = match.group(1).strip() # 提取故障编号 repair_situation = match.group(2).strip() # 提取维修情况 repair_person = match.group(3).strip() # 提取维修人 return {"serial": serial, "repair_situation": repair_situation, "repair_person": repair_person } else: return False
from datetime import datetime from openpyxl.reader.excel import load_workbook # 加载工作簿 wb = load_workbook(filename='故障维修登记表.xlsx') # 选择活动工作表或者通过名字选择工作表 ws = wb.active # 使用活动工作表 def insert_new_note(repair_data): # 获取行数 row_count = ws.max_row now = datetime.now() # 使用f-string格式化时间 formatted_time = f"{now:%Y%m%d-%H%M%S}" data = { "index": row_count, "serial": formatted_time, "fault_location": repair_data['fault_location'], "fault_situation": repair_data['fault_situation'], "reporter": repair_data['reporter'], "repair_situation": "", "repair_person": "" } row_num = row_count + 1 ws.cell(row=row_num, column=1).value = data['index'] ws.cell(row=row_num, column=2).value = data['serial'] ws.cell(row=row_num, column=3).value = data['fault_location'] ws.cell(row=row_num, column=4).value = data['fault_situation'] ws.cell(row=row_num, column=5).value = data['reporter'] ws.cell(row=row_num, column=6).value = data['repair_situation'] ws.cell(row=row_num, column=7).value = data['repair_person'] # 保存工作簿到文件 wb.save("故障维修登记表.xlsx") return data['serial'] def finish_old_note(solve_data): target_name = solve_data['serial'] # 遍历所有行,查找目标值 state = False for index, row in enumerate(ws.iter_rows(values_only=True)): if row[1] == "编号": continue if row[1] == target_name: # 假设"Name"在第一列 # 找到目标行后,修改该行的数据 # 这里假设我们要修改的是第二列的数据,将其改为"New Value" ws.cell(row=(index + 1), column=6).value = solve_data['repair_situation'] ws.cell(row=(index + 1), column=7).value = solve_data['repair_person'] state = True wb.save("故障维修登记表.xlsx") break # 找到后就退出循环 if state: return solve_data['serial'] else: return False
# -*- coding: utf-8 -*- import sys from datetime import datetime import ntwork from read_repair_data import analysis_repair_data from read_solve_data import analysis_solve_data from update_excel import insert_new_note, finish_old_note wework = ntwork.WeWork() # 打开 pc 企业微信, smart: 是否管理已经登录的微信 wework.open(smart=True) # 注册消息回调 @wework.msg_register(ntwork.MT_RECV_TEXT_MSG) def on_recv_text_msg(wework_instance: ntwork.WeWork, message): data = message["data"] if data['content'][0:1] != "【": return sender_user_id = data["sender"] self_user_id = wework_instance.get_login_info()["user_id"] conversation_id: str = data["conversation_id"] # 判断消息不是自己发的并且不是群消息时,回复对方 if sender_user_id != self_user_id and conversation_id.startswith("R:"): insert_data = analysis_repair_data(data['content']) if insert_data: serial = insert_new_note(insert_data) now = datetime.now() # 使用f-string格式化时间 formatted_time = f"{now:%Y-%m-%d %H:%M:%S}" print(f"【反馈结果】【{formatted_time}】请注意存在新的反馈:{serial}") wework_instance.send_text(conversation_id=conversation_id, content=f"【反馈结果】反馈成功\n【故障编号】{serial}") insert_data = analysis_solve_data(data['content']) if insert_data: serial = finish_old_note(insert_data) if serial: # 使用f-string格式化时间 now = datetime.now() formatted_time = f"{now:%Y-%m-%d %H:%M:%S}" print(f"【维修结果】【{formatted_time}】请注意存在新的维修:{serial}") wework_instance.send_text(conversation_id=conversation_id, content=f"【维修结果】维修成功\n【故障编号】{str(serial)}") else: wework_instance.send_text(conversation_id=conversation_id, content=f"【操作警告】故障编号不存在!") try: while True: pass except KeyboardInterrupt: ntwork.exit_() sys.exit()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。