贡献转换指南

本文档描述了如何理解、开发和贡献一个转换。

我们还提供了转换端到端测试,以验证转换对数据的输入和输出。

概念

使用SeaTunnel,您可以通过连接器读取或写入数据,但如果您需要在读取之后或写入之前处理数据,则需要使用转换。

使用转换对数据行或字段进行简单的编辑,例如拆分字段、更改字段值、添加或删除字段。

数据类型转换

转换从上游(源或转换)接收数据类型输入,并将新的数据类型输出到下游(接收器或转换),这个过程称为数据类型转换。

例子

示例1:移除字段

| A         | B         | C         |
|-----------|-----------|-----------|
| STRING    | INT       | BOOLEAN   |

| A         | B         |
|-----------|-----------|
| STRING    | INT       |

Example 2:Sort fields

| B         | C         | A         |
|-----------|-----------|-----------|
| INT       | BOOLEAN   | STRING    |

| A         | B         | C         |
|-----------|-----------|-----------|
| STRING    | INT       | BOOLEAN   |

Example 3:Update fields datatype

| A         | B         | C         |
|-----------|-----------|-----------|
| STRING    | INT       | BOOLEAN   |


| A         | B         | C         |
|-----------|-----------|-----------|
| STRING    | STRING    | STRING    |

Example 4:Add new fields

| A         | B         | C         |
|-----------|-----------|-----------|
| STRING    | INT       | BOOLEAN   |


| A         | B         | C         | D         |
|-----------|-----------|-----------|-----------|
| STRING    | INT       | BOOLEAN   | DOUBLE    |

数据转换

在数据类型转换后,转换将从上游(源或转换)接收数据行输入,编辑成具有新数据类型的数据行,然后输出到下游(接收器或转换)。这个过程称为数据转换。

翻译

转换与执行引擎解耦,任何转换实现都可以在所有引擎上运行而无需更改代码和配置,这需要翻译层来适应转换和执行引擎。

示例:翻译数据类型和数据

Original:

| A         | B         | C         |
|-----------|-----------|-----------|
| STRING    | INT       | BOOLEAN   |

Datatype translation:

| A                 | B                 | C                 |
|-------------------|-------------------|-------------------|
| ENGINE<STRING>    | ENGINE<INT>       | ENGINE<BOOLEAN>   |

Data translation:

| A                 | B                 | C                 |
|-------------------|-------------------|-------------------|
| ENGINE<"test">    | ENGINE<1>         |  ENGINE<false>    |

core apis

SeaTunnelTransform核心API

SeaTunnelTransform提供了所有主要和基本的API,您可以通过继承它来执行任何转换操作。

从上游接收数据类型的输入。

/**
 * 设置输入数据的数据类型信息。
 *
 * @param inputDataType 上游输入的数据类型信息。
 */
void setTypeInfo(SeaTunnelDataType<T> inputDataType);

向下游输出新的数据类型。

/**
 * 获取此转换生成的记录的数据类型。
 *
 * @return 生成的数据类型。
 */
SeaTunnelDataType<T> getProducedType();

编辑输入数据并将新数据输出到下游。

/**
 * 将输入数据转换为{@link this#getProducedType()}类型的数据。
 *
 * @param row 需要转换的数据。
 * @return 转换后的数据。
 */
T map(T row);

SingleFieldOutputTransform

SingleFieldOutputTransform是一个抽象的单字段更改操作符。

定义输出字段

/**
 * 输出新字段
 *
 * @return
 */
protected abstract String getOutputFieldName();

定义输出字段的数据类型

/**
 * 输出新字段的数据类型
 *
 * @return
 */
protected abstract SeaTunnelDataType getOutputFieldDataType();

定义输出字段的值

/**
 * 输出新字段的值
 *
 * @param inputRow 上游输入的inputRow。
 * @return
 */
protected abstract Object getOutputFieldValue(SeaTunnelRowAccessor inputRow);

MultipleFieldOutputTransform

MultipleFieldOutputTransform是一个抽象的多字段更改操作符。

定义输出字段

/**
 * 输出新字段
 *
 * @return
 */
protected abstract String[] getOutputFieldNames();

定义输出字段的数据类型

/**
 * 输出新字段的数据类型
 *
 * @return
 */
protected abstract SeaTunnelDataType[] getOutputFieldDataTypes();

定义输出字段的值

/**
 * 输出新字段的值
 *
 * @param inputRow 上游输入的inputRow。
 * @return
 */
protected abstract Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow);

AbstractSeaTunnelTransform

AbstractSeaTunnelTransform是一个抽象的数据类型和字段更改操作符。

转换输入行类型并输出新行类型

/**
 * 输出转换后的行类型。
 *
 * @param inputRowType 上游输入的行类型
 * @return
 */
protected abstract SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType);

转换输入行数据并输出新行数据

/**
 * 输出转换后的行数据。
 * 
 * @param inputRow 上游输入的行数据
 * @return
 */
protected abstract SeaTunnelRow transformRow(SeaTunnelRow inputRow);

开发一个转换

必须实现以下API之一:

  • SeaTunnelTransform
  • AbstractSeaTunnelTransform
  • SingleFieldOutputTransform
  • MultipleFieldOutputTransform

将实现的子类添加到seatunnel-transforms-v2模块中。

例如:将字段复制到新字段

@AutoService(SeaTunnelTransform.class)
public class CopyFieldTransform extends SingleFieldOutputTransform {

    private String srcField;
    private int srcFieldIndex;
    private SeaTunnelDataType srcFieldDataType;
    private String destField;

    @Override
    public String getPluginName() {
        return "Copy";
    }

    @Override
    protected void setConfig(Config pluginConfig) {
        this.srcField = pluginConfig.getString("src_field");
        this.destField = pluginConfig.getString("dest_fields");
    }

    @Override
    protected void setInputRowType(SeaTunnelRowType inputRowType) {
        srcFieldIndex = inputRowType.indexOf(srcField);
        srcFieldDataType = inputRowType.getFieldType(srcFieldIndex);
    }

    @Override
    protected String getOutputFieldName() {
        return destField;
    }

    @Override
    protected SeaTunnelDataType getOutputFieldDataType() {
        return srcFieldDataType;
    }

    @Override
    protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
        return inputRow.getField(srcFieldIndex);
    }
}
  • getPluginName方法用于标识转换名称。
  • @AutoService用于自动生成META-INF/services/org.apache.seatunnel.api.transform.SeaTunnelTransform文件。
  • setConfig方法用于注入用户配置。

转换测试工具

一旦添加了新的插件,建议为其添加端到端测试。我们有一个seatunnel-e2e/seatunnel-transforms-v2-e2e模块来帮助您执行这项任务。

例如,如果您想为CopyFieldTransform添加一个端到端测试,可以在seatunnel-e2e/seatunnel-transforms-v2-e2e模块中创建一个新的测试,并在测试中扩展TestSuiteBase类。

public class TestCopyFieldTransformIT extends TestSuiteBase {

    @TestTemplate
    public void testCopyFieldTransform(TestContainer container) {
        Container.ExecResult execResult = container.executeJob("/copy_transform.conf");
        Assertions.assertEquals(0, execResult.getExitCode());
    }
}

一旦您的测试用例实现了TestSuiteBase接口并使用@TestTemplate注解启动,它将在所有引擎上运行作业,您只需要使用SeaTunnel配置文件执行executeJob方法,它将提交SeaTunnel作业。

参考资料

https://seatunnel.apache.org/docs/2.3.3/contribution/contribute-transform-v2-guide