1.创建钉钉机器人,打开钉钉开发平台
添加应用能力--机器人
开发配置--权限管理--群管理--钉钉群基础信息读权限
应用发布--版本管理与发布--立即发布
记录机器人信息中的client-id和client-secret
2.自定义群添加钉钉机器人
成功添加后,记录webhook
3.stream模式服务端部署
win10电脑采用的python部署,打开stream mode--事件订阅--python,主要步骤如下:
下载dingtalk-stream-sdk-python,并保存到适当位置
创建 Python 应用
mkdir event_chat_update
cd event_chat_update
安装依赖
py -m pip install dingtalk-stream
开发事件订阅服务
在 event_chat_update 目录中,创建 event_handler.py 文件,文件内容如下:
相关代码如下:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import logging
import time
import dingtalk_stream
def define_options():
parser = argparse.ArgumentParser()
parser.add_argument(
'--client_id', dest='client_id', required=True,
help='app_key or suite_key from https://open-dev.digntalk.com'
)
parser.add_argument(
'--client_secret', dest='client_secret', required=True,
help='app_secret or suite_secret from https://open-dev.digntalk.com'
)
options = parser.parse_args()
return options
class MyEventHandler(dingtalk_stream.EventHandler):
async def process(self, event: dingtalk_stream.EventMessage):
if event.headers.event_type != 'chat_update_title':
# ignore events not equals chat_update_title; 忽略`chat_update_title`之外的其他事件;
# 该示例仅演示 chat_update_title 类型的事件订阅;
return dingtalk_stream.AckMessage.STATUS_OK, 'OK'
self.logger.info(
'received event, delay=%sms, eventType=%s, eventId=%s, eventBornTime=%d, eventCorpId=%s, '
'eventUnifiedAppId=%s, data=%s',
int(time.time() * 1000) - event.headers.event_born_time,
event.headers.event_type,
event.headers.event_id,
event.headers.event_born_time,
event.headers.event_corp_id,
event.headers.event_unified_app_id,
event.data)
# put your code here; 可以在这里添加你的业务代码,处理事件订阅的业务逻辑;
return dingtalk_stream.AckMessage.STATUS_OK, 'OK'
def main():
options = define_options()
credential = dingtalk_stream.Credential(options.client_id, options.client_secret)
client = dingtalk_stream.DingTalkStreamClient(credential)
client.register_all_event_handler(MyEventHandler())
client.start_forever()
if name == '__main__':
main()
运行事件订阅服务
通过以下命令可以运行你的机器人服务,当看到这样的日志输出时候表示运行成功 INFO endpoint is {'endpoint': 'wss://wss-....
py event_handler.py --client_id="your-client-id" --client_secret="your-client-secret"
至此,stream模式服务端部署完成!
此外,win10电脑需要放行对应的端口,因为Flask服务需要公网访问,我采用的是动态ddns。
4.win10与钉钉api成功连接后,结合自定义脚本(webhook_service.py),进行相关测试。经过不知多少次的尝试,最终代码如下:
import subprocess
import os
import requests # 新增导入requests库
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/webhook', methods=['POST'])
def webhook():
try:
# 获取并打印完整的JSON数据,便于调试
data = request.get_json()
print(f"Received data: {data}")
# 获取钉钉的 sessionWebhook URL
session_webhook = data.get('sessionWebhook')
# 确保数据包含 'text' 和 'content' 字段
if 'text' in data and 'content' in data['text']:
command = data['text']['content'].strip() # 去除前后空格
print(f"Command received: {command}")
# 针对 Notepad 命令特殊处理,使用 os.system
if command.lower() == "notepad":
try:
os.system("start notepad") # 直接打开Notepad
response_message = "Notepad opened"
except Exception as e:
response_message = f"Failed to open Notepad: {str(e)}"
else:
# 其他命令使用 subprocess 执行并返回输出结果
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True)
output = result.stdout if result.stdout else result.stderr
print(f"Command output: {output}") # 输出日志以便调试
response_message = f"Command executed: {output}"
except Exception as e:
response_message = f"Command failed: {str(e)}"
# 将消息发送回钉钉群
if session_webhook:
response_data = {
"msgtype": "text",
"text": {
"content": response_message
}
}
headers = {'Content-Type': 'application/json'}
response = requests.post(session_webhook, json=response_data, headers=headers)
print(f"Response to DingTalk: {response.text}")
return jsonify({'msg': 'Command processed', 'status': 'success'}), 200
else:
print("Invalid data format")
return jsonify({'msg': 'Invalid data format', 'status': 'error'}), 400
except Exception as e:
print(f"Error: {str(e)}")
return jsonify({'msg': 'Error occurred', 'status': 'error', 'error': str(e)}), 500
if name == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)