服务端事件推送——HTTP协议的事件流(EventStream)

背景

最近由于工作要求需要使用Springboot搭建一个流式响应服务,即客户端发送一次请求,服务端需要多次响应才能返回完整的数据。使用场景就是与chatGPT对话,你问一个问题,页面会逐字将结果打印出来。

下面我在SpringBoot中可以简单的实现一下这种场景需求,即SSE(Server-Sent Events)模式

前端请求实现方式

目前前端的请求实现方式有两种,一个是采用EventSource实现,这种实现方式不支持自定义的请求头,也就没有办法再请求头部中增加Token这样的用户身份验证信息。并且该方式只支持GET请求方式。所以这种实现方式只适用于,不需要验证用户身份并且请求参数内容少的情况下。

若要传输更多的参数信息或者在请求头中增加自定义内容建议使用AbortController实现

若传输过程中链接断开,EventSource可以实现自动重新链接,AbortController不能实现自动重新链接。

使用EventSource实现

       // 建立连接
       let source = new EventSource('http://localhost:8080/sse/connect/' + userId);

        /**
         * 连接一旦建立,就会触发open事件
         * 另一种写法:source.onopen = function (event) {}
         */
        source.addEventListener('open', function (e) {
            console.log("建立连接。。。");
        }, false);

        /**
         * 客户端收到服务器发来的数据
         * 另一种写法:source.onmessage = function (event) {}
         */
        source.addEventListener('message', function (e) {
           console.log(e.data);
        });


        /**
         * 如果发生通信错误(比如连接中断),就会触发error事件
         * 或者:
         * 另一种写法:source.onerror = function (event) {}
         */
        source.addEventListener('error', function (e) {
            if (e.readyState === EventSource.CLOSED) {
                console.log("连接关闭");
            } else {
                console.log(e);
            }
        }, false);

使用AbortController实现

<template>
  <div>
    <input v-model="name" placeholder="Enter your name">
    <button @click="sendPost">Send POST request</button>
    <button @click="stopGenerating">Stop Generating</button>
    <button @click="restartGenerating">Restart Generating</button>
    <pre>{{ response }}</pre>
  </div>
</template>
 
<script>
export default {
  data() {
    return {
      name: '',
      response: '',
      controller: new AbortController(),
      isStopped: false
    }
  },
  methods: {
    async sendPost() {
  this.controller = new AbortController()
  this.response = ''
  this.isStopped = false
  const response = await fetch('http://127.0.0.1:5000/stream', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ name: this.name }),
    signal: this.controller.signal
  })
  const reader = response.body.getReader()
  while (true) {
    if (this.isStopped) break
    const { done, value } = await reader.read()
    if (done) break
    this.response += new TextDecoder().decode(value)
  }
},
 
    stopGenerating() {
      this.controller.abort()
      this.isStopped = true
    },
    restartGenerating() {
      this.controller = new AbortController()
      this.sendPost()
    }
  }
}
</script>

后端响应实现方式

