背景
最近由于工作要求需要使用Springboot搭建一个流式响应服务,即客户端发送一次请求,服务端需要多次响应才能返回完整的数据。使用场景就是与chatGPT对话,你问一个问题,页面会逐字将结果打印出来。
下面我在SpringBoot中可以简单的实现一下这种场景需求,即SSE(Server-Sent Events)模式
前端请求实现方式
目前前端的请求实现方式有两种,一个是采用EventSource实现,这种实现方式不支持自定义的请求头,也就没有办法再请求头部中增加Token这样的用户身份验证信息。并且该方式只支持GET请求方式。所以这种实现方式只适用于,不需要验证用户身份并且请求参数内容少的情况下。
若要传输更多的参数信息或者在请求头中增加自定义内容建议使用AbortController实现
若传输过程中链接断开,EventSource可以实现自动重新链接,AbortController不能实现自动重新链接。
使用EventSource实现
// 建立连接
let source = new EventSource('http://localhost:8080/sse/connect/' + userId);
/**
* 连接一旦建立,就会触发open事件
* 另一种写法:source.onopen = function (event) {}
*/
source.addEventListener('open', function (e) {
console.log("建立连接。。。");
}, false);
/**
* 客户端收到服务器发来的数据
* 另一种写法:source.onmessage = function (event) {}
*/
source.addEventListener('message', function (e) {
console.log(e.data);
});
/**
* 如果发生通信错误(比如连接中断),就会触发error事件
* 或者:
* 另一种写法:source.onerror = function (event) {}
*/
source.addEventListener('error', function (e) {
if (e.readyState === EventSource.CLOSED) {
console.log("连接关闭");
} else {
console.log(e);
}
}, false);
使用AbortController实现
<template>
<div>
<input v-model="name" placeholder="Enter your name">
<button @click="sendPost">Send POST request</button>
<button @click="stopGenerating">Stop Generating</button>
<button @click="restartGenerating">Restart Generating</button>
<pre>{{ response }}</pre>
</div>
</template>
<script>
export default {
data() {
return {
name: '',
response: '',
controller: new AbortController(),
isStopped: false
}
},
methods: {
async sendPost() {
this.controller = new AbortController()
this.response = ''
this.isStopped = false
const response = await fetch('http://127.0.0.1:5000/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ name: this.name }),
signal: this.controller.signal
})
const reader = response.body.getReader()
while (true) {
if (this.isStopped) break
const { done, value } = await reader.read()
if (done) break
this.response += new TextDecoder().decode(value)
}
},
stopGenerating() {
this.controller.abort()
this.isStopped = true
},
restartGenerating() {
this.controller = new AbortController()
this.sendPost()
}
}
}
</script>
后端响应实现方式
使用SseEmitter实现
@RequestMapping(value = "/talkeAbouttestSseEmitter")
public SseEmitter talkeAbouttestSseEmitter(HttpServletResponse response, @RequestBody JSONObject object) throws IOException {
SseEmitter emitter = new SseEmitter();
logger.info("【prompt内容】:{}", object.getString("prompt"));
String str = " 什么是爱而不得? \n" +
"东边日出西边雨,道是无晴却有晴。\n" +
"他朝若是同淋雪,此生也算共白头。\n" +
"我本将心向明月,奈何明月照沟渠。\n" +
"此时相望不相闻,愿逐月华流照君。\n" +
"衣带渐宽终不悔,为伊消得人憔悴。\n" +
"此情可待成追忆,只是当时已惘然。\n" +
"人生若只如初见,何事西风悲画扇。\n" +
"曾经沧海难为水,除却巫山不是云。\n" +
"何当共剪西窗烛,却话巴山夜雨时。\n" +
"天长地久有时尽,此恨绵绵无绝期。\n" +
"\n";
response.setHeader("Content-Type", "text/event-stream");
response.setContentType("text/event-stream");
response.setCharacterEncoding("UTF-8");
response.setHeader("Pragma", "no-cache");
new Thread(() -> {
// // 响应流
try {
for (int i = 0; i < str.length(); i++) {
// 指定事件标识 event: 这个为固定格式
emitter.send(String.valueOf(str.charAt(i)));
Thread.sleep(100);
}
emitter.send("stop");
emitter.complete(); // Complete the SSE connection
} catch (IOException e) {
e.printStackTrace();
}
}).start();
return emitter;
}
使用HttpServlet实现
@RequestMapping(value = "/talkeAbouttestEvent")
public void talkeAbouttestEvent(HttpServletResponse response, @Param("prompt") String prompt) throws IOException {
logger.info("【prompt内容】:{}", prompt);
String str = " 什么是爱而不得? \n" +
"东边日出西边雨,道是无晴却有晴。\n" +
"他朝若是同淋雪,此生也算共白头。\n" +
"我本将心向明月,奈何明月照沟渠。\n" +
"此时相望不相闻,愿逐月华流照君。\n" +
"衣带渐宽终不悔,为伊消得人憔悴。\n" +
"此情可待成追忆,只是当时已惘然。\n" +
"人生若只如初见,何事西风悲画扇。\n" +
"曾经沧海难为水,除却巫山不是云。\n" +
"何当共剪西窗烛,却话巴山夜雨时。\n" +
"天长地久有时尽,此恨绵绵无绝期。\n" +
"\n";
// 响应流
response.setHeader("Content-Type", "text/event-stream");
response.setContentType("text/event-stream");
response.setCharacterEncoding("UTF-8");
response.setHeader("Pragma", "no-cache");
try {
// 指定事件标识 event: 这个为固定格式
response.getWriter().write("event:open\n");
response.getWriter().flush();
for (int i = 0; i < str.length(); i++) {
// 指定事件标识 event: 这个为固定格式
// response.getWriter().write("event:msg\n");
// 格式:data: + 数据 + 2个回车
response.getWriter().write("data:{\"content\":\""+ String.valueOf(str.charAt(i)).getBytes(StandardCharsets.UTF_8) + "\"}\n\n");
response.getWriter().flush();
Thread.sleep(100);
}
// 指定事件标识 event: 这个为固定格式
response.getWriter().write("event:error\n");
response.getWriter().flush();
// response.getWriter().close();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
}
}
后端请求实现方式
/**
* *
* @param url
* @param json
* @return
*/
public static BufferedReader sendJsonPostResveEventStream(String url, String json) {
PrintWriter out = null;
BufferedReader in = null;
BufferedReader reader = null;
try {
log.info("sendPost - {}", url);
log.info("json - {}", json);
URL realUrl = new URL(url);
HttpURLConnection conn = (HttpURLConnection) realUrl.openConnection();
conn.setRequestMethod("POST");
conn.setDoOutput(true);
conn.setDoInput(true);
conn.setUseCaches(false);
conn.setRequestProperty("Connection", "Keep-Alive");
conn.setRequestProperty("Charset", "UTF-8");
conn.setRequestProperty("Content-Type", "application/json; charset=UTF-8");
conn.setRequestProperty("accept", "application/json");
if (json != null && !json.equals("")) {
byte[] writebytes = json.getBytes();
conn.setRequestProperty("Content-Length", String.valueOf(writebytes.length));
OutputStream outwritestream = conn.getOutputStream();
outwritestream.write(json.getBytes());
outwritestream.flush();
outwritestream.close();
conn.getResponseCode();
}
if (conn.getResponseCode() == 200) {
reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
return reader;
}
} catch (ConnectException e) {
log.error("调用HttpUtils.sendPost ConnectException, url=" + url + ",param=" + json, e);
} catch (SocketTimeoutException e) {
log.error("调用HttpUtils.sendPost SocketTimeoutException, url=" + url + ",param=" + json, e);
} catch (IOException e) {
log.error("调用HttpUtils.sendPost IOException, url=" + url + ",param=" + json, e);
} catch (Exception e) {
log.error("调用HttpsUtil.sendPost Exception, url=" + url + ",param=" + json, e);
} finally {
try {
if (out != null) {
out.close();
}
if (in != null) {
in.close();
}
} catch (IOException ex) {
log.error("调用in.close Exception, url=" + url + ",param=" + json, ex);
}
}
return null;
}
后端请求然后以事件流的方式发送给前端
@PostMapping(value = "/talkeAbout", produces = "text/event-stream")
public void talkeAbout(HttpServletResponse response, @RequestBody JSONObject object) throws IOException {
response.setHeader("Content-Type", "text/event-stream");
response.setContentType("text/event-stream");
response.setCharacterEncoding("UTF-8");
response.setHeader("Pragma", "no-cache");
talkeAboutToXinference(object.getString("prompt"), response);
}
public void talkeAboutToXinference(String msg, HttpServletResponse response) throws IOException {
String json = CHAT_PRARAM.replace("user_talke_about", msg);
BufferedReader reader = HttpUtils.sendJsonPostResveEventStream("http://localhost/chat" + CHAT_CHAT_COMPLETIONS, json);
if (reader == null) return;
String line = "";
while ((line = reader.readLine()) != null) {
response.getWriter().write(line +"\n");
response.getWriter().flush();
}
response.getWriter().close();
}