java – 尝试使用自定义SerDe创建Hive表时出错
发布时间:2020-12-15 02:09:03 所属栏目:Java 来源:网络整理
导读:我有一个包含需要上传到Hive表的数据的文件.我写了一个自定义SerDe(它基本上是Hive已经提供的 Regex Serde的修改)来帮助我上传数据. 这是我写的SerDe package my.hive.customserde;public class FIASC2 extends AbstractSerDe { public static final Log LOG
我有一个包含需要上传到Hive表的数据的文件.我写了一个自定义SerDe(它基本上是Hive已经提供的
Regex Serde的修改)来帮助我上传数据.
这是我写的SerDe package my.hive.customserde; public class FIASC2 extends AbstractSerDe { public static final Log LOG = LogFactory.getLog(FIASC2.class.getName()); int colwidths[] = {1,10,6,12,8,14,16,2,19,1}; String outputformat = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s %10$s %11$s %12$s %13$s %14$s %15$s " + "%16$s %17$s %18$s"; int datetimecols[] = {5}; int datecols[] = {17}; String cols; int numColumns; int totalcolwidth = 0; List<String> columnNames; List<TypeInfo> columnTypes; ArrayList<String> row; StructObjectInspector rowOI; Object[] outputFields; Text outputRowText; @Override public void initialize(Configuration conf,Properties tbl) throws SerDeException { LOG.debug("Initializing SerDe"); // Get column names String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); LOG.debug("Columns : " + columnNameProperty + "Types : " + columnTypeProperty); if(columnNameProperty.length() == 0) { columnNames = new ArrayList<String>(); } else { columnNames = Arrays.asList(columnNameProperty.split(",")); } columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); assert columnNames.size() == columnTypes.size(); assert colwidths.length == columnNames.size(); numColumns = columnNames.size(); for(int i = 0; i < numColumns; i++) { totalcolwidth += i; } List<ObjectInspector> columnOIs = new ArrayList<ObjectInspector>(columnNames.size()); for (int i = 0; i < numColumns; i++) { columnOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); } rowOI = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames,columnOIs); row = new ArrayList<String>(numColumns); for(int i = 0; i < numColumns; i++) { row.add(null); } outputFields = new Object[numColumns]; outputRowText = new Text(); } @Override public Object deserialize(Writable blob) throws SerDeException { // TODO Auto-generated method stub Text rowText = (Text) blob; int index = 0; if(rowText.toString().length() < totalcolwidth) { return null; } if((rowText.toString().substring(0,1) == "H") || (rowText.toString().substring(0,1) == "T")) { return null; } for(int i = 0; i < numColumns; i++) { int len = colwidths[i]; String col = rowText.toString().substring(index,index + len); // Convert the datetime string into the correct format so that it can be uploaded to the hive table if(Arrays.asList(datetimecols).contains(i)) { DateTimeFormatConverter dtc = new DateTimeFormatConverter(); try { col = dtc.convertCurrToNew(col); } catch (ParseException e) { LOG.error("Unable to parse Date Time string : " + col); e.printStackTrace(); } } if(Arrays.asList(datecols).contains(i)) { DateFormatConverter dtc = new DateFormatConverter(); try { col = dtc.convertCurrToNew(col); } catch (ParseException e) { LOG.error("Unable to parse Date String : " + col); e.printStackTrace(); } } row.set(i,col); index += len; } return row; } @Override public ObjectInspector getObjectInspector() throws SerDeException { return rowOI; } @Override public SerDeStats getSerDeStats() { // TODO Auto-generated method stub return null; } @Override public Class<? extends Writable> getSerializedClass() { return Text.class; } @Override public Writable serialize(Object obj,ObjectInspector objInspector) throws SerDeException { if(outputformat == null) { throw new SerDeException("Cannot write into table because no output format was specified"); } StructObjectInspector outputRowOI = (StructObjectInspector) objInspector; List<? extends StructField> outputFieldRefs = outputRowOI.getAllStructFieldRefs(); if(outputFieldRefs.size() != numColumns) { throw new SerDeException("Output format does not have the same number fields as the number of columns"); } for(int i = 0; i < numColumns; i++) { Object field = outputRowOI.getStructFieldData(obj,outputFieldRefs.get(i)); ObjectInspector fieldOI = outputFieldRefs.get(i).getFieldObjectInspector(); StringObjectInspector fieldStringOI = (StringObjectInspector) fieldOI; outputFields[i] = fieldStringOI.getPrimitiveJavaObject(field); } String outputRowString = null; try { outputRowString = String.format(outputformat,outputFields); } catch (MissingFormatArgumentException e) { throw new SerDeException("The table contains " + numColumns + "columns but the output format requires more",e); } outputRowText.set(outputRowString); return outputRowText; } } 您可以放心,我已经导入了需要导入的每个类. 当我尝试创建表时,我收到一条错误消息“无法从serde获取字段:my.hive.customserde.FIASC2” 这是堆栈跟踪 2015-08-25 15:57:51,995 ERROR [HiveServer2-Background-Pool: Thread-57]: metadata.Table (Table.java:getCols(608)) - Unable to get field from serde: my.hive.customserde.FIASC2 java.lang.NullPointerException at org.apache.hadoop.hive.metastore.MetaStoreUtils.getFieldsFromDeserializer(MetaStoreUtils.java:1257) at org.apache.hadoop.hive.ql.metadata.Table.getCols(Table.java:605) at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:694) at org.apache.hadoop.hive.ql.exec.DDLTask.createTable(DDLTask.java:4135) at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:306) at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:160) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:88) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1653) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1412) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1195) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1059) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1054) at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:154) at org.apache.hive.service.cli.operation.SQLOperation.access$100(SQLOperation.java:71) at org.apache.hive.service.cli.operation.SQLOperation$1$1.run(SQLOperation.java:206) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hive.service.cli.operation.SQLOperation$1.run(SQLOperation.java:218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2015-08-25 15:57:51,996 ERROR [HiveServer2-Background-Pool: Thread-57]: exec.DDLTask (DDLTask.java:failed(520)) - org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.NullPointerException at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:720) at org.apache.hadoop.hive.ql.exec.DDLTask.createTable(DDLTask.java:4135) at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:306) at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:160) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:88) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1653) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1412) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1195) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1059) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1054) at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:154) at org.apache.hive.service.cli.operation.SQLOperation.access$100(SQLOperation.java:71) at org.apache.hive.service.cli.operation.SQLOperation$1$1.run(SQLOperation.java:206) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hive.service.cli.operation.SQLOperation$1.run(SQLOperation.java:218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.hadoop.hive.metastore.MetaStoreUtils.getFieldsFromDeserializer(MetaStoreUtils.java:1257) at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:695) ... 21 more 我知道表创建失败了.但有谁知道我为什么会收到此错误?我试过谷歌搜索,但没有得到很多帮助. 如果它有任何帮助,这里是我正在使用的创建表脚本. create table if not exists fiasc2( record_type varchar(1),fin_id varchar(16),corp_id varchar(8),merc_id varchar(16),term_id varchar(8),tran_time timestamp,cashcard_number varchar(16),ttc varchar(8),tcc varchar(8),tran_type varchar(2),tran_amount varchar(16),deposit_amount varchar(16),pan varchar(32),account_type varchar(2),response_code varchar(2),card_balance varchar(8),settlement_date date,tran_mode varchar(1)) row format serde 'my.hive.customserde.FIASC2' location '/user/hive/fiasc2_test'; 解决方法
听起来很熟悉.
你正在返回null SerDeStats,这是我见过的唯一可能是null的东西,它响了铃,我想我的JSON SerDe在他们介绍SerdeStats几个版本的hive时遇到了同样的问题. 尝试: // add this to the members private SerDeStats stats; // ... public void initialize(Configuration conf,Properties tbl) throws SerDeException { .. // add this in initialize() stats = new SerDeStats(); // and of course here @Override public SerDeStats getSerDeStats() { return stats; } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |