现象

希望实现 mysql 到 neo4j 的批量模式.

版本 v2.3.4

配置文件

env {
  execution.parallelism = 1
}

source {
  # 使用 MySQL 作为数据源
  Jdbc {
    url = "jdbc:mysql://localhost:3306/your_database"
    user = "your_username"
    password = "your_password"
    # 可以通过 query 自定义 SQL 查询语句(可选)
    query = "SELECT id, name, age FROM your_table WHERE age > 18"
    fetch_size = 500  # 每次拉取的大小
  }
}

transform {
}

sink {
  # 使用 Neo4j 作为目标数据存储
  Neo4j {
    # Neo4j Bolt URL
    uri = "bolt://localhost:7687"
    user = "neo4j"
    password = "neo4j_password"
    write_mode = "Batch"  # 指定批量模式
    max_batch_size = 100 # 批量处理大小
    # 节点定义,指定数据如何映射为图形中的节点
    query = "UNWIND $batch AS event MERGE (n:User {id: event.user_id}) SET n.name = event.username, n.age = event.user_age"
  }
}

注意:这里的 $batch 才是默认值。

如何过滤空置

有时候会发现 比如 event.user_id 为空,会导致 neo4j 直接匹配报错。

如何解决?

UNWIND $batch AS event with event where event.user_id IS NOT NULL MERGE (n:User {id: event.user_id}) SET n.name = event.username, n.age = event.user_age

参考资料

https://seatunnel.apache.org/docs/2.3.4/connector-v2/sink/Neo4j