java - Why Map record output=0 , even when I am giving output in mapper -
i have tried lot , not understanding reason why mapper record output =0 . want mapper read lines more once working on big data , need data on each line more once trying first small file(graph.txt) contain --
1,2,4,6 2,10,3,7 3,6,5,8 4,7,7,9 5,13,9,9
but mapper deal file line line there no other way first store values in file when map() method called first (n-1) times , processing in last map() method call. each row in file storing data in row array. , in last map method call giving output via output.collect() function. using setup() method calculate no. of rows in file ,as setup() called once each mapper. here as input file small 1 mapper called.
i stuck @ while , new , please give solution. in advance. here's code.
driver code-
import org.apache.hadoop.fs.path; import org.apache.hadoop.io.text; import org.apache.hadoop.mapred.fileinputformat; import org.apache.hadoop.mapred.fileoutputformat; import org.apache.hadoop.mapred.jobclient; import org.apache.hadoop.mapred.jobconf; import org.apache.hadoop.mapred.textinputformat; import org.apache.hadoop.mapred.textoutputformat; public class primdriver { public static void main(string[] args) throws exception { jobconf conf = new jobconf(primdriver.class); conf.setjobname("primdriver"); conf.setoutputkeyclass(text.class); conf.setoutputvalueclass(text.class); conf.setmapperclass(primmapper.class); //conf.setcombinerclass(reduce.class); conf.setreducerclass(primreducer.class); conf.setinputformat(textinputformat.class); conf.setoutputformat(textoutputformat.class); fileinputformat.setinputpaths(conf, new path(args[0])); fileoutputformat.setoutputpath(conf, new path(args[1])); jobclient.runjob(conf); } }
mapper code -
import java.io.bufferedreader; import java.io.ioexception; import java.io.inputstreamreader; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapred.mapreducebase; import org.apache.hadoop.mapred.mapper; import org.apache.hadoop.mapred.outputcollector; import org.apache.hadoop.mapred.reporter; import org.apache.hadoop.mapreduce.mapper.context; public class primmapper extends mapreducebase implements mapper<longwritable, text, text, text> { //private final static intwritable 1 = new intwritable(1); //private text word = new text(); private int no_line=0; private int i=0; public void setup(context context) throws ioexception{ path pt=new path("hdfs:/myinput/graph.txt");//location of file in hdfs filesystem fs = filesystem.get(new configuration()); bufferedreader br=new bufferedreader(new inputstreamreader(fs.open(pt))); string line; line=br.readline(); while (line != null){ no_line=no_line+1; line=br.readline(); } } private string [][]row=new string[no_line][4]; @override public void map(longwritable key, text value, outputcollector<text, text> output, reporter reporter) throws ioexception { if (i<no_line-1){ string[] s = value.tostring().split(","); (int j=0;j<s.length;j++){ row[i][j]=(s[j]); } i=i+1; } else{ string[] s = value.tostring().split(","); (int j=0;j<s.length;j++){ //row[i][j]=integer.parseint(s[j]); } (int i=0;i<no_line-1;i++){ string a=row[i][0]; string b=row[i][1]+","+row[i][2]+","+row[i][3]; output.collect(new text(a),new text(b)); } } } }
reducer code -
import java.io.ioexception; import java.util.iterator; import org.apache.hadoop.io.text; import org.apache.hadoop.mapred.mapreducebase; import org.apache.hadoop.mapred.outputcollector; import org.apache.hadoop.mapred.reducer; import org.apache.hadoop.mapred.reporter; public class primreducer extends mapreducebase implements reducer<text, text, text, text> { public void reduce(text key, iterator<text> values, outputcollector<text, text> output, reporter reporter) throws ioexception { int = 0, b = 0 , c = 0; output.collect(new text("kishan "), new text("sharma")); while (values.hasnext()) { string val[]=(values.next().tostring()).split(","); a=integer.parseint(val[0]); b=integer.parseint(val[1]); c=integer.parseint(val[2]); } output.collect(key, new text(a+","+b+","+c)); } }
in console got logs-
[training@localhost workspace]$ hadoop jar hierarchical.jar primdriver myinput/graph.txt cluster5 17/04/07 10:21:18 warn mapred.jobclient: use genericoptionsparser parsing arguments. applications should implement tool same. 17/04/07 10:21:18 warn snappy.loadsnappy: snappy native library available 17/04/07 10:21:18 info snappy.loadsnappy: snappy native library loaded 17/04/07 10:21:18 info mapred.fileinputformat: total input paths process : 1 17/04/07 10:21:18 info mapred.jobclient: running job: job_201704070816_0007 17/04/07 10:21:19 info mapred.jobclient: map 0% reduce 0% 17/04/07 10:22:21 info mapred.jobclient: map 100% reduce 0% 17/04/07 10:22:29 info mapred.jobclient: map 100% reduce 66% 17/04/07 10:22:53 info mapred.jobclient: map 100% reduce 100% 17/04/07 10:23:22 info mapred.jobclient: job complete: job_201704070816_0007 17/04/07 10:23:22 info mapred.jobclient: counters: 33 17/04/07 10:23:22 info mapred.jobclient: file system counters 17/04/07 10:23:22 info mapred.jobclient: file: number of bytes read=6 17/04/07 10:23:22 info mapred.jobclient: file: number of bytes written=361924 17/04/07 10:23:22 info mapred.jobclient: file: number of read operations=0 17/04/07 10:23:22 info mapred.jobclient: file: number of large read operations=0 17/04/07 10:23:22 info mapred.jobclient: file: number of write operations=0 17/04/07 10:23:22 info mapred.jobclient: hdfs: number of bytes read=146 17/04/07 10:23:22 info mapred.jobclient: hdfs: number of bytes written=0 17/04/07 10:23:22 info mapred.jobclient: hdfs: number of read operations=3 17/04/07 10:23:22 info mapred.jobclient: hdfs: number of large read operations=0 17/04/07 10:23:22 info mapred.jobclient: hdfs: number of write operations=2 17/04/07 10:23:22 info mapred.jobclient: job counters 17/04/07 10:23:22 info mapred.jobclient: launched map tasks=1 17/04/07 10:23:22 info mapred.jobclient: launched reduce tasks=1 17/04/07 10:23:22 info mapred.jobclient: data-local map tasks=1 17/04/07 10:23:22 info mapred.jobclient: total time spent maps in occupied slots (ms)=90240 17/04/07 10:23:22 info mapred.jobclient: total time spent reduces in occupied slots (ms)=31777 17/04/07 10:23:22 info mapred.jobclient: total time spent maps waiting after reserving slots (ms)=0 17/04/07 10:23:22 info mapred.jobclient: total time spent reduces waiting after reserving slots (ms)=0 17/04/07 10:23:22 info mapred.jobclient: map-reduce framework 17/04/07 10:23:22 info mapred.jobclient: map input records=5 17/04/07 10:23:22 info mapred.jobclient: map output records=0 17/04/07 10:23:22 info mapred.jobclient: map output bytes=0 17/04/07 10:23:22 info mapred.jobclient: input split bytes=104 17/04/07 10:23:22 info mapred.jobclient: combine input records=0 17/04/07 10:23:22 info mapred.jobclient: combine output records=0 17/04/07 10:23:22 info mapred.jobclient: reduce input groups=0 17/04/07 10:23:22 info mapred.jobclient: reduce shuffle bytes=6 17/04/07 10:23:22 info mapred.jobclient: reduce input records=0 17/04/07 10:23:22 info mapred.jobclient: reduce output records=0 17/04/07 10:23:22 info mapred.jobclient: spilled records=0 17/04/07 10:23:22 info mapred.jobclient: cpu time spent (ms)=1240 17/04/07 10:23:22 info mapred.jobclient: physical memory (bytes) snapshot=196472832 17/04/07 10:23:22 info mapred.jobclient: virtual memory (bytes) snapshot=775897088 17/04/07 10:23:22 info mapred.jobclient: total committed heap usage (bytes)=177016832 17/04/07 10:23:22 info mapred.jobclient: org.apache.hadoop.mapreduce.lib.input.fileinputformatcounter 17/04/07 10:23:22 info mapred.jobclient: bytes_read=42
when i
smaller no_line-1
, don't collect anything. condition holds in case, , that's why not seeing map output records.
when start processing first record, no_line
initialized @ final value (the actual number of lines in input file "hdfs:/myinput/graph.txt").
at point, i
0. then, when if condition met, i
becomes 1 in specific mapper (not @ mappers).* then, i
has value 1 (in mapper) , must still less no_line - 1
. seems file graph.txt
has more 5 lines (i guess).
in summary, setup()
executed once before each map()
gets executed @ every mapper.
i have no idea want do, , seems difficult understand part. try make clearer , update question more details, if need more help. in else statement, using variable i
again seems confusing, not clear if want use local i
, or "shadowed" i
. doesn't ide give warning that?
*this bad practice, can't know values i
take in each mapper, depends on data partitioning.
Comments
Post a Comment