Adding row index to pyspark dataframe (to add a new column/concatenate dataframes side-by-side)Spark Dataset unique id performance - row_number vs monotonically_increasing_idHow to add new column to dataframe in pysparkAdd new keys to a dictionary?Add one row to pandas DataFrameSelecting multiple columns in a pandas dataframeAdding new column to existing DataFrame in Python pandasDelete column from pandas DataFrameHow to iterate over rows in a DataFrame in Pandas?Select rows from a DataFrame based on values in a column in pandasGet list from pandas DataFrame column headersSpark: count events based on two columnshorizontally joining multiple dataframes in pyspark

Adding a (stair/baby) gate without facing walls

What kind of horizontal stabilizer does a Boeing 737 have?

Why does macOS create file mounts for each app?

Patio gate not at right angle to the house

Gold Battle KoTH

Was Donald Trump at ground zero helping out on 9-11?

Can birds evolve without trees?

How did Biff return to 2015 from 1955 without a lightning strike?

If the Moon were impacted by a suitably sized meteor, how long would it take to impact the Earth?

Should students have access to past exams or an exam bank?

Derivative is just speed of change?

Best Ergonomic Design for a handheld ranged weapon

Would people understand me speaking German all over Europe?

Numerically Stable IIR filter

Balancing Humanoid fantasy races: Elves

Scam? Checks via Email

A conjectural trigonometric identity

how can I calculate confidence interval with small sample in R?

Why is the Searing Smite spell not listed in the Roll20 spell list?

How to structure presentation to avoid getting questions that will be answered later in the presentation?

How would a lunar colony attack Earth?

Why don't short runways use ramps for takeoff?

Russian pronunciation of /etc (a directory)

Coworker mumbles to herself when working, how to ask her to stop?



Adding row index to pyspark dataframe (to add a new column/concatenate dataframes side-by-side)


Spark Dataset unique id performance - row_number vs monotonically_increasing_idHow to add new column to dataframe in pysparkAdd new keys to a dictionary?Add one row to pandas DataFrameSelecting multiple columns in a pandas dataframeAdding new column to existing DataFrame in Python pandasDelete column from pandas DataFrameHow to iterate over rows in a DataFrame in Pandas?Select rows from a DataFrame based on values in a column in pandasGet list from pandas DataFrame column headersSpark: count events based on two columnshorizontally joining multiple dataframes in pyspark






.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;








0















I was trying to concatenate two dataframes side by side. And I saw this. On the description for the monotonically_increasing_id() it says:



"monotonically_increasing_id() - Returns monotonically increasing 64-bit integers. The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits represent the record number within each partition. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records. The function is non-deterministic because its result depends on partition IDs."



I'm trying to understand how we are presuming that monotonically_increasing_id() produces the same results for both of these dataframes to join on since it is non-deterministic. If it produces different row_numbers for these dataframes, then they wouldn't join. The 'result depends on partition IDs' part might be the answer, but I don't understand this. Can someone explain?










share|improve this question
































    0















    I was trying to concatenate two dataframes side by side. And I saw this. On the description for the monotonically_increasing_id() it says:



    "monotonically_increasing_id() - Returns monotonically increasing 64-bit integers. The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits represent the record number within each partition. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records. The function is non-deterministic because its result depends on partition IDs."



    I'm trying to understand how we are presuming that monotonically_increasing_id() produces the same results for both of these dataframes to join on since it is non-deterministic. If it produces different row_numbers for these dataframes, then they wouldn't join. The 'result depends on partition IDs' part might be the answer, but I don't understand this. Can someone explain?










    share|improve this question




























      0












      0








      0








      I was trying to concatenate two dataframes side by side. And I saw this. On the description for the monotonically_increasing_id() it says:



      "monotonically_increasing_id() - Returns monotonically increasing 64-bit integers. The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits represent the record number within each partition. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records. The function is non-deterministic because its result depends on partition IDs."



      I'm trying to understand how we are presuming that monotonically_increasing_id() produces the same results for both of these dataframes to join on since it is non-deterministic. If it produces different row_numbers for these dataframes, then they wouldn't join. The 'result depends on partition IDs' part might be the answer, but I don't understand this. Can someone explain?










      share|improve this question
















      I was trying to concatenate two dataframes side by side. And I saw this. On the description for the monotonically_increasing_id() it says:



      "monotonically_increasing_id() - Returns monotonically increasing 64-bit integers. The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits represent the record number within each partition. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records. The function is non-deterministic because its result depends on partition IDs."



      I'm trying to understand how we are presuming that monotonically_increasing_id() produces the same results for both of these dataframes to join on since it is non-deterministic. If it produces different row_numbers for these dataframes, then they wouldn't join. The 'result depends on partition IDs' part might be the answer, but I don't understand this. Can someone explain?







      python apache-spark pyspark






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Mar 28 at 13:50







      user2816215

















      asked Mar 26 at 22:42









      user2816215user2816215

      1112 silver badges12 bronze badges




      1112 silver badges12 bronze badges

























          2 Answers
          2






          active

          oldest

          votes


















          0














          This is the best way that I've found so far to add an index to a dataframe df:



          new_columns = df.columns + ["row_idx"]

          # Adding row index
          df = df
          .rdd
          .zipWithIndex()
          .map(lambda(row, rowindex): row + (rowindex,)).toDF()

          # Renaming all the columns
          df = df.toDF(*new_columns)


          It does have the overhead of converting to rdd and then back to the dataframe. However, monotonically_increasing_id() is non-deterministic and row_number() requires a Window, which may not be ideal unless used with PARTITION BY, otherwise it shuffles all the data to one partition, defeating the purpose of pyspark.



          So, to add a list as a new column in a dataframe, simply convert the list to a dataframe



          new_df = spark.createDataFrame([(l,) for l in lst], ['new_col'])


          and add row_number to it like above. Then join,



          joined_df = df.join(new_df, ['row_idx'], 'inner')





          share|improve this answer


































            0














            This is from my experience. monotonically_increasing_id() has some gnarliness to it. For small use cases you will always get a generically increasing ID. However if you are having complex shuffles or data usage issues, it can and will not increase at the same value each tick. By this I mean DF1 went from 1->~100000000 however during a reshuffle DF2 was recalculated again from Spark lazy implementation it went from 1->~48000000 then 48000001.23->100000000.23. This meant I lost a ton of rows.



            How I solved the problem was through unique Row_IDs. To do this I had a function called Row_Hash, below, it would go through and build a unique row ID at the front of the column. Not matter how many shuffles or data writes there were I maintained the uniqueness of my join conditions.



            EDIT: What I am going to do is turn all elements of the metadata of the data frame into arrays. The reason for this is that you can specify what elements of an array you want to query. This is different from a data frame, because of shuffle and repartitions, calling a take(n) might give different results however calling a array(n) will always output the same results.



            With this in mind, lets return to the problem we need to create a local row identifier where there is none. To do this we are fully concatenating the rows (this is for scenarios where there are no row-keys), calling an MD5 on top of the product (yes there is a chance of intersection but it is exceedingly low). This will yield a large string character for each row, making it separate from the rest of the system, allowing the user to use it as a unique row-join key.



            #Call in the input data frame
            val inputDF = ...

            #Returns a array of string on the columns of input dataframe
            val columnArray = inputDF.columns

            #In Scala a variable allows us to dynamically augment and update the value
            #This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields.
            var commandString = "md5(concat("
            #This will be a set of string of actions we want Spark to run on our columns.
            #The reason we are passing through the names is because we want to return the base columns.
            #Think of a select query
            var commandArray = columnArray

            #This is an iterator where we are going to move 1->n, n being the last element of the number of columns
            var columnIterator = 1

            #Run while there are still columns we have not acted upon.
            while(columnIterator<=columnArray.length)

            #We are going to take an N element from the columns and build a statement to cast it as a string
            commandString = "cast(" + columnArray(columnIterator-1) + " as string)"

            #This loop checks if we are not the last element of the column array, if so we add
            #in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
            if (columnIterator!=columnArray.length) commandString = commandString + ", "
            #Iterator
            columnIterator = columnIterator + 1


            #I am appending the command we just build to the from of the command array with
            #a few extra characters to end the local command and name it something consistent.
            #So if we have a DF of Name, Addr, Date; this below statement will look like
            #Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
            val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray

            #Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
            #Each string is its own element so based on the above example DF
            #inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date)
            #Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date.
            #In the previous lines of code we have build out those strings into the command array
            #The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
            val finalDF = inputDF.selectExpr(commandArray:_*)





            share|improve this answer



























            • Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?

              – user2816215
              Mar 27 at 13:42












            • Also, what is piiIterator?

              – user2816215
              Mar 27 at 14:13












            • Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.

              – afeldman
              Mar 27 at 15:22













            Your Answer






            StackExchange.ifUsing("editor", function ()
            StackExchange.using("externalEditor", function ()
            StackExchange.using("snippets", function ()
            StackExchange.snippets.init();
            );
            );
            , "code-snippets");

            StackExchange.ready(function()
            var channelOptions =
            tags: "".split(" "),
            id: "1"
            ;
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function()
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled)
            StackExchange.using("snippets", function()
            createEditor();
            );

            else
            createEditor();

            );

            function createEditor()
            StackExchange.prepareEditor(
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader:
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            ,
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            );



            );













            draft saved

            draft discarded


















            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55367217%2fadding-row-index-to-pyspark-dataframe-to-add-a-new-column-concatenate-dataframe%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown

























            2 Answers
            2






            active

            oldest

            votes








            2 Answers
            2






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            0














            This is the best way that I've found so far to add an index to a dataframe df:



            new_columns = df.columns + ["row_idx"]

            # Adding row index
            df = df
            .rdd
            .zipWithIndex()
            .map(lambda(row, rowindex): row + (rowindex,)).toDF()

            # Renaming all the columns
            df = df.toDF(*new_columns)


            It does have the overhead of converting to rdd and then back to the dataframe. However, monotonically_increasing_id() is non-deterministic and row_number() requires a Window, which may not be ideal unless used with PARTITION BY, otherwise it shuffles all the data to one partition, defeating the purpose of pyspark.



            So, to add a list as a new column in a dataframe, simply convert the list to a dataframe



            new_df = spark.createDataFrame([(l,) for l in lst], ['new_col'])


            and add row_number to it like above. Then join,



            joined_df = df.join(new_df, ['row_idx'], 'inner')





            share|improve this answer































              0














              This is the best way that I've found so far to add an index to a dataframe df:



              new_columns = df.columns + ["row_idx"]

              # Adding row index
              df = df
              .rdd
              .zipWithIndex()
              .map(lambda(row, rowindex): row + (rowindex,)).toDF()

              # Renaming all the columns
              df = df.toDF(*new_columns)


              It does have the overhead of converting to rdd and then back to the dataframe. However, monotonically_increasing_id() is non-deterministic and row_number() requires a Window, which may not be ideal unless used with PARTITION BY, otherwise it shuffles all the data to one partition, defeating the purpose of pyspark.



              So, to add a list as a new column in a dataframe, simply convert the list to a dataframe



              new_df = spark.createDataFrame([(l,) for l in lst], ['new_col'])


              and add row_number to it like above. Then join,



              joined_df = df.join(new_df, ['row_idx'], 'inner')





              share|improve this answer





























                0












                0








                0







                This is the best way that I've found so far to add an index to a dataframe df:



                new_columns = df.columns + ["row_idx"]

                # Adding row index
                df = df
                .rdd
                .zipWithIndex()
                .map(lambda(row, rowindex): row + (rowindex,)).toDF()

                # Renaming all the columns
                df = df.toDF(*new_columns)


                It does have the overhead of converting to rdd and then back to the dataframe. However, monotonically_increasing_id() is non-deterministic and row_number() requires a Window, which may not be ideal unless used with PARTITION BY, otherwise it shuffles all the data to one partition, defeating the purpose of pyspark.



                So, to add a list as a new column in a dataframe, simply convert the list to a dataframe



                new_df = spark.createDataFrame([(l,) for l in lst], ['new_col'])


                and add row_number to it like above. Then join,



                joined_df = df.join(new_df, ['row_idx'], 'inner')





                share|improve this answer















                This is the best way that I've found so far to add an index to a dataframe df:



                new_columns = df.columns + ["row_idx"]

                # Adding row index
                df = df
                .rdd
                .zipWithIndex()
                .map(lambda(row, rowindex): row + (rowindex,)).toDF()

                # Renaming all the columns
                df = df.toDF(*new_columns)


                It does have the overhead of converting to rdd and then back to the dataframe. However, monotonically_increasing_id() is non-deterministic and row_number() requires a Window, which may not be ideal unless used with PARTITION BY, otherwise it shuffles all the data to one partition, defeating the purpose of pyspark.



                So, to add a list as a new column in a dataframe, simply convert the list to a dataframe



                new_df = spark.createDataFrame([(l,) for l in lst], ['new_col'])


                and add row_number to it like above. Then join,



                joined_df = df.join(new_df, ['row_idx'], 'inner')






                share|improve this answer














                share|improve this answer



                share|improve this answer








                edited Mar 27 at 14:59

























                answered Mar 27 at 14:53









                user2816215user2816215

                1112 silver badges12 bronze badges




                1112 silver badges12 bronze badges


























                    0














                    This is from my experience. monotonically_increasing_id() has some gnarliness to it. For small use cases you will always get a generically increasing ID. However if you are having complex shuffles or data usage issues, it can and will not increase at the same value each tick. By this I mean DF1 went from 1->~100000000 however during a reshuffle DF2 was recalculated again from Spark lazy implementation it went from 1->~48000000 then 48000001.23->100000000.23. This meant I lost a ton of rows.



                    How I solved the problem was through unique Row_IDs. To do this I had a function called Row_Hash, below, it would go through and build a unique row ID at the front of the column. Not matter how many shuffles or data writes there were I maintained the uniqueness of my join conditions.



                    EDIT: What I am going to do is turn all elements of the metadata of the data frame into arrays. The reason for this is that you can specify what elements of an array you want to query. This is different from a data frame, because of shuffle and repartitions, calling a take(n) might give different results however calling a array(n) will always output the same results.



                    With this in mind, lets return to the problem we need to create a local row identifier where there is none. To do this we are fully concatenating the rows (this is for scenarios where there are no row-keys), calling an MD5 on top of the product (yes there is a chance of intersection but it is exceedingly low). This will yield a large string character for each row, making it separate from the rest of the system, allowing the user to use it as a unique row-join key.



                    #Call in the input data frame
                    val inputDF = ...

                    #Returns a array of string on the columns of input dataframe
                    val columnArray = inputDF.columns

                    #In Scala a variable allows us to dynamically augment and update the value
                    #This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields.
                    var commandString = "md5(concat("
                    #This will be a set of string of actions we want Spark to run on our columns.
                    #The reason we are passing through the names is because we want to return the base columns.
                    #Think of a select query
                    var commandArray = columnArray

                    #This is an iterator where we are going to move 1->n, n being the last element of the number of columns
                    var columnIterator = 1

                    #Run while there are still columns we have not acted upon.
                    while(columnIterator<=columnArray.length)

                    #We are going to take an N element from the columns and build a statement to cast it as a string
                    commandString = "cast(" + columnArray(columnIterator-1) + " as string)"

                    #This loop checks if we are not the last element of the column array, if so we add
                    #in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
                    if (columnIterator!=columnArray.length) commandString = commandString + ", "
                    #Iterator
                    columnIterator = columnIterator + 1


                    #I am appending the command we just build to the from of the command array with
                    #a few extra characters to end the local command and name it something consistent.
                    #So if we have a DF of Name, Addr, Date; this below statement will look like
                    #Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
                    val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray

                    #Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
                    #Each string is its own element so based on the above example DF
                    #inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date)
                    #Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date.
                    #In the previous lines of code we have build out those strings into the command array
                    #The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
                    val finalDF = inputDF.selectExpr(commandArray:_*)





                    share|improve this answer



























                    • Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?

                      – user2816215
                      Mar 27 at 13:42












                    • Also, what is piiIterator?

                      – user2816215
                      Mar 27 at 14:13












                    • Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.

                      – afeldman
                      Mar 27 at 15:22















                    0














                    This is from my experience. monotonically_increasing_id() has some gnarliness to it. For small use cases you will always get a generically increasing ID. However if you are having complex shuffles or data usage issues, it can and will not increase at the same value each tick. By this I mean DF1 went from 1->~100000000 however during a reshuffle DF2 was recalculated again from Spark lazy implementation it went from 1->~48000000 then 48000001.23->100000000.23. This meant I lost a ton of rows.



                    How I solved the problem was through unique Row_IDs. To do this I had a function called Row_Hash, below, it would go through and build a unique row ID at the front of the column. Not matter how many shuffles or data writes there were I maintained the uniqueness of my join conditions.



                    EDIT: What I am going to do is turn all elements of the metadata of the data frame into arrays. The reason for this is that you can specify what elements of an array you want to query. This is different from a data frame, because of shuffle and repartitions, calling a take(n) might give different results however calling a array(n) will always output the same results.



                    With this in mind, lets return to the problem we need to create a local row identifier where there is none. To do this we are fully concatenating the rows (this is for scenarios where there are no row-keys), calling an MD5 on top of the product (yes there is a chance of intersection but it is exceedingly low). This will yield a large string character for each row, making it separate from the rest of the system, allowing the user to use it as a unique row-join key.



                    #Call in the input data frame
                    val inputDF = ...

                    #Returns a array of string on the columns of input dataframe
                    val columnArray = inputDF.columns

                    #In Scala a variable allows us to dynamically augment and update the value
                    #This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields.
                    var commandString = "md5(concat("
                    #This will be a set of string of actions we want Spark to run on our columns.
                    #The reason we are passing through the names is because we want to return the base columns.
                    #Think of a select query
                    var commandArray = columnArray

                    #This is an iterator where we are going to move 1->n, n being the last element of the number of columns
                    var columnIterator = 1

                    #Run while there are still columns we have not acted upon.
                    while(columnIterator<=columnArray.length)

                    #We are going to take an N element from the columns and build a statement to cast it as a string
                    commandString = "cast(" + columnArray(columnIterator-1) + " as string)"

                    #This loop checks if we are not the last element of the column array, if so we add
                    #in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
                    if (columnIterator!=columnArray.length) commandString = commandString + ", "
                    #Iterator
                    columnIterator = columnIterator + 1


                    #I am appending the command we just build to the from of the command array with
                    #a few extra characters to end the local command and name it something consistent.
                    #So if we have a DF of Name, Addr, Date; this below statement will look like
                    #Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
                    val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray

                    #Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
                    #Each string is its own element so based on the above example DF
                    #inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date)
                    #Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date.
                    #In the previous lines of code we have build out those strings into the command array
                    #The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
                    val finalDF = inputDF.selectExpr(commandArray:_*)





                    share|improve this answer



























                    • Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?

                      – user2816215
                      Mar 27 at 13:42












                    • Also, what is piiIterator?

                      – user2816215
                      Mar 27 at 14:13












                    • Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.

                      – afeldman
                      Mar 27 at 15:22













                    0












                    0








                    0







                    This is from my experience. monotonically_increasing_id() has some gnarliness to it. For small use cases you will always get a generically increasing ID. However if you are having complex shuffles or data usage issues, it can and will not increase at the same value each tick. By this I mean DF1 went from 1->~100000000 however during a reshuffle DF2 was recalculated again from Spark lazy implementation it went from 1->~48000000 then 48000001.23->100000000.23. This meant I lost a ton of rows.



                    How I solved the problem was through unique Row_IDs. To do this I had a function called Row_Hash, below, it would go through and build a unique row ID at the front of the column. Not matter how many shuffles or data writes there were I maintained the uniqueness of my join conditions.



                    EDIT: What I am going to do is turn all elements of the metadata of the data frame into arrays. The reason for this is that you can specify what elements of an array you want to query. This is different from a data frame, because of shuffle and repartitions, calling a take(n) might give different results however calling a array(n) will always output the same results.



                    With this in mind, lets return to the problem we need to create a local row identifier where there is none. To do this we are fully concatenating the rows (this is for scenarios where there are no row-keys), calling an MD5 on top of the product (yes there is a chance of intersection but it is exceedingly low). This will yield a large string character for each row, making it separate from the rest of the system, allowing the user to use it as a unique row-join key.



                    #Call in the input data frame
                    val inputDF = ...

                    #Returns a array of string on the columns of input dataframe
                    val columnArray = inputDF.columns

                    #In Scala a variable allows us to dynamically augment and update the value
                    #This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields.
                    var commandString = "md5(concat("
                    #This will be a set of string of actions we want Spark to run on our columns.
                    #The reason we are passing through the names is because we want to return the base columns.
                    #Think of a select query
                    var commandArray = columnArray

                    #This is an iterator where we are going to move 1->n, n being the last element of the number of columns
                    var columnIterator = 1

                    #Run while there are still columns we have not acted upon.
                    while(columnIterator<=columnArray.length)

                    #We are going to take an N element from the columns and build a statement to cast it as a string
                    commandString = "cast(" + columnArray(columnIterator-1) + " as string)"

                    #This loop checks if we are not the last element of the column array, if so we add
                    #in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
                    if (columnIterator!=columnArray.length) commandString = commandString + ", "
                    #Iterator
                    columnIterator = columnIterator + 1


                    #I am appending the command we just build to the from of the command array with
                    #a few extra characters to end the local command and name it something consistent.
                    #So if we have a DF of Name, Addr, Date; this below statement will look like
                    #Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
                    val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray

                    #Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
                    #Each string is its own element so based on the above example DF
                    #inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date)
                    #Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date.
                    #In the previous lines of code we have build out those strings into the command array
                    #The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
                    val finalDF = inputDF.selectExpr(commandArray:_*)





                    share|improve this answer















                    This is from my experience. monotonically_increasing_id() has some gnarliness to it. For small use cases you will always get a generically increasing ID. However if you are having complex shuffles or data usage issues, it can and will not increase at the same value each tick. By this I mean DF1 went from 1->~100000000 however during a reshuffle DF2 was recalculated again from Spark lazy implementation it went from 1->~48000000 then 48000001.23->100000000.23. This meant I lost a ton of rows.



                    How I solved the problem was through unique Row_IDs. To do this I had a function called Row_Hash, below, it would go through and build a unique row ID at the front of the column. Not matter how many shuffles or data writes there were I maintained the uniqueness of my join conditions.



                    EDIT: What I am going to do is turn all elements of the metadata of the data frame into arrays. The reason for this is that you can specify what elements of an array you want to query. This is different from a data frame, because of shuffle and repartitions, calling a take(n) might give different results however calling a array(n) will always output the same results.



                    With this in mind, lets return to the problem we need to create a local row identifier where there is none. To do this we are fully concatenating the rows (this is for scenarios where there are no row-keys), calling an MD5 on top of the product (yes there is a chance of intersection but it is exceedingly low). This will yield a large string character for each row, making it separate from the rest of the system, allowing the user to use it as a unique row-join key.



                    #Call in the input data frame
                    val inputDF = ...

                    #Returns a array of string on the columns of input dataframe
                    val columnArray = inputDF.columns

                    #In Scala a variable allows us to dynamically augment and update the value
                    #This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields.
                    var commandString = "md5(concat("
                    #This will be a set of string of actions we want Spark to run on our columns.
                    #The reason we are passing through the names is because we want to return the base columns.
                    #Think of a select query
                    var commandArray = columnArray

                    #This is an iterator where we are going to move 1->n, n being the last element of the number of columns
                    var columnIterator = 1

                    #Run while there are still columns we have not acted upon.
                    while(columnIterator<=columnArray.length)

                    #We are going to take an N element from the columns and build a statement to cast it as a string
                    commandString = "cast(" + columnArray(columnIterator-1) + " as string)"

                    #This loop checks if we are not the last element of the column array, if so we add
                    #in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
                    if (columnIterator!=columnArray.length) commandString = commandString + ", "
                    #Iterator
                    columnIterator = columnIterator + 1


                    #I am appending the command we just build to the from of the command array with
                    #a few extra characters to end the local command and name it something consistent.
                    #So if we have a DF of Name, Addr, Date; this below statement will look like
                    #Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
                    val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray

                    #Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
                    #Each string is its own element so based on the above example DF
                    #inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date)
                    #Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date.
                    #In the previous lines of code we have build out those strings into the command array
                    #The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
                    val finalDF = inputDF.selectExpr(commandArray:_*)






                    share|improve this answer














                    share|improve this answer



                    share|improve this answer








                    edited Mar 27 at 16:16

























                    answered Mar 27 at 1:47









                    afeldmanafeldman

                    1748 bronze badges




                    1748 bronze badges















                    • Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?

                      – user2816215
                      Mar 27 at 13:42












                    • Also, what is piiIterator?

                      – user2816215
                      Mar 27 at 14:13












                    • Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.

                      – afeldman
                      Mar 27 at 15:22

















                    • Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?

                      – user2816215
                      Mar 27 at 13:42












                    • Also, what is piiIterator?

                      – user2816215
                      Mar 27 at 14:13












                    • Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.

                      – afeldman
                      Mar 27 at 15:22
















                    Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?

                    – user2816215
                    Mar 27 at 13:42






                    Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?

                    – user2816215
                    Mar 27 at 13:42














                    Also, what is piiIterator?

                    – user2816215
                    Mar 27 at 14:13






                    Also, what is piiIterator?

                    – user2816215
                    Mar 27 at 14:13














                    Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.

                    – afeldman
                    Mar 27 at 15:22





                    Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.

                    – afeldman
                    Mar 27 at 15:22

















                    draft saved

                    draft discarded
















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid


                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.

                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function ()
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55367217%2fadding-row-index-to-pyspark-dataframe-to-add-a-new-column-concatenate-dataframe%23new-answer', 'question_page');

                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Kamusi Yaliyomo Aina za kamusi | Muundo wa kamusi | Faida za kamusi | Dhima ya picha katika kamusi | Marejeo | Tazama pia | Viungo vya nje | UrambazajiKuhusu kamusiGo-SwahiliWiki-KamusiKamusi ya Kiswahili na Kiingerezakuihariri na kuongeza habari

                    Swift 4 - func physicsWorld not invoked on collision? The Next CEO of Stack OverflowHow to call Objective-C code from Swift#ifdef replacement in the Swift language@selector() in Swift?#pragma mark in Swift?Swift for loop: for index, element in array?dispatch_after - GCD in Swift?Swift Beta performance: sorting arraysSplit a String into an array in Swift?The use of Swift 3 @objc inference in Swift 4 mode is deprecated?How to optimize UITableViewCell, because my UITableView lags

                    Access current req object everywhere in Node.js ExpressWhy are global variables considered bad practice? (node.js)Using req & res across functionsHow do I get the path to the current script with Node.js?What is Node.js' Connect, Express and “middleware”?Node.js w/ express error handling in callbackHow to access the GET parameters after “?” in Express?Modify Node.js req object parametersAccess “app” variable inside of ExpressJS/ConnectJS middleware?Node.js Express app - request objectAngular Http Module considered middleware?Session variables in ExpressJSAdd properties to the req object in expressjs with Typescript