Administrator
发布于 2024-09-24 / 34 阅读
0

PVE-win10-webhook-钉钉

1.创建钉钉机器人,打开钉钉开发平台

添加应用能力--机器人

开发配置--权限管理--群管理--钉钉群基础信息读权限

应用发布--版本管理与发布--立即发布

记录机器人信息中的client-id和client-secret

2.自定义群添加钉钉机器人

成功添加后,记录webhook

3.stream模式服务端部署

win10电脑采用的python部署,打开stream mode--事件订阅--python,主要步骤如下:

下载dingtalk-stream-sdk-python并保存到适当位置

创建 Python 应用

mkdir event_chat_update
cd event_chat_update

安装依赖

py -m pip install dingtalk-stream

开发事件订阅服务

在 event_chat_update 目录中,创建 event_handler.py 文件,文件内容如下:

相关代码如下:

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

import argparse

import logging

import time

import dingtalk_stream

def define_options():

    parser = argparse.ArgumentParser()

    parser.add_argument(

        '--client_id', dest='client_id', required=True,

        help='app_key or suite_key from https://open-dev.digntalk.com'

    )

    parser.add_argument(

        '--client_secret', dest='client_secret', required=True,

        help='app_secret or suite_secret from https://open-dev.digntalk.com'

    )

    options = parser.parse_args()

    return options

class MyEventHandler(dingtalk_stream.EventHandler):

    async def process(self, event: dingtalk_stream.EventMessage):

        if event.headers.event_type != 'chat_update_title':

            # ignore events not equals chat_update_title; 忽略`chat_update_title`之外的其他事件;

            # 该示例仅演示 chat_update_title 类型的事件订阅;

            return dingtalk_stream.AckMessage.STATUS_OK, 'OK'

        self.logger.info(

            'received event, delay=%sms, eventType=%s, eventId=%s, eventBornTime=%d, eventCorpId=%s, '

            'eventUnifiedAppId=%s, data=%s',

            int(time.time() * 1000) - event.headers.event_born_time,

            event.headers.event_type,

            event.headers.event_id,

            event.headers.event_born_time,

            event.headers.event_corp_id,

            event.headers.event_unified_app_id,

            event.data)

        # put your code here; 可以在这里添加你的业务代码,处理事件订阅的业务逻辑;

        return dingtalk_stream.AckMessage.STATUS_OK, 'OK'

def main():

    options = define_options()

    credential = dingtalk_stream.Credential(options.client_id, options.client_secret)

    client = dingtalk_stream.DingTalkStreamClient(credential)

    client.register_all_event_handler(MyEventHandler())

    client.start_forever()

if name == '__main__':

    main()

运行事件订阅服务

通过以下命令可以运行你的机器人服务,当看到这样的日志输出时候表示运行成功 INFO endpoint is {'endpoint': 'wss://wss-....

py event_handler.py --client_id="your-client-id" --client_secret="your-client-secret"

至此,stream模式服务端部署完成!

此外,win10电脑需要放行对应的端口,因为Flask服务需要公网访问,我采用的是动态ddns。

4.win10与钉钉api成功连接后,结合自定义脚本(webhook_service.py),进行相关测试。经过不知多少次的尝试,最终代码如下:

import subprocess

import os

import requests  # 新增导入requests库

from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/webhook', methods=['POST'])

def webhook():

    try:

        # 获取并打印完整的JSON数据,便于调试

        data = request.get_json()

        print(f"Received data: {data}")

        

        # 获取钉钉的 sessionWebhook URL

        session_webhook = data.get('sessionWebhook')

        

        # 确保数据包含 'text' 和 'content' 字段

        if 'text' in data and 'content' in data['text']:

            command = data['text']['content'].strip()  # 去除前后空格

            print(f"Command received: {command}")

            

            # 针对 Notepad 命令特殊处理,使用 os.system

            if command.lower() == "notepad":

                try:

                    os.system("start notepad")  # 直接打开Notepad

                    response_message = "Notepad opened"

                except Exception as e:

                    response_message = f"Failed to open Notepad: {str(e)}"

            

            else:

                # 其他命令使用 subprocess 执行并返回输出结果

                try:

                    result = subprocess.run(command, shell=True, capture_output=True, text=True)

                    output = result.stdout if result.stdout else result.stderr

                    print(f"Command output: {output}")  # 输出日志以便调试

                    response_message = f"Command executed: {output}"

                except Exception as e:

                    response_message = f"Command failed: {str(e)}"

            # 将消息发送回钉钉群

            if session_webhook:

                response_data = {

                    "msgtype": "text",

                    "text": {

                        "content": response_message

                    }

                }

                headers = {'Content-Type': 'application/json'}

                response = requests.post(session_webhook, json=response_data, headers=headers)

                print(f"Response to DingTalk: {response.text}")

            return jsonify({'msg': 'Command processed', 'status': 'success'}), 200

        else:

            print("Invalid data format")

            return jsonify({'msg': 'Invalid data format', 'status': 'error'}), 400

    except Exception as e:

        print(f"Error: {str(e)}")

        return jsonify({'msg': 'Error occurred', 'status': 'error', 'error': str(e)}), 500

if name == '__main__':

    app.run(host='0.0.0.0', port=5000, debug=True)