欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > IT业 > Reactor编程模型中,阻塞上传文件FilePart的3中方式

Reactor编程模型中,阻塞上传文件FilePart的3中方式

2025/9/22 23:53:19 来源:https://blog.csdn.net/m0_37859032/article/details/145159872  浏览:    关键词:Reactor编程模型中,阻塞上传文件FilePart的3中方式

1. 使用非阻塞的方式处理数据

将 Flux 转换为 Mono,并确保整个链式调用是非阻塞的。你可以使用 Mono.fromCallable 或其他非阻塞的方式来处理资源转换。

public Mono<Void> addDocumentsByFile(FilePart file) {String suffix = FileNameUtil.getSuffix(file.filename());return file.content().reduce(DataBuffer::write).map(DataBuffer::toByteBuffer).map(ByteBuffer::array).map(ByteArrayResource::new).flatMap(resource -> {TikaDocumentReader tikaDocumentReader = new TikaDocumentReader(resource);List<Document> splitDocuments;if ("csv".equalsIgnoreCase(suffix)) {splitDocuments = new CsvTextSplitter().apply(tikaDocumentReader.read());} else {splitDocuments = new TokenTextSplitter().apply(tikaDocumentReader.read());}// 将文档存入向量数据库return Mono.fromRunnable(() -> {for (Document doc : splitDocuments) {vectorStore.add(doc); // 假设vectorStore是一个异步接口}});}).then();
}

2. 使用 publishOn 切换到阻塞线程池

如果你确实需要使用阻塞操作,可以使用 publishOn 将操作切换到一个允许阻塞的线程池(例如 (Schedulers.boundedElastic()))。这会确保阻塞操作不会影响到非阻塞线程。

public Mono<Void> addDocumentsByFile(FilePart file) {String suffix = FileNameUtil.getSuffix(file.filename());return file.content().reduce(DataBuffer::write).map(DataBuffer::toByteBuffer).map(ByteBuffer::array).map(ByteArrayResource::new).publishOn(Schedulers.boundedElastic()).flatMap(resource -> {TikaDocumentReader tikaDocumentReader = new TikaDocumentReader(resource);List<Document> splitDocuments;if ("csv".equalsIgnoreCase(suffix)) {splitDocuments = new CsvTextSplitter().apply(tikaDocumentReader.read());} else {splitDocuments = new TokenTextSplitter().apply(tikaDocumentReader.read());}// 将文档存入向量数据库for (Document doc : splitDocuments) {vectorStore.add(doc); // 假设vectorStore是一个同步接口}return Mono.empty();});
}

3. 使用 Mono.fromCallable 和 subscribeOn

如果你需要在一个特定的线程上执行阻塞操作,可以使用 Mono.fromCallable 并结合 subscribeOn 来确保操作在正确的线程上执行。

public Mono<Void> addDocumentsByFile(FilePart file) {String suffix = FileNameUtil.getSuffix(file.filename());return file.content().reduce(DataBuffer::write).map(DataBuffer::toByteBuffer).map(ByteBuffer::array).map(ByteArrayResource::new).flatMap(resource -> Mono.fromCallable(() -> {TikaDocumentReader tikaDocumentReader = new TikaDocumentReader(resource);List<Document> splitDocuments;if ("csv".equalsIgnoreCase(suffix)) {splitDocuments = new CsvTextSplitter().apply(tikaDocumentReader.read());} else {splitDocuments = new TokenTextSplitter().apply(tikaDocumentReader.read());}// 将文档存入向量数据库for (Document doc : splitDocuments) {vectorStore.add(doc); // 假设vectorStore是一个同步接口}return null;}).subscribeOn(Schedulers.boundedElastic())).then();
}

总结

为了避免阻塞操作导致的问题,建议尽量使用非阻塞的方式处理数据流。如果必须使用阻塞操作,可以通过 publishOn 或 subscribeOn 将操作切换到允许阻塞的线程池。这样可以确保你的应用程序在非阻塞环境中正常工作。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词