Unable to write as sequence file with spark RDD API -
i using following code write rdd sequence file
@test def testsparkwordcount(): unit = { val words = array("hello", "hello", "world", "hello", "welcome", "world") val conf = new sparkconf().setmaster("local").setappname("testsparkwordcount") val sc = new sparkcontext(conf) val dir = "file:///" + system.currenttimemillis() sc.parallelize(words).map(x => (x, 1)).saveashadoopfile( dir, classof[text], classof[intwritable], classof[org.apache.hadoop.mapred.sequencefileoutputformat[text, intwritable]] ) sc.stop() }
when run it, complains
caused by: java.io.ioexception: wrong key class: java.lang.string not class org.apache.hadoop.io.text @ org.apache.hadoop.io.sequencefile$writer.append(sequencefile.java:1373) @ org.apache.hadoop.mapred.sequencefileoutputformat$1.write(sequencefileoutputformat.java:76) @ org.apache.spark.internal.io.sparkhadoopwriter.write(sparkhadoopwriter.scala:94) @ org.apache.spark.rdd.pairrddfunctions$$anonfun$saveashadoopdataset$1$$anonfun$12$$anonfun$apply$4.apply$mcv$sp(pairrddfunctions.scala:1139) @ org.apache.spark.rdd.pairrddfunctions$$anonfun$saveashadoopdataset$1$$anonfun$12$$anonfun$apply$4.apply(pairrddfunctions.scala:1137) @ org.apache.spark.rdd.pairrddfunctions$$anonfun$saveashadoopdataset$1$$anonfun$12$$anonfun$apply$4.apply(pairrddfunctions.scala:1137) @ org.apache.spark.util.utils$.trywithsafefinallyandfailurecallbacks(utils.scala:1360) @ org.apache.spark.rdd.pairrddfunctions$$anonfun$saveashadoopdataset$1$$anonfun$12.apply(pairrddfunctions.scala:1145) @ org.apache.spark.rdd.pairrddfunctions$$anonfun$saveashadoopdataset$1$$anonfun$12.apply(pairrddfunctions.scala:1125) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:87)
should have use sc.parallelize(words).map(x => (new text(x), new intwritable(1))
instead of sc.parallelize(words).map(x => (x, 1))
? don't think have wrap explicitly since sparkcontext has provides implicits wrap premitive types corresponding writables.
so, should make piece of code work
yes, sparkcontext provides implicits conversion. conversion not applied during saving, must used in usual scala way:
import org.apache.spark.sparkcontext._ val mapperfunction: string=> (text,intwritable) = x => (x, 1) ... parallelize(words).map(mapperfunction).saveashadoopfile ...
Comments
Post a Comment