Personal Website

因纽特·冬

ChatGPT接入微信公众号

发布于 # AI

把自己将chatgpt接入微信公众号的方法和步骤记录下来。

Background

  1. 由于受国内网络的限制,很多人并没有使用过chatgpt,能不能有一种简单的方式,使用户不需要配置任何网络,就可以快速感受ai的能力?
  2. 自己折腾玩。

Dev Requirements

  1. dif.ai https://dify.ai

    这是一个可以帮你快速开发AI Applications的平台,1min内就可以创建一个chat-base应用,可以通过浏览器访问,也提供其他嵌入的方式,方便集成到自己网页,因为同时提供API,所以也可以集成到自己的应用中。dif在模型上可以选择免费的chatgpt3.5,当然你也可以填写自己gpt key和model。dif还有很多好玩的设置,可以慢慢探索。总之,你不需要有chatgpt帐号,不需要任何任何基础设施,就可以快速免费的在dif部署一个ai app。

  2. laf https://laf.dev/

    laf是一个severless云开发平台,只需要创建一个函数,把我们的代码放进去,它就帮我们直接在某个服务器上运行起来,一个公网接口,并以RestAPI的方式提供服务。当然服务器不需要我们维护,我们只需要关注代码本身,但是在开发之前需要选择型号,选择最基础的配置就可以了,每个月大概需支持30左右。

  3. 微信公众号

    我们的目的是接入微信公众号,所以你需要申请一个公众号并启用开发设置,步骤自行搜索吧。

Architecture

Alt text

key flow

step1:当我们在微信公众号配置了我们服务器信息后,微信会把公众号的消息转发到我们的服务上。

step2/7:如果是在同一个会话中,我们需要把上下文的信息发给chatgpt,根据dify的api文档,我们需要在请求中带一个conversation_id,这个就是上一条消息的id,所以我们需要对每个消息进行保存,每次请求之前,先查一下这次对话是否在一个上下文中,若有,就把上一个消息的id带上。

step3:这里涉及到dify的API使用,详见 https://docs.dify.ai/user-guide/launching-dify-apps/developing-with-apis

Code

其中比较重要的有个token,这个token需要和公众号服务器设置里填写的token一致,这样才可以通过签名认证,相信消息的来源是我们的公众号。

// 引入crypto和cloud模块
import * as crypto from 'node:crypto'
import cloud from '@lafjs/cloud'

const DIFY_KEY = process.env.DIFY_KEY || ''

const WAIT_MESSAGE = `处理中 ... \n\n请稍等几秒后秒后发送 1 查看回复`
const NO_MESSAGE = `暂无内容,请稍后回复 1 再试`
const CLEAR_MESSAGE = `记忆已清除`
const MY_QRCODE = ``
const HELP_MESSAGE = `
我是一个基于chatgpt的公众号。
目前支持基于文本的智能问答,后面可能会升级为多模态。
WinterAI命令指南:
  1:获取上一次的回复
  /clear:清除上下文
  /help:获取更多帮助
WinterAI菜单指南:
  web:通过浏览器访问使用web winterAI
  help:获得帮助信息,若无法解决请通过coffee备注留言你的问题。
  coffee:如果觉得不错,点击菜单栏的coffee,buy me a coffee支持一下吧
`
// 不支持的消息类型
const UNSUPPORTED_MESSAGE_TYPES = {
  image: '暂不支持图片消息',
  voice: '暂不支持语音消息',
  video: '暂不支持视频消息',
  music: '暂不支持音乐消息',
  news: '暂不支持图文消息',
}

// 定义休眠函数
const sleep = ms => new Promise(resolve => setTimeout(resolve, ms))

// 创建数据库连接并获取Message集合
const db = cloud.database()
const Message = db.collection('messages')

// 处理接收到的微信公众号消息,communicate with wechat server
export async function main(event) {
  console.log('event', event)
  const { signature, timestamp, nonce, echostr } = event.query
  const token = '****'

  // 验证消息是否合法,若不合法则返回错误信息
  if (!verifySignature(signature, timestamp, nonce, token)) {
    return 'Invalid signature'
  }

  // 如果是首次验证,则返回 echostr 给微信服务器
  if (echostr) {
    console.log('echostr', echostr)
    return String(echostr)
  }

  // 处理接收到的消息
  const payload = event.body.xml
  console.log('payload', payload)
  // 暂不支持的消息类型
  if (payload.MsgType in UNSUPPORTED_MESSAGE_TYPES) {
    const responseText = UNSUPPORTED_MESSAGE_TYPES[payload.MsgType]
    return toXML(payload, responseText)
  }
  // 文本消息
  if (payload.msgtype[0] === 'text') {
    const newMessage = {
      msgid: payload.msgid[0],
      question: payload.content[0].trim(),
      username: payload.fromusername[0],
      sessionId: payload.fromusername[0],
      createdAt: Date.now()
    }
    // TODO:other message type
    // 修复请求响应超时问题:如果 5 秒内 AI 没有回复,则返回等待消息
    const responseText = await Promise.race([
      replyText(newMessage),
      sleep(300).then(() => WAIT_MESSAGE),
    ])
    console.log('need to reply')
    return toXML(payload, responseText)
  }

  // 公众号事件
  if (payload.msgtype[0] === 'event') {
    // 公众号订阅
    if (payload.event[0] === 'subscribe') {
      return toXML(payload, HELP_MESSAGE)
    }
  }

  return 'success'
}

