Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

如何开发自定义数据处理插件 #13

Open
zhangkewei opened this issue Jan 21, 2019 · 0 comments
Open

如何开发自定义数据处理插件 #13

zhangkewei opened this issue Jan 21, 2019 · 0 comments
Labels
question Further information is requested

Comments

@zhangkewei
Copy link
Collaborator

zhangkewei commented Jan 21, 2019

假设我们要将mysql表T_USER同步到目标端Oracle T_USER_2,源端表T_USER表结构与目标端表T_USER_2一致。我们的需求是只保留FLAG字段等于0的用户数据。

需求有了,接下来我们就要实现EventProcessor接口做自定义数据过滤

	package cn.vbill.middleware.porter.plugin;
	public class UserFilter implements cn.vbill.middleware.porter.core.event.s.EventProcessor {
    @Override
    public void process(ETLBucket etlBucket) {
        List<ETLRow> rows = etlBucket.getRows().stream().filter(r -> {
            //第一步 找到表名为T_USER的记录
            boolean tableMatch = r.getFinalTable().equalsIgnoreCase("T_USER");
            if (!tableMatch) return tableMatch;
            //第二步 找到字段FLAG的值不等于0的记录
            boolean columnMatch = r.getColumns().stream().filter(c -> c.getFinalName().equalsIgnoreCase("FLAG")
            && (null == c.getFinalValue() || !c.getFinalValue().equals("0"))).count() > 0;
            return tableMatch && columnMatch;
        }).collect(Collectors.toList());
        //第三步 清除不符合条件的集合
        etlBucket.getRows().removeAll(rows);
    }
}

在任务中指定自定义数据处理插件:

以下配置文件格式适用配置管理后台"同步管理->高级任务配置(原菜单名:本地任务)->新增"
如果是本地任务配置文件需要增加前缀"porter.task[任务下标,从0开始]"

taskId=任务ID
nodeId=节点1,节点2,节点3
consumer.consumerName=CanalFetch
consumer.converter=canalRow
consumer.source.sourceType=CANAL
consumer.source.slaveId=0
consumer.source.address=127.0.0.1:3306
consumer.source.database=数据库
consumer.source.username=账号
consumer.source.password=密码
consumer.source.filter=*.\.t_user
consumer.eventProcessor.className=cn.vbill.middleware.porter.plugin.UserFilter
consumer.eventProcessor.content=/path/UserFilter.class(xxx.jar包,xxx.java类)

loader.loaderName=JdbcBatch #目标端插件
loader.source.sourceType=JDBC
loader.source.dbType=ORACLE
loader.source.url=jdbc:oracle:thin:@//127.0.0.1:1521/oracledb
loader.source.userName=demo
loader.source.password=demo

mapper[0].auto=false
mapper[0].table=T_USER,T_USER_2

@zhangkewei zhangkewei added the question Further information is requested label Jan 21, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

1 participant