scala - Why does using custom case class in Spark shell lead to serialization error? -
for life of me can't understand why not serializable. i'm running below in spark-shell (paste mode). i'm running on spark 1.3.1, cassandra 2.1.6, scala 2.10
import org.apache.spark._ import com.datastax.spark.connector._ val driverport = 7077 val driverhost = "localhost" val conf = new sparkconf(true) .set("spark.driver.port", driverport.tostring) .set("spark.driver.host", driverhost) .set("spark.logconf", "true") .set("spark.driver.allowmultiplecontexts", "true") .set("spark.cassandra.connection.host", "localhost") val sc = new sparkcontext("local[*]", "test", conf) case class test(id: string, creationdate: string) extends serializable sc.parallelize(seq(test("98429740-2933-11e5-8e68-f7cca436f8bf", "2015-07-13t07:48:47.924z"))) .savetocassandra("testks", "test", somecolumns("id", "creationdate")) sc.cassandratable[test]("testks", "test").toarray sc.stop()
i started spark-shell this:
./spark-shell -ddriver-class-path=/usr/local/spark/libs/* -dsun.io.serialization.extendeddebuginfo=true
didn't see difference in inclusion of -dsun.io.serialization.extendeddebuginfo=true property.
full error (edited):
java.lang.reflect.invocationtargetexception @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:606) @ org.apache.spark.serializer.serializationdebugger$objectstreamclassmethods$.getobjfieldvalues$extension(serializationdebugger.scala:240) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializable(serializationdebugger.scala:150) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:99) @ org.apache.spark.serializer.serializationdebugger$.find(serializationdebugger.scala:58) @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:39) @ org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:47) @ org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:80) @ org.apache.spark.scheduler.task$.serializewithdependencies(task.scala:149) @ org.apache.spark.scheduler.tasksetmanager.resourceoffer(tasksetmanager.scala:464) @ org.apache.spark.scheduler.taskschedulerimpl$$anonfun$org$apache$spark$scheduler$taskschedulerimpl$$resourceoffersingletaskset$1.apply$mcvi$sp(taskschedulerimpl.scala:232) @ scala.collection.immutable.range.foreach$mvc$sp(range.scala:141) @ org.apache.spark.scheduler.taskschedulerimpl.org$apache$spark$scheduler$taskschedulerimpl$$resourceoffersingletaskset(taskschedulerimpl.scala:227) @ org.apache.spark.scheduler.taskschedulerimpl$$anonfun$resourceoffers$3$$anonfun$apply$6.apply(taskschedulerimpl.scala:296) @ org.apache.spark.scheduler.taskschedulerimpl$$anonfun$resourceoffers$3$$anonfun$apply$6.apply(taskschedulerimpl.scala:294) @ scala.collection.indexedseqoptimized$class.foreach(indexedseqoptimized.scala:33) @ scala.collection.mutable.arrayops$ofref.foreach(arrayops.scala:108) @ org.apache.spark.scheduler.taskschedulerimpl$$anonfun$resourceoffers$3.apply(taskschedulerimpl.scala:294) @ org.apache.spark.scheduler.taskschedulerimpl$$anonfun$resourceoffers$3.apply(taskschedulerimpl.scala:294) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47) @ org.apache.spark.scheduler.taskschedulerimpl.resourceoffers(taskschedulerimpl.scala:294) @ org.apache.spark.scheduler.local.localactor.reviveoffers(localbackend.scala:81) @ org.apache.spark.scheduler.local.localactor$$anonfun$receivewithlogging$1.applyorelse(localbackend.scala:63) @ scala.runtime.abstractpartialfunction$mcvl$sp.apply$mcvl$sp(abstractpartialfunction.scala:33) @ scala.runtime.abstractpartialfunction$mcvl$sp.apply(abstractpartialfunction.scala:33) @ scala.runtime.abstractpartialfunction$mcvl$sp.apply(abstractpartialfunction.scala:25) @ org.apache.spark.util.actorlogreceive$$anon$1.apply(actorlogreceive.scala:53) @ org.apache.spark.util.actorlogreceive$$anon$1.apply(actorlogreceive.scala:42) @ scala.partialfunction$class.applyorelse(partialfunction.scala:118) @ org.apache.spark.util.actorlogreceive$$anon$1.applyorelse(actorlogreceive.scala:42) @ akka.actor.actor$class.aroundreceive(actor.scala:465) @ org.apache.spark.scheduler.local.localactor.aroundreceive(localbackend.scala:45) @ akka.actor.actorcell.receivemessage(actorcell.scala:516) @ akka.actor.actorcell.invoke(actorcell.scala:487) @ akka.dispatch.mailbox.processmailbox(mailbox.scala:238) @ akka.dispatch.mailbox.run(mailbox.scala:220) @ akka.dispatch.forkjoinexecutorconfigurator$akkaforkjointask.exec(abstractdispatcher.scala:393) @ scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260) @ scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339) @ scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979) @ scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107) caused by: java.lang.arrayindexoutofboundsexception: 1 @ java.io.objectstreamclass$fieldreflector.getobjfieldvalues(objectstreamclass.java:2050) @ java.io.objectstreamclass.getobjfieldvalues(objectstreamclass.java:1252) ... 45 more 15/08/31 09:04:45 error scheduler.taskschedulerimpl: resource offer failed, task set taskset_0 not serializable
something different worker logs:
org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1505) com.datastax.spark.connector.rddfunctions.savetocassandra(rddfunctions.scala:38) $line15.$read$$iwc$$iwc.<init>(<console>:30) $line15.$read$$iwc.<init>(<console>:51) $line15.$read.<init>(<console>:53) $line15.$read$.<init>(<console>:57) $line15.$read$.<clinit>(<console>) $line15.$eval$.<init>(<console>:7) $line15.$eval$.<clinit>(<console>) $line15.$eval.$print(<console>) sun.reflect.nativemethodaccessorimpl.invoke0(native method) sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) java.lang.reflect.method.invoke(method.java:606) org.apache.spark.repl.sparkimain$readevalprint.call(sparkimain.scala:1065) org.apache.spark.repl.sparkimain$request.loadandrun(sparkimain.scala:1338) org.apache.spark.repl.sparkimain.loadandrunreq$1(sparkimain.scala:840) org.apache.spark.repl.sparkimain.interpret(sparkimain.scala:871) org.apache.spark.repl.sparkimain.interpret(sparkimain.scala:819) org.apache.spark.repl.sparkiloop.org$apache$spark$repl$sparkiloop$$pastecommand(sparkiloop.scala:824)
i'm pretty sure reason error scala repl wraps expressions object before compilation , eval phases (see is reasonable use scala's repl comparative performance benchmarks?). while wrapping expression, grabs objects environment in many non-serializable vals can't safely sent out driver (a remote process).
a solution define case class outside spark shell , use --jars
both include on classpath.
Comments
Post a Comment