SQL UDF transform 插件

描述

使用 UDF SPI 来扩展 SQL transform 的函数库。

UDF API

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package org.apache.seatunnel.transform.sql.zeta; public interface ZetaUDF { /** * Function name * * @return function name */ String functionName(); /** * The type of function result * * @param argsType input arguments type * @return result type */ SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> argsType); /** * Evaluate * * @param args input arguments * @return result value */ Object evaluate(List<Object> args); }

SQL UDF transform 插件

描述

使用 UDF SPI 扩展 SQL transform 的函数库。

UDF 实现示例

在 Maven 项目中添加 transform-v2 依赖,并将其作为 provided 作用域:

  [xml]
1
2
3
4
5
6
7
8
9
10
11
12
13
<dependency> <groupId>org.apache.seatunnel</groupId> <artifactId>seatunnel-transforms-v2</artifactId> <version>2.3.x</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.seatunnel</groupId> <artifactId>seatunnel-api</artifactId> <version>2.3.x</version> <scope>provided</scope> </dependency>

创建一个实现 ZetaUDF 接口的 Java 类,示例代码如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.google.auto.service.AutoService; import org.apache.seatunnel.api.data.SeaTunnelDataType; import org.apache.seatunnel.api.udf.ZetaUDF; import org.apache.seatunnel.api.udf.types.BasicType; import java.util.List; @AutoService(ZetaUDF.class) public class ExampleUDF implements ZetaUDF { @Override public String functionName() { return "EXAMPLE"; } @Override public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> argsType) { return BasicType.STRING_TYPE; } @Override public Object evaluate(List<Object> args) { String arg = (String) args.get(0); if (arg == null) return null; return "UDF: " + arg; } }

打包 UDF 项目并将 JAR 文件复制到路径:${SEATUNNEL_HOME}/lib

示例:

从源数据中读取的数据表如下:

  [plaintext]
1
2
3
4
5
id name age 1 Joy Ding 20 2 May Ding 21 3 Kin Dom 24 4 Joy Dom 22

我们使用 SQL 查询的 UDF 来转换源数据,如下所示:

  [yaml]
1
2
3
4
5
6
7
transform { Sql { source_table_name = "fake" result_table_name = "fake1" query = "select id, example(name) as name, age from fake" } }

然后,结果表 fake1 中的数据将被更新为:

  [plaintext]
1
2
3
4
5
id name age 1 UDF: Joy Ding 20 2 UDF: May Ding 21 3 UDF: Kin Dom 24 4 UDF: Joy Dom 22

参考资料

https://seatunnel.apache.org/docs/2.3.3/transform-v2/sql-udf