使用SseEmitter实现

 @RequestMapping(value = "/talkeAbouttestSseEmitter")
    public SseEmitter talkeAbouttestSseEmitter(HttpServletResponse response, @RequestBody JSONObject object) throws IOException {
        SseEmitter emitter = new SseEmitter();
        logger.info("【prompt内容】:{}", object.getString("prompt"));
        String str = "       什么是爱而不得? \n" +
                "东边日出西边雨,道是无晴却有晴。\n" +
                "他朝若是同淋雪,此生也算共白头。\n" +
                "我本将心向明月,奈何明月照沟渠。\n" +
                "此时相望不相闻,愿逐月华流照君。\n" +
                "衣带渐宽终不悔,为伊消得人憔悴。\n" +
                "此情可待成追忆,只是当时已惘然。\n" +
                "人生若只如初见,何事西风悲画扇。\n" +
                "曾经沧海难为水,除却巫山不是云。\n" +
                "何当共剪西窗烛,却话巴山夜雨时。\n" +
                "天长地久有时尽,此恨绵绵无绝期。\n" +
                "\n";
        response.setHeader("Content-Type", "text/event-stream");
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("UTF-8");
        response.setHeader("Pragma", "no-cache");
        new Thread(() -> {
//            // 响应流
            try {
                for (int i = 0; i < str.length(); i++) {
                    // 指定事件标识  event: 这个为固定格式
                    emitter.send(String.valueOf(str.charAt(i)));
                    Thread.sleep(100);
                }
                emitter.send("stop");
                emitter.complete(); // Complete the SSE connection
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
        return emitter;
    }

使用HttpServlet实现

    @RequestMapping(value = "/talkeAbouttestEvent")
    public void talkeAbouttestEvent(HttpServletResponse response, @Param("prompt") String prompt) throws IOException {
        logger.info("【prompt内容】:{}", prompt);
        String str = "       什么是爱而不得? \n" +
                "东边日出西边雨,道是无晴却有晴。\n" +
                "他朝若是同淋雪,此生也算共白头。\n" +
                "我本将心向明月,奈何明月照沟渠。\n" +
                "此时相望不相闻,愿逐月华流照君。\n" +
                "衣带渐宽终不悔,为伊消得人憔悴。\n" +
                "此情可待成追忆,只是当时已惘然。\n" +
                "人生若只如初见,何事西风悲画扇。\n" +
                "曾经沧海难为水,除却巫山不是云。\n" +
                "何当共剪西窗烛,却话巴山夜雨时。\n" +
                "天长地久有时尽,此恨绵绵无绝期。\n" +
                "\n";
        // 响应流
        response.setHeader("Content-Type", "text/event-stream");
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("UTF-8");
        response.setHeader("Pragma", "no-cache");
        try {
            // 指定事件标识  event: 这个为固定格式
            response.getWriter().write("event:open\n");
            response.getWriter().flush();

            for (int i = 0; i < str.length(); i++) {
                // 指定事件标识  event: 这个为固定格式
//                response.getWriter().write("event:msg\n");
                // 格式:data: + 数据 + 2个回车
                response.getWriter().write("data:{\"content\":\""+ String.valueOf(str.charAt(i)).getBytes(StandardCharsets.UTF_8) + "\"}\n\n");
                response.getWriter().flush();
                Thread.sleep(100);
            }
            // 指定事件标识  event: 这个为固定格式
            response.getWriter().write("event:error\n");
            response.getWriter().flush();
//            response.getWriter().close();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        } finally {
        }
    }

后端请求实现方式

 /**
     * *
     * @param url
     * @param json
     * @return
     */
    public static BufferedReader sendJsonPostResveEventStream(String url, String json) {
        PrintWriter out = null;
        BufferedReader in = null;
        BufferedReader reader = null;
        try {
            log.info("sendPost - {}", url);
            log.info("json - {}", json);
            URL realUrl = new URL(url);
            HttpURLConnection conn = (HttpURLConnection) realUrl.openConnection();
            conn.setRequestMethod("POST");
            conn.setDoOutput(true);
            conn.setDoInput(true);
            conn.setUseCaches(false);
            conn.setRequestProperty("Connection", "Keep-Alive");
            conn.setRequestProperty("Charset", "UTF-8");
            conn.setRequestProperty("Content-Type", "application/json; charset=UTF-8");
            conn.setRequestProperty("accept", "application/json");
            if (json != null && !json.equals("")) {
                byte[] writebytes = json.getBytes();
                conn.setRequestProperty("Content-Length", String.valueOf(writebytes.length));
                OutputStream outwritestream = conn.getOutputStream();
                outwritestream.write(json.getBytes());
                outwritestream.flush();
                outwritestream.close();
                conn.getResponseCode();
            }
            if (conn.getResponseCode() == 200) {
                reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                return reader;
            }
        } catch (ConnectException e) {
            log.error("调用HttpUtils.sendPost ConnectException, url=" + url + ",param=" + json, e);
        } catch (SocketTimeoutException e) {
            log.error("调用HttpUtils.sendPost SocketTimeoutException, url=" + url + ",param=" + json, e);
        } catch (IOException e) {
            log.error("调用HttpUtils.sendPost IOException, url=" + url + ",param=" + json, e);
        } catch (Exception e) {
            log.error("调用HttpsUtil.sendPost Exception, url=" + url + ",param=" + json, e);
        } finally {
            try {
                if (out != null) {
                    out.close();
                }
                if (in != null) {
                    in.close();
                }
            } catch (IOException ex) {
                log.error("调用in.close Exception, url=" + url + ",param=" + json, ex);
            }
        }
        return null;
    }

后端请求然后以事件流的方式发送给前端

    @PostMapping(value = "/talkeAbout", produces = "text/event-stream")
    public void talkeAbout(HttpServletResponse response, @RequestBody JSONObject object) throws IOException {
        response.setHeader("Content-Type", "text/event-stream");
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("UTF-8");
        response.setHeader("Pragma", "no-cache");
        talkeAboutToXinference(object.getString("prompt"), response);
    }

    public void talkeAboutToXinference(String msg, HttpServletResponse response) throws IOException {
        String json = CHAT_PRARAM.replace("user_talke_about", msg);
        BufferedReader reader = HttpUtils.sendJsonPostResveEventStream("http://localhost/chat" + CHAT_CHAT_COMPLETIONS, json);
        if (reader == null) return;
        String line = "";
        while ((line = reader.readLine()) != null) {
            response.getWriter().write(line +"\n");
            response.getWriter().flush();
        }
        response.getWriter().close();
    }


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/767414.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

win10下Python的安装和卸载

前言 之前电脑上安装了python3.9版本&#xff0c;因为工作需要使用3.6版本的Python&#xff0c;需要将3.9版本卸载&#xff0c;重新安装3.6版本。下面就是具体的操作步骤: 1. 卸载 在我的电脑中搜索到3.9版本的安装文件&#xff0c;如下图&#xff1a; 双击该应用程序&#xf…

数据结构历年考研真题对应知识点(树的基本概念)

目录 5.1树的基本概念 5.1.2基本术语 【森林中树的数量、边数和结点数的关系&#xff08;2016&#xff09;】 5.1.3树的性质 【树中结点数和度数的关系的应用&#xff08;2010、2016&#xff09;】 【指定结点数的三叉树的最小高度分析&#xff08;2022&#xff09;】 5.1…

win10显示毫秒-上午-下午及星期几,24小时制

关于毫秒 winr regedit 计算机\HKEY_CURRENT_USER\SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\Advanced 新建ShowSecondsInSystemClock&#xff0c;编辑1显示&#xff0c;不显示就删了它 然后重启 资源管理器可能有多个全部重启&#xff0c;就可以啦 根据自己喜好…

【MySQL】表的操作{创建/查看/修改/删除}

文章目录 1.创建表1.1comment&#xff1a;注释信息1.2存储引擎 2.查看表3.修改表3.1add添加列&#xff0c;对原数据无影响3.2drop删除列3.3modify修改列类型3.4change修改列名3.5rename [to]修改表名 4.删除表5.总结 1.创建表 CREATE TABLE table_name (field1 datatype,field…

昇思MindSpore学习笔记3-02热门LLM及其他AI应用--K近邻算法实现红酒聚类

摘要&#xff1a; 介绍了K近邻算法&#xff0c;记录了MindSporeAI框架使用部分wine数据集进行KNN实验的步聚和方法。包括环境准备、下载红酒数据集、加载数据和预处理、搭建模型、进行预测等。 一、KNN概念 1. K近邻算法K-Nearest-Neighbor(KNN) 用于分类和回归的非参数统计…

2024年下半年系统集成项目管理工程师使用新版教材,该如何备考?

2024年下半年&#xff0c;新版的《系统集成项目管理工程师教程》&#xff08;第3版&#xff09;将被系统集成项目管理工程师使用。许多考生可能会感到迷茫&#xff0c;不知道该如何复习。毕竟教材更新后&#xff0c;以往的资料和真题都变得无用&#xff0c;重点内容和考试方向也…

llm学习-3(向量数据库的使用)

1&#xff1a;数据读取和加载 接着上面的常规操作 加载环境变量---》获取所有路径---》加载文档---》切分文档 代码如下&#xff1a; import os from dotenv import load_dotenv, find_dotenvload_dotenv(find_dotenv()) # 获取folder_path下所有文件路径&#xff0c;储存在…

OV SSL证书年度成本概览:为企业安全护航的经济之选

在当今数字化时代&#xff0c;企业网站不仅是品牌展示的窗口&#xff0c;更是与客户沟通的桥梁。然而&#xff0c;随着网络威胁的不断升级&#xff0c;保护网站安全成为了企业不可忽视的任务。SSL证书&#xff0c;特别是OV SSL证书&#xff0c;因其对企业身份的严格验证&#x…

Halcon OCR字符识别(极坐标转换,字符识别)

Halcon OCR字符识别&#xff08;极坐标转换&#xff0c;字符识别&#xff09; 代码 * 1.加载图片 *************************************************** dev_close_window () read_image (Image, ./img) get_image_size (Image, Width, Height) dev_get_window (WindowHandle…

聚鼎贸易:装饰画开店教程新手入门

当艺术遇上商业&#xff0c;每一幕交易皆是文化的交流。本篇将引领有志于开设装饰画店铺的新手们&#xff0c;迈入创业的门槛&#xff0c;以独特的视角和步骤&#xff0c;探索如何成功经营一家装饰画店。 精选货源乃基石。货源的选取不仅反映店主的品味&#xff0c;更直接影响到…

NPDP|产品经理的沟通协调能力:塑造产品成功的核心力量

在快速发展的商业环境中&#xff0c;产品经理的角色愈发重要。他们不仅要负责产品的战略规划、需求管理、项目管理&#xff0c;更要与团队内外各方进行有效的沟通协调。那么&#xff0c;产品经理的沟通协调能力到底有多重要呢&#xff1f;本文将深入探讨这一话题。 沟通是产品成…

使用css做一个旋转的八卦图

使用css做一个旋转的八卦图 1, html部分 <div class"tai"><div class"bai"></div><div class"hei"></div> </div>2, css部分 .tai{width: 200px;height: 200px;border: 1px solid #000;background: linea…

【Python机器学习】模型评估与改进——回归指标

对于回归问题&#xff0c;可以像分类问题一样进行详细评估&#xff0c;例如&#xff0c;对目标值估计过高与目标值估计过低进行对比分析。 但是&#xff0c;对于我们见过的大多数应用来说&#xff0c;使用默认就足够了&#xff0c;它由所有回归器的score方法给出。业务决策有时…

全面详解菲律宾slots游戏本土网盟广告CPI流量效果分析

全面详解菲律宾slots游戏本土网盟广告CPI流量效果分析 一、引言 随着互联网的普及和移动设备的广泛应用&#xff0c;网络游戏行业迅速崛起&#xff0c;成为全球娱乐市场的一大热门。菲律宾作为东南亚地区的重要国家&#xff0c;其网络游戏市场也呈现出蓬勃的发展势头。在这样的…

AI数字人直播源码出售价格公布!

随着数字人行业的兴起&#xff0c;以数字人直播为代表的应用场景逐渐成为人们日常生活中不可分割的一部分&#xff0c;再加上艾媒研究数据显示&#xff0c;超五成以上的被调查群体的企业使用过虚拟人技术&#xff0c;超三成被调查群体的企业计划使用虚拟人技术等结论的公布&…

网友炸锅:这款iPhone壳竟能直接放保时捷车钥匙?

在当今这个个性化消费时代&#xff0c;高端智能手机及其配件已经成为了展示个人身份和品味的重要途径。最近&#xff0c;一款专为保时捷车主量身定制的iPhone手机壳&#xff0c;在互联网上引发了广泛的关注和讨论。 这款手机壳不仅在设计上凸显了保时捷的品牌logo&#xff0c;…

游戏工作室如何巧妙应对IP封禁风险?

游戏工作室在使用IP时&#xff0c;面临着封号的风险&#xff0c;因此需要采取一些防封技巧来保护自己的运营。以下是一些游戏工作室常用的防封技巧。 1. 多IP轮换 游戏工作室可以使用多个代理IP&#xff0c;并定期轮换它们。这样做可以减少单个IP被频繁访问同一游戏服务器而被…

for循环中list触发fast-fail或不触发的原理和方法

Iterable和Iterator Iterator接口位于的位置是java.util.Iterator&#xff0c;它主要有两个抽象方法供子类实现。hasNext()用来判断还有没有数据可供访问&#xff0c;next()用来访问下一个数据。 集合Collection不是直接去实现Iterator接口&#xff0c;而是去实现Iterable接口…

Pycharm设置远程解释器(本地代码+远程服务器环境)

Pycharm设置远程解释器(本地代码+服务器环境) Pycharm设置远程开发环境: 1.点击IDE左上角“文件”-》点击其中的设置 2:点击项目->点击添加解释器 3: 4根据提示填入远程服务器的IP地址,端口,和用户名: 5:连接上后选择现有环境: 6:选择刚才建立好的conda环境…

ubnutu20.04-vscode安装leetcode插件流程

1.在vscode插件商城选择安装leetcode 2.安装node.js 官网下载一个版本安装流程&#xff1a; ①tar -xvf node-v14.18.0-linux-x64.tar.xz ①sudo ln -s /app/software/nodejs/bin/npm /usr/local/bin/ ②ln -s /app/software/nodejs/bin/node /usr/local/bin/ 查看版本&…