scala - Why does using custom case class in Spark shell lead to serialization error? -


for life of me can't understand why not serializable. i'm running below in spark-shell (paste mode). i'm running on spark 1.3.1, cassandra 2.1.6, scala 2.10

import org.apache.spark._ import com.datastax.spark.connector._  val driverport = 7077 val driverhost = "localhost" val conf = new sparkconf(true)   .set("spark.driver.port", driverport.tostring)   .set("spark.driver.host", driverhost)   .set("spark.logconf", "true")   .set("spark.driver.allowmultiplecontexts", "true")   .set("spark.cassandra.connection.host", "localhost") val sc = new sparkcontext("local[*]", "test", conf) case class test(id: string, creationdate: string) extends serializable  sc.parallelize(seq(test("98429740-2933-11e5-8e68-f7cca436f8bf", "2015-07-13t07:48:47.924z")))   .savetocassandra("testks", "test", somecolumns("id", "creationdate"))  sc.cassandratable[test]("testks", "test").toarray sc.stop() 

i started spark-shell this:

./spark-shell -ddriver-class-path=/usr/local/spark/libs/* -dsun.io.serialization.extendeddebuginfo=true 

didn't see difference in inclusion of -dsun.io.serialization.extendeddebuginfo=true property.

full error (edited):

java.lang.reflect.invocationtargetexception         @ sun.reflect.nativemethodaccessorimpl.invoke0(native method)         @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57)         @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43)         @ java.lang.reflect.method.invoke(method.java:606)         @ org.apache.spark.serializer.serializationdebugger$objectstreamclassmethods$.getobjfieldvalues$extension(serializationdebugger.scala:240)         @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializable(serializationdebugger.scala:150)         @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:99)         @ org.apache.spark.serializer.serializationdebugger$.find(serializationdebugger.scala:58)         @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:39)         @ org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:47)         @ org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:80)         @ org.apache.spark.scheduler.task$.serializewithdependencies(task.scala:149)         @ org.apache.spark.scheduler.tasksetmanager.resourceoffer(tasksetmanager.scala:464)         @ org.apache.spark.scheduler.taskschedulerimpl$$anonfun$org$apache$spark$scheduler$taskschedulerimpl$$resourceoffersingletaskset$1.apply$mcvi$sp(taskschedulerimpl.scala:232)         @ scala.collection.immutable.range.foreach$mvc$sp(range.scala:141)         @ org.apache.spark.scheduler.taskschedulerimpl.org$apache$spark$scheduler$taskschedulerimpl$$resourceoffersingletaskset(taskschedulerimpl.scala:227)         @ org.apache.spark.scheduler.taskschedulerimpl$$anonfun$resourceoffers$3$$anonfun$apply$6.apply(taskschedulerimpl.scala:296)         @ org.apache.spark.scheduler.taskschedulerimpl$$anonfun$resourceoffers$3$$anonfun$apply$6.apply(taskschedulerimpl.scala:294)         @ scala.collection.indexedseqoptimized$class.foreach(indexedseqoptimized.scala:33)         @ scala.collection.mutable.arrayops$ofref.foreach(arrayops.scala:108)         @ org.apache.spark.scheduler.taskschedulerimpl$$anonfun$resourceoffers$3.apply(taskschedulerimpl.scala:294)         @ org.apache.spark.scheduler.taskschedulerimpl$$anonfun$resourceoffers$3.apply(taskschedulerimpl.scala:294)         @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59)         @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47)         @ org.apache.spark.scheduler.taskschedulerimpl.resourceoffers(taskschedulerimpl.scala:294)         @ org.apache.spark.scheduler.local.localactor.reviveoffers(localbackend.scala:81)         @ org.apache.spark.scheduler.local.localactor$$anonfun$receivewithlogging$1.applyorelse(localbackend.scala:63)         @ scala.runtime.abstractpartialfunction$mcvl$sp.apply$mcvl$sp(abstractpartialfunction.scala:33)         @ scala.runtime.abstractpartialfunction$mcvl$sp.apply(abstractpartialfunction.scala:33)         @ scala.runtime.abstractpartialfunction$mcvl$sp.apply(abstractpartialfunction.scala:25)         @ org.apache.spark.util.actorlogreceive$$anon$1.apply(actorlogreceive.scala:53)         @ org.apache.spark.util.actorlogreceive$$anon$1.apply(actorlogreceive.scala:42)         @ scala.partialfunction$class.applyorelse(partialfunction.scala:118)         @ org.apache.spark.util.actorlogreceive$$anon$1.applyorelse(actorlogreceive.scala:42)         @ akka.actor.actor$class.aroundreceive(actor.scala:465)         @ org.apache.spark.scheduler.local.localactor.aroundreceive(localbackend.scala:45)         @ akka.actor.actorcell.receivemessage(actorcell.scala:516)         @ akka.actor.actorcell.invoke(actorcell.scala:487)         @ akka.dispatch.mailbox.processmailbox(mailbox.scala:238)         @ akka.dispatch.mailbox.run(mailbox.scala:220)         @ akka.dispatch.forkjoinexecutorconfigurator$akkaforkjointask.exec(abstractdispatcher.scala:393)         @ scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260)         @ scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339)         @ scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979)         @ scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107) caused by: java.lang.arrayindexoutofboundsexception: 1         @ java.io.objectstreamclass$fieldreflector.getobjfieldvalues(objectstreamclass.java:2050)         @ java.io.objectstreamclass.getobjfieldvalues(objectstreamclass.java:1252)         ... 45 more 15/08/31 09:04:45 error scheduler.taskschedulerimpl: resource offer failed, task set taskset_0 not serializable 

something different worker logs:

org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1505) com.datastax.spark.connector.rddfunctions.savetocassandra(rddfunctions.scala:38) $line15.$read$$iwc$$iwc.<init>(<console>:30) $line15.$read$$iwc.<init>(<console>:51) $line15.$read.<init>(<console>:53) $line15.$read$.<init>(<console>:57) $line15.$read$.<clinit>(<console>) $line15.$eval$.<init>(<console>:7) $line15.$eval$.<clinit>(<console>) $line15.$eval.$print(<console>) sun.reflect.nativemethodaccessorimpl.invoke0(native method) sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) java.lang.reflect.method.invoke(method.java:606) org.apache.spark.repl.sparkimain$readevalprint.call(sparkimain.scala:1065) org.apache.spark.repl.sparkimain$request.loadandrun(sparkimain.scala:1338) org.apache.spark.repl.sparkimain.loadandrunreq$1(sparkimain.scala:840) org.apache.spark.repl.sparkimain.interpret(sparkimain.scala:871) org.apache.spark.repl.sparkimain.interpret(sparkimain.scala:819) org.apache.spark.repl.sparkiloop.org$apache$spark$repl$sparkiloop$$pastecommand(sparkiloop.scala:824) 

i'm pretty sure reason error scala repl wraps expressions object before compilation , eval phases (see is reasonable use scala's repl comparative performance benchmarks?). while wrapping expression, grabs objects environment in many non-serializable vals can't safely sent out driver (a remote process).

a solution define case class outside spark shell , use --jars both include on classpath.


Comments

Popular posts from this blog

c# - Binding a comma separated list to a List<int> in asp.net web api -

how to prompt save As Box in Excel Interlop c# MVC 4 -

xslt 1.0 - How to access or retrieve mets content of an item from another item? -