从官方文档可以看到,Flink支持的数据源有如下几个:
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
对于其他的源,它也提供了接口给我们实现,扩展性非常好,今天我们就实现一个从RocketMQ取数据的实现。
打开项目,转到Flink处理类,可以看到:
1 | object FlinkKafkaConsumerDemo { |
我们按住Command,点击FlinkKafkaConsumer
查看其结构:
1 | public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T>{} |
可以看到,要实现自定义的Source,只需要实现接口SourceFunction
即可,如果需要并行消费,可以实现ParallelSourceFunction
。
创建类RocketMQSourceFunction
,继承SourceFunction
:
1 | public class RocketMQSourceFunction implements SourceFunction<String> { |
首先我们需要准备一个RocketMQ的消费者客户端,打开pom.xml
,添加如下依赖:
1 | <dependency> |
对于RocketMQSourceFunction
来说,我们需要初始化一个Consumer,所以添加代码如下:
1 | public class RocketMQSourceFunction implements SourceFunction<String> { |
这样,当类在加载的时候,系统会创建一个consumer。对于一个Consumer来说,还需要知道我们要消费的nameSrvAddr和Topic是什么,所以我们添加字段:
1 | public class RocketMQSourceFunction implements SourceFunction<String> { |
重写Run方法:
1 |
|
consumer会在接收到消息时,发送消息到sourceContext中,这样Flink的流就可以接收到消息了。同时不要忘了重写cancal方法:
1 |
|
这样,一个完整的RocketMQ的数据源接收器我们已经实现好了,在需要用到的Flink代码中加入:
1 | object FlinkRocketMQConsumerDemo { |
这样,Flink即可接收到RocketMQ的消息了。