上篇文章我们分析了Otter的代码结构:
《改造阿里巴巴otter框架,使其支持RocketMQ配置和输出(一)》:快速链接
这篇我们开始正式进行改造。
改造清单
- 定义RocketMQ实体,定义一个RocketMQ的目标源。
- 改造管理系统的逻辑以及前端,支持RocketMQ的配置。
- 改造Transerformer模块,支持将数据转换成RocketMQ需要的格式。
- 改造Load模块,支持将数据向RocketMQ发送。
定义RocketMQ
otter 中定义一个数据源是由DataMediaSource
决定的。
1 | public class DataMediaSource implements Serializable { |
查看这个表的继承类,可以看到DbMediaSource
,定义了url
、username
、password
、driver
等连接数据库必须要的参数。于是我就照着他的实现方式来进行编写支持RocketMQ的新实现。
RocketMQMediaSource
编写类RocketMQMediaSource
,定义连接到RocketMQ的必要参数:
1 | public class RocketMqMediaSource extends DataMediaSource { |
同时,在枚举类DataMediaType
添加名称为ROCKETMQ
的枚举。
RocketMQDataMedia
实现DataMedia类DataMedia<Source extends DataMediaSource>
,泛型定义为RocketMQMediaSource
1 | public class RocketMqDataMedia extends DataMedia<RocketMQMediaSource> {} |
此类定义了一种数据媒介为RocketMQ。
改造管理系统
添加数据源页面
打开addDataSource.vm
可以看到添加数据源的前端代码,此代码是由模板引擎velocity编写的,Java会根据指定的格式渲染HTML页面展示给客户端。找到数据源下拉框Select,添加RocketMQ选项。
1 | <td> |
对于sourceType这个参数,后端可以识别为枚举类DataMediaType.ROCKETMQ
,所以后端可以不需要修改其他的地方。
但是RocketMQ还需要填写额外的参数gourpName
,我们需要对前端进行一定的改造,由于作者在这里预留了changeFrom()
函数,触发条件为select
下拉框被改变的时候,那么我们就可以判断当其改变时,作出相应的表单内容调整。
首先我们定义需要填写的groupName:
1 | <tr id="group_name_tr" style="display: none;"> <!-- 注意第一行,很重要 --> |
重写changeForm()
函数:
1 | function changeform() { |
这样我们就可以根据下拉框自动展示需要填写的表单了:
验证连接数据源改造
同时,需要将验证连接数据源,这个按钮所请求的后端逻辑进行改造,否则无法通过提交:
根据DEBUG我们得知,该验证逻辑定义在DataSourceChecker
类的check()方法中,所以我们对该方法进行改造:
1 | public String check(String url, String username, String password, String encode, String sourceType, String groupName) { |
改造后,系统可以根据入参sourceType执行不同的验证逻辑。对于RocketMQ的验证逻辑目前我只是将连接打开再关闭,如果没有报错则返回成功,但是目前看下来,好像这个逻辑并不严谨,希望讨论获得更好的方案:
1 | private String checkMQ(String url, String username, String password, String encode, String sourceType, String groupName) { |
实现RocketMQTransformer
Otter对从Canal拉取到的数据进行了四步处理,分别是:Select、Extract、Transform、Load。
对于新的输出来说,只需要改造Transformer和Load即可。
查看源代码,找到node模块下的包com.alibaba.otter.node.etl.transform
,可以看到基于MySQL和Oracle的实现:
该实现继承自类com.alibaba.otter.node.etl.transform.transformer.OtterTransformer
:
1 | public interface OtterTransformer<S, T> { |
所以我们只需要实现该接口即可:
1 | public class RocketMQTransformer implements OtterTransformer<EventData, EventData> { |
RocketMQTransformer
是OtterTransformer
的RocketMQ版本实现,主要是根据使用方的需求进行数据的整理封装,返回合理的数据。这里我们将每个字段的改变前和改变后作为值存入数据集。
实现RocketMQLoader
实现Loader也是相同的,找到Loader接口com.alibaba.otter.node.etl.load.loader.OtterLoader
:
1 | public interface OtterLoader<P, R> { |
进行实现:
1 | public class RocketMQLoader implements OtterLoader<DbBatch, List<LoadContext>>, BeanFactoryAware { |
由于篇幅限制,我省略了一些实现,包括管道配置、RocketMQClient工具类编写。
更多代码请参考我Fork出来的github项目:https://github.com/lzx2005/otter,我修改的代码在分支mq-extend-try-1
中。
接下来的第三篇文章就是对我们新改造的模块进行各方面的压测,确保其可以继承Otter优秀的性能,同时也能完美做到组件扩展。
未完待续。