uniapp 开发微信小程序,接入AI接口, 使用异步迭代器(Async Iterator)优雅实现 AI 打字机效果
...大约 7 分钟
uniapp 开发微信小程序,接入AI接口, 使用异步迭代器(Async Iterator)优雅实现 AI 打字机效果
背景
在上一篇博客中,我介绍了如何使用 callback 回调函数的方式对接讯飞星火大模型,实现打字机效果。虽然这种方式可以工作,但代码分散在回调函数中,可读性和可维护性都不够理想。
本篇将介绍如何使用 异步迭代器(Async Iterator) 来重构整个流程,让代码更加线性、直观,就像写同步代码一样处理异步流式数据。
一、什么是异步迭代器(Async Iterator)
在深入代码之前,我们先来理解一下异步迭代器的核心概念。
1.1 从同步迭代器说起
你可能已经用过 for...of 循环遍历数组:
const arr = [1, 2, 3];
for (const item of arr) {
console.log(item); // 1, 2, 3
}数组之所以能被遍历,是因为它实现了 迭代器协议(Iterator Protocol),即拥有 [Symbol.iterator] 方法,返回一个包含 next() 方法的对象。
1.2 异步迭代器的核心区别
| 特性 | 同步迭代器 | 异步迭代器 |
|---|---|---|
| 协议 | [Symbol.iterator] | [Symbol.asyncIterator] |
next() 返回值 | { value, done } | Promise<{ value, done }> |
| 遍历语法 | for...of | for await...of |
| 适用场景 | 内存中的同步数据 | 异步获取的数据(网络请求、流式数据等) |
1.3 async generator 函数
ES2018 引入了 async generator 函数,让我们可以用声明式的方式创建异步迭代器:
// 函数前加 async,function 后加 *
async function* asyncGenerator() {
yield 1;
yield 2;
yield 3;
}
// 使用 for await...of 遍历
async function main() {
for await (const num of asyncGenerator()) {
console.log(num); // 1, 2, 3
}
}关键点:for await...of 会自动处理 Promise,每次迭代都会等待异步操作完成后再继续。
二、为什么要用异步迭代器改造
2.1 callback 方式的问题
回顾之前的 callback 实现:
sendToSpark(
'请分析这道题目',
(answer, seq, status) => {
// 处理逻辑分散在回调里
if (seq == 0) {
// 第一帧处理
} else {
// 中间帧处理
}
if (status == 2) {
// 最后一帧处理
}
}
);缺点:
- 逻辑分散,代码难以阅读
- 错误处理困难,需要额外的错误回调
- 无法使用
break提前终止 - 状态管理复杂(需要在回调外维护变量)
2.2 异步迭代器方式的优势
// 线性、直观的代码
for await (const chunk of sendToSparkAI('请分析这道题目')) {
console.log(chunk.content);
if (chunk.done) {
console.log('接收完成');
}
}优势:
- ✅ 代码线性化,像写同步代码一样
- ✅ 可以用
try/catch统一处理错误 - ✅ 可以用
break随时终止 - ✅ 状态管理更简洁
三、核心挑战:事件驱动 vs 拉取式
WebSocket 是事件驱动的(服务器推送),而异步迭代器是拉取式的(消费者主动请求)。这两者节奏不同,需要一个"适配器"来协调。
WebSocket 推送时机 Generator 请求时机
↓ ↓
随时可能到来 await next()
(被动接收) (主动拉取)
↓ ↓
"我有消息了!" "我要数据!"
└──────────┬──────────┘
↓
【需要协调机制】四、改造后的完整代码
4.1 核心实现:ai-async-iterator.js
import CryptoJS from '../src/static/crypto-js';
const httpUrl = 'https://spark-api.xf-yun.com/v1.1/chat';
const modelDomain = 'lite';
const APPID = ''; // 控制台获取填写
const APISecret = '';
const APIKey = '';
let socketTask = null;
// base64编码(保持不变)
function weBtoa(string) {
var b64 = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=';
string = String(string);
var bitmap, a, b, c, result = '', i = 0, rest = string.length % 3;
for (; i < string.length; ) {
if ((a = string.charCodeAt(i++)) > 255 || (b = string.charCodeAt(i++)) > 255 || (c = string.charCodeAt(i++)) > 255)
throw new TypeError("Failed to execute 'btoa' on 'Window': The string to be encoded contains characters outside of the Latin1 range.");
bitmap = (a << 16) | (b << 8) | c;
result += b64.charAt((bitmap >> 18) & 63) + b64.charAt((bitmap >> 12) & 63) + b64.charAt((bitmap >> 6) & 63) + b64.charAt(bitmap & 63);
}
return rest ? result.slice(0, rest - 3) + '==='.substring(rest) : result;
}
// 鉴权(保持不变)
async function getWebSocketUrl() {
var httpUrlHost = httpUrl.substring(8, 28);
var httpUrlPath = httpUrl.substring(28);
return new Promise((resolve, reject) => {
var url = 'wss://' + httpUrlHost + httpUrlPath;
var host = 'spark-api.xf-yun.com';
var apiKeyName = 'api_key';
var date = new Date().toGMTString();
var algorithm = 'hmac-sha256';
var headers = 'host date request-line';
var signatureOrigin = `host: ${host}\ndate: ${date}\nGET ${httpUrlPath} HTTP/1.1`;
var signatureSha = CryptoJS.HmacSHA256(signatureOrigin, APISecret);
var signature = CryptoJS.enc.Base64.stringify(signatureSha);
var authorizationOrigin = `${apiKeyName}="${APIKey}", algorithm="${algorithm}", headers="${headers}", signature="${signature}"`;
var authorization = weBtoa(authorizationOrigin);
url = `${url}?authorization=${authorization}&date=${encodeURI(date)}&host=${host}`;
resolve(url);
});
}
/**
* 使用异步迭代器调用 AI 接口
* @param {string} questionTitle - 用户问题
* @param {number} temperature - 随机性参数
* @param {array} historyTextList - 历史对话记录
* @returns {AsyncGenerator} 异步迭代器
*/
async function* sendToSparkAI(
questionTitle,
temperature = 0.5,
historyTextList = []
) {
// 用于缓存 WebSocket 提前到达的消息
const messageQueue = [];
// 用于在 Generator 等待时,从外部唤醒它
let resolveNext = null;
let isCompleted = false;
let hasError = null;
// 将消息推入队列或直接交给等待中的迭代器
const pushMessage = (content, seq, status) => {
const message = { content, seq, status, done: status === 2 };
if (resolveNext) {
// 情况1:Generator 正在 await 等待数据
// 直接交付,并清空 resolveNext
resolveNext({ value: message, done: false });
resolveNext = null;
} else {
// 情况2:Generator 还没来要数据,先存起来
messageQueue.push(message);
}
if (status === 2) {
isCompleted = true;
}
};
// 创建一个可以被外部 resolve 的 Promise
const waitForMessage = () => new Promise((resolve) => {
// 把 resolve 函数保存到外部变量
// 这样 WebSocket 回调里可以调用它来唤醒 Generator
resolveNext = resolve;
});
// 建立 WebSocket 连接
let myUrl = await getWebSocketUrl();
await new Promise((resolve, reject) => {
socketTask = uni.connectSocket({
url: myUrl,
method: 'GET',
success: resolve,
fail: reject
});
socketTask.onError((res) => {
hasError = res;
reject(res);
});
socketTask.onOpen((res) => {
historyTextList.push({
role: 'user',
content: questionTitle,
});
let params = {
header: {
app_id: APPID,
uid: 'aef9f963-7',
},
parameter: {
chat: {
domain: modelDomain,
temperature: temperature,
max_tokens: 4096,
},
},
payload: {
message: {
text: historyTextList,
},
},
};
socketTask.send({
data: JSON.stringify(params),
});
});
// 接收到消息时,推送到队列
socketTask.onMessage((res) => {
let obj = JSON.parse(res.data);
let dataArray = obj.payload.choices.text;
for (let i = 0; i < dataArray.length; i++) {
pushMessage(
dataArray[i].content,
obj.payload.choices.seq,
obj.payload.choices.status
);
}
let temp = JSON.parse(res.data);
if (temp.header.code !== 0) {
socketTask.close({ success() {}, fail() {} });
}
if (temp.header.code === 0 && temp.header.status === 2) {
setTimeout(() => {
socketTask.close({ success() {}, fail() {} });
}, 1000);
}
});
});
// 生成器核心循环
try {
// 循环直到完成且队列为空
while (!isCompleted || messageQueue.length > 0) {
if (messageQueue.length > 0) {
// 队列有消息,直接产出
const msg = messageQueue.shift();
yield msg;
} else {
// 队列空,挂起等待下一条消息
const result = await waitForMessage();
yield result.value;
}
}
} finally {
// 确保资源清理(即使循环被 break)
if (socketTask) {
socketTask.close();
}
}
}
export default sendToSparkAI;4.2 在 Vue 中使用
export default {
data() {
return {
answerContent: '',
talkList: [],
aiModelShow: false
};
},
methods: {
// 调用 AI 接口
async getToken() {
this.answerContent = '';
this.aiModelShow = true;
let tempAnswer = '';
try {
// 使用 for await...of 优雅地处理流式数据
for await (const chunk of sendToSparkAI(
'请分析这道题目...',
0.5,
this.talkList
)) {
const { content, seq, status, done } = chunk;
// 实时更新显示(打字机效果)
this.answerContent += content;
// 处理第一条消息
if (seq === 0) {
tempAnswer += content;
this.talkList.push({
role: 'user',
content: '请分析这道题目...'
});
} else {
tempAnswer += content;
}
// 处理结束
if (status === 2 || done) {
this.talkList.push({
role: 'assistant',
content: tempAnswer
});
}
}
console.log('AI 回复完成');
} catch (error) {
console.error('AI 调用出错:', error);
uni.showToast({
title: 'AI 服务异常',
icon: 'none'
});
}
},
// 重新解析
async aiCancel() {
this.answerContent = '';
let tempAnswer = '';
try {
for await (const chunk of sendToSparkAI(
'重新详细解析一下!',
0.5,
this.talkList
)) {
const { content, seq, status } = chunk;
this.answerContent += content;
if (seq === 0) {
tempAnswer += content;
this.talkList.push({
role: 'user',
content: '重新详细解析一下!'
});
} else {
tempAnswer += content;
}
if (status === 2) {
this.talkList.push({
role: 'assistant',
content: tempAnswer + content
});
}
}
} catch (error) {
console.error('重新解析出错:', error);
}
}
}
};五、核心机制
5.1 pushMessage 的双分支逻辑
const pushMessage = (content, seq, status) => {
const message = { content, seq, status, done: status === 2 };
if (resolveNext) {
// 分支1:Generator 正在等着要数据
resolveNext({ value: message, done: false });
resolveNext = null;
} else {
// 分支2:Generator 还没来要,先存队列
messageQueue.push(message);
}
};5.2 waitForMessage 的巧妙之处
const waitForMessage = () => new Promise((resolve) => {
resolveNext = resolve; // 把 resolve "暴露"出去
});普通 Promise 的 resolve 在内部决定:
const p = new Promise((resolve) => {
setTimeout(() => resolve('ok'), 1000); // 1秒后 resolve
});我们的做法是把 resolve "保存"到外部变量,让 WebSocket 回调可以调用:
// Generator 里:创建 Promise,保存 resolve
const result = await waitForMessage(); // 挂起
// WebSocket 回调里:唤醒 Promise
resolveNext({ value: message, done: false }); // Generator 继续执行六、异步迭代器的其他用法
6.1 提前终止迭代
for await (const chunk of sendToSparkAI('问题')) {
this.answerContent += chunk.content;
// 如果用户点击了停止按钮,可以提前终止
if (this.userCancelled) {
break; // ✅ 可以直接 break
}
}
// 由于 finally 块,socket 会自动关闭6.2 配合其他异步操作
async function processWithAI() {
// 先显示加载状态
this.loading = true;
// 调用 AI
for await (const chunk of sendToSparkAI('问题')) {
this.answerContent += chunk.content;
// 可以配合其他异步操作
if (chunk.seq % 5 === 0) {
await this.autoSave(); // 每5帧自动保存
}
}
this.loading = false;
}6.3 错误处理
async function safeCallAI() {
try {
for await (const chunk of sendToSparkAI('问题')) {
this.answerContent += chunk.content;
}
} catch (error) {
// 统一处理所有错误(WebSocket 错误、网络错误等)
console.error('AI 调用失败:', error);
uni.showToast({ title: '服务异常,请重试', icon: 'none' });
} finally {
// 清理工作
this.loading = false;
}
}提示
本文的完整代码可以直接在 uniapp 项目中使用,只需替换你的 APPID、APISecret 和 APIKey 即可。