业务需求

定时执行 http 请求任务?

source

可以根据 http source,内置了一些工具,我们只测试最基本的,如果有特别的需求,可以自己定义。

实际测试笔记

引入依赖包

<!--        引入基础的 http,看的出来,可以自己自定义 http-->
<dependency>
    <groupId>org.apache.seatunnel</groupId>
    <artifactId>connector-http-base</artifactId>
    <version>${project.version}</version>
</dependency>

配置文件

我们简单测试下,GET 请求 https://www.baidu.com/。

5 秒一次

# Defining the runtime environment
env {
  # You can set flink configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 10000
}
source{
    Http {
      url = "https://www.baidu.com/"
      method = "GET"
      format = "text"
      poll_interval_millis = 5000
    }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    # 使用自定义的控台输出,避免 console 异常。
    ConsoleBinlog {
    }
}

输出 sink ConsoleBinlog

这个是我们自定义的简单 console 输出,改自 Console。原来的 console 比较严格,可能会报错。这里仅用于测试。

@Override
@SuppressWarnings("checkstyle:RegexpSingleline")
public void write(SeaTunnelRow element) {
    Date date = new Date();
    String dateStr = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(date);
    log.info("ConsoleBinlogSinkWriter ================= " + dateStr + " >>>>>>>>>>> " + element.toString());
}

测试效果

2024-01-18 10:15:15,060 INFO  org.apache.seatunnel.connectors.seatunnel.consolebinlog.sink.ConsoleBinlogSinkWriter - ConsoleBinlogSinkWriter ================= 2024-01-18 10:15:15.060 >>>>>>>>>>> SeaTunnelRow{tableId=, kind=+I, fields=[<!DOCTYPE html>

2024-01-18 10:15:20,080 INFO  org.apache.seatunnel.connectors.seatunnel.consolebinlog.sink.ConsoleBinlogSinkWriter - ConsoleBinlogSinkWriter ================= 2024-01-18 10:15:20.080 >>>>>>>>>>> SeaTunnelRow{tableId=, kind=+I, fields=[<!DOCTYPE html>

2024-01-18 10:15:25,101 INFO  org.apache.seatunnel.connectors.seatunnel.consolebinlog.sink.ConsoleBinlogSinkWriter - ConsoleBinlogSinkWriter ================= 2024-01-18 10:15:25.101 >>>>>>>>>>> SeaTunnelRow{tableId=, kind=+I, fields=[<!DOCTYPE html>

...

5000ms 请求一次。

其他

这里也就解答了我上一篇中的疑惑,其实一个请求,所谓的流批一体,应该直接参考这种方式实现。

有一些 source 比如 ES,只支持 batch,想实现流式会比较麻烦。

所以可以自己定义一个跑批实现,然后同时支持流批即可。

这个能力感觉可以直接放在 job 层,而不是每一个组件。

参考资料

[Bug] [Connector-V2 JDBC] source读取数据为空时,java.lang.NullPointerException