欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 新车 > Java版Flink使用指南——分流导出

Java版Flink使用指南——分流导出

2025/6/17 20:31:39 来源:https://blog.csdn.net/breaksoftware/article/details/140246914  浏览:    关键词:Java版Flink使用指南——分流导出

大纲

  • 新建工程
  • 编码
    • Pom.xml
    • 自定义无界流
    • 分流
  • 测试
  • 工程代码

在之前的案例中,我们一直使用的是单个Sink来做数据的输出。实际上,Flink是支持多个输出流的。本文我们就来讲解如何在Flink数据输出时做分流处理。
我们将基于《Java版Flink使用指南——自定义无界流生成器》的输入流,按生成数字的奇偶性,将其分流输出到不同的RabbitMQ队列中。

新建工程

我们新建一个名字叫MultiSinkTo的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
在这里插入图片描述

编码

Pom.xml

因为我们要往RabbitMQ中输出,所以需要引入相关连接组件。

		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version></dependency>

自定义无界流

新建src/main/java/org/example/generator/UnBoundedStreamGenerator.java
这块的代码可以见《Java版Flink使用指南——自定义无界流生成器》
它会每隔1秒钟生成一个递增的数字

package org.example.generator;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class UnBoundedStreamGenerator extends RichSourceFunction<Long> {private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<Long> ctx) throws Exception {long count = 0L;while (isRunning) {Thread.sleep(1000); // Simulate delayctx.collect(count++); // Emit data}}@Overridepublic void cancel() {isRunning = false;System.out.println("UnBoundedStreamGenerator canceled");}
}

分流

我们通过下面的代码生成数据流

		DataStreamSource<Long> longDataStreamSource = env.addSource(new UnBoundedStreamGenerator());

然后奇数发布到odd.data.to.rbtmq队列;偶数发布到even.data.to.rbtmq。
分流主要是通过filter来区分数据,然后针对不同的数据addSink来发布到不同的队列。
如果不需要区分数据,只是将相同的数据发布到不同的目的地,则可以直接多次addSink来达成。

		String host = "172.25.103.252"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();int parallelism = 1;String oddSinkQueueName = "odd.data.to.rbtmq"; RMQSink<String> oddRMQSink = new RMQSink<>(rmqConnectionConfig, oddSinkQueueName, new SimpleStringSchema());longDataStreamSource.filter(value -> value % 2 != 0).map(Object::toString).addSink(oddRMQSink).setParallelism(parallelism).name("oddSink");String evenSinkQueueName = "even.data.to.rbtmq";RMQSink<String> evenRMQSink = new RMQSink<>(rmqConnectionConfig, evenSinkQueueName, new SimpleStringSchema());longDataStreamSource.filter(value -> value % 2 == 0).map(Object::toString).addSink(evenRMQSink).setParallelism(parallelism).name("evenSink");

测试

执行一段时间后,我们看到两个队列相序增加
在这里插入图片描述
奇数队列
在这里插入图片描述
偶数队列
在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词