scala – 在Zeppelin和Spark中解析CSV中的日期时间信息
发布时间:2020-12-16 18:39:22 所属栏目:安全 来源:网络整理
导读:我正在尝试读取CSV文件并构建数据框. 像打击一样的CSV格式.我使用ISO8602日期/时间格式进行数据/时间字符串表示. 2015-6-29T12:0:0,b82debd63cffb1490f8c9c647ca97845,G1J8RX22EGKP,2015-6-29T12:0:5,2015-6-29T12:0:6,0QA97RAM1GIV,2015-6-29T12:0:10,2015-
我正在尝试读取CSV文件并构建数据框.
像打击一样的CSV格式.我使用ISO8602日期/时间格式进行数据/时间字符串表示. 2015-6-29T12:0:0,b82debd63cffb1490f8c9c647ca97845,G1J8RX22EGKP,2015-6-29T12:0:5,2015-6-29T12:0:6,0QA97RAM1GIV,2015-6-29T12:0:10,2015-6-29T12:0:11,2015-6-29T12:0:12,2015-6-29T12:5:42,1 2015-6-29T12:20:0,0d60c871bd9180275f1e4104d4b7ded0,5HNB7QZSUI2C,2015-6-29T12:20:5,2015-6-29T12:20:6,KSL2LB0R6367,2015-6-29T12:20:10,2015-6-29T12:20:11,2015-6-29T12:20:12,2015-6-29T12:25:13,1 ...... 为了加载这些数据,我在Zeppelin中编写了scala代码,如下所示 import org.apache.spark.sql.types.DateType import org.apache.spark.sql.functions._ import org.joda.time.DateTime import org.joda.time.format.DateTimeFormat import sys.process._ val logCSV = sc.textFile ("log_table.csv") case class record( callingTime:DateTime,userID:String,CID:String,serverConnectionTime:DateTime,serverResponseTime:DateTime,connectedAgentID:String,beginCallingTime:DateTime,endCallingTime:DateTime,Succeed:Int) val formatter = DateTimeFormat.forPattern("yyyy-mm-dd'T'kk:mm:ss") val logTable = logCSV.map(s => s.split(",") ).map( s => record( formatter.parseDateTime( s(0) ),s(1),s(2),formatter.parseDateTime( s(3) ),formatter.parseDateTime( s(4) ),s(5),formatter.parseDateTime( s(6) ),formatter.parseDateTime( s(7) ),s(8).toInt ) ).toDF() 它像下面那样犯了错误.主要问题是DateTime不可序列化. logCSV: org.apache.spark.rdd.RDD[String] = log_table.csv MapPartitionsRDD[38] at textFile at <console>:169 defined class record formatter: org.joda.time.format.DateTimeFormatter = org.joda.time.format.DateTimeFormatter@46051d99 org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at org.apache.spark.rdd.RDD.map(RDD.scala:286) 然后我想知道我如何处理Scala中的日期/时间信息.你可以帮帮我吗? 解决方法
虽然
DateTime不是可序列化的,但如果你使用
DateTimeFormatter的
parseMillis方法,你将得到一个很长的,它是免费的桥接到Long,这是Serializable.要从Long获取DateTime,请使用DateTime(longInstance.longValue())构造函数.
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
推荐文章
站长推荐
- twitter-bootstrap – Bootstrap:到达某个div的
- twitter-bootstrap – Bootstrap 3 input-append
- 读SRE Google运维解密有感(一)
- Angular2的最新版本抱怨:NgModule DynamicModul
- 在没有shell转义序列的情况下在Python中打印stdo
- bootstrap插件 datetimepicker开始月份设置bug
- bash – 重新加载.Xresources而不重新启动xterm
- SegmentFault D-Day 2016「天津站: 前端场」~ vu
- typescript – Angular2:注入一个非@Injectable
- Bash完成:在别名完成中荣誉资源库特定的Git别名
热点阅读