Adding row index to pyspark dataframe (to add a new column/concatenate dataframes side-by-side)Spark Dataset unique id performance - row_number vs monotonically_increasing_idHow to add new column to dataframe in pysparkAdd new keys to a dictionary?Add one row to pandas DataFrameSelecting multiple columns in a pandas dataframeAdding new column to existing DataFrame in Python pandasDelete column from pandas DataFrameHow to iterate over rows in a DataFrame in Pandas?Select rows from a DataFrame based on values in a column in pandasGet list from pandas DataFrame column headersSpark: count events based on two columnshorizontally joining multiple dataframes in pyspark
Adding a (stair/baby) gate without facing walls
What kind of horizontal stabilizer does a Boeing 737 have?
Why does macOS create file mounts for each app?
Patio gate not at right angle to the house
Gold Battle KoTH
Was Donald Trump at ground zero helping out on 9-11?
Can birds evolve without trees?
How did Biff return to 2015 from 1955 without a lightning strike?
If the Moon were impacted by a suitably sized meteor, how long would it take to impact the Earth?
Should students have access to past exams or an exam bank?
Derivative is just speed of change?
Best Ergonomic Design for a handheld ranged weapon
Would people understand me speaking German all over Europe?
Numerically Stable IIR filter
Balancing Humanoid fantasy races: Elves
Scam? Checks via Email
A conjectural trigonometric identity
how can I calculate confidence interval with small sample in R?
Why is the Searing Smite spell not listed in the Roll20 spell list?
How to structure presentation to avoid getting questions that will be answered later in the presentation?
How would a lunar colony attack Earth?
Why don't short runways use ramps for takeoff?
Russian pronunciation of /etc (a directory)
Coworker mumbles to herself when working, how to ask her to stop?
Adding row index to pyspark dataframe (to add a new column/concatenate dataframes side-by-side)
Spark Dataset unique id performance - row_number vs monotonically_increasing_idHow to add new column to dataframe in pysparkAdd new keys to a dictionary?Add one row to pandas DataFrameSelecting multiple columns in a pandas dataframeAdding new column to existing DataFrame in Python pandasDelete column from pandas DataFrameHow to iterate over rows in a DataFrame in Pandas?Select rows from a DataFrame based on values in a column in pandasGet list from pandas DataFrame column headersSpark: count events based on two columnshorizontally joining multiple dataframes in pyspark
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;
I was trying to concatenate two dataframes side by side. And I saw this. On the description for the monotonically_increasing_id() it says:
"monotonically_increasing_id() - Returns monotonically increasing 64-bit integers. The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits represent the record number within each partition. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records. The function is non-deterministic because its result depends on partition IDs."
I'm trying to understand how we are presuming that monotonically_increasing_id() produces the same results for both of these dataframes to join on since it is non-deterministic. If it produces different row_numbers for these dataframes, then they wouldn't join. The 'result depends on partition IDs' part might be the answer, but I don't understand this. Can someone explain?
python apache-spark pyspark
add a comment |
I was trying to concatenate two dataframes side by side. And I saw this. On the description for the monotonically_increasing_id() it says:
"monotonically_increasing_id() - Returns monotonically increasing 64-bit integers. The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits represent the record number within each partition. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records. The function is non-deterministic because its result depends on partition IDs."
I'm trying to understand how we are presuming that monotonically_increasing_id() produces the same results for both of these dataframes to join on since it is non-deterministic. If it produces different row_numbers for these dataframes, then they wouldn't join. The 'result depends on partition IDs' part might be the answer, but I don't understand this. Can someone explain?
python apache-spark pyspark
add a comment |
I was trying to concatenate two dataframes side by side. And I saw this. On the description for the monotonically_increasing_id() it says:
"monotonically_increasing_id() - Returns monotonically increasing 64-bit integers. The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits represent the record number within each partition. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records. The function is non-deterministic because its result depends on partition IDs."
I'm trying to understand how we are presuming that monotonically_increasing_id() produces the same results for both of these dataframes to join on since it is non-deterministic. If it produces different row_numbers for these dataframes, then they wouldn't join. The 'result depends on partition IDs' part might be the answer, but I don't understand this. Can someone explain?
python apache-spark pyspark
I was trying to concatenate two dataframes side by side. And I saw this. On the description for the monotonically_increasing_id() it says:
"monotonically_increasing_id() - Returns monotonically increasing 64-bit integers. The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits represent the record number within each partition. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records. The function is non-deterministic because its result depends on partition IDs."
I'm trying to understand how we are presuming that monotonically_increasing_id() produces the same results for both of these dataframes to join on since it is non-deterministic. If it produces different row_numbers for these dataframes, then they wouldn't join. The 'result depends on partition IDs' part might be the answer, but I don't understand this. Can someone explain?
python apache-spark pyspark
python apache-spark pyspark
edited Mar 28 at 13:50
user2816215
asked Mar 26 at 22:42
user2816215user2816215
1112 silver badges12 bronze badges
1112 silver badges12 bronze badges
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
This is the best way that I've found so far to add an index to a dataframe df
:
new_columns = df.columns + ["row_idx"]
# Adding row index
df = df
.rdd
.zipWithIndex()
.map(lambda(row, rowindex): row + (rowindex,)).toDF()
# Renaming all the columns
df = df.toDF(*new_columns)
It does have the overhead of converting to rdd
and then back to the dataframe. However, monotonically_increasing_id()
is non-deterministic and row_number()
requires a Window
, which may not be ideal unless used with PARTITION BY
, otherwise it shuffles all the data to one partition, defeating the purpose of pyspark.
So, to add a list as a new column in a dataframe, simply convert the list to a dataframe
new_df = spark.createDataFrame([(l,) for l in lst], ['new_col'])
and add row_number to it like above. Then join,
joined_df = df.join(new_df, ['row_idx'], 'inner')
add a comment |
This is from my experience. monotonically_increasing_id() has some gnarliness to it. For small use cases you will always get a generically increasing ID. However if you are having complex shuffles or data usage issues, it can and will not increase at the same value each tick. By this I mean DF1 went from 1->~100000000 however during a reshuffle DF2 was recalculated again from Spark lazy implementation it went from 1->~48000000 then 48000001.23->100000000.23. This meant I lost a ton of rows.
How I solved the problem was through unique Row_IDs. To do this I had a function called Row_Hash, below, it would go through and build a unique row ID at the front of the column. Not matter how many shuffles or data writes there were I maintained the uniqueness of my join conditions.
EDIT: What I am going to do is turn all elements of the metadata of the data frame into arrays. The reason for this is that you can specify what elements of an array you want to query. This is different from a data frame, because of shuffle and repartitions, calling a take(n) might give different results however calling a array(n) will always output the same results.
With this in mind, lets return to the problem we need to create a local row identifier where there is none. To do this we are fully concatenating the rows (this is for scenarios where there are no row-keys), calling an MD5 on top of the product (yes there is a chance of intersection but it is exceedingly low). This will yield a large string character for each row, making it separate from the rest of the system, allowing the user to use it as a unique row-join key.
#Call in the input data frame
val inputDF = ...
#Returns a array of string on the columns of input dataframe
val columnArray = inputDF.columns
#In Scala a variable allows us to dynamically augment and update the value
#This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields.
var commandString = "md5(concat("
#This will be a set of string of actions we want Spark to run on our columns.
#The reason we are passing through the names is because we want to return the base columns.
#Think of a select query
var commandArray = columnArray
#This is an iterator where we are going to move 1->n, n being the last element of the number of columns
var columnIterator = 1
#Run while there are still columns we have not acted upon.
while(columnIterator<=columnArray.length)
#We are going to take an N element from the columns and build a statement to cast it as a string
commandString = "cast(" + columnArray(columnIterator-1) + " as string)"
#This loop checks if we are not the last element of the column array, if so we add
#in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
if (columnIterator!=columnArray.length) commandString = commandString + ", "
#Iterator
columnIterator = columnIterator + 1
#I am appending the command we just build to the from of the command array with
#a few extra characters to end the local command and name it something consistent.
#So if we have a DF of Name, Addr, Date; this below statement will look like
#Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray
#Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
#Each string is its own element so based on the above example DF
#inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date)
#Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date.
#In the previous lines of code we have build out those strings into the command array
#The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
val finalDF = inputDF.selectExpr(commandArray:_*)
Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?
– user2816215
Mar 27 at 13:42
Also, what ispiiIterator
?
– user2816215
Mar 27 at 14:13
Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.
– afeldman
Mar 27 at 15:22
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55367217%2fadding-row-index-to-pyspark-dataframe-to-add-a-new-column-concatenate-dataframe%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
This is the best way that I've found so far to add an index to a dataframe df
:
new_columns = df.columns + ["row_idx"]
# Adding row index
df = df
.rdd
.zipWithIndex()
.map(lambda(row, rowindex): row + (rowindex,)).toDF()
# Renaming all the columns
df = df.toDF(*new_columns)
It does have the overhead of converting to rdd
and then back to the dataframe. However, monotonically_increasing_id()
is non-deterministic and row_number()
requires a Window
, which may not be ideal unless used with PARTITION BY
, otherwise it shuffles all the data to one partition, defeating the purpose of pyspark.
So, to add a list as a new column in a dataframe, simply convert the list to a dataframe
new_df = spark.createDataFrame([(l,) for l in lst], ['new_col'])
and add row_number to it like above. Then join,
joined_df = df.join(new_df, ['row_idx'], 'inner')
add a comment |
This is the best way that I've found so far to add an index to a dataframe df
:
new_columns = df.columns + ["row_idx"]
# Adding row index
df = df
.rdd
.zipWithIndex()
.map(lambda(row, rowindex): row + (rowindex,)).toDF()
# Renaming all the columns
df = df.toDF(*new_columns)
It does have the overhead of converting to rdd
and then back to the dataframe. However, monotonically_increasing_id()
is non-deterministic and row_number()
requires a Window
, which may not be ideal unless used with PARTITION BY
, otherwise it shuffles all the data to one partition, defeating the purpose of pyspark.
So, to add a list as a new column in a dataframe, simply convert the list to a dataframe
new_df = spark.createDataFrame([(l,) for l in lst], ['new_col'])
and add row_number to it like above. Then join,
joined_df = df.join(new_df, ['row_idx'], 'inner')
add a comment |
This is the best way that I've found so far to add an index to a dataframe df
:
new_columns = df.columns + ["row_idx"]
# Adding row index
df = df
.rdd
.zipWithIndex()
.map(lambda(row, rowindex): row + (rowindex,)).toDF()
# Renaming all the columns
df = df.toDF(*new_columns)
It does have the overhead of converting to rdd
and then back to the dataframe. However, monotonically_increasing_id()
is non-deterministic and row_number()
requires a Window
, which may not be ideal unless used with PARTITION BY
, otherwise it shuffles all the data to one partition, defeating the purpose of pyspark.
So, to add a list as a new column in a dataframe, simply convert the list to a dataframe
new_df = spark.createDataFrame([(l,) for l in lst], ['new_col'])
and add row_number to it like above. Then join,
joined_df = df.join(new_df, ['row_idx'], 'inner')
This is the best way that I've found so far to add an index to a dataframe df
:
new_columns = df.columns + ["row_idx"]
# Adding row index
df = df
.rdd
.zipWithIndex()
.map(lambda(row, rowindex): row + (rowindex,)).toDF()
# Renaming all the columns
df = df.toDF(*new_columns)
It does have the overhead of converting to rdd
and then back to the dataframe. However, monotonically_increasing_id()
is non-deterministic and row_number()
requires a Window
, which may not be ideal unless used with PARTITION BY
, otherwise it shuffles all the data to one partition, defeating the purpose of pyspark.
So, to add a list as a new column in a dataframe, simply convert the list to a dataframe
new_df = spark.createDataFrame([(l,) for l in lst], ['new_col'])
and add row_number to it like above. Then join,
joined_df = df.join(new_df, ['row_idx'], 'inner')
edited Mar 27 at 14:59
answered Mar 27 at 14:53
user2816215user2816215
1112 silver badges12 bronze badges
1112 silver badges12 bronze badges
add a comment |
add a comment |
This is from my experience. monotonically_increasing_id() has some gnarliness to it. For small use cases you will always get a generically increasing ID. However if you are having complex shuffles or data usage issues, it can and will not increase at the same value each tick. By this I mean DF1 went from 1->~100000000 however during a reshuffle DF2 was recalculated again from Spark lazy implementation it went from 1->~48000000 then 48000001.23->100000000.23. This meant I lost a ton of rows.
How I solved the problem was through unique Row_IDs. To do this I had a function called Row_Hash, below, it would go through and build a unique row ID at the front of the column. Not matter how many shuffles or data writes there were I maintained the uniqueness of my join conditions.
EDIT: What I am going to do is turn all elements of the metadata of the data frame into arrays. The reason for this is that you can specify what elements of an array you want to query. This is different from a data frame, because of shuffle and repartitions, calling a take(n) might give different results however calling a array(n) will always output the same results.
With this in mind, lets return to the problem we need to create a local row identifier where there is none. To do this we are fully concatenating the rows (this is for scenarios where there are no row-keys), calling an MD5 on top of the product (yes there is a chance of intersection but it is exceedingly low). This will yield a large string character for each row, making it separate from the rest of the system, allowing the user to use it as a unique row-join key.
#Call in the input data frame
val inputDF = ...
#Returns a array of string on the columns of input dataframe
val columnArray = inputDF.columns
#In Scala a variable allows us to dynamically augment and update the value
#This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields.
var commandString = "md5(concat("
#This will be a set of string of actions we want Spark to run on our columns.
#The reason we are passing through the names is because we want to return the base columns.
#Think of a select query
var commandArray = columnArray
#This is an iterator where we are going to move 1->n, n being the last element of the number of columns
var columnIterator = 1
#Run while there are still columns we have not acted upon.
while(columnIterator<=columnArray.length)
#We are going to take an N element from the columns and build a statement to cast it as a string
commandString = "cast(" + columnArray(columnIterator-1) + " as string)"
#This loop checks if we are not the last element of the column array, if so we add
#in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
if (columnIterator!=columnArray.length) commandString = commandString + ", "
#Iterator
columnIterator = columnIterator + 1
#I am appending the command we just build to the from of the command array with
#a few extra characters to end the local command and name it something consistent.
#So if we have a DF of Name, Addr, Date; this below statement will look like
#Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray
#Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
#Each string is its own element so based on the above example DF
#inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date)
#Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date.
#In the previous lines of code we have build out those strings into the command array
#The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
val finalDF = inputDF.selectExpr(commandArray:_*)
Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?
– user2816215
Mar 27 at 13:42
Also, what ispiiIterator
?
– user2816215
Mar 27 at 14:13
Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.
– afeldman
Mar 27 at 15:22
add a comment |
This is from my experience. monotonically_increasing_id() has some gnarliness to it. For small use cases you will always get a generically increasing ID. However if you are having complex shuffles or data usage issues, it can and will not increase at the same value each tick. By this I mean DF1 went from 1->~100000000 however during a reshuffle DF2 was recalculated again from Spark lazy implementation it went from 1->~48000000 then 48000001.23->100000000.23. This meant I lost a ton of rows.
How I solved the problem was through unique Row_IDs. To do this I had a function called Row_Hash, below, it would go through and build a unique row ID at the front of the column. Not matter how many shuffles or data writes there were I maintained the uniqueness of my join conditions.
EDIT: What I am going to do is turn all elements of the metadata of the data frame into arrays. The reason for this is that you can specify what elements of an array you want to query. This is different from a data frame, because of shuffle and repartitions, calling a take(n) might give different results however calling a array(n) will always output the same results.
With this in mind, lets return to the problem we need to create a local row identifier where there is none. To do this we are fully concatenating the rows (this is for scenarios where there are no row-keys), calling an MD5 on top of the product (yes there is a chance of intersection but it is exceedingly low). This will yield a large string character for each row, making it separate from the rest of the system, allowing the user to use it as a unique row-join key.
#Call in the input data frame
val inputDF = ...
#Returns a array of string on the columns of input dataframe
val columnArray = inputDF.columns
#In Scala a variable allows us to dynamically augment and update the value
#This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields.
var commandString = "md5(concat("
#This will be a set of string of actions we want Spark to run on our columns.
#The reason we are passing through the names is because we want to return the base columns.
#Think of a select query
var commandArray = columnArray
#This is an iterator where we are going to move 1->n, n being the last element of the number of columns
var columnIterator = 1
#Run while there are still columns we have not acted upon.
while(columnIterator<=columnArray.length)
#We are going to take an N element from the columns and build a statement to cast it as a string
commandString = "cast(" + columnArray(columnIterator-1) + " as string)"
#This loop checks if we are not the last element of the column array, if so we add
#in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
if (columnIterator!=columnArray.length) commandString = commandString + ", "
#Iterator
columnIterator = columnIterator + 1
#I am appending the command we just build to the from of the command array with
#a few extra characters to end the local command and name it something consistent.
#So if we have a DF of Name, Addr, Date; this below statement will look like
#Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray
#Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
#Each string is its own element so based on the above example DF
#inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date)
#Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date.
#In the previous lines of code we have build out those strings into the command array
#The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
val finalDF = inputDF.selectExpr(commandArray:_*)
Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?
– user2816215
Mar 27 at 13:42
Also, what ispiiIterator
?
– user2816215
Mar 27 at 14:13
Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.
– afeldman
Mar 27 at 15:22
add a comment |
This is from my experience. monotonically_increasing_id() has some gnarliness to it. For small use cases you will always get a generically increasing ID. However if you are having complex shuffles or data usage issues, it can and will not increase at the same value each tick. By this I mean DF1 went from 1->~100000000 however during a reshuffle DF2 was recalculated again from Spark lazy implementation it went from 1->~48000000 then 48000001.23->100000000.23. This meant I lost a ton of rows.
How I solved the problem was through unique Row_IDs. To do this I had a function called Row_Hash, below, it would go through and build a unique row ID at the front of the column. Not matter how many shuffles or data writes there were I maintained the uniqueness of my join conditions.
EDIT: What I am going to do is turn all elements of the metadata of the data frame into arrays. The reason for this is that you can specify what elements of an array you want to query. This is different from a data frame, because of shuffle and repartitions, calling a take(n) might give different results however calling a array(n) will always output the same results.
With this in mind, lets return to the problem we need to create a local row identifier where there is none. To do this we are fully concatenating the rows (this is for scenarios where there are no row-keys), calling an MD5 on top of the product (yes there is a chance of intersection but it is exceedingly low). This will yield a large string character for each row, making it separate from the rest of the system, allowing the user to use it as a unique row-join key.
#Call in the input data frame
val inputDF = ...
#Returns a array of string on the columns of input dataframe
val columnArray = inputDF.columns
#In Scala a variable allows us to dynamically augment and update the value
#This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields.
var commandString = "md5(concat("
#This will be a set of string of actions we want Spark to run on our columns.
#The reason we are passing through the names is because we want to return the base columns.
#Think of a select query
var commandArray = columnArray
#This is an iterator where we are going to move 1->n, n being the last element of the number of columns
var columnIterator = 1
#Run while there are still columns we have not acted upon.
while(columnIterator<=columnArray.length)
#We are going to take an N element from the columns and build a statement to cast it as a string
commandString = "cast(" + columnArray(columnIterator-1) + " as string)"
#This loop checks if we are not the last element of the column array, if so we add
#in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
if (columnIterator!=columnArray.length) commandString = commandString + ", "
#Iterator
columnIterator = columnIterator + 1
#I am appending the command we just build to the from of the command array with
#a few extra characters to end the local command and name it something consistent.
#So if we have a DF of Name, Addr, Date; this below statement will look like
#Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray
#Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
#Each string is its own element so based on the above example DF
#inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date)
#Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date.
#In the previous lines of code we have build out those strings into the command array
#The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
val finalDF = inputDF.selectExpr(commandArray:_*)
This is from my experience. monotonically_increasing_id() has some gnarliness to it. For small use cases you will always get a generically increasing ID. However if you are having complex shuffles or data usage issues, it can and will not increase at the same value each tick. By this I mean DF1 went from 1->~100000000 however during a reshuffle DF2 was recalculated again from Spark lazy implementation it went from 1->~48000000 then 48000001.23->100000000.23. This meant I lost a ton of rows.
How I solved the problem was through unique Row_IDs. To do this I had a function called Row_Hash, below, it would go through and build a unique row ID at the front of the column. Not matter how many shuffles or data writes there were I maintained the uniqueness of my join conditions.
EDIT: What I am going to do is turn all elements of the metadata of the data frame into arrays. The reason for this is that you can specify what elements of an array you want to query. This is different from a data frame, because of shuffle and repartitions, calling a take(n) might give different results however calling a array(n) will always output the same results.
With this in mind, lets return to the problem we need to create a local row identifier where there is none. To do this we are fully concatenating the rows (this is for scenarios where there are no row-keys), calling an MD5 on top of the product (yes there is a chance of intersection but it is exceedingly low). This will yield a large string character for each row, making it separate from the rest of the system, allowing the user to use it as a unique row-join key.
#Call in the input data frame
val inputDF = ...
#Returns a array of string on the columns of input dataframe
val columnArray = inputDF.columns
#In Scala a variable allows us to dynamically augment and update the value
#This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields.
var commandString = "md5(concat("
#This will be a set of string of actions we want Spark to run on our columns.
#The reason we are passing through the names is because we want to return the base columns.
#Think of a select query
var commandArray = columnArray
#This is an iterator where we are going to move 1->n, n being the last element of the number of columns
var columnIterator = 1
#Run while there are still columns we have not acted upon.
while(columnIterator<=columnArray.length)
#We are going to take an N element from the columns and build a statement to cast it as a string
commandString = "cast(" + columnArray(columnIterator-1) + " as string)"
#This loop checks if we are not the last element of the column array, if so we add
#in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
if (columnIterator!=columnArray.length) commandString = commandString + ", "
#Iterator
columnIterator = columnIterator + 1
#I am appending the command we just build to the from of the command array with
#a few extra characters to end the local command and name it something consistent.
#So if we have a DF of Name, Addr, Date; this below statement will look like
#Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray
#Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
#Each string is its own element so based on the above example DF
#inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date)
#Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date.
#In the previous lines of code we have build out those strings into the command array
#The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
val finalDF = inputDF.selectExpr(commandArray:_*)
edited Mar 27 at 16:16
answered Mar 27 at 1:47
afeldmanafeldman
1748 bronze badges
1748 bronze badges
Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?
– user2816215
Mar 27 at 13:42
Also, what ispiiIterator
?
– user2816215
Mar 27 at 14:13
Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.
– afeldman
Mar 27 at 15:22
add a comment |
Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?
– user2816215
Mar 27 at 13:42
Also, what ispiiIterator
?
– user2816215
Mar 27 at 14:13
Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.
– afeldman
Mar 27 at 15:22
Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?
– user2816215
Mar 27 at 13:42
Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?
– user2816215
Mar 27 at 13:42
Also, what is
piiIterator
?– user2816215
Mar 27 at 14:13
Also, what is
piiIterator
?– user2816215
Mar 27 at 14:13
Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.
– afeldman
Mar 27 at 15:22
Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.
– afeldman
Mar 27 at 15:22
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55367217%2fadding-row-index-to-pyspark-dataframe-to-add-a-new-column-concatenate-dataframe%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown