从官方文档可以看到,Flink支持的数据源有如下几个:
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
对于其他的源,它也提供了接口给我们实现,扩展性非常好,今天我们就实现一个从RocketMQ取数据的实现。
打开项目,转到Flink处理类,可以看到:
| 1 | object FlinkKafkaConsumerDemo { | 
我们按住Command,点击FlinkKafkaConsumer查看其结构:
| 1 | public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T>{} | 
可以看到,要实现自定义的Source,只需要实现接口SourceFunction即可,如果需要并行消费,可以实现ParallelSourceFunction。
创建类RocketMQSourceFunction,继承SourceFunction:
| 1 | public class RocketMQSourceFunction implements SourceFunction<String> { | 
首先我们需要准备一个RocketMQ的消费者客户端,打开pom.xml,添加如下依赖:
| 1 | <dependency> | 
对于RocketMQSourceFunction来说,我们需要初始化一个Consumer,所以添加代码如下:
| 1 | public class RocketMQSourceFunction implements SourceFunction<String> { | 
这样,当类在加载的时候,系统会创建一个consumer。对于一个Consumer来说,还需要知道我们要消费的nameSrvAddr和Topic是什么,所以我们添加字段:
| 1 | public class RocketMQSourceFunction implements SourceFunction<String> { | 
重写Run方法:
| 1 | 
 | 
consumer会在接收到消息时,发送消息到sourceContext中,这样Flink的流就可以接收到消息了。同时不要忘了重写cancal方法:
| 1 | 
 | 
这样,一个完整的RocketMQ的数据源接收器我们已经实现好了,在需要用到的Flink代码中加入:
| 1 | object FlinkRocketMQConsumerDemo { | 
这样,Flink即可接收到RocketMQ的消息了。