MySQL Binary Log connector

MySQL Binary Log connector.

@osheroff’s fork of @shiyko’s project, probably the “official” version of this. With help from the Debezium devs.

引入

<dependency>
    <groupId>com.zendesk</groupId>
    <artifactId>mysql-binlog-connector-java</artifactId>
    <version>0.25.0</version>
</dependency>

最初项目是作为开放复制器的一个分支开始的,但最终作为一个完整的重写。

主要区别/特点:

  • 自动二进制日志文件名/位置 / GTID 解析

  • 可恢复的断开连接

  • 可插拔故障转移策略

  • binlog_checksum=CRC32 支持(适用于 MySQL 5.6.2+ 用户)

  • 通过 TLS 的安全通信

  • JMX 友好

  • 实时统计

  • Maven Central 中的可用性

  • 没有第三方依赖

  • 不同版本的 MySQL 版本的测试套件

  • 如果您正在寻找其他语言中的类似内容,请查看 siddontang/go-mysql (Go)、noplay/python-mysql-replication (Python)。

或者从这里获取最新的 JAR。

Reading binary log file

File binlogFile = ...
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
    EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
    EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer);
try {
    for (Event event; (event = reader.readEvent()) != null; ) {
        ...
    }
} finally {
    reader.close();
}

利用 MySQL 复制流

先决条件:无论您计划为 BinaryLogClient 使用哪个用户,他都必须具有 REPLICATION SLAVE 权限。

除非您自己指定 binlogFilename/binlogPosition(在这种情况下不会启动自动解析),否则您还需要授予 REPLICATION CLIENT。

BinaryLogClient client = new BinaryLogClient("hostname", 3306, "username", "password");
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
    EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
    EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
client.setEventDeserializer(eventDeserializer);
client.registerEventListener(new EventListener() {

    @Override
    public void onEvent(Event event) {
        ...
    }
});
client.connect();

控制事件反序列化

您可能出于以下几个原因需要它:

您不想浪费时间反序列化不需要的事件;

没有为您感兴趣的事件类型定义 EventDataDeserializer(或者有但它包含一个错误);

您希望以不同的方式反序列化某些类型的事件(也许 *RowsEventData 应该包含表名而不是 id?); 等等。

EventDeserializer eventDeserializer = new EventDeserializer();

// do not deserialize EXT_DELETE_ROWS event data, return it as a byte array
eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS,
    new ByteArrayEventDataDeserializer());

// skip EXT_WRITE_ROWS event data altogether
eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS,
    new NullEventDataDeserializer());

// use custom event data deserializer for EXT_DELETE_ROWS
eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS,
    new EventDataDeserializer() {
        ...
    });

BinaryLogClient client = ...
client.setEventDeserializer(eventDeserializer);

Exposing BinaryLogClient through JMX

MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();

BinaryLogClient binaryLogClient = ...
ObjectName objectName = new ObjectName("mysql.binlog:type=BinaryLogClient");
mBeanServer.registerMBean(binaryLogClient, objectName);

// following bean accumulates various BinaryLogClient stats
// (e.g. number of disconnects, skipped events)
BinaryLogClientStatistics stats = new BinaryLogClientStatistics(binaryLogClient);
ObjectName statsObjectName = new ObjectName("mysql.binlog:type=BinaryLogClientStatistics");
mBeanServer.registerMBean(stats, statsObjectName);

Using SSL

System.setProperty("javax.net.ssl.trustStore", "/path/to/truststore.jks");
System.setProperty("javax.net.ssl.trustStorePassword","truststore.password");
System.setProperty("javax.net.ssl.keyStore", "/path/to/keystore.jks");
System.setProperty("javax.net.ssl.keyStorePassword", "keystore.password");

BinaryLogClient client = ...
client.setSSLMode(SSLMode.VERIFY_IDENTITY);

open-replicator

Open Replicator 是一个用 Java 编写的高性能 MySQL 二进制日志解析器。

它展现了您可以实时解析、过滤和广播 binlog 事件的可能性。

maven

<dependency>
        <groupId>open-replicator</groupId>
        <artifactId>open-replicator</artifactId>
        <version>1.0.7</version>
</dependency>

使用

final OpenReplicator or = new OpenReplicator();
or.setUser("root");
or.setPassword("123456");
or.setHost("localhost");
or.setPort(3306);
or.setServerId(6789);
or.setBinlogPosition(4);
or.setBinlogFileName("mysql_bin.000001");
or.setBinlogEventListener(new BinlogEventListener() {
    public void onEvents(BinlogEventV4 event) {
        // your code goes here
    }
});
or.start();

System.out.println("press 'q' to stop");
final BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
for(String line = br.readLine(); line != null; line = br.readLine()) {
    if(line.equals("q")) {
        or.stop();
        break;
    }
}

参考资料

https://github.com/osheroff/mysql-binlog-connector-java

https://github.com/whitesock/open-replicator