scala - "Immortal" Spark Streaming Job? -


all right, i've asked similar question related how spark handles exceptions internally, example had wasn't clear or complete. answer there pointed me in direction can't explain things.

i've setup dummy spark streaming app , in transform stage have russian-roulette expression, might or not throw exception. if exception thrown, stop spark streaming context. that's it, no other logic, no rdd transformation.

object immortalstreamingjob extends app {   val conf = new sparkconf().setappname("fun-spark").setmaster("local[*]")   val ssc  = new streamingcontext(conf, seconds(1))    val elems = (1 1000).grouped(10)     .map(seq => ssc.sparkcontext.parallelize(seq))     .toseq   val stream = ssc.queuestream(mutable.queue[rdd[int]](elems: _*))    val transformed = stream.transform { rdd =>     try {       if (random.nextint(6) == 5) throw new runtimeexception("boom")       else println("lucky bastard")       rdd     } catch {       case e: throwable =>         println("stopping streaming context", e)         ssc.stop(stopsparkcontext = true, stopgracefully = false)         throw e     }   }    transformed.foreachrdd { rdd =>     println(rdd.collect().mkstring(","))   }    ssc.start()   ssc.awaittermination() } 

running in intellij throw exception @ point. fun part:

  • if exception thrown in first transformation (when first rdd processed), spark context stopped , app dies, want
  • if exception thrown after @ least 1 rdd has been processed, app hangs after printing error message , never stops, not want

why app hang instead of dying in second case?


i'm running spark 2.1.0 on scala 2.11.8. getting out try-catch solves problem (spark stops itself). also, moving out try-catch inside foreachrdd solves problem.

however i'm looking answer can me understand what's going on in particular example.

you see exceptions manifest in actions (like foreachrdd in case) rather transformations (like transform in case) because actions execute transformations lazily. means transformations won't occur until action. reason why necessary demands changing mental model of how distributed processing works.

consider conventional, single-threaded program. code proceeds line-by-line, , if exception thrown , not handled, subsequent lines of code don't execute. in distributed system same spark transformations running in parallel on multiple machines (and @ different paces), should happen when exception thrown? it's not simple since exception on 1 machine independent of code proceeding on other machines, how want it. want independent tasks spread throughout cluster shut down on exception single-machine thinking doesn't translate distributed paradigm. how driver supposed deal that?

according matei zaharia, of databricks , 1 of creators of spark @ berkeley, "exceptions should sent driver program , logged there (with sparkexception thrown if task fails more 4 times)." (incidentally, default number of retries can changed spark.task.maxfailures.). if log4j configured on executors, exception logged there; serialized , sent driver, try again 3 more times default.

in particular situation, guess have couple of things going on. first, running on single machine, give misleading picture of how exception handling works in distributed model. second, stopping context prematurely. stopping context is extremely destructive operation, includes stopping listeners , dagscheduler. frankly, don't know how can expect spark wrap neatly when you've turned out lights.

finally, mention more elegant exception handling model might executing transformations inside try. end potentially more cumbersome code in transformations return rdd[try[t]] or dstream[try[t]], means have handle success , failure cases each element. able propagate success , error information downstream benefits monad provides including mapping rdd[try[a]] => rdd[try[b]] , using for comprehensions (by virtue of flatmap).


Comments

Popular posts from this blog

Command prompt result in label. Python 2.7 -

javascript - How do I use URL parameters to change link href on page? -

amazon web services - AWS Route53 Trying To Get Site To Resolve To www -