Striveonger

vuePress-theme-reco Mr.Lee    2015 - 2025
Striveonger Striveonger
主页
分类
  • 文章
  • 笔记
  • 工具
标签
时间轴
author-avatar

Mr.Lee

264

Article

134

Tag

主页
分类
  • 文章
  • 笔记
  • 工具
标签
时间轴

SpringAi --- HelloWorld

vuePress-theme-reco Mr.Lee    2015 - 2025

SpringAi --- HelloWorld

Mr.Lee 2025-04-09 11:16:23 SpringAiSpringWebFlux

SpringAi 最近发布的 1.0 的快照版本, 今天我们来体验一下.

我这里使用本地的 Ollama, 做为示例程序. 模型选用了 llama3.1:8b 和 gemma3:1b.

import cn.hutool.core.util.StrUtil;
import com.striveonger.common.core.thread.ThreadKit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ai.chat.messages.Message;
import org.springframework.ai.chat.messages.UserMessage;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.model.Generation;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.ai.ollama.OllamaChatModel;
import org.springframework.ai.ollama.api.OllamaApi;
import org.springframework.ai.ollama.api.OllamaOptions;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @author Mr.Lee
 * @since 2025-04-08 17:51
 */
public class TestLLM {
    private final Logger log = LoggerFactory.getLogger(TestLLM.class);

    private OllamaChatModel model;

    @BeforeEach
    public void setUp() {
        String s = "gemma3:1b";
        // String s = "llama3.1:8b";
        model = OllamaChatModel
                .builder()
                .ollamaApi(new OllamaApi("http://localhost:11434"))
                .defaultOptions(OllamaOptions
                        .builder()
                        .model(s)
                        .temperature(0.95) // 思维活跃度
                        .build()
                ).build();
    }

    @Test
    public void testHello() {
        log.info("hello");
        Prompt prompt = new Prompt(List.of(new UserMessage("你好")));
        ChatResponse response = model.call(prompt);
        log.info(response.getResult().getOutput().getText());
    }

    @Test
    public void testAsync() {
        var ref = new Object() {
            long start = 0L;
            String id = "";
        };
        List<String> messages = List.of(
                "你是一个运维开发工程师, 你需要根据以下规则来回答用户的问题",
                "k8s中污点和容忍度的概念是什么?"
        );
        // messages = List.of("以下请用中文回答问题", "你好~");
        // 构建提示词
        Prompt prompt = new Prompt(messages.stream().map(UserMessage::new).map(Message.class::cast).toList());
        // 流式响应体
        Flux<ChatResponse> flux = model.stream(prompt);
        // 订阅流
        Disposable subscribe = flux
                .doFirst(() -> {
                    log.info("Start....");
                    ref.start = System.currentTimeMillis();
                })
                .doOnNext(response -> {
                    if (StrUtil.isBlank(ref.id)) {
                        ref.id = response.getMetadata().getId();
                        log.info("id: {}", ref.id);
                    }
                    Generation result = response.getResult();
                    log.info(result.getOutput().getText());
                })
                .doOnComplete(() -> {
                    log.info("Done....");
                    ThreadKit.queue().offer("done");
                })
                .doOnCancel(() -> {
                    ThreadKit.queue().offer("cancel");
                    log.info("Cancel....");
                })
                .doOnError(e -> {
                    log.error("Error....", e);
                    ThreadKit.queue().offer("error");
                })
                .subscribe();

        ThreadKit.run(() -> {
            // 阻塞 5 秒
            ThreadKit.sleep(5, TimeUnit.SECONDS);
            // 取消订阅
            subscribe.dispose();
        });

        Object status = ThreadKit.queue().take();
        log.info("status: {}", status);
        log.info("Time-consuming: {}", System.currentTimeMillis() - ref.start);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107

非阻塞式(流式)响应, 让我想起了 webflux......

Flux 和 Mono 都是 reactor-core 包中核心类

在 Spirng-Webflux 中Flux 类是响应式编程框架的核心组件, 主要用于处理异步, 非阻塞的流式数据序列. 是Spring WebFlux模块的基础, 适用于高并发, 实时数据推送等场景.

  • **Mono: ** 表示单值或空值的异步序列
  • **Flux: ** 处理多值异步数据流

Flux的核心作用

  1. 处理多元素数据流 Flux表示一个包含0到N个元素的异步序列, 适用于需要返回多个结果的场景, 例如数据库查询返回多条记录, 实时消息推送或流式API响应.
  2. 非阻塞式编程 基于响应式编程模型, Flux通过事件驱动机制实现异步处理, 避免传统Servlet模型的线程阻塞问题, 提升系统吞吐量和资源利用率.
  3. 背压(Backpressure)支持 能够动态控制数据流速, 防止生产者过快导致消费者过载. 例如: 通过onBackpressureBuffer设置缓冲区大小或通过limitRate限制请求速率.