Bottled Water: 实时集成postgresql与kafka
Bottled Water简介Bottled Water是Confluent公司开发的一款可以将postgresql数据库转换为kafka events的工具。 Bottled Water可以实时地将postgresql的变化推送至kafka中。有以下几个主要特性:
为什么使用Bottled Water在某些场景下,可能需要将数据存储在不止一个数据存储中。比如,为了搜索方便快速,可能会考虑冗余存储一份数据到elasticsearch或某些缓存数据库中;为了迁移到不同的数据库,在迁移过程中也可能会同时存储多种不同的数据库。 这种需求下,常见的一种实现方式是“dual writes”,即应用程序内部做这样的处理 —— 将变更的数据同时并行写入不同的数据库中。如下图所示: "dual writes"的做法是有隐患的,会受到竞态条件(race conditions)和可靠性问题的影响,造成不同的数据库之间的差异越来越大,最终变得难以修复。 从数据库快照中重建缓存或索引有利于消除从dump重建的不一致性。但是,在一个大型数据库这个过程将是十分缓慢低效的,需要找到一种方案使其更快。 如果能模仿数据库主从之间复制的过程,将数据库的修改变成流,携带每一次修改的消息,并且完全按照写入顺序重放这些修改,那么将得到一份一模一样的数据库,并且这种做法比dual writes效率好,可靠性更高。 一个更加理想的工作流程如下图所示: Bottled Water就是按照上述工作流程设计的工具,将postgresql的变更存入kafka中。 Bottled Water工作原理Bottled Water就是模拟数据库主从复制的过程,将数据库的变化写入kafka中。 其中,这个过程使用了postgresql 9.4版本引入的新特性logical decoding,可以从一个数据库中提取出consistent snapshot和连续的change events流,提取出的数据按照行级别推送至kafka中。默认情况下,会在kafka中对每个表创建一个同名topic,以存储change events。
Bottled Water项目在使用中包含了两个部分:
Bottled Water使用方法安装按照官方文档的说明,有三种方法可以上手使用: docker、源码编译、ubuntu ppa。本文将使用第三种方案,使用的环境为Ubuntu 14.04,postgresql 9.4讲解。 添加Bottled Water的ppa源,然后安装两个软件包即可: $ sudo apt-add-repository -y ppa:stub/bottledwater $ sudo apt-get update $ sudo apt-get install postgresql-9.4-bottledwater bottledwater 使用上述命令安装 使用前配置Bottled Water会连接到postgresql获取相关数据,连接的账户需要有 需要修改WAL的日志级别为 打开 wal_level = logical # 默认WAL级别minimal,不符合要求,需要调整为logical max_wal_senders = 8 wal_keep_segments = 4 max_replication_slots = 4 打开 local replication repl trust host replication repl 127.0.0.1/32 trust host replication repl ::1/128 trust 重启 sudo service postgresql restart 以管理员身份(postgres)登录psql,进行创建角色操作: $ sudo -u postgres psql psql (9.4.8) Type "help" for help. postgres=# CREATE ROLE repl WITH REPLICATION PASSWORD 'password' LOGIN; 至此使用前的配置已全部完成。 Bottled Water使用演示我们创建一个数据库 CREATE DATABASE mybw; c mybw CREATE EXTENSION bottledwater; -- 创建bottledwater扩展 CREATE TABLE test (id serial primary key,value text); 在另一个终端,启动 $ bottledwater -d postgres://repl:password@127.0.0.1/mybw -b 172.16.250.10:9092 -f json 解释一下上述命令行:
执行成功后,终端输出如下,应该无任何报错: Writing messages to Kafka in JSON format Created replication slot "bottledwater",capturing consistent snapshot "0049D495-1". INFO: bottledwater_export: Table public.test is keyed by index test_pkey Snapshot complete,streaming changes from 2/44DAB970. 现在向 INSERT INTO test (value) values('hello world!'); 此时从kafka中可以看到如下的消息: $ kafka-console-consumer --zookeeper localhost --topic test --from-beginning --property print.key=true {"id": {"int": 1}} {"id": {"int": 1},"value": {"string": "hello world!"}} 继续执行以下会修改数据库的sql: INSERT INTO test (value) values('hello'); UPDATE test SET value = 'world' WHERE id = 2; DELETE FROM test WHERE id = 2; 那么可以观察到kafka中多了以下几条消息: {"id": {"int": 2}} {"id": {"int": 2},"value": {"string": "hello"}} {"id": {"int": 2}} {"id": {"int": 2},"value": {"string": "world"}} {"id": {"int": 2}} null 官方文档中,对kafka中消息格式的描述如下:
最后,只需要使用一个kafka consumer连接kafka就可以实时获取到数据库表的变更了。 总结Bottled Water总体来说使用还是很方便的,并且响应速度十分灵敏。即使停掉bottledwater进程,下次启动时也会继续上次的断点,不会丢失数据。需要使用postgresql与kafka结合方案的可以考虑。 参考资料
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |