scala – 为什么Spark reduceByKey的结果不一致
发布时间:2020-12-16 18:19:02 所属栏目:安全 来源:网络整理
导读:我试图通过使用 scala的spark来计算每行的迭代次数. 以下是我的意见: 1维克拉姆 2萨钦 3个shobit 4 alok 5 akul 5 akul 1维克拉姆 1维克拉姆 3个shobit 10 ashu 5 akul 1维克拉姆 2萨钦 7维克拉姆 现在我创建2个单独的RDD,如下所示. val f1 = sc.textFile("
我试图通过使用
scala的spark来计算每行的迭代次数.
以下是我的意见: 1维克拉姆 现在我创建2个单独的RDD,如下所示. val f1 = sc.textFile("hdfs:///path to above data file") val m1 = f1.map( s => (s.split(" ")(0),1) ) //creating a tuple (key,1) //now if i create a RDD as val rd1 = m1.reduceByKey((a,b) => a+b ) rd1.collect().foreach(println) //I get a proper output i.e (it gives correct output every time) //output: (4,1) (2,2) (7,1) (5,3) (3,2) (1,4) (10,1) //but if i create a RDD as val rd2 = m1.reduceByKey((a,b) => a+1 ) rd2.collect().foreach(println) //I get a inconsistent result i.e some times i get this (WRONG) //output: (4,2) (3,2) (10,1) //and sometimes I get this as output (CORRECT) //output: (4,1) 我无法理解为什么会发生这种情况以及在哪里使用什么.我也尝试过创建RDD val m2 = f1.map(s => (s,1)) val rd3 = m2.reduceByKey((a,b) => a+1 ) // Then also same issue occurs with a+1 but every thing works fine with a+b 解决方法
reduceByKey假设传递的函数是可交换的和关联的(如
docs清楚地说明).
并且 – 您的第一个函数(a,b)=> a b是,但是(a,b)=> a 1不是. 为什么? 考虑以下场景 – 输入包含4条记录,分为两个分区: (aa,1) (aa,1) (aa,1) (cc,1) 此输入的reduceByKey(f)可能计算如下: val intermediate1 = f((aa,1),(aa,1)) val intermediate2 = f((aa,(cc,1)) val result = f(intermediate2,intermediate1) 现在,让我们跟着f =(a,b)=> a b val intermediate1 = f((aa,1)) // (aa,2) val intermediate2 = f((aa,1) val result = f(intermediate2,intermediate1) // (aa,3),1) 并且f =(a,b)=> 1: val intermediate1 = f((aa,(bb,1) // this is where it goes wrong: val result = f(intermediate2,2),1) 主要的是 – 中间计算的顺序不能得到保证,并且可能在执行之间发生变化,对于后一种非交换函数的情况,这意味着结果有时是错误的. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |