基于OGG Datahub插件将Oracle数据同步上云
一、背景介绍随着数据规模的不断扩大,传统的RDBMS难以满足OLAP的需求, OGG(Oracle GoldenGate)是一个基于日志的结构化数据备份工具,一般用于Oracle数据库之间的主从备份以及Oracle数据库到其他数据库(DB2,MySQL等)的同步。下面是Oracle官方提供的一个OGG的整体架构图,从图中可以看出OGG的部署分为源端和目标端两部分组成,主要有Manager,Extract,Pump,Collector,Replicat这么一些组件。
本文介绍的Oracle数据同步是通过OGG的Datahub插件实现的,该Datahub插件在架构图中处于Replicat的位置,会分析Trail文件,将数据的变化记录写入Datahub中,可以使用流计算对datahub中的数据进行实时分析,也可以将数据归档到MaxCompute中进行离线处理。 二、安装步骤0. 环境要求
(下面将介绍Oracle/OGG相关安装和配置过程,Oracle的安装将不做介绍,另外需要注意的是:Oracle/OGG相关参数配置以熟悉Oracle/OGG的运维人员配置为准,本示例只是提供一个可运行的样本,Oracle所使用的版本为ORA11g) 1. 源端OGG安装 drwxr-xr-x install drwxrwxr-x response -rwxr-xr-x runInstaller drwxr-xr-x stage 目前oracle一般采取response安装的方式,在response/oggcore.rsp中配置安装依赖,具体如下: oracle.install.responseFileVersion=/oracle/install/rspfmt_ogginstall_response_schema_v12_1_2 # 需要目前与oracle版本对应 INSTALL_OPTION=ORA11g # goldegate主目录 SOFTWARE_LOCATION=/home/oracle/u01/ggate # 初始不启动manager START_MANAGER=false # manger端口 MANAGER_PORT=7839 # 对应oracle的主目录 DATABASE_LOCATION=/home/oracle/u01/app/oracle/product/11.2.0/dbhome_1 # 暂可不配置 INVENTORY_LOCATION= # 分组(目前暂时将oracle和ogg用同一个账号ogg_test,实际可以给ogg单独账号) UNIX_GROUP_NAME=oinstall 执行命令: runInstaller -silent -responseFile {YOUR_OGG_INSTALL_FILE_PATH}/response/oggcore.rsp 本示例中,安装后OGG的目录在/home/oracle/u01/ggate,安装日志在/home/oracle/u01/ggate/cfgtoollogs/oui目录下,当silentInstall{时间}.log文件里出现如下提示,表明安装成功: The installation of Oracle GoldenGate Core was successful. 执行/home/oracle/u01/ggate/ggsci命令,并在提示符下键入命令:CREATE SUBDIRS,从而生成ogg需要的各种目录(dir打头的那些)。 2. 源端Oracle配置 # 创建独立的表空间 create tablespace ATMV datafile '/home/oracle/u01/app/oracle/oradata/uprr/ATMV.dbf' size 100m autoextend on next 50m maxsize unlimited; # 创建ogg_test用户,密码也为ogg_test create user ogg_test identified by ogg_test default tablespace ATMV; # 给ogg_test赋予充分的权限 grant connect,resource,dba to ogg_test; # 检查附加日志情况 Select SUPPLEMENTAL_LOG_DATA_MIN,SUPPLEMENTAL_LOG_DATA_PK,SUPPLEMENTAL_LOG_DATA_UI,SUPPLEMENTAL_LOG_DATA_FK,SUPPLEMENTAL_LOG_DATA_ALL from v$database; # 增加数据库附加日志及回退 alter database add supplemental log data; alter database add supplemental log data (primary key,unique,foreign key) columns; # rollback alter database drop supplemental log data (primary key,foreign key) columns; alter database drop supplemental log data; # 全字段模式,注意:在该模式下的delete操作也只有主键值 ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; # 开启数据库强制日志模式 alter database force logging; # 执行marker_setup.sql 脚本 @marker_setup.sql # 执行@ddl_setup.sql @ddl_setup.sql # 执行role_setup.sql @role_setup.sql # 给ogg用户赋权 grant GGS_GGSUSER_ROLE to ogg_test; # 执行@ddl_enable.sql,开启DDL trigger @ddl_enable.sql # 执行优化脚本 @ddl_pin ogg_test # 安装sequence support @sequence.sql # alter table sys.seq$ add supplemental log data (primary key) columns; 3. OGG源端mgr配置 配置mgr PORT 7839 DYNAMICPORTLIST 7840-7849 USERID ogg_test,PASSWORD ogg_test PURGEOLDEXTRACTS ./dirdat/*,USECHECKPOINTS,MINKEEPDAYS 7 LAGREPORTHOURS 1 LAGINFOMINUTES 30 LAGCRITICALMINUTES 45 PURGEDDLHISTORY MINKEEPDAYS 3,MAXKEEPDAYS 7 PURGEMARKERHISTORY MINKEEPDAYS 3,MAXKEEPDAYS 7 启动mgr(运行日志在ggate/dirrpt中) start mgr 查看mgr状态 info mgr 查看mgr配置 view params mgr 4. OGG源端extract配置 配置extract(名字可任取,extract是组名) EXTRACT extract SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8") DBOPTIONS ALLOWUNUSEDCOLUMN USERID ogg_test,PASSWORD ogg_test REPORTCOUNT EVERY 1 MINUTES,RATE NUMFILES 5000 DISCARDFILE ./dirrpt/ext_test.dsc,APPEND,MEGABYTES 100 DISCARDROLLOVER AT 2:00 WARNLONGTRANS 2h,CHECKINTERVAL 3m EXTTRAIL ./dirdat/st,MEGABYTES 200 DYNAMICRESOLUTION TRANLOGOPTIONS CONVERTUCS2CLOBS TRANLOGOPTIONS RAWDEVICEOFFSET 0 DDL & INCLUDE MAPPED OBJTYPE 'table' & INCLUDE MAPPED OBJTYPE 'index' & INCLUDE MAPPED OBJTYPE 'SEQUENCE' & EXCLUDE OPTYPE COMMENT DDLOPTIONS NOCROSSRENAME REPORT TABLE OGG_TEST.*; SEQUENCE OGG_TEST.*; GETUPDATEBEFORES 增加extract进程(ext后的名字要跟上面extract对应,本例中extract是组名) 删除某废弃进程DP_TEST 添加抽取进程,每个队列文件大小为200m 启动抽取进程(运行日志在ggate/dirrpt中) 5. 生成def文件 DEFSFILE ./dirdef/ogg_test.def USERID ogg_test,PASSWORD ogg_test table OGG_TEST.*; 在shell中执行如下命令,生成ogg_test.def ./defgen paramfile ./dirprm/defgen.prm 6. 目标端OGG安装和配置 执行ggsci起来后执行如下命令,创建必须目录 编辑mgr配置 PORT 7839 DYNAMICPORTLIST 7840-7849 PURGEOLDEXTRACTS ./dirdat/*,MAXKEEPDAYS 7 启动mgr 7. 源端ogg pump配置 编辑pump配置 EXTRACT pump RMTHOST xx.xx.xx.xx,MGRPORT 7839,COMPRESS PASSTHRU NUMFILES 5000 RMTTRAIL ./dirdat/st DYNAMICRESOLUTION TABLE OGG_TEST.*; SEQUENCE OGG_TEST.*; 添加投递进程,从某一个队列开始投 备注:投递进程,每个队文件大小为200m 启动pump 8. Datahub插件安装和配置 export JAVA_HOME=/xxx/xxx/jrexx export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$JAVA_HOME/lib/amd64:$JAVA_HOME/lib/amd64/server 修改环境变量后,请重启adapter的mgr进程 修改conf路径下的javaue.properties文件,将{YOUR_HOME}替换为解压后的路径 gg.handlerlist=ggdatahub gg.handler.ggdatahub.type=com.aliyun.odps.ogg.handler.datahub.DatahubHandler gg.handler.ggdatahub.configureFileName={YOUR_HOME}/datahub-ogg-plugin/conf/configure.xml goldengate.userexit.nochkpt=false goldengate.userexit.timestamp=utc gg.classpath={YOUR_HOME}/datahub-ogg-plugin/lib/* gg.log.level=debug jvm.bootoptions=-Xmx512m -Dlog4j.configuration=file:{YOUR_HOME}/datahub-ogg-plugin/conf/log4j.properties -Djava.class.path=ggjava/ggjava.jar 修改conf路径下的log4j.properties文件,将{YOUR_HOME}替换为解压后的路径 修改conf路径下的configure.xml文件,修改方式见文件中的注释 <?xml version="1.0" encoding="UTF-8"?> <configue> <defaultOracleConfigure> <!-- oracle sid,必选--> <sid>100</sid> <!-- oracle schema,可以被mapping中的oracleSchema覆盖,两者必须有一个非空--> <schema>ogg_test</schema> </defaultOracleConfigure> <defalutDatahubConfigure> <!-- datahub endpoint,必填--> <endPoint>YOUR_DATAHUB_ENDPOINT</endPoint> <!-- datahub project,可以被mapping中的datahubProject,两者必须有一个非空--> <project>YOUR_DATAHUB_PROJECT</project> <!-- datahub accessId,可以被mapping中的datahubAccessId覆盖,两者必须有一个非空--> <accessId>YOUR_DATAHUB_ACCESS_ID</accessId> <!-- datahub accessKey,可以被mapping中的datahubAccessKey覆盖,两者必须有一个非空--> <accessKey>YOUR_DATAHUB_ACCESS_KEY</accessKey> <!-- 数据变更类型同步到datahub对应的字段,可以被columnMapping中的ctypeColumn覆盖 --> <ctypeColumn>optype</ctypeColumn> <!-- 数据变更时间同步到datahub对应的字段,可以被columnMapping中的ctimeColumn覆盖 --> <ctimeColumn>readtime</ctimeColumn> <!-- 数据变更序号同步到datahub对应的字段,按数据变更先后递增,不保证连续,可以被columnMapping中的cidColumn覆盖 --> <cidColumn>record_id</cidColumn> <!-- 额外增加的常量列,每条record该列值为指定值,格式为c1=xxx,c2=xxx,可以被columnMapping中的constColumnMap覆盖--> <constColumnMap></constColumnMap> </defalutDatahubConfigure> <!-- 默认最严格,不落文件 直接退出 无限重试--> <!-- 运行每批上次的最多纪录数,可选,默认1000--> <batchSize>1000</batchSize> <!-- 默认时间字段转换格式,默认yyyy-MM-dd HH:mm:ss--> <defaultDateFormat>yyyy-MM-dd HH:mm:ss</defaultDateFormat> <!-- 脏数据是否继续,默认false--> <dirtyDataContinue>true</dirtyDataContinue> <!-- 脏数据文件,默认datahub_ogg_plugin.dirty--> <dirtyDataFile>datahub_ogg_plugin.dirty</dirtyDataFile> <!-- 脏数据文件最大size,单位M,默认500--> <dirtyDataFileMaxSize>200</dirtyDataFileMaxSize> <!-- 重试次数,-1:无限重试 0:不重试 n:重试次数,默认-1--> <retryTimes>0</retryTimes> <!-- 重试间隔,单位毫秒,默认3000--> <retryInterval>4000</retryInterval> <!-- 点位文件,默认datahub_ogg_plugin.chk--> <checkPointFileName>datahub_ogg_plugin.chk</checkPointFileName> <mappings> <mapping> <!-- oracle schema,见上描述--> <oracleSchema></oracleSchema> <!-- oracle table,必选--> <oracleTable>t_person</oracleTable> <!-- datahub project,见上描述--> <datahubProject></datahubProject> <!-- datahub AccessId,见上描述--> <datahubAccessId></datahubAccessId> <!-- datahub AccessKey,见上描述--> <datahubAccessKey></datahubAccessKey> <!-- datahub topic,必选--> <datahubTopic>t_person</datahubTopic> <ctypeColumn></ctypeColumn> <ctimeColumn></ctimeColumn> <cidColumn></cidColumn> <constColumnMap></constColumnMap> <columnMapping> <!-- src:oracle字段名称,必须; dest:datahub field,必须; destOld:变更前数据落到datahub的field,可选; isShardColumn: 是否作为shard的hashkey,默认为false,可以被shardId覆盖 isDateFormat: timestamp字段是否采用DateFormat格式转换,默认true. 如果是false,源端数据必须是long dateFormat: timestamp字段的转换格式,不填就用默认值 --> <column src="id" dest="id" isShardColumn="true" isDateFormat="false" dateFormat="yyyy-MM-dd HH:mm:ss"/> <column src="name" dest="name" isShardColumn="true"/> <column src="age" dest="age"/> <column src="address" dest="address"/> <column src="comments" dest="comments"/> <column src="sex" dest="sex"/> <column src="temp" dest="temp" destOld="temp1"/> </columnMapping> <!--指定shard id,优先生效,可选--> <shardId>1</shardId> </mapping> </mappings> </configue> 在ggsci下启动datahub writer edit params dhwriter extract dhwriter getEnv (JAVA_HOME) getEnv (LD_LIBRARY_PATH) getEnv (PATH) CUSEREXIT ./libggjava_ue.so CUSEREXIT PASSTHRU INCLUDEUPDATEBEFORES,PARAMS "{YOUR_HOME}/datahub-ogg-plugin/conf/javaue.properties" sourcedefs ./dirdef/ogg_test.def table OGG_TEST.*; 添加dhwriter 启动dhwriter 三、使用场景这里会用一个简单的示例来说明数据的使用方法,例如我们在Oracle数据库有一张商品订单表orders(oid int,pid int,num int),该表有三列,分别为订单ID,商品ID和商品数量。 <ctypeColumn>optype</ctypeColumn> <ctimeColumn>readtime</ctimeColumn> <columnMapping> <column src="oid" dest="oid_after" destOld="oid_before" isShardColumn="true"/> <column src="pid" dest="pid_after" destOld="pid_before"/> <column src="num" dest="num_after" destOld="num_before"/> </columnMapping> 其中optype和readtime字段是记录数据库的数据变更类型和时间,optype有"I","D","U"三种取值,分别对应为“增”,“删”,“改”三种数据变更操作。 +--------+------------+------------+------------+------------+------------+------------+------------+------------+ | record_id | optype | readtime | oid_before | oid_after | pid_before | pid_after | num_before | num_after | +-------+------------+------------+------------+------------+------------+------------+------------+------------+ | 14810373343020000 | I | 2016-12-06 15:15:28.000141 | NULL | 1 | NULL | 2 | NULL | 1 | 修改这条数据,比如把num改为20,datahub则会收到的一条变更数据记录,如下: +-------+------------+------------+------------+------------+------------+------------+------------+------------+ | record_id | optype | readtime | oid_before | oid_after | pid_before | pid_after | num_before | num_after | +--------+------------+------------+------------+------------+------------+------------+------------+------------+ | 14810373343080000 | U | 2016-12-06 15:15:58.000253 | 1 | 1 | 2 | 2 | 1 | 20 | 实时计算 离线处理 create table orders_log(record_id string,optype string,readtime string,oid_before bigint,oid_after bigint,pid_before bigint,pid_after bigint,num_before bigint,num_after bigint); INSERT OVERWRITE TABLE orders_result SELECT t.oid,t.pid,t.num FROM ( SELECT oid,pid,num,'0' x_record_id,1 AS x_optype FROM orders_base UNION ALL SELECT decode(optype,'D',oid_before,oid_after) AS oid,decode(optype,pid_before,pid_after) AS pid,num_after AS num,record_id x_record_id,1) AS x_optype FROM orders_log ) t JOIN ( SELECT oid,max(record_id) x_max_modified FROM ( SELECT oid,'0' record_id FROM orders_base UNION ALL SELECT decode(optype,record_id FROM orders_log ) g GROUP BY oid,pid ) s ON t.oid = s.oid AND t.pid = s.pid AND t.x_record_id = s.x_max_modified AND t.x_optype <> 0; 四、常见问题Q:目标端报错 OGG-06551 Oracle GoldenGate Collector: Could not translate host name localhost into an Internet address. Q:找不到jvm相关的so包 例如:export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$JAVA_HOME/lib/amd64:$JAVA_HOME/lib/amd64/server 本文作者:冶善 原文链接 本文为云栖社区原创内容,未经允许不得转载。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |