什么年代了还在用传统划水网站?来试试这款chatGPT钉钉机器人吧。chatGPT已经火了几个月了,github上面各种GPT插件和机器人🤖️也是层出不穷,俺随大流也做了一个python版本的钉钉机器人,最近感觉服务差不多稳定了,所以在这里分享给大家
★目录 ★前提 ★效果展示 ★创建钉钉机器人 ★加入chatGPT ★结束
★前提
有钉钉管理员权限,没有的话自己建一个测试公司拉小伙伴进来一起划水
有服务器,python3.9以上的环境或者使用docker
有chatGPT的session
有一些python和服务器的基础知识
俗话说巧妇难为无米之炊,钉钉管理员、服务器、chatGPT的session这三个是必须滴,如果不了解python的话,也可以用下面提到的nodejs或者.NET库
★效果展示 如果你已经满足了上面的条件,想立即体验的话可以直接克隆到服务器 dingtalk-chatgpt-bot ,修改config.js配置后就可以使用了
★创建钉钉机器人 ◇什么是钉钉机器人 官方文档 说:在钉钉,机器人是独立存在的一个应用类型,可以开箱即用,也可以进行二次开发,无需和微应用或者群等场景进行强制绑定。 官方说的有点绕,在俺的理解中,钉钉机器人就是一个代理服务,可以把你的消息转发给第三方,也可以从第三方再由机器人转发回来。机器人通常用来做消息推送或者资料查询 我当时是跟着 老表 的教程来的,改了一部分东西
◇创建机器人
创建公司
登录开发者后台,按照如下图示顺序创建应用,提示选择新版和旧版的话选择旧版,注意应用名不能有chatGPT
更改配置,这时候保存不了,ip对应的服务还没有启动,我们等下面服务启动之后再来进行这个
点击调试,会创建测试群,测试通过之后上线
在群聊里找到智能群助手,添加机器人,然后@机器人就可以进行玩耍了
◇开启服务
安装quart(类似flask,不过可以进行异步处理)
创建index.py,写入如下代码 1 2 3 4 5 6 7 8 9 10 from quart import Quart app = Quart(__name__)@app.route('/' , methods=['GET' , 'POST' ] ) async def get_data (): return 'Hello world' if __name__ == '__main__' : app.run(host='127.0.0.1' , port=8083 )
开启服务,打开 http://127.0.0.1:8083/ 就能看到熟悉的hello world了,很简单对吧?
需要注意的是,127.0.0.1是本地开发调试用的,如果部署到服务器,需要改成0.0.0.0端口,并开启网络防火墙,这部分我也不多说了,需要的这看 老表 的原文吧
全部代码如下, 更改app_secret
为机器人应用信息里的app_secret
展开查看完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 import base64import hmacimport hashlibimport requestsimport datetimefrom quart import Quart, request app = Quart(__name__)@app.route('/' , methods=['GET' , 'POST' ] ) async def get_data (): if request.method == "POST" : try : req_data = await request.get_json() timestamp = request.headers.get('Timestamp' ) sign = request.headers.get('Sign' ) print ('request.data-----\n' , req_data) if check_sig(timestamp) == sign: print ('签名验证成功-----' ) await handle_info(req_data) return str (req_data) else : result = '签名验证失败-----' print (result) return result except Exception as e: result = '出错啦~~' print ('error' , repr (e)) return str (result) return '钉钉机器人:' + str (datetime.datetime.now())async def handle_info (req_data ): text_info = req_data['text' ]['content' ].strip() webhook_url = req_data['sessionWebhook' ] senderid = req_data['senderId' ] answer = '测试成功:' + text_info send_md_msg(senderid, answer, webhook_url)def send_md_msg (userid, message, webhook_url ): ''' userid: @用户 钉钉id title : 消息标题 message: 消息主体内容 webhook_url: 通讯url ''' message = '<font color=#008000>@%s </font> \n\n %s' % (userid, message) title = '大聪明说' data = { "msgtype" : "markdown" , "markdown" : { "title" :title, "text" : message }, "at" : { "atDingtalkIds" : [ userid ], } } req = requests.post(webhook_url, json=data)def check_sig (timestamp ): app_secret = 'BIQ7O8AqNMRiHrW....' app_secret_enc = app_secret.encode('utf-8' ) string_to_sign = '{}\n{}' .format (timestamp, app_secret) string_to_sign_enc = string_to_sign.encode('utf-8' ) hmac_code = hmac.new(app_secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = base64.b64encode(hmac_code).decode('utf-8' ) return signif __name__ == '__main__' : app.run(host='0.0.0.0' , port=8083 )
◇测试效果 部署成功后再回到机器人配置页面,这时候配置应该就能保存成功了,回到版本管理与发布中点击调试,会创建调试群,这时候@机器人就能收到消息了,结果如下
★加入chatGPT 如果你测试机器人能收到消息之后,下一步需要做的就是把handle_info的回复改成chatGPT的回复。
◇请求代理库PyGPT 这里使用的是 PawanOsman 开发的一个python库,他似乎突破了openAI的某些限制,可以代理我们的请求到 https://chat.openai.com/chat ,看起来就像是在使用网页请求一样,并且请求的历史也可以在官网上看到。所以不像是openAI的官方库那么笨,包括GPT3.5。如果你不是一个python开发者,你也可以使用他的 nodeJs库 或者 .Net库 自行开发非python的机器人
库的使用很简单,如demo所示,把pyGPT的参数修改成自己的session就可以了
1 2 3 4 5 6 7 8 9 10 11 12 13 import asynciofrom pygpt import PyGPTasync def main (): chat_gpt = PyGPT('eyJhbGciOiJkaXIiLCJlbmMiOiJBMR0NN....' ) await chat_gpt.connect() await chat_gpt.wait_for_ready() answer = await chat_gpt.ask('What is the capital of France?' ) print (answer) await chat_gpt.disconnect()if __name__ == '__main__' : asyncio.run(main())
修改handle_info中的answer为chatGPT的回复
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 async def handle_info (req_data ): text_info = req_data['text' ]['content' ].strip() webhook_url = req_data['sessionWebhook' ] senderid = req_data['senderId' ] retry_count = 0 max_retry_count = 3 while retry_count < max_retry_count: try : chat_gpt = PyGPT('eyJhbGciOiJkaXIiLCJlbmMiOiJBMR0NN....' ) await chat_gpt.connect() await chat_gpt.wait_for_ready() answer = await chat_gpt.ask(text_info) await chat_gpt.disconnect() print ('answer:\n' , answer) print ('--------------------------' ) break except Exception as e: retry_count = retry_count + 1 print ('retry_count' , retry_count) print ('error\n' , repr (e)) continue if not answer: answer = '请求接口失败,请稍后重试' send_md_msg(senderid, answer, webhook_url)
有一点需要注意的是,如果我们在钉钉转发过来的http请求里不断的执行上面的代码,每次调用PyGPT都会产生一个新的连接,作者的代理服务器会hold住连接,超过50个socket连接或者短时间内请求太频繁,会被拉黑1~5分钟。所以像这样修改一下代码,在http循环外部创建chat_gpt对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 app = Quart(__name__) chat_gpt = None ...async def handle_info (req_data ): ... global chat_gpt while retry_count < max_retry_count: try : if chat_gpt is None : connect_task = asyncio.create_task(init_connect()) await connect_task answer = await chat_gpt.ask(text_info, senderid) print ('answer:\n' , answer) print ('--------------------------' ) break except Exception as e: retry_count = retry_count + 1 print ('retry_count' , retry_count) print ('error\n' , repr (e)) answer = '' if retry_count == 2 : connect_task = asyncio.create_task(init_connect()) await connect_task continue if not answer: answer = '请求接口失败,请稍后重试'
init_connect函数内容如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 async def init_connect (): retry_count = 0 max_retry_count = 3 while retry_count < max_retry_count: try : global chat_gpt chat_gpt = PyGPT('eyJhbGciOiJkaXIiLCJlbmMiOiJBMR0NN....' ) await chat_gpt.connect() await chat_gpt.wait_for_ready() break except Exception as e: retry_count = retry_count + 1 print ('retry_count' , retry_count) print ('error\n' , repr (e)) continue
为了以后修改配置方便,我们可以把GPT_SESSION和APP_SECRET放到一个config.py文件里并导出
1 2 3 4 5 6 7 8 GPT_SESSION = '' APP_SECRET = '' __all__ = [ GPT_SESSION, APP_SECRET, ]
此时index.py的完整代码如下,功能已经可以正常使用了!
展开查看完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 import base64import hmacimport hashlibimport requestsfrom pygpt import PyGPTimport datetimefrom quart import Quart, requestimport asyncioimport config app = Quart(__name__) chat_gpt = None @app.route('/' , methods=['GET' , 'POST' ] ) async def get_data (): if request.method == "POST" : try : req_data = await request.get_json() timestamp = request.headers.get('Timestamp' ) sign = request.headers.get('Sign' ) print ('request.data-----\n' , req_data) if check_sig(timestamp) == sign: print ('签名验证成功-----' ) await handle_info(req_data) return str (req_data) else : result = '签名验证失败-----' print (result) return result except Exception as e: result = '出错啦~~' print ('error' , repr (e)) return str (result) return '钉钉机器人:' + str (datetime.datetime.now())async def handle_info (req_data ): text_info = req_data['text' ]['content' ].strip() webhook_url = req_data['sessionWebhook' ] senderid = req_data['senderId' ] retry_count = 0 max_retry_count = 3 global chat_gpt while retry_count < max_retry_count: try : if chat_gpt is None : connect_task = asyncio.create_task(init_connect()) await connect_task answer = await chat_gpt.ask(text_info, senderid) print ('answer:\n' , answer) print ('--------------------------' ) break except Exception as e: retry_count = retry_count + 1 print ('retry_count' , retry_count) print ('error\n' , repr (e)) answer = '' if retry_count == 2 : connect_task = asyncio.create_task(init_connect()) await connect_task continue if not answer: answer = '请求接口失败,请稍后重试' send_md_msg(senderid, answer, webhook_url)def send_md_msg (userid, message, webhook_url ): ''' userid: @用户 钉钉id title : 消息标题 message: 消息主体内容 webhook_url: 通讯url ''' message = '<font color=#008000>@%s </font> \n\n %s' % (userid, message) title = '大聪明说' data = { "msgtype" : "markdown" , "markdown" : { "title" :title, "text" : message }, "at" : { "atDingtalkIds" : [ userid ], } } req = requests.post(webhook_url, json=data)def check_sig (timestamp ): app_secret = config.APP_SECRET app_secret_enc = app_secret.encode('utf-8' ) string_to_sign = '{}\n{}' .format (timestamp, app_secret) string_to_sign_enc = string_to_sign.encode('utf-8' ) hmac_code = hmac.new(app_secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = base64.b64encode(hmac_code).decode('utf-8' ) return signasync def init_connect (): retry_count = 0 max_retry_count = 3 while retry_count < max_retry_count: try : global chat_gpt chat_gpt = PyGPT(config.GPT_SESSION) await chat_gpt.connect() await chat_gpt.wait_for_ready() break except Exception as e: retry_count = retry_count + 1 print ('retry_count' , retry_count) print ('error\n' , repr (e)) continue if __name__ == '__main__' : app.run(host='0.0.0.0' , port=8083 )
◇增加上下文功能 经过使用俺发现此时每次聊天都相当于在官网上重新打开一个聊天窗口,没有上下文的功能。经过调试发现chatGPT的接口和pygpt的源码有一些联系,pygpt的self.socket.call返回对象包括conversationId,messageId,answer,而conversationId正是 openai 地址后面的某个对话的id,messageId是对话内上一条回复的parentId,把官网的参数替换到socket.call的参数里,可以完美衔接上一条对话,有了这个关系做上下文语境就简单多了
这里俺用的是python自带的轻量级数据库sqlite3,
pygpt请求之前的时候带上senderid参数
pygpt响应之前看数据库有没有这个用户,有的话就socket.call使用用户的conversation_id、parent_id,没有就使用pygpt默认的随机数。
获取pygpt响应后,新用户的话就以senderid为主键保存一条数据(id、conversation_id、parent_id)。已经存在的话就把响应的messageId更新到parent_id。
新建一个sql.py,代码如下,用来导出sql函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import sqlite3DATABASE = 'database.db' # 查询结果元组转字典 def dict_factory(cursor , row ): d = {} for idx, col in enumerate(cursor .description): d[col[0 ]] = row [idx] return d # 初始化数据库 def init_db(): db = sqlite3.connect (DATABASE , check_same_thread=False ) cursor = db.cursor () create_table_query = ''' CREATE TABLE IF NOT EXISTS user( id TEXT PRIMARY KEY NOT NULL, name TEXT , conversation_id TEXT NOT NULL, parent_id TEXT NOT NULL, create_at timestamp NOT NULL); ''' cursor .execute (create_table_query) cursor .close () db.close () print('数据库初始化成功' ) # 获取数据库 def get_db(): db = sqlite3.connect (DATABASE , check_same_thread=False ) db.row_factory = dict_factory return db # 执行sql 语句 def query_db(query, args=(), one=False ): db = get_db() cur = db.cursor () cur.execute (query, args) rv = cur.fetchall() db.commit () cur.close () db.close () return (rv[0 ] if rv else None ) if one else rv __all__ = [ init_db, query_db ]
初始化数据库
1 2 3 4 5 6 app = Quart (__name__ )init_db ()chat_gpt = None ...
传递 query_db senderid 参数
1 2 3 4 5 if chat_gpt is None : connect_task = asyncio.create_task(init_connect()) await connect_task answer = await chat_gpt.ask(text_info, query_db, senderid) print ('answer:\n' , answer)
把pygpt的源码复制到本地,使用sqlite3保存、更新数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 async def ask (self, prompt, query_db, id ='default' ): if not self.auth or not self.validate_token(self.auth): await self.get_tokens() conversation = self.get_conversation_by_id(id ) sqlite_get_data_query = """ SELECT * FROM user WHERE id = ? """ user_record = query_db(sqlite_get_data_query, (id ,), True ) print ('user_record' , user_record) data = await self.socket.call(event='askQuestion' , data={ 'prompt' : prompt, 'parentId' : user_record['parent_id' ] if user_record else str (conversation['parent_id' ]), 'conversationId' : user_record["conversation_id" ] if user_record else str (conversation['conversation_id' ]), 'auth' : self.auth }, timeout=self.timeout) print ('ask data---\n' , data) if 'error' in data: print (f'Error: {data["error" ]} ' ) return f'Error: {data["error" ]} ' try : if user_record is None : sqlite_insert_data_query = """ INSERT INTO user ('id', 'name', 'conversation_id', 'parent_id', 'create_at') VALUES (?,?,?,?,?); """ query_db(sqlite_insert_data_query, (id , None , data['conversationId' ], data['messageId' ], datetime.datetime.now())) print ('插入数据' ) else : sqlite_update_data_query = """ UPDATE user SET id = ?, name = ?, conversation_id = ?, parent_id = ?, create_at = ? WHERE id = ? """ query_db(sqlite_update_data_query, (id , None , data['conversationId' ], data['messageId' ], datetime.datetime.now(), id )) print ('更新数据' ) except Exception as e: print ('database error\n' , repr (e)) conversation['parent_id' ] = data['messageId' ] conversation['conversation_id' ] = data['conversationId' ] return data['answer' ]
要是增加一条新的对话怎么办呢,就增加一个/reset命令,删掉那个用户的数据,下次他请求就会打开新聊天窗口了
1 2 3 4 5 6 7 8 9 async def handle_info (req_data ): ... if (text_info == '/reset' ): sqlite_delete_data_query = """ DELETE FROM 'user' WHERE id = ? """ query_db(sqlite_delete_data_query, (senderid,)) send_md_msg(senderid, '聊天上下文已重置' , webhook_url) return
◇后台运行 注意: 我们服务此时在前台运行,如果我们关闭命令行窗口,服务就停止了,要想服务在后台运行并且方便的查看日志,我们可以使用nohup命令,输出的日志保存在nohup.out文件里
1 nohup python3 -u index .py > nohup.out 2 >&1 &
查看最新30条日志使用tail命令,ctrl+c退出查看日志
★结束 以上就是俺划水踩坑的全部内容了,完整代码在 dingtalk-chatgpt-bot ,第一次发文,才疏学浅,要是有不足之处还请多多指正