加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 在所有工作人员上拥有静态对象的正确方法是什么?

发布时间:2020-12-16 09:17:44 所属栏目:安全 来源:网络整理
导读:我一直在查看spark的文档,并提到这一点: Spark’s API relies heavily on passing functions in the driver program to run on the cluster. There are two recommended ways to do this: Anonymous function syntax,which can be used for short pieces of
我一直在查看spark的文档,并提到这一点:

Spark’s API relies heavily on passing functions in the driver program
to run on the cluster. There are two recommended ways to do this:

Anonymous function syntax,which can be used for short pieces of code.
Static methods in a global singleton object. For example,you can
define object MyFunctions and then pass MyFunctions.func1,as follows:

object MyFunctions {   def func1(s: String): String = { ... } }

myRdd.map(MyFunctions.func1)

Note that while it is also possible to
pass a reference to a method in a class instance (as opposed to a
singleton object),this requires sending the object that contains that
class along with the method. For example,consider:

class MyClass {   
  def func1(s: String): String = { ... }   
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) } 
}

Here,if we create a new MyClass and call doStuff on it,the map inside there
references the func1 method of that MyClass instance,so the whole
object needs to be sent to the cluster. It is similar to writing
rdd.map(x => this.func1(x)).

现在我怀疑,如果你在单例对象上有属性(应该等于静态的话)会发生什么.同样的例子有一个小的改变:

object MyClass {   
  val value = 1   
  def func1(s: String): String = { s + value }   
} 

myRdd.map(MyClass.func1)

所以这个函数仍然被静态引用,但是Spark通过尝试序列化所有被引用的变量有多远?它将序列化值还是会在远程工作人员中重新初始化?

另外,这一切都在上下文中,我在单例对象中有一些沉重的模型,我想找到正确的方法来将它们序列化到工作人员,同时保持将它们从单独的引用到任何地方,而不是传递给它们功能参数跨越相当深的函数调用堆栈.

关于什么/如何/什么时候Spark序列化事情的任何深入的信息将不胜感激.

解决方法

这更少是关于Spark的问题,更多的是Scala生成代码的问题.请记住,Scala对象几乎是一个充满静态方法的Java类.考虑一个简单的例子:

object foo {

  val value = 42

  def func(i: Int): Int = i + value

  def main(args: Array[String]): Unit = {
    println(Seq(1,2,3).map(func).sum)
  }

}

那将被翻译成3个Java类;其中一个将是关闭,它是map方法的一个参数.在该类上使用javap会产生如下所示:

public final class foo$$anonfun$main$1 extends scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable {
  public static final long serialVersionUID;
  public final int apply(int);
  public int apply$mcII$sp(int);
  public final java.lang.Object apply(java.lang.Object);
  public foo$$anonfun$main$1();
}

注意没有字段或任何东西.如果你看看反汇编的字节码,它所做的就是调用func()方法.当在Spark中运行时,这是将被序列化的实例;因为它没有字段,没有太多的序列化.

对于您的问题,如何初始化静态对象,您可以在开始关闭时调用一个幂等初始化函数.第一个将触发初始化,后续的调用将是no-ops.然而,清理是一件非常棘手的事情,因为我不熟悉一个API,它会像“在所有执行者上运行这个代码”一样.

在“setup()和cleanup()”部分中解释了一个可能有用的方法,如果需要清理,则会说明in this blog.

编辑:只是为了澄清,这里是实际进行调用的方法的反汇编.

public int apply$mcII$sp(int);
  Code:
   0:   getstatic       #29; //Field foo$.MODULE$:Lfoo$;
   3:   iload_1
   4:   invokevirtual   #32; //Method foo$.func:(I)I
   7:   ireturn

看看它如何引用持有单例的静态字段并调用func()方法.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读