SpringAi --- HelloWorld
Mr.Lee 2025-04-09 11:16:23 SpringAiSpringWebFlux
SpringAi 最近发布的 1.0 的快照版本, 今天我们来体验一下.
我这里使用本地的 Ollama, 做为示例程序. 模型选用了 llama3.1:8b
和 gemma3:1b
.
import cn.hutool.core.util.StrUtil;
import com.striveonger.common.core.thread.ThreadKit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ai.chat.messages.Message;
import org.springframework.ai.chat.messages.UserMessage;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.model.Generation;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.ai.ollama.OllamaChatModel;
import org.springframework.ai.ollama.api.OllamaApi;
import org.springframework.ai.ollama.api.OllamaOptions;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Lee
* @since 2025-04-08 17:51
*/
public class TestLLM {
private final Logger log = LoggerFactory.getLogger(TestLLM.class);
private OllamaChatModel model;
@BeforeEach
public void setUp() {
String s = "gemma3:1b";
// String s = "llama3.1:8b";
model = OllamaChatModel
.builder()
.ollamaApi(new OllamaApi("http://localhost:11434"))
.defaultOptions(OllamaOptions
.builder()
.model(s)
.temperature(0.95) // 思维活跃度
.build()
).build();
}
@Test
public void testHello() {
log.info("hello");
Prompt prompt = new Prompt(List.of(new UserMessage("你好")));
ChatResponse response = model.call(prompt);
log.info(response.getResult().getOutput().getText());
}
@Test
public void testAsync() {
var ref = new Object() {
long start = 0L;
String id = "";
};
List<String> messages = List.of(
"你是一个运维开发工程师, 你需要根据以下规则来回答用户的问题",
"k8s中污点和容忍度的概念是什么?"
);
// messages = List.of("以下请用中文回答问题", "你好~");
// 构建提示词
Prompt prompt = new Prompt(messages.stream().map(UserMessage::new).map(Message.class::cast).toList());
// 流式响应体
Flux<ChatResponse> flux = model.stream(prompt);
// 订阅流
Disposable subscribe = flux
.doFirst(() -> {
log.info("Start....");
ref.start = System.currentTimeMillis();
})
.doOnNext(response -> {
if (StrUtil.isBlank(ref.id)) {
ref.id = response.getMetadata().getId();
log.info("id: {}", ref.id);
}
Generation result = response.getResult();
log.info(result.getOutput().getText());
})
.doOnComplete(() -> {
log.info("Done....");
ThreadKit.queue().offer("done");
})
.doOnCancel(() -> {
ThreadKit.queue().offer("cancel");
log.info("Cancel....");
})
.doOnError(e -> {
log.error("Error....", e);
ThreadKit.queue().offer("error");
})
.subscribe();
ThreadKit.run(() -> {
// 阻塞 5 秒
ThreadKit.sleep(5, TimeUnit.SECONDS);
// 取消订阅
subscribe.dispose();
});
Object status = ThreadKit.queue().take();
log.info("status: {}", status);
log.info("Time-consuming: {}", System.currentTimeMillis() - ref.start);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
非阻塞式(流式)响应, 让我想起了 webflux
......
Flux 和 Mono 都是 reactor-core 包中核心类
在 Spirng-Webflux
中Flux
类是响应式编程框架的核心组件, 主要用于处理异步, 非阻塞的流式数据序列. 是Spring WebFlux模块的基础, 适用于高并发, 实时数据推送等场景.
- **Mono: ** 表示单值或空值的异步序列
- **Flux: ** 处理多值异步数据流
Flux的核心作用
- 处理多元素数据流
Flux
表示一个包含0到N个元素的异步序列, 适用于需要返回多个结果的场景, 例如数据库查询返回多条记录, 实时消息推送或流式API响应. - 非阻塞式编程
基于响应式编程模型,
Flux
通过事件驱动机制实现异步处理, 避免传统Servlet模型的线程阻塞问题, 提升系统吞吐量和资源利用率. - 背压(Backpressure)支持
能够动态控制数据流速, 防止生产者过快导致消费者过载. 例如: 通过
onBackpressureBuffer
设置缓冲区大小或通过limitRate
限制请求速率.