// 处理文本回复消息
async function replyText(message) {
  const { question, sessionId } = message
  // 检查是否是重试操作,如果是重试操作,返回上一次的回复
  if (question === '1') {
    const lastMessage = await Message.where({
      sessionId
    }).orderBy('createdAt', 'desc').get()
    if (lastMessage.data[0]) {
      return `${lastMessage.data[0].question}\n------------\n${lastMessage.data[0].answer}`
    }

    return NO_MESSAGE
  }
  // 发送指令
  if (question.startsWith('/')) {
    return await processCommandText(message)
  }

  // 获取上下文 id
  const res = await Message.where({
    sessionId
  }).orderBy('createdAt', 'desc').getOne()

  const parentId = res?.data?.parentMessageId
  // 获取 dify 回复内容
  const { error, answer, parentMessageId } = await sendDifyMessage(question, sessionId, parentId)
  if (error) {
    console.error(`sessionId: ${sessionId}; question: ${question}; error: ${error}`)
    return error
  }

  // 将消息保存到数据库中
  const token = question.length + answer.length
  const result = await Message.add({ token, answer, parentMessageId, ...message })
  console.debug(`[save message] result: ${result}`)

  return answer
}

// 获取 dify API 的回复
async function sendDifyMessage(message, user, parentId) {
  console.log('[sendDifyMessage] called', parentId)
  let result = ''
  let pid = ''
  if (!parentId) {
    parentId = ''
  }
  try {
    const response = await cloud.fetch({
      url: 'https://api.dify.ai/v1/chat-messages',
      method: 'POST',
      headers: {
        Authorization: `Bearer ${DIFY_KEY}`
      },
      data: {
        inputs: {},
        response_mode: 'streaming',
        query: message,
        user,
        conversation_id: parentId
      },
      responseType: 'stream'
    })
    return new Promise((resolve, reject) => {
      let buffer = '' // 缓存数据
      response.data.on('data', (data) => {
        buffer += data.toString()
        const chunks = buffer.split('\n\n') // 使用 \n\n 分割数据
        for (let i = 0; i < chunks.length - 1; i++) {
          const message = chunks[i]
          try {
            // 检查是不是sse协议
            if (!message.startsWith('data: '))
              continue
            console.log('[sendDifyMessage]before:', message, message.substring(6))
            const parsedChunk = JSON.parse(message.substring(6))
            console.log('[sendDifyMessage]after parsedChunk:', parsedChunk)
            const { answer, event, conversation_id } = parsedChunk
            if (event === 'message') {
              result += answer
              pid = conversation_id
            }
          }
          catch (e) {
            console.error('[sendDifyMessage]parsedChunk error', message)
            reject({ error: e })
          }
        }
        buffer = chunks[chunks.length - 1] // 更新剩余的未处理数据
      })
      // stream结束时把剩下的消息全部发出去
      response.data.on('end', () => {
        resolve({ answer: result.replace('\n\n', ''), parentMessageId: pid })
      })
    })
  }
  catch (e) {
    console.log(e)
    if (e.statusCode === 429) {
      return Promise.reject({
        error: '问题太多了,我有点眩晕,请稍后再试。可能是额度已耗尽或已过期,请检查额度'
      })
    }
    return Promise.reject({
      error: '问题太难了 出错了. (uДu〃).',
    })
  }
}

// 校验微信服务器发送的消息是否合法
function verifySignature(signature, timestamp, nonce, token) {
  const arr = [token, timestamp, nonce].sort()
  const str = arr.join('')
  const sha1 = crypto.createHash('sha1')
  sha1.update(str)
  return sha1.digest('hex') === signature
}

// 返回组装 xml
function toXML(payload, content) {
  const timestamp = Date.now()
  const { tousername: fromUserName, fromusername: toUserName } = payload
  return `
  <xml>
    <ToUserName><![CDATA[${toUserName}]]></ToUserName>
    <FromUserName><![CDATA[${fromUserName}]]></FromUserName>
    <CreateTime>${timestamp}</CreateTime>
    <MsgType><![CDATA[text]]></MsgType>
    <Content><![CDATA[${content}]]></Content>
  </xml>
  `
}

async function processCommandText({ sessionId, question }) {
  // 清理历史会话
  if (question === '/clear') {
    const res = await Message.where({ sessionId }).remove({ multi: true })
    return CLEAR_MESSAGE
  }
  else {
    return HELP_MESSAGE
  }
}

Issues

  1. 在配置公众号的服务器设置时,如果使用laf给的地址,提交会失败,说这个地址有风险。所以我们要在laf中将它给的地址,托管到我们自己的域名(支持https)下面,所以你还需要有个域名hhh。 Alt text

    Alt text

  2. 为什么公众号不能stream的方式给出答案,而是要回复1查看。

    众所周知,推荐的chatgpt回复方式是流式的,就是一个字一个字输出。dify也是推荐用stream的方式读取chatgpt的response,因为你无法确定chatgpt的回道要多久,如果用一次读取的方式,就要等在那里,甚至受一些限制链接会断开,相比之下,使用流式读取的方式可以更早地获取生成的部分响应并将其返回给用户,从而减少用户等待的时间,高用户体验。所以回答这个问题:

    a. 为什么不是stream: 我们在调用dify其实也是流的方式读取返回,但是在微信公众号这一端不支持stream的方式,所以我们是流式读取chatgpt的回复,但是一次性的返回给微信公众号。这是微信公众号这边的限制。

    b.为什么要先返回1: 这是微信的第二个限制。微信公众号有一个超时时间5s,也就是如果5s内没有回复就要重试请求,一共三次,如果三次都没有返回请求的话,那么就会提示微信公众号不可用。​而通过实践,chatgpt很大概率上不能在5s内完成回复。所以我们先在5s内给个回复,避免不必要的重试,也让微信知道我们是active的。

Demo

Alt text

Alt text

Alt text