模块整体目录

seatunnel-transforms-v2 转换模块整体目录如下:

├─src
│  ├─main
│  │  └─java
│  │      └─org
│  │          └─apache
│  │              └─seatunnel
│  │                  └─transform
│  │                      ├─common
│  │                      ├─contains
│  │                      ├─copy
│  │                      ├─exception
│  │                      ├─fieldmapper
│  │                      ├─filter
│  │                      ├─filterrowkind
│  │                      ├─replace
│  │                      ├─split
│  │                      └─sql
│  │                          └─zeta
│  │                              └─functions
│  │                                  └─udf

自定义插件 Contains

介绍

插件说明:可以指定一个字段,比如包含指定的字符串,才会到 sink。

应用场景:比如我们有时候监听 Kafka,只关心包含 Exception 的字段进行处理。

实现类

整体如下:

Mode                 LastWriteTime         Length Name
----                 -------------         ------ ----                                                                                         
-a----         2024/1/31     10:16           3815 ContainsTransform.java
-a----         2024/1/31      9:47           1606 ContainsTransformConfig.java
-a----         2024/1/31      9:52           1976 ContainsTransformFactory.java

直接模仿 FilterRowKindTransform 编写。

源码

ContainsTransform

package org.apache.seatunnel.transform.contains;

import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.transform.common.FilterRowTransform;

@Slf4j
@AutoService(SeaTunnelTransform.class)
@NoArgsConstructor
public class ContainsTransform extends FilterRowTransform {
    public static String PLUGIN_NAME = "Contains";

    private Integer fieldPos = null;
    private String containsValue = null;

    public ContainsTransform(
            @NonNull ReadonlyConfig config, @NonNull CatalogTable inputCatalogTable) {
        super(inputCatalogTable);
        initConfig(config);
    }

    @Override
    public String getPluginName() {
        return PLUGIN_NAME;
    }

    private void initConfig(ReadonlyConfig config) {
        this.fieldPos = config.get(ContainsTransformConfig.KEY_FIELD_POS);
        this.containsValue = config.get(ContainsTransformConfig.KEY_CONTAINS_VALUE);

        if (fieldPos== null
                || StringUtils.isEmpty(containsValue)) {
            throw new SeaTunnelRuntimeException(
                    CommonErrorCode.ILLEGAL_ARGUMENT,
                    String.format(
                            "These options(%s,%s) must be config.",
                            ContainsTransformConfig.KEY_FIELD_POS.key(),
                            ContainsTransformConfig.KEY_CONTAINS_VALUE.key()));
        }
    }

    @Override
    protected void setConfig(Config pluginConfig) {
        ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
                .validate(new ContainsTransformFactory().optionRule());
        initConfig(ReadonlyConfig.fromConfig(pluginConfig));
    }

    @Override
    protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
        if(inputRow == null) {
            return null;
        }

        // 不满足,直接返回 null
        Object value = inputRow.getField(fieldPos);
        log.info("[ContainsTransfer] value of fieldPos={} is {}, return", fieldPos, value);

        if(value == null) {
            return null;
        }

        String fieldValueString = value.toString();
        if(fieldValueString.contains(containsValue)) {
            return inputRow;
        } else {
            log.info("[ContainsTransfer] value of fieldPos={} is {}, not match contains={}, ignore", fieldPos, fieldValueString, containsValue);
            return null;
        }
    }
}

其他的部分,只是针对 fieldPos 和 containsValue 的配置获取处理。

测试

数据库准备

mysql> select * from user_info;
+----+----------+---------------------+---------------------+
| id | username | create_time         | update_time         |
+----+----------+---------------------+---------------------+
|  1 | u1       | 2024-01-29 15:31:03 | 2024-01-29 15:31:03 |
|  2 | u2       | 2024-01-29 15:31:03 | 2024-01-29 15:31:03 |
|  3 | u3       | 2024-01-29 15:31:03 | 2024-01-29 15:31:03 |
|  4 | u4       | 2024-01-29 15:31:03 | 2024-01-29 15:31:03 |
+----+----------+---------------------+---------------------+

user_info 表中存在的数据,只是为了测试。

fieldPos = 1,对应的字段实际就是 username。

containsValue = “u3”,则只处理 username.contains(“u3”) 的数据到 sink

配置文件

# Defining the runtime environment
env {
  # You can set flink configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 10000
}
source{
    MySQL-CDC {
        base-url = "jdbc:mysql://localhost:3306/migrate_source?useSSL=false&serverTimezone=Asia/Shanghai"
        driver = "com.mysql.cj.jdbc.Driver"
        username = "admin"
        password = "123456"
        table-names = ["migrate_source.user_info"]

        startup.mode = "initial"
    }
}

transform {
    Contains {
        fieldPos = 1
        containsValue = "u3"
    }
}

sink {
    ConsoleBinlog {
    }

}

测试效果

最后的 console 只有 1 条记录:

2024-01-31 10:14:04.122 ~~~~~~~~~~~~ SeaTunnelRow{tableId=migrate_source.user_info, kind=+I, fields=[3, u3, 2024-01-29T15:31:03, 2024-01-29T15:31:03]}

参考资料

https://github.com/apache/seatunnel/issues/5555

[Bug] [Zeta Engine] the checkpoint lock cause checkpoint-flow blocking with long time