neo4j

Neo4j sink connector

描述

将数据写入 Neo4j。

neo4j-java-driver 版本为 4.4.9

选项

名称 类型 必需 默认值
uri 字符串 -
username 字符串 -
password 字符串 -
max_batch_size 整数 -
write_mode 字符串 OneByOne
bearer_token 字符串 -
kerberos_ticket 字符串 -
database 字符串 -
query 字符串 -
queryParamPosition 对象 -
max_transaction_retry_time 长整数 30
max_connection_timeout 长整数 30
common-options config -

这是关于 Neo4j 接收器连接器选项的配置列表。

根据您的需求,您可以适当配置这些选项。

uri [string] Neo4j 数据库的 URI。参考格式:neo4j://localhost:7687

username [string] Neo4j 的用户名

password [string] Neo4j 的密码。如果提供了用户名,则为必需项。

max_batch_size [Integer] max_batch_size 指的是在写入数据库时,可以在单个事务中写入的最大数据条目数。

write_mode 默认值为 oneByOne,或者将其设置为 “Batch”,如果希望能够批量写入

unwind $ttt as row create (n:Label) set n.name = row.name,n.age = rw.age

“ttt” 表示一批数据,”ttt” 可以是任意字符串,只要它与配置的 “batch_data_variable” 匹配。

bearer_token [string] Neo4j 的 base64 编码的承载令牌,用于身份验证。

kerberos_ticket [string] Neo4j 的 base64 编码的 Kerberos 票据,用于身份验证。

database [string] 数据库名称。

query [string] 查询语句。包含在运行时用相应值替换的参数占位符。

queryParamPosition [object] 查询参数的位置映射信息。

键名是参数占位符名称。

关联值是输入数据行中字段的位置。

max_transaction_retry_time [long] 最大事务重试时间(秒)。如果超过这个时间,事务将失败。

max_connection_timeout [long] 等待建立 TCP 连接的最长时间(秒)。

common options Sink 插件的通用参数,请参阅 Sink Common Options 了解详细信息。

WriteOneByOneExample

sink {
  Neo4j {
    uri = "neo4j://localhost:7687"
    username = "neo4j"
    password = "1234"
    database = "neo4j"

    max_transaction_retry_time = 10
    max_connection_timeout = 10

    query = "CREATE (a:Person {name: $name, age: $age})"
    queryParamPosition = {
        name = 0
        age = 1
    }
  }
}

WriteBatchExample

您提供的 Neo4j Sink 连接器配置示例使用了 unwind 关键字,支持批量写入。默认情况下,一批数据的默认变量名为 batch

如果您要编写批量写入语句,则应声明 cypher:unwind $batch as row 进行相应操作。

以下是您提供的配置示例:

sink {
    Neo4j {
        uri = "bolt://localhost:7687"
        username = "neo4j"
        password = "neo4j"
        database = "neo4j"
        max_batch_size = 1000
        write_mode = "BATCH"
        max_transaction_retry_time = 3
        max_connection_timeout = 10

        query = "unwind $batch as row create(n:MyLabel) set n.name = row.name, n.age = row.age"
    }
}

该配置指定了 Neo4j 数据库的 URI、用户名、密码等参数,并使用了 BATCH 写模式,设置了最大批量大小、最大事务重试时间和最大连接超时。

查询语句使用了 unwind $batch as row 进行批量写入。请确保您的实际数据和数据模型与配置中的查询语句相匹配。

参考资料

https://seatunnel.apache.org/docs/2.3.3/connector-v2/sink/Neo4j