Unable to write as sequence file with spark RDD API -


i using following code write rdd sequence file

  @test   def testsparkwordcount(): unit = {     val words = array("hello", "hello", "world", "hello", "welcome", "world")     val conf = new sparkconf().setmaster("local").setappname("testsparkwordcount")     val sc = new sparkcontext(conf)      val dir = "file:///" + system.currenttimemillis()     sc.parallelize(words).map(x => (x, 1)).saveashadoopfile(       dir,       classof[text],       classof[intwritable],       classof[org.apache.hadoop.mapred.sequencefileoutputformat[text, intwritable]]     )      sc.stop()   } 

when run it, complains

caused by: java.io.ioexception: wrong key class: java.lang.string not class org.apache.hadoop.io.text     @ org.apache.hadoop.io.sequencefile$writer.append(sequencefile.java:1373)     @ org.apache.hadoop.mapred.sequencefileoutputformat$1.write(sequencefileoutputformat.java:76)     @ org.apache.spark.internal.io.sparkhadoopwriter.write(sparkhadoopwriter.scala:94)     @ org.apache.spark.rdd.pairrddfunctions$$anonfun$saveashadoopdataset$1$$anonfun$12$$anonfun$apply$4.apply$mcv$sp(pairrddfunctions.scala:1139)     @ org.apache.spark.rdd.pairrddfunctions$$anonfun$saveashadoopdataset$1$$anonfun$12$$anonfun$apply$4.apply(pairrddfunctions.scala:1137)     @ org.apache.spark.rdd.pairrddfunctions$$anonfun$saveashadoopdataset$1$$anonfun$12$$anonfun$apply$4.apply(pairrddfunctions.scala:1137)     @ org.apache.spark.util.utils$.trywithsafefinallyandfailurecallbacks(utils.scala:1360)     @ org.apache.spark.rdd.pairrddfunctions$$anonfun$saveashadoopdataset$1$$anonfun$12.apply(pairrddfunctions.scala:1145)     @ org.apache.spark.rdd.pairrddfunctions$$anonfun$saveashadoopdataset$1$$anonfun$12.apply(pairrddfunctions.scala:1125)     @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:87) 

should have use sc.parallelize(words).map(x => (new text(x), new intwritable(1)) instead of sc.parallelize(words).map(x => (x, 1))? don't think have wrap explicitly since sparkcontext has provides implicits wrap premitive types corresponding writables.

so, should make piece of code work

yes, sparkcontext provides implicits conversion. conversion not applied during saving, must used in usual scala way:

import org.apache.spark.sparkcontext._ val mapperfunction: string=> (text,intwritable) = x => (x, 1) ... parallelize(words).map(mapperfunction).saveashadoopfile ... 

Comments

Popular posts from this blog

Command prompt result in label. Python 2.7 -

javascript - How do I use URL parameters to change link href on page? -

amazon web services - AWS Route53 Trying To Get Site To Resolve To www -