scala - Spark sql Dataframe joins what's going on? -
i have 2 dataframes, simplicity lets call them left , right , i'll show sample structure.
dataframe "left": (this dataframe quite large)
src | dst ------------ b | c | b | c
dataframe "right" (this dataframe tiny)
loc | name ------------ | london b | paris
both these dataframes created using hive context , sql statement.
if run join on left dataframe follows works fine:
left.join(right, left("src") === right("loc"), "left_outer")
this returns me dataframe join expected
what trying do match on both col1 , col2 in effect trying return following
src | dst | src_loc | src_name | dst_loc | dst_name --------------------------------------------------- b | | b | paris | | london c | b | null | null | b | paris | c | | london | null | null
if try on dataframes follows entire spark job falls over, doesn't error either takes way long or going on don't understand.
val dfjoin1 = left.join(right, left("src") === right("loc"), "left_outer") dfjoin1.join(right, dfjoin1("dst") === right("loc"), "left_outer")
out of frustration tried rather reusing right dataframe, create new 1 second identical hive query
the following works me seems wrong (shouldn't need call hive twice same data)
val right = hivecontext.sql(from .....) val right2 = hivecontext.sql(from .....) val dfjoin1 = left.join(right, left("src") === right("loc"), "left_outer") dfjoin1.join(right2, dfjoin1("dst") === right2("loc"), "left_outer")
the ext problem have want filter on columns have been added, argument sake lets want ones src loc name paris.
dfjoin1.filter($"name" === "paris")
this fails because of ambiguous column names. how solve issue? can prefix columns name part of join?
not sure - think cause failure column ambiguity - when compare dfjoin1("dst") === right("loc")
might comparing dst
loc
column joined previous join operation.
in other words, believe both issues can solved more accurate column naming make sure there no ambiguities. easier way achieve (and output schema want) rename columns after each join:
val result = left .join(right, $"src" === $"loc", "left_outer") .withcolumnrenamed("loc", "src_loc") .withcolumnrenamed("name", "src_name") .join(right, $"dst" === $"loc", "left_outer") // "loc" non-ambiguous, because renamed left's "loc" .withcolumnrenamed("loc", "dst_loc") .withcolumnrenamed("name", "dst_name") result.show() // +---+---+-------+--------+-------+--------+ // |src|dst|src_loc|src_name|dst_loc|dst_name| // +---+---+-------+--------+-------+--------+ // | b| a| b| paris| a| london| // | c| b| null| null| b| paris| // | a| c| a| london| null| null| // +---+---+-------+--------+-------+--------+
an alternative approach can use dataframe.as(string)
name right dataframe before using it, each time different names. result different still usable:
left .join(right.as("src"), $"src" === $"src.loc", "left_outer") .join(right.as("dst"), $"dst" === $"dst.loc", "left_outer") .show() // +---+---+----+------+----+------+ // |src|dst| loc| name| loc| name| // +---+---+----+------+----+------+ // | b| a| b| paris| a|london| // | c| b|null| null| b| paris| // | a| c| a|london|null| null| // +---+---+----+------+----+------+
the schema shows 2 columns same name loc
, name
, can referenced relevant prefixes, e.g. src.name
or dst.loc
.
Comments
Post a Comment