scala – Spark UDF线程安全
发布时间:2020-12-16 18:28:48 所属栏目:安全 来源:网络整理
导读:我正在使用Spark来获取包含一列日期的数据框,并创建3个新列,其中包含列中日期和今天之间的天,周和月的时间. 我关心的是使用SimpleDateFormat,它不是线程安全的.通常没有Spark这可以,因为它是一个局部变量,但是使用Spark的懒惰评估,在多个UDF上共享一个Simple
我正在使用Spark来获取包含一列日期的数据框,并创建3个新列,其中包含列中日期和今天之间的天,周和月的时间.
我关心的是使用SimpleDateFormat,它不是线程安全的.通常没有Spark这可以,因为它是一个局部变量,但是使用Spark的懒惰评估,在多个UDF上共享一个SimpleDateFormat实例可能会导致问题? def calcTimeDifference(...){ val sdf = new SimpleDateFormat(dateFormat) val dayDifference = udf{(x: String) => math.abs(Days.daysBetween(new DateTime(sdf.parse(x)),presentDate).getDays)} output = output.withColumn("days",dayDifference(myCol)) val weekDifference = udf{(x: String) => math.abs(Weeks.weeksBetween(new DateTime(sdf.parse(x)),presentDate).getWeeks)} output = output.withColumn("weeks",weekDifference(myCol)) val monthDifference = udf{(x: String) => math.abs(Months.monthsBetween(new DateTime(sdf.parse(x)),presentDate).getMonths)} output = output.withColumn("months",monthDifference(myCol)) } 解决方法
我不认为它是安全的,正如我们所知,SimpleDateFormat不是线程安全的.
因此,如果您需要,我更喜欢这种方法在Spark中使用SimpleDateFormat: import java.text.SimpleDateFormat import java.util.SimpleTimeZone /** * Thread Safe SimpleDateFormat for Spark. */ object ThreadSafeFormat extends ThreadLocal[SimpleDateFormat] { override def initialValue(): SimpleDateFormat = { val dateFormat = new SimpleDateFormat("yyyy-MM-dd:H") // if you need get UTC time,you can set UTC timezone val utcTimeZone = new SimpleTimeZone(SimpleTimeZone.UTC_TIME,"UTC") dateFormat.setTimeZone(utcTimeZone) dateFormat } } 然后使用ThreadSafeFormat.get()来获取线程安全的SimpleDateFormat来做任何事情. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |