Adding row index to pyspark dataframe (to add a new column/concatenate dataframes side-by-side)Spark Dataset unique id performance - row_number vs monotonically_increasing_idHow to add new column to dataframe in pysparkAdd new keys to a dictionary?Add one row to pandas DataFrameSelecting multiple columns in a pandas dataframeAdding new column to existing DataFrame in Python pandasDelete column from pandas DataFrameHow to iterate over rows in a DataFrame in Pandas?Select rows from a DataFrame based on values in a column in pandasGet list from pandas DataFrame column headersSpark: count events based on two columnshorizontally joining multiple dataframes in pyspark

Adding a (stair/baby) gate without facing walls

What kind of horizontal stabilizer does a Boeing 737 have?

Why does macOS create file mounts for each app?

Patio gate not at right angle to the house

Gold Battle KoTH

Was Donald Trump at ground zero helping out on 9-11?

Can birds evolve without trees?

How did Biff return to 2015 from 1955 without a lightning strike?

If the Moon were impacted by a suitably sized meteor, how long would it take to impact the Earth?

Should students have access to past exams or an exam bank?

Derivative is just speed of change?

Best Ergonomic Design for a handheld ranged weapon

Would people understand me speaking German all over Europe?

Numerically Stable IIR filter

Balancing Humanoid fantasy races: Elves

Scam? Checks via Email

A conjectural trigonometric identity

how can I calculate confidence interval with small sample in R?

Why is the Searing Smite spell not listed in the Roll20 spell list?

How to structure presentation to avoid getting questions that will be answered later in the presentation?

How would a lunar colony attack Earth?

Why don't short runways use ramps for takeoff?

Russian pronunciation of /etc (a directory)

Coworker mumbles to herself when working, how to ask her to stop?



Adding row index to pyspark dataframe (to add a new column/concatenate dataframes side-by-side)


Spark Dataset unique id performance - row_number vs monotonically_increasing_idHow to add new column to dataframe in pysparkAdd new keys to a dictionary?Add one row to pandas DataFrameSelecting multiple columns in a pandas dataframeAdding new column to existing DataFrame in Python pandasDelete column from pandas DataFrameHow to iterate over rows in a DataFrame in Pandas?Select rows from a DataFrame based on values in a column in pandasGet list from pandas DataFrame column headersSpark: count events based on two columnshorizontally joining multiple dataframes in pyspark






.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;








0















I was trying to concatenate two dataframes side by side. And I saw this. On the description for the monotonically_increasing_id() it says:



"monotonically_increasing_id() - Returns monotonically increasing 64-bit integers. The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits represent the record number within each partition. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records. The function is non-deterministic because its result depends on partition IDs."



I'm trying to understand how we are presuming that monotonically_increasing_id() produces the same results for both of these dataframes to join on since it is non-deterministic. If it produces different row_numbers for these dataframes, then they wouldn't join. The 'result depends on partition IDs' part might be the answer, but I don't understand this. Can someone explain?










share|improve this question
































    0















    I was trying to concatenate two dataframes side by side. And I saw this. On the description for the monotonically_increasing_id() it says:



    "monotonically_increasing_id() - Returns monotonically increasing 64-bit integers. The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits represent the record number within each partition. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records. The function is non-deterministic because its result depends on partition IDs."



    I'm trying to understand how we are presuming that monotonically_increasing_id() produces the same results for both of these dataframes to join on since it is non-deterministic. If it produces different row_numbers for these dataframes, then they wouldn't join. The 'result depends on partition IDs' part might be the answer, but I don't understand this. Can someone explain?










    share|improve this question




























      0












      0








      0








      I was trying to concatenate two dataframes side by side. And I saw this. On the description for the monotonically_increasing_id() it says:



      "monotonically_increasing_id() - Returns monotonically increasing 64-bit integers. The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits represent the record number within each partition. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records. The function is non-deterministic because its result depends on partition IDs."



      I'm trying to understand how we are presuming that monotonically_increasing_id() produces the same results for both of these dataframes to join on since it is non-deterministic. If it produces different row_numbers for these dataframes, then they wouldn't join. The 'result depends on partition IDs' part might be the answer, but I don't understand this. Can someone explain?










      share|improve this question
















      I was trying to concatenate two dataframes side by side. And I saw this. On the description for the monotonically_increasing_id() it says:



      "monotonically_increasing_id() - Returns monotonically increasing 64-bit integers. The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits represent the record number within each partition. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records. The function is non-deterministic because its result depends on partition IDs."



      I'm trying to understand how we are presuming that monotonically_increasing_id() produces the same results for both of these dataframes to join on since it is non-deterministic. If it produces different row_numbers for these dataframes, then they wouldn't join. The 'result depends on partition IDs' part might be the answer, but I don't understand this. Can someone explain?







      python apache-spark pyspark






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Mar 28 at 13:50







      user2816215

















      asked Mar 26 at 22:42









      user2816215user2816215

      1112 silver badges12 bronze badges




      1112 silver badges12 bronze badges

























          2 Answers
          2






          active

          oldest

          votes


















          0














          This is the best way that I've found so far to add an index to a dataframe df:



          new_columns = df.columns + ["row_idx"]

          # Adding row index
          df = df
          .rdd
          .zipWithIndex()
          .map(lambda(row, rowindex): row + (rowindex,)).toDF()

          # Renaming all the columns
          df = df.toDF(*new_columns)


          It does have the overhead of converting to rdd and then back to the dataframe. However, monotonically_increasing_id() is non-deterministic and row_number() requires a Window, which may not be ideal unless used with PARTITION BY, otherwise it shuffles all the data to one partition, defeating the purpose of pyspark.



          So, to add a list as a new column in a dataframe, simply convert the list to a dataframe



          new_df = spark.createDataFrame([(l,) for l in lst], ['new_col'])


          and add row_number to it like above. Then join,



          joined_df = df.join(new_df, ['row_idx'], 'inner')





          share|improve this answer


































            0














            This is from my experience. monotonically_increasing_id() has some gnarliness to it. For small use cases you will always get a generically increasing ID. However if you are having complex shuffles or data usage issues, it can and will not increase at the same value each tick. By this I mean DF1 went from 1->~100000000 however during a reshuffle DF2 was recalculated again from Spark lazy implementation it went from 1->~48000000 then 48000001.23->100000000.23. This meant I lost a ton of rows.



            How I solved the problem was through unique Row_IDs. To do this I had a function called Row_Hash, below, it would go through and build a unique row ID at the front of the column. Not matter how many shuffles or data writes there were I maintained the uniqueness of my join conditions.



            EDIT: What I am going to do is turn all elements of the metadata of the data frame into arrays. The reason for this is that you can specify what elements of an array you want to query. This is different from a data frame, because of shuffle and repartitions, calling a take(n) might give different results however calling a array(n) will always output the same results.



            With this in mind, lets return to the problem we need to create a local row identifier where there is none. To do this we are fully concatenating the rows (this is for scenarios where there are no row-keys), calling an MD5 on top of the product (yes there is a chance of intersection but it is exceedingly low). This will yield a large string character for each row, making it separate from the rest of the system, allowing the user to use it as a unique row-join key.



            #Call in the input data frame
            val inputDF = ...

            #Returns a array of string on the columns of input dataframe
            val columnArray = inputDF.columns

            #In Scala a variable allows us to dynamically augment and update the value
            #This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields.
            var commandString = "md5(concat("
            #This will be a set of string of actions we want Spark to run on our columns.
            #The reason we are passing through the names is because we want to return the base columns.
            #Think of a select query
            var commandArray = columnArray

            #This is an iterator where we are going to move 1->n, n being the last element of the number of columns
            var columnIterator = 1

            #Run while there are still columns we have not acted upon.
            while(columnIterator<=columnArray.length)

            #We are going to take an N element from the columns and build a statement to cast it as a string
            commandString = "cast(" + columnArray(columnIterator-1) + " as string)"

            #This loop checks if we are not the last element of the column array, if so we add
            #in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
            if (columnIterator!=columnArray.length) commandString = commandString + ", "
            #Iterator
            columnIterator = columnIterator + 1


            #I am appending the command we just build to the from of the command array with
            #a few extra characters to end the local command and name it something consistent.
            #So if we have a DF of Name, Addr, Date; this below statement will look like
            #Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
            val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray

            #Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
            #Each string is its own element so based on the above example DF
            #inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date)
            #Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date.
            #In the previous lines of code we have build out those strings into the command array
            #The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
            val finalDF = inputDF.selectExpr(commandArray:_*)





            share|improve this answer



























            • Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?

              – user2816215
              Mar 27 at 13:42












            • Also, what is piiIterator?

              – user2816215
              Mar 27 at 14:13












            • Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.

              – afeldman
              Mar 27 at 15:22













            Your Answer






            StackExchange.ifUsing("editor", function ()
            StackExchange.using("externalEditor", function ()
            StackExchange.using("snippets", function ()
            StackExchange.snippets.init();
            );
            );
            , "code-snippets");

            StackExchange.ready(function()
            var channelOptions =
            tags: "".split(" "),
            id: "1"
            ;
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function()
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled)
            StackExchange.using("snippets", function()
            createEditor();
            );

            else
            createEditor();

            );

            function createEditor()
            StackExchange.prepareEditor(
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader:
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            ,
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            );



            );













            draft saved

            draft discarded


















            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55367217%2fadding-row-index-to-pyspark-dataframe-to-add-a-new-column-concatenate-dataframe%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown

























            2 Answers
            2






            active

            oldest

            votes








            2 Answers
            2






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            0














            This is the best way that I've found so far to add an index to a dataframe df:



            new_columns = df.columns + ["row_idx"]

            # Adding row index
            df = df
            .rdd
            .zipWithIndex()
            .map(lambda(row, rowindex): row + (rowindex,)).toDF()

            # Renaming all the columns
            df = df.toDF(*new_columns)


            It does have the overhead of converting to rdd and then back to the dataframe. However, monotonically_increasing_id() is non-deterministic and row_number() requires a Window, which may not be ideal unless used with PARTITION BY, otherwise it shuffles all the data to one partition, defeating the purpose of pyspark.



            So, to add a list as a new column in a dataframe, simply convert the list to a dataframe



            new_df = spark.createDataFrame([(l,) for l in lst], ['new_col'])


            and add row_number to it like above. Then join,



            joined_df = df.join(new_df, ['row_idx'], 'inner')





            share|improve this answer































              0














              This is the best way that I've found so far to add an index to a dataframe df:



              new_columns = df.columns + ["row_idx"]

              # Adding row index
              df = df
              .rdd
              .zipWithIndex()
              .map(lambda(row, rowindex): row + (rowindex,)).toDF()

              # Renaming all the columns
              df = df.toDF(*new_columns)


              It does have the overhead of converting to rdd and then back to the dataframe. However, monotonically_increasing_id() is non-deterministic and row_number() requires a Window, which may not be ideal unless used with PARTITION BY, otherwise it shuffles all the data to one partition, defeating the purpose of pyspark.



              So, to add a list as a new column in a dataframe, simply convert the list to a dataframe



              new_df = spark.createDataFrame([(l,) for l in lst], ['new_col'])


              and add row_number to it like above. Then join,



              joined_df = df.join(new_df, ['row_idx'], 'inner')





              share|improve this answer





























                0












                0








                0







                This is the best way that I've found so far to add an index to a dataframe df:



                new_columns = df.columns + ["row_idx"]

                # Adding row index
                df = df
                .rdd
                .zipWithIndex()
                .map(lambda(row, rowindex): row + (rowindex,)).toDF()

                # Renaming all the columns
                df = df.toDF(*new_columns)


                It does have the overhead of converting to rdd and then back to the dataframe. However, monotonically_increasing_id() is non-deterministic and row_number() requires a Window, which may not be ideal unless used with PARTITION BY, otherwise it shuffles all the data to one partition, defeating the purpose of pyspark.



                So, to add a list as a new column in a dataframe, simply convert the list to a dataframe



                new_df = spark.createDataFrame([(l,) for l in lst], ['new_col'])


                and add row_number to it like above. Then join,



                joined_df = df.join(new_df, ['row_idx'], 'inner')





                share|improve this answer















                This is the best way that I've found so far to add an index to a dataframe df:



                new_columns = df.columns + ["row_idx"]

                # Adding row index
                df = df
                .rdd
                .zipWithIndex()
                .map(lambda(row, rowindex): row + (rowindex,)).toDF()

                # Renaming all the columns
                df = df.toDF(*new_columns)


                It does have the overhead of converting to rdd and then back to the dataframe. However, monotonically_increasing_id() is non-deterministic and row_number() requires a Window, which may not be ideal unless used with PARTITION BY, otherwise it shuffles all the data to one partition, defeating the purpose of pyspark.



                So, to add a list as a new column in a dataframe, simply convert the list to a dataframe



                new_df = spark.createDataFrame([(l,) for l in lst], ['new_col'])


                and add row_number to it like above. Then join,



                joined_df = df.join(new_df, ['row_idx'], 'inner')






                share|improve this answer














                share|improve this answer



                share|improve this answer








                edited Mar 27 at 14:59

























                answered Mar 27 at 14:53









                user2816215user2816215

                1112 silver badges12 bronze badges




                1112 silver badges12 bronze badges


























                    0














                    This is from my experience. monotonically_increasing_id() has some gnarliness to it. For small use cases you will always get a generically increasing ID. However if you are having complex shuffles or data usage issues, it can and will not increase at the same value each tick. By this I mean DF1 went from 1->~100000000 however during a reshuffle DF2 was recalculated again from Spark lazy implementation it went from 1->~48000000 then 48000001.23->100000000.23. This meant I lost a ton of rows.



                    How I solved the problem was through unique Row_IDs. To do this I had a function called Row_Hash, below, it would go through and build a unique row ID at the front of the column. Not matter how many shuffles or data writes there were I maintained the uniqueness of my join conditions.



                    EDIT: What I am going to do is turn all elements of the metadata of the data frame into arrays. The reason for this is that you can specify what elements of an array you want to query. This is different from a data frame, because of shuffle and repartitions, calling a take(n) might give different results however calling a array(n) will always output the same results.



                    With this in mind, lets return to the problem we need to create a local row identifier where there is none. To do this we are fully concatenating the rows (this is for scenarios where there are no row-keys), calling an MD5 on top of the product (yes there is a chance of intersection but it is exceedingly low). This will yield a large string character for each row, making it separate from the rest of the system, allowing the user to use it as a unique row-join key.



                    #Call in the input data frame
                    val inputDF = ...

                    #Returns a array of string on the columns of input dataframe
                    val columnArray = inputDF.columns

                    #In Scala a variable allows us to dynamically augment and update the value
                    #This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields.
                    var commandString = "md5(concat("
                    #This will be a set of string of actions we want Spark to run on our columns.
                    #The reason we are passing through the names is because we want to return the base columns.
                    #Think of a select query
                    var commandArray = columnArray

                    #This is an iterator where we are going to move 1->n, n being the last element of the number of columns
                    var columnIterator = 1

                    #Run while there are still columns we have not acted upon.
                    while(columnIterator<=columnArray.length)

                    #We are going to take an N element from the columns and build a statement to cast it as a string
                    commandString = "cast(" + columnArray(columnIterator-1) + " as string)"

                    #This loop checks if we are not the last element of the column array, if so we add
                    #in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
                    if (columnIterator!=columnArray.length) commandString = commandString + ", "
                    #Iterator
                    columnIterator = columnIterator + 1


                    #I am appending the command we just build to the from of the command array with
                    #a few extra characters to end the local command and name it something consistent.
                    #So if we have a DF of Name, Addr, Date; this below statement will look like
                    #Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
                    val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray

                    #Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
                    #Each string is its own element so based on the above example DF
                    #inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date)
                    #Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date.
                    #In the previous lines of code we have build out those strings into the command array
                    #The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
                    val finalDF = inputDF.selectExpr(commandArray:_*)





                    share|improve this answer



























                    • Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?

                      – user2816215
                      Mar 27 at 13:42












                    • Also, what is piiIterator?

                      – user2816215
                      Mar 27 at 14:13












                    • Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.

                      – afeldman
                      Mar 27 at 15:22















                    0














                    This is from my experience. monotonically_increasing_id() has some gnarliness to it. For small use cases you will always get a generically increasing ID. However if you are having complex shuffles or data usage issues, it can and will not increase at the same value each tick. By this I mean DF1 went from 1->~100000000 however during a reshuffle DF2 was recalculated again from Spark lazy implementation it went from 1->~48000000 then 48000001.23->100000000.23. This meant I lost a ton of rows.



                    How I solved the problem was through unique Row_IDs. To do this I had a function called Row_Hash, below, it would go through and build a unique row ID at the front of the column. Not matter how many shuffles or data writes there were I maintained the uniqueness of my join conditions.



                    EDIT: What I am going to do is turn all elements of the metadata of the data frame into arrays. The reason for this is that you can specify what elements of an array you want to query. This is different from a data frame, because of shuffle and repartitions, calling a take(n) might give different results however calling a array(n) will always output the same results.



                    With this in mind, lets return to the problem we need to create a local row identifier where there is none. To do this we are fully concatenating the rows (this is for scenarios where there are no row-keys), calling an MD5 on top of the product (yes there is a chance of intersection but it is exceedingly low). This will yield a large string character for each row, making it separate from the rest of the system, allowing the user to use it as a unique row-join key.



                    #Call in the input data frame
                    val inputDF = ...

                    #Returns a array of string on the columns of input dataframe
                    val columnArray = inputDF.columns

                    #In Scala a variable allows us to dynamically augment and update the value
                    #This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields.
                    var commandString = "md5(concat("
                    #This will be a set of string of actions we want Spark to run on our columns.
                    #The reason we are passing through the names is because we want to return the base columns.
                    #Think of a select query
                    var commandArray = columnArray

                    #This is an iterator where we are going to move 1->n, n being the last element of the number of columns
                    var columnIterator = 1

                    #Run while there are still columns we have not acted upon.
                    while(columnIterator<=columnArray.length)

                    #We are going to take an N element from the columns and build a statement to cast it as a string
                    commandString = "cast(" + columnArray(columnIterator-1) + " as string)"

                    #This loop checks if we are not the last element of the column array, if so we add
                    #in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
                    if (columnIterator!=columnArray.length) commandString = commandString + ", "
                    #Iterator
                    columnIterator = columnIterator + 1


                    #I am appending the command we just build to the from of the command array with
                    #a few extra characters to end the local command and name it something consistent.
                    #So if we have a DF of Name, Addr, Date; this below statement will look like
                    #Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
                    val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray

                    #Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
                    #Each string is its own element so based on the above example DF
                    #inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date)
                    #Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date.
                    #In the previous lines of code we have build out those strings into the command array
                    #The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
                    val finalDF = inputDF.selectExpr(commandArray:_*)





                    share|improve this answer



























                    • Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?

                      – user2816215
                      Mar 27 at 13:42












                    • Also, what is piiIterator?

                      – user2816215
                      Mar 27 at 14:13












                    • Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.

                      – afeldman
                      Mar 27 at 15:22













                    0












                    0








                    0







                    This is from my experience. monotonically_increasing_id() has some gnarliness to it. For small use cases you will always get a generically increasing ID. However if you are having complex shuffles or data usage issues, it can and will not increase at the same value each tick. By this I mean DF1 went from 1->~100000000 however during a reshuffle DF2 was recalculated again from Spark lazy implementation it went from 1->~48000000 then 48000001.23->100000000.23. This meant I lost a ton of rows.



                    How I solved the problem was through unique Row_IDs. To do this I had a function called Row_Hash, below, it would go through and build a unique row ID at the front of the column. Not matter how many shuffles or data writes there were I maintained the uniqueness of my join conditions.



                    EDIT: What I am going to do is turn all elements of the metadata of the data frame into arrays. The reason for this is that you can specify what elements of an array you want to query. This is different from a data frame, because of shuffle and repartitions, calling a take(n) might give different results however calling a array(n) will always output the same results.



                    With this in mind, lets return to the problem we need to create a local row identifier where there is none. To do this we are fully concatenating the rows (this is for scenarios where there are no row-keys), calling an MD5 on top of the product (yes there is a chance of intersection but it is exceedingly low). This will yield a large string character for each row, making it separate from the rest of the system, allowing the user to use it as a unique row-join key.



                    #Call in the input data frame
                    val inputDF = ...

                    #Returns a array of string on the columns of input dataframe
                    val columnArray = inputDF.columns

                    #In Scala a variable allows us to dynamically augment and update the value
                    #This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields.
                    var commandString = "md5(concat("
                    #This will be a set of string of actions we want Spark to run on our columns.
                    #The reason we are passing through the names is because we want to return the base columns.
                    #Think of a select query
                    var commandArray = columnArray

                    #This is an iterator where we are going to move 1->n, n being the last element of the number of columns
                    var columnIterator = 1

                    #Run while there are still columns we have not acted upon.
                    while(columnIterator<=columnArray.length)

                    #We are going to take an N element from the columns and build a statement to cast it as a string
                    commandString = "cast(" + columnArray(columnIterator-1) + " as string)"

                    #This loop checks if we are not the last element of the column array, if so we add
                    #in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
                    if (columnIterator!=columnArray.length) commandString = commandString + ", "
                    #Iterator
                    columnIterator = columnIterator + 1


                    #I am appending the command we just build to the from of the command array with
                    #a few extra characters to end the local command and name it something consistent.
                    #So if we have a DF of Name, Addr, Date; this below statement will look like
                    #Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
                    val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray

                    #Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
                    #Each string is its own element so based on the above example DF
                    #inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date)
                    #Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date.
                    #In the previous lines of code we have build out those strings into the command array
                    #The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
                    val finalDF = inputDF.selectExpr(commandArray:_*)





                    share|improve this answer















                    This is from my experience. monotonically_increasing_id() has some gnarliness to it. For small use cases you will always get a generically increasing ID. However if you are having complex shuffles or data usage issues, it can and will not increase at the same value each tick. By this I mean DF1 went from 1->~100000000 however during a reshuffle DF2 was recalculated again from Spark lazy implementation it went from 1->~48000000 then 48000001.23->100000000.23. This meant I lost a ton of rows.



                    How I solved the problem was through unique Row_IDs. To do this I had a function called Row_Hash, below, it would go through and build a unique row ID at the front of the column. Not matter how many shuffles or data writes there were I maintained the uniqueness of my join conditions.



                    EDIT: What I am going to do is turn all elements of the metadata of the data frame into arrays. The reason for this is that you can specify what elements of an array you want to query. This is different from a data frame, because of shuffle and repartitions, calling a take(n) might give different results however calling a array(n) will always output the same results.



                    With this in mind, lets return to the problem we need to create a local row identifier where there is none. To do this we are fully concatenating the rows (this is for scenarios where there are no row-keys), calling an MD5 on top of the product (yes there is a chance of intersection but it is exceedingly low). This will yield a large string character for each row, making it separate from the rest of the system, allowing the user to use it as a unique row-join key.



                    #Call in the input data frame
                    val inputDF = ...

                    #Returns a array of string on the columns of input dataframe
                    val columnArray = inputDF.columns

                    #In Scala a variable allows us to dynamically augment and update the value
                    #This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields.
                    var commandString = "md5(concat("
                    #This will be a set of string of actions we want Spark to run on our columns.
                    #The reason we are passing through the names is because we want to return the base columns.
                    #Think of a select query
                    var commandArray = columnArray

                    #This is an iterator where we are going to move 1->n, n being the last element of the number of columns
                    var columnIterator = 1

                    #Run while there are still columns we have not acted upon.
                    while(columnIterator<=columnArray.length)

                    #We are going to take an N element from the columns and build a statement to cast it as a string
                    commandString = "cast(" + columnArray(columnIterator-1) + " as string)"

                    #This loop checks if we are not the last element of the column array, if so we add
                    #in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
                    if (columnIterator!=columnArray.length) commandString = commandString + ", "
                    #Iterator
                    columnIterator = columnIterator + 1


                    #I am appending the command we just build to the from of the command array with
                    #a few extra characters to end the local command and name it something consistent.
                    #So if we have a DF of Name, Addr, Date; this below statement will look like
                    #Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
                    val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray

                    #Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
                    #Each string is its own element so based on the above example DF
                    #inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date)
                    #Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date.
                    #In the previous lines of code we have build out those strings into the command array
                    #The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
                    val finalDF = inputDF.selectExpr(commandArray:_*)






                    share|improve this answer














                    share|improve this answer



                    share|improve this answer








                    edited Mar 27 at 16:16

























                    answered Mar 27 at 1:47









                    afeldmanafeldman

                    1748 bronze badges




                    1748 bronze badges















                    • Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?

                      – user2816215
                      Mar 27 at 13:42












                    • Also, what is piiIterator?

                      – user2816215
                      Mar 27 at 14:13












                    • Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.

                      – afeldman
                      Mar 27 at 15:22

















                    • Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?

                      – user2816215
                      Mar 27 at 13:42












                    • Also, what is piiIterator?

                      – user2816215
                      Mar 27 at 14:13












                    • Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.

                      – afeldman
                      Mar 27 at 15:22
















                    Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?

                    – user2816215
                    Mar 27 at 13:42






                    Could you provide the solution in pyspark? I'm not very familiar with scala. For example, I understand the loop itself, but what is selectExpr doing?

                    – user2816215
                    Mar 27 at 13:42














                    Also, what is piiIterator?

                    – user2816215
                    Mar 27 at 14:13






                    Also, what is piiIterator?

                    – user2816215
                    Mar 27 at 14:13














                    Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.

                    – afeldman
                    Mar 27 at 15:22





                    Sure! Also totally forgot a concat. Let me quickly add comments once I get to the office.

                    – afeldman
                    Mar 27 at 15:22

















                    draft saved

                    draft discarded
















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid


                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.

                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function ()
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55367217%2fadding-row-index-to-pyspark-dataframe-to-add-a-new-column-concatenate-dataframe%23new-answer', 'question_page');

                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Kamusi Yaliyomo Aina za kamusi | Muundo wa kamusi | Faida za kamusi | Dhima ya picha katika kamusi | Marejeo | Tazama pia | Viungo vya nje | UrambazajiKuhusu kamusiGo-SwahiliWiki-KamusiKamusi ya Kiswahili na Kiingerezakuihariri na kuongeza habari

                    SQL error code 1064 with creating Laravel foreign keysForeign key constraints: When to use ON UPDATE and ON DELETEDropping column with foreign key Laravel error: General error: 1025 Error on renameLaravel SQL Can't create tableLaravel Migration foreign key errorLaravel php artisan migrate:refresh giving a syntax errorSQLSTATE[42S01]: Base table or view already exists or Base table or view already exists: 1050 Tableerror in migrating laravel file to xampp serverSyntax error or access violation: 1064:syntax to use near 'unsigned not null, modelName varchar(191) not null, title varchar(191) not nLaravel cannot create new table field in mysqlLaravel 5.7:Last migration creates table but is not registered in the migration table

                    은진 송씨 목차 역사 본관 분파 인물 조선 왕실과의 인척 관계 집성촌 항렬자 인구 같이 보기 각주 둘러보기 메뉴은진 송씨세종실록 149권, 지리지 충청도 공주목 은진현