scala - How to join two RDDs by key to get RDD of (String, String)? -
i have 2 paired rdds in form rdd [(string, mutable.hashset[string]):
for example:
rdd1: 332101231222, "320758, 320762, 320760, 320759, 320757, 320761" rdd2: 332101231222, "220758, 220762, 220760, 220759, 220757, 220761"
i want combine rdd1 , rdd2 based on common keys, o/p should like: 332101231222 320758, 320762, 320760, 320759, 320757, 320761 220758, 220762, 220760, 220759, 220757, 220761
here code:
def cogrouptest (rdd1: rdd [(string, mutable.hashset[string])], rdd2: rdd [(string, mutable.hashset[string])] ): unit = { val prods_per_user_co_grouped = (rdd1).cogroup(rdd2) prods_per_user_co_grouped.map { case (key: string, (value1: mutable.hashset[string], value2: mutable.hashset[string])) => { val combinedhs = value1 ++ value2 val sstr = combinedhs.mkstring("\t") val keypadded = key + "\t" s"$keypadded$sstr" } }.saveastextfile("/scratch/rdds_joined/")
here error when run program:
scala.matcherror: (32101231222,(compactbuffer(set(320758, 320762, 320760, 320759, 320757, 320761)),compactbuffer(set(220758, 220762, 220760, 220759, 220757, 220761)))) (of class scala.tuple2)
any great!
as might guess name cogroup
groups observations key. means in case get:
(string, (iterable[mutable.hashset[string]], iterable[mutable.hashset[string]]))
not
(string, (mutable.hashset[string], mutable.hashset[string]))
it pretty clear when take @ error get. if want combine pairs should use join
method. if not should adjust pattern match structure , use this:
val combinedhs = value1.reduce(_ ++ _) ++ value2.reduce(_ ++ _)
Comments
Post a Comment