scala - How to join two RDDs by key to get RDD of (String, String)? -


i have 2 paired rdds in form rdd [(string, mutable.hashset[string]):

for example:

rdd1: 332101231222, "320758, 320762, 320760, 320759, 320757, 320761" rdd2: 332101231222, "220758, 220762, 220760, 220759, 220757, 220761" 

i want combine rdd1 , rdd2 based on common keys, o/p should like: 332101231222 320758, 320762, 320760, 320759, 320757, 320761 220758, 220762, 220760, 220759, 220757, 220761

here code:

def cogrouptest (rdd1: rdd [(string, mutable.hashset[string])], rdd2: rdd [(string, mutable.hashset[string])] ): unit = {  val prods_per_user_co_grouped = (rdd1).cogroup(rdd2)  prods_per_user_co_grouped.map { case (key: string, (value1: mutable.hashset[string], value2:  mutable.hashset[string])) => {  val combinedhs = value1 ++ value2  val sstr =  combinedhs.mkstring("\t")  val keypadded = key + "\t"   s"$keypadded$sstr" } }.saveastextfile("/scratch/rdds_joined/") 

here error when run program:

scala.matcherror: (32101231222,(compactbuffer(set(320758, 320762, 320760, 320759, 320757, 320761)),compactbuffer(set(220758, 220762, 220760, 220759, 220757, 220761)))) (of class scala.tuple2)

any great!

as might guess name cogroup groups observations key. means in case get:

(string, (iterable[mutable.hashset[string]], iterable[mutable.hashset[string]])) 

not

(string, (mutable.hashset[string], mutable.hashset[string])) 

it pretty clear when take @ error get. if want combine pairs should use join method. if not should adjust pattern match structure , use this:

val combinedhs = value1.reduce(_ ++ _) ++ value2.reduce(_ ++ _) 

Comments

Popular posts from this blog

c# - Binding a comma separated list to a List<int> in asp.net web api -

how to prompt save As Box in Excel Interlop c# MVC 4 -

xslt 1.0 - How to access or retrieve mets content of an item from another item? -