Sqoop任务搬运数据出现重复
问题描述sqoop任务:sqlserver -> hdfs 问题影响影响酒店下游任务报表数据不准确,需要重跑任务 当时临时解决方案重跑该sqoop任务后,数据没有出现重复 问题原因定位该sqoop任务配置信息大致如下: sqoop import -D mapreduce.job.name={JOB_NAME} --connect '{db_info=232}' --delete-target-dir -query " SELECT id,star_out,hotel_type,hotel_economic,hotel_apartment,IsMultiSupply,InventoryUseType,IsSendVouchFax,auditingType,replace(replace(replace(replace(SubcityID,char(10),''),char(13),char(1),char(0),'') as SubcityID,isshadow,AgreementEntityID FROM Hotel_Product.dbo.hotel(nolock) where $CONDITIONS " --where '1=1' --split-by id --null-string 'N' --null-non-string 'N' --fields-terminated-by ' 01' -m 8 --target-dir /data/BaseData/elong/dshprdt_hotel 其中--split-by id,-m 8 通过id字段来分割出8个map执行。 运行在hadoop-070-126.bigdata.ly节点上的map为第map 2019-05-23 00:36:27,823 ERROR [main] org.apache.sqoop.mapreduce.db.DBRecordReader: Top level exception: com.microsoft.sqlserver.jdbc.SQLServerException: Connection reset at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:1352) at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:1339) at com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:1694) at com.microsoft.sqlserver.jdbc.TDSReader.readPacket(IOBuffer.java:3734) at com.microsoft.sqlserver.jdbc.TDSReader.nextPacket(IOBuffer.java:3687) at com.microsoft.sqlserver.jdbc.TDSReader.ensurePayload(IOBuffer.java:3663) at com.microsoft.sqlserver.jdbc.TDSReader.readBytes(IOBuffer.java:3979) at com.microsoft.sqlserver.jdbc.TDSReader.readWrappedBytes(IOBuffer.java:4001) at com.microsoft.sqlserver.jdbc.TDSReader.readInt(IOBuffer.java:3942) at com.microsoft.sqlserver.jdbc.TDSReader.readUnsignedInt(IOBuffer.java:3959) at com.microsoft.sqlserver.jdbc.PLPInputStream.readBytesInternal(PLPInputStream.java:313) at com.microsoft.sqlserver.jdbc.PLPInputStream.getBytes(PLPInputStream.java:129) at com.microsoft.sqlserver.jdbc.DDC.convertStreamToObject(DDC.java:438) at com.microsoft.sqlserver.jdbc.ServerDTVImpl.getValue(dtv.java:2441) at com.microsoft.sqlserver.jdbc.DTV.getValue(dtv.java:176) at com.microsoft.sqlserver.jdbc.Column.getValue(Column.java:113) at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getValue(SQLServerResultSet.java:1981) at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getValue(SQLServerResultSet.java:1966) at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getString(SQLServerResultSet.java:2291) at org.apache.sqoop.lib.JdbcWritableBridge.readString(JdbcWritableBridge.java:71) at com.cloudera.sqoop.lib.JdbcWritableBridge.readString(JdbcWritableBridge.java:61) at QueryResult.readFields(QueryResult.java:1670) at org.apache.sqoop.mapreduce.db.DBRecordReader.nextKeyValue(DBRecordReader.java:244) at org.apache.sqoop.mapreduce.db.SQLServerDBRecordReader.nextKeyValue(SQLServerDBRecordReader.java:148) at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:553) at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at org.apache.sqoop.mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:64) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1701) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163) 该数据库连接异常导致map任务重新和数据库建立连接进行恢复,继续进行数据搬运,最终该map成功完成。recover过程见下面日志 2019-05-23 00:36:27,862 WARN [main] org.apache.sqoop.mapreduce.db.SQLServerDBRecordReader: Trying to recover from DB read failure: java.io.IOException: SQLException in nextKeyValue at org.apache.sqoop.mapreduce.db.DBRecordReader.nextKeyValue(DBRecordReader.java:277) at org.apache.sqoop.mapreduce.db.SQLServerDBRecordReader.nextKeyValue(SQLServerDBRecordReader.java:148) at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:553) at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at org.apache.sqoop.mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:64) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1701) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163) Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Connection reset at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:1352) at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:1339) at com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:1694) at com.microsoft.sqlserver.jdbc.TDSReader.readPacket(IOBuffer.java:3734) at com.microsoft.sqlserver.jdbc.TDSReader.nextPacket(IOBuffer.java:3687) at com.microsoft.sqlserver.jdbc.TDSReader.ensurePayload(IOBuffer.java:3663) at com.microsoft.sqlserver.jdbc.TDSReader.readBytes(IOBuffer.java:3979) at com.microsoft.sqlserver.jdbc.TDSReader.readWrappedBytes(IOBuffer.java:4001) at com.microsoft.sqlserver.jdbc.TDSReader.readInt(IOBuffer.java:3942) at com.microsoft.sqlserver.jdbc.TDSReader.readUnsignedInt(IOBuffer.java:3959) at com.microsoft.sqlserver.jdbc.PLPInputStream.readBytesInternal(PLPInputStream.java:313) at com.microsoft.sqlserver.jdbc.PLPInputStream.getBytes(PLPInputStream.java:129) at com.microsoft.sqlserver.jdbc.DDC.convertStreamToObject(DDC.java:438) at com.microsoft.sqlserver.jdbc.ServerDTVImpl.getValue(dtv.java:2441) at com.microsoft.sqlserver.jdbc.DTV.getValue(dtv.java:176) at com.microsoft.sqlserver.jdbc.Column.getValue(Column.java:113) at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getValue(SQLServerResultSet.java:1981) at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getValue(SQLServerResultSet.java:1966) at com.microsoft.sqlserver.jdbc.SQLServerResultSet.getString(SQLServerResultSet.java:2291) at org.apache.sqoop.lib.JdbcWritableBridge.readString(JdbcWritableBridge.java:71) at com.cloudera.sqoop.lib.JdbcWritableBridge.readString(JdbcWritableBridge.java:61) at QueryResult.readFields(QueryResult.java:1670) at org.apache.sqoop.mapreduce.db.DBRecordReader.nextKeyValue(DBRecordReader.java:244) ... 13 more 2019-05-23 00:36:28,130 INFO [main] org.apache.sqoop.mapreduce.db.SQLServerConnectionFailureHandler: Session context is: NULL 2019-05-23 00:36:28,131 INFO [main] org.apache.sqoop.mapreduce.db.BasicRetrySQLFailureHandler: A new connection has been established 2019-05-23 00:36:28,186 INFO [main] org.apache.sqoop.mapreduce.db.DBRecordReader: Working on split: id >= 1 AND id < 464562 2019-05-23 00:36:28,198 INFO [main] org.apache.sqoop.mapreduce.db.DBRecordReader: Executing query: SELECT id,CorpGroupID,HotelBrandID,AgreementEntityID FROM Hotel_Product.dbo.hotel(nolock) where ( id > 458735 ) AND ( id < 464562 ) 2019-05-23 00:36:28,804 WARN [ResponseProcessor for block BP-894016253-10.12.180.10-1463057953660:blk_15531850664_15126642971] org.apache.hadoop.hdfs.DFSClient: Slow ReadProcessor read fiel ds took 51708ms (threshold=30000ms); ack: seqno: 21070 status: SUCCESS status: SUCCESS status: SUCCESS downstreamAckTimeNanos: 20470492,targets: [172.1.0.126:50010,172.1.0.72:50010,1 72.1.0.78:50010] 2019-05-23 00:36:42,628 INFO [Thread-13] org.apache.sqoop.mapreduce.AutoProgressMapper: Auto-progress thread is finished. keepGoing=false 2019-05-23 00:36:42,693 INFO [main] org.apache.hadoop.mapred.Task: Task:attempt_1555615894016_1958403_m_000000_0 is done. And is in the process of committing 2019-05-23 00:36:42,742 INFO [main] org.apache.hadoop.mapred.Task: Task attempt_1555615894016_1958403_m_000000_0 is allowed to commit now 2019-05-23 00:36:42,751 INFO [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Saved output of task 'attempt_1555615894016_1958403_m_000000_0' to viewfs://dcfs/data/BaseDat a/elong/dshprdt_hotel/_temporary/1/task_1555615894016_1958403_m_000000 2019-05-23 00:36:42,816 INFO [main] org.apache.hadoop.mapred.Task: Task 'attempt_1555615894016_1958403_m_000000_0' done. 注意上述日志中重新建立数据库连接后查询的id范围为 SQLServerDBRecordReader.java public boolean nextKeyValue() throws IOException { boolean valueReceived = false; int retryCount = RETRY_MAX; boolean doRetry = true; do { try { // Try to get the next key/value pairs valueReceived = super.nextKeyValue(); doRetry = false; } catch (IOException ioEx) { LOG.warn("Trying to recover from DB read failure: ",ioEx); Throwable cause = ioEx.getCause(); // Use configured connection handler to recover from the connection // failure and use the newly constructed connection. // If the failure cannot be recovered,an exception is thrown if (failureHandler.canHandleFailure(cause)) { // Recover from connection failure Connection conn = failureHandler.recover(); // Configure the new connection before using it configureConnection(conn); setConnection(conn); --retryCount; doRetry = (retryCount >= 0); } else { // Cannot recovered using configured handler,re-throw throw new IOException("Cannection handler cannot recover failure: ",ioEx); } } } while (doRetry); // Rethrow the exception if all retry attempts are consumed if (retryCount < 0) { throw new IOException("Failed to read from database after " + RETRY_MAX + " retries."); } return valueReceived; } 从上述代码可知,nextKeyValue()获取数据出现数据连接相关异常后会进行3次重试。 DBRecordReader.java @Override public boolean nextKeyValue() throws IOException { try { if (key == null) { key = new LongWritable(); } if (value == null) { value = createValue(); } if (null == this.results) { // First time into this method,run the query. LOG.info("Working on split: " + split); this.results = executeQuery(getSelectQuery()); } if (!results.next()) { return false; } ... } 仔细看看getSelectQuery()方法是如何构建出来的(见下面代码),构建新的查询语句的时候会添加一个lowerClause,这个是动态拼接出来的查询下界:( id > 458735 ) SQLServerDBRecordReader.java protected String getSelectQuery() { // Last seen record key is only expected to be unavailable if no reads // ever happened String selectQuery; if (lastRecordKey == null) { selectQuery = super.getSelectQuery(); } else { // If last record key is available,construct the select query to start // from DataDrivenDBInputFormat.DataDrivenDBInputSplit dataSplit = (DataDrivenDBInputFormat.DataDrivenDBInputSplit) getSplit(); StringBuilder lowerClause = new StringBuilder(); lowerClause.append(getDBConf().getInputOrderBy()); lowerClause.append(" > "); lowerClause.append(lastRecordKey.toString()); // Get the select query with the lowerClause,and split upper clause selectQuery = getSelectQuery(lowerClause.toString(),dataSplit.getUpperClause()); } return selectQuery; } 查询下界的值是lastRecordKey,map是当前故障发生前处理的最后一条成功数据的splitColumn所对应的值(--split-by id)。此处代码如下 SQLServerDBRecordReader.java public T getCurrentValue() { T val = super.getCurrentValue(); // Lookup the key of the last read record to use for recovering // As documented,the map may not be null,though it may be empty. Object lastRecordSplitCol = val.getFieldMap().get(splitColumn); lastRecordKey = (lastRecordSplitCol == null) ? null : lastRecordSplitCol.toString(); return val; } QueryResult.java public Map<String,Object> getFieldMap() { Map<String,Object> __sqoop$field_map = new TreeMap<String,Object>(); __sqoop$field_map.put("id",this.id); ... } 因为我们使用的id 不是主键(主键为hotel_id),且不保证有序,所以sqoop利用游标查询数据库时返回数据可能会出现如下情况 id |
hotel_name |
| id | hotel_name | 3 B酒店 4 C酒店 5 D酒店 6 E酒店 7 F酒店 8 G酒店 9 I酒店 10 J酒店 11 K酒店
---|