DropDuplicates is not giving expected result Unicorn Meta Zoo #1: Why another podcast? Announcing the arrival of Valued Associate #679: Cesar Manara Data science time! April 2019 and salary with experience The Ask Question Wizard is Live!Spark 1.6.2: DropDuplicates giving unexpected resultsUnable to find encoder for type stored in Dataset when attempting to perform flatMap on a DataFrame in Spark 2.0How to expire state of dropDuplicates in structured streaming to avoid OOM?Dataframe dropduplicate in partitionMemory issue with spark structured streamingError in Spark Structured Streaming w/ File Source and File SinkSpark dropDuplicates source codeRead data from EventHub to table in DataBricks using scalaSpark timeout exception while writing on HDFS - java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]

When I export an AI 300x60 art board it saves with bigger dimensions

Arriving in Atlanta after US Preclearance in Dublin. Will I go through TSA security in Atlanta to transfer to a connecting flight?

Getting AggregateResult variables from Execute Anonymous Window

Is it OK if I do not take the receipt in Germany?

Why aren't road bicycle wheels tiny?

Why doesn't the university give past final exams' answers?

What does the black goddess statue do and what is it?

Page Layouts : 1 column , 2 columns-left , 2 columns-right , 3 column

What is the numbering system used for the DSN dishes?

Why did Israel vote against lifting the American embargo on Cuba?

What do you call an IPA symbol that lacks a name (e.g. ɲ)?

Will I lose my paid in full property

How long can a nation maintain a technological edge over the rest of the world?

What is the evidence that custom checks in Northern Ireland are going to result in violence?

What's the difference between using dependency injection with a container and using a service locator?

Raising a bilingual kid. When should we introduce the majority language?

Marquee sign letters

Why does Java have support for time zone offsets with seconds precision?

What is the ongoing value of the Kanban board to the developers as opposed to management

Where to find documentation for `whois` command options?

Why isPrototypeOf() returns false?

Bright yellow or light yellow?

"Working on a knee"

Is it appropriate to mention a relatable company blog post when you're asked about the company?



DropDuplicates is not giving expected result



Unicorn Meta Zoo #1: Why another podcast?
Announcing the arrival of Valued Associate #679: Cesar Manara
Data science time! April 2019 and salary with experience
The Ask Question Wizard is Live!Spark 1.6.2: DropDuplicates giving unexpected resultsUnable to find encoder for type stored in Dataset when attempting to perform flatMap on a DataFrame in Spark 2.0How to expire state of dropDuplicates in structured streaming to avoid OOM?Dataframe dropduplicate in partitionMemory issue with spark structured streamingError in Spark Structured Streaming w/ File Source and File SinkSpark dropDuplicates source codeRead data from EventHub to table in DataBricks using scalaSpark timeout exception while writing on HDFS - java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]



.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;








0















I am working on a use-case of removing duplicate records from incoming structured data (in the form of CSV files within a folder on HDFS). In order to try this use-case, I wrote some sample code using files option to see if duplicates can be removed from the records that are present in the CSVs that are copied to the folder (HDFS).



Find below the codepiece:



import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.types.StructType, StructField, StringType, IntegerType


val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
val userSchema = new StructType()
.add("prod_code", "string")
.add("bal", "integer")
.add("v_txn_id", "string")
.add("timestamp", "Timestamp")

val csvDF = spark.readStream.option("sep", ",")
.schema(userSchema)
.csv("/user/Temp")
csvDF.dropDuplicates("v_txn_id")
csvDF.createOrReplaceTempView("table1")

val dbDf2 = spark.sql("select prod_code, bal, v_txn_id, current_timestamp timestamp from table1")
dbDf2.writeStream.queryName("aggregates").outputMode("update").format("memory").start()

spark.sql("select * from aggregates").show();


Now, when I copy a file in the folder with duplicate records (by v_txn_id), i still see that the result sink gets all the rows from the file:



P1,1000,TXNID1
P1,2000,TXNID2
P1,3000,TXNID2
P1,4000,TXNID3
P1,5000,TXNID3
P1,6000,TXNID4


All these rows in the csv file get moved to the result "aggregates". What I am expecting is:



P1,1000,TXNID1
P1,3000,TXNID2
P1,5000,TXNID3
P1,6000,TXNID4


This is the first time I am attempting structured streaming (with state), so pardon me for trivial question. Any suggestions would help a lot.










share|improve this question
























  • Just based on the description it sounds like you're looking for complete mode, not update mode.

    – user10938362
    Mar 22 at 15:41


















0















I am working on a use-case of removing duplicate records from incoming structured data (in the form of CSV files within a folder on HDFS). In order to try this use-case, I wrote some sample code using files option to see if duplicates can be removed from the records that are present in the CSVs that are copied to the folder (HDFS).



Find below the codepiece:



import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.types.StructType, StructField, StringType, IntegerType


val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
val userSchema = new StructType()
.add("prod_code", "string")
.add("bal", "integer")
.add("v_txn_id", "string")
.add("timestamp", "Timestamp")

val csvDF = spark.readStream.option("sep", ",")
.schema(userSchema)
.csv("/user/Temp")
csvDF.dropDuplicates("v_txn_id")
csvDF.createOrReplaceTempView("table1")

val dbDf2 = spark.sql("select prod_code, bal, v_txn_id, current_timestamp timestamp from table1")
dbDf2.writeStream.queryName("aggregates").outputMode("update").format("memory").start()

spark.sql("select * from aggregates").show();


Now, when I copy a file in the folder with duplicate records (by v_txn_id), i still see that the result sink gets all the rows from the file:



P1,1000,TXNID1
P1,2000,TXNID2
P1,3000,TXNID2
P1,4000,TXNID3
P1,5000,TXNID3
P1,6000,TXNID4


All these rows in the csv file get moved to the result "aggregates". What I am expecting is:



P1,1000,TXNID1
P1,3000,TXNID2
P1,5000,TXNID3
P1,6000,TXNID4


This is the first time I am attempting structured streaming (with state), so pardon me for trivial question. Any suggestions would help a lot.










share|improve this question
























  • Just based on the description it sounds like you're looking for complete mode, not update mode.

    – user10938362
    Mar 22 at 15:41














0












0








0








I am working on a use-case of removing duplicate records from incoming structured data (in the form of CSV files within a folder on HDFS). In order to try this use-case, I wrote some sample code using files option to see if duplicates can be removed from the records that are present in the CSVs that are copied to the folder (HDFS).



Find below the codepiece:



import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.types.StructType, StructField, StringType, IntegerType


val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
val userSchema = new StructType()
.add("prod_code", "string")
.add("bal", "integer")
.add("v_txn_id", "string")
.add("timestamp", "Timestamp")

val csvDF = spark.readStream.option("sep", ",")
.schema(userSchema)
.csv("/user/Temp")
csvDF.dropDuplicates("v_txn_id")
csvDF.createOrReplaceTempView("table1")

val dbDf2 = spark.sql("select prod_code, bal, v_txn_id, current_timestamp timestamp from table1")
dbDf2.writeStream.queryName("aggregates").outputMode("update").format("memory").start()

spark.sql("select * from aggregates").show();


Now, when I copy a file in the folder with duplicate records (by v_txn_id), i still see that the result sink gets all the rows from the file:



P1,1000,TXNID1
P1,2000,TXNID2
P1,3000,TXNID2
P1,4000,TXNID3
P1,5000,TXNID3
P1,6000,TXNID4


All these rows in the csv file get moved to the result "aggregates". What I am expecting is:



P1,1000,TXNID1
P1,3000,TXNID2
P1,5000,TXNID3
P1,6000,TXNID4


This is the first time I am attempting structured streaming (with state), so pardon me for trivial question. Any suggestions would help a lot.










share|improve this question
















I am working on a use-case of removing duplicate records from incoming structured data (in the form of CSV files within a folder on HDFS). In order to try this use-case, I wrote some sample code using files option to see if duplicates can be removed from the records that are present in the CSVs that are copied to the folder (HDFS).



Find below the codepiece:



import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.types.StructType, StructField, StringType, IntegerType


val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
val userSchema = new StructType()
.add("prod_code", "string")
.add("bal", "integer")
.add("v_txn_id", "string")
.add("timestamp", "Timestamp")

val csvDF = spark.readStream.option("sep", ",")
.schema(userSchema)
.csv("/user/Temp")
csvDF.dropDuplicates("v_txn_id")
csvDF.createOrReplaceTempView("table1")

val dbDf2 = spark.sql("select prod_code, bal, v_txn_id, current_timestamp timestamp from table1")
dbDf2.writeStream.queryName("aggregates").outputMode("update").format("memory").start()

spark.sql("select * from aggregates").show();


Now, when I copy a file in the folder with duplicate records (by v_txn_id), i still see that the result sink gets all the rows from the file:



P1,1000,TXNID1
P1,2000,TXNID2
P1,3000,TXNID2
P1,4000,TXNID3
P1,5000,TXNID3
P1,6000,TXNID4


All these rows in the csv file get moved to the result "aggregates". What I am expecting is:



P1,1000,TXNID1
P1,3000,TXNID2
P1,5000,TXNID3
P1,6000,TXNID4


This is the first time I am attempting structured streaming (with state), so pardon me for trivial question. Any suggestions would help a lot.







scala apache-spark apache-spark-sql spark-structured-streaming






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Mar 22 at 15:28









Carlos Vilchez

2,2781924




2,2781924










asked Mar 22 at 15:07









Prashant APrashant A

270110




270110












  • Just based on the description it sounds like you're looking for complete mode, not update mode.

    – user10938362
    Mar 22 at 15:41


















  • Just based on the description it sounds like you're looking for complete mode, not update mode.

    – user10938362
    Mar 22 at 15:41

















Just based on the description it sounds like you're looking for complete mode, not update mode.

– user10938362
Mar 22 at 15:41






Just based on the description it sounds like you're looking for complete mode, not update mode.

– user10938362
Mar 22 at 15:41













1 Answer
1






active

oldest

votes


















0














As per you expected output, I believe that you need to find the max of bal based on prod_code and v_txn_id column. To achieve you output, on your final aggregate table, you can use a window funtion (partition by) to find the max of bal based on prod_code and v_txn_id column by created a temporary column called temp_bal. Then in the outer query select distinct values based on prod_code, temp_bal and v_txn_id columns.



spark.sql("select distinct prod_code,temp_bal as bal,v_txn_id from(select *,max(bal) over(partition by prod_code,v_txn_id) as temp_bal from aggregates) order by prod_code,v_txn_id").show()


enter image description hereEDIT 1 :



As per your requirment please find the below script that will work according to the latest date/time for the v_txn_id.



spark.sql("select distinct a.prod_code,a.bal,a.v_txn_id from aggregates a join (select distinct v_txn_id,max(timestamp) over(partition by v_txn_id) as temp_timestamp from aggregates) b on a.v_txn_id=b.v_txn_id and a.timestamp=b.temp_timestamp order by a.v_txn_id").show()


enter image description here



Please let me know if you have any questions, else please mark this answer as accepted (tick icon).






share|improve this answer

























  • Thanks for the detailed response. But actually, the balance may not be maximum. I have put an incremental balance for this sample run. Balance can be less for the same v_txn_id (erroneous previous transaction). So, in totality I am interested in the final value of the v_txn_id that has come as a part of the injestion process. So, in above example: P1,2000,TXNID2 / P1,4000,TXNID3 are actually erroneous transactions.

    – Prashant A
    Mar 23 at 14:52












  • Do you want to take the latest "v_txn_id" based on "timestamp" column? @BDA

    – Sarath Avanavu
    Mar 24 at 14:22











  • Yes, that will also do in case dropduplicates doesn't remove the old v_txn_ids. My assumption was that Dropduplicates will only give me the latest v_txn_ids. Anyways, if we can achieve it through timestamp column then it should work for me. Based on this small code piece it appears that my understanding of DropDuplicates is wrong (presumptuous).

    – Prashant A
    Mar 25 at 2:21












  • How can I apply Window (with/without watermark) and use processing time to get the latest v_txn_ids. The use-case then would be: multiple transactions have come in during the sliding window of let's say 5 minutes, and I want to get the latest transaction ids for that window of 5 minutes. Also, the next window transactions will be totally different from the previous window.

    – Prashant A
    Mar 25 at 2:30











  • I have added the answer. Please go through and let me know if you have any questions @BDA

    – Sarath Avanavu
    Mar 25 at 7:59











Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













draft saved

draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55302583%2fdropduplicates-is-not-giving-expected-result%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









0














As per you expected output, I believe that you need to find the max of bal based on prod_code and v_txn_id column. To achieve you output, on your final aggregate table, you can use a window funtion (partition by) to find the max of bal based on prod_code and v_txn_id column by created a temporary column called temp_bal. Then in the outer query select distinct values based on prod_code, temp_bal and v_txn_id columns.



spark.sql("select distinct prod_code,temp_bal as bal,v_txn_id from(select *,max(bal) over(partition by prod_code,v_txn_id) as temp_bal from aggregates) order by prod_code,v_txn_id").show()


enter image description hereEDIT 1 :



As per your requirment please find the below script that will work according to the latest date/time for the v_txn_id.



spark.sql("select distinct a.prod_code,a.bal,a.v_txn_id from aggregates a join (select distinct v_txn_id,max(timestamp) over(partition by v_txn_id) as temp_timestamp from aggregates) b on a.v_txn_id=b.v_txn_id and a.timestamp=b.temp_timestamp order by a.v_txn_id").show()


enter image description here



Please let me know if you have any questions, else please mark this answer as accepted (tick icon).






share|improve this answer

























  • Thanks for the detailed response. But actually, the balance may not be maximum. I have put an incremental balance for this sample run. Balance can be less for the same v_txn_id (erroneous previous transaction). So, in totality I am interested in the final value of the v_txn_id that has come as a part of the injestion process. So, in above example: P1,2000,TXNID2 / P1,4000,TXNID3 are actually erroneous transactions.

    – Prashant A
    Mar 23 at 14:52












  • Do you want to take the latest "v_txn_id" based on "timestamp" column? @BDA

    – Sarath Avanavu
    Mar 24 at 14:22











  • Yes, that will also do in case dropduplicates doesn't remove the old v_txn_ids. My assumption was that Dropduplicates will only give me the latest v_txn_ids. Anyways, if we can achieve it through timestamp column then it should work for me. Based on this small code piece it appears that my understanding of DropDuplicates is wrong (presumptuous).

    – Prashant A
    Mar 25 at 2:21












  • How can I apply Window (with/without watermark) and use processing time to get the latest v_txn_ids. The use-case then would be: multiple transactions have come in during the sliding window of let's say 5 minutes, and I want to get the latest transaction ids for that window of 5 minutes. Also, the next window transactions will be totally different from the previous window.

    – Prashant A
    Mar 25 at 2:30











  • I have added the answer. Please go through and let me know if you have any questions @BDA

    – Sarath Avanavu
    Mar 25 at 7:59















0














As per you expected output, I believe that you need to find the max of bal based on prod_code and v_txn_id column. To achieve you output, on your final aggregate table, you can use a window funtion (partition by) to find the max of bal based on prod_code and v_txn_id column by created a temporary column called temp_bal. Then in the outer query select distinct values based on prod_code, temp_bal and v_txn_id columns.



spark.sql("select distinct prod_code,temp_bal as bal,v_txn_id from(select *,max(bal) over(partition by prod_code,v_txn_id) as temp_bal from aggregates) order by prod_code,v_txn_id").show()


enter image description hereEDIT 1 :



As per your requirment please find the below script that will work according to the latest date/time for the v_txn_id.



spark.sql("select distinct a.prod_code,a.bal,a.v_txn_id from aggregates a join (select distinct v_txn_id,max(timestamp) over(partition by v_txn_id) as temp_timestamp from aggregates) b on a.v_txn_id=b.v_txn_id and a.timestamp=b.temp_timestamp order by a.v_txn_id").show()


enter image description here



Please let me know if you have any questions, else please mark this answer as accepted (tick icon).






share|improve this answer

























  • Thanks for the detailed response. But actually, the balance may not be maximum. I have put an incremental balance for this sample run. Balance can be less for the same v_txn_id (erroneous previous transaction). So, in totality I am interested in the final value of the v_txn_id that has come as a part of the injestion process. So, in above example: P1,2000,TXNID2 / P1,4000,TXNID3 are actually erroneous transactions.

    – Prashant A
    Mar 23 at 14:52












  • Do you want to take the latest "v_txn_id" based on "timestamp" column? @BDA

    – Sarath Avanavu
    Mar 24 at 14:22











  • Yes, that will also do in case dropduplicates doesn't remove the old v_txn_ids. My assumption was that Dropduplicates will only give me the latest v_txn_ids. Anyways, if we can achieve it through timestamp column then it should work for me. Based on this small code piece it appears that my understanding of DropDuplicates is wrong (presumptuous).

    – Prashant A
    Mar 25 at 2:21












  • How can I apply Window (with/without watermark) and use processing time to get the latest v_txn_ids. The use-case then would be: multiple transactions have come in during the sliding window of let's say 5 minutes, and I want to get the latest transaction ids for that window of 5 minutes. Also, the next window transactions will be totally different from the previous window.

    – Prashant A
    Mar 25 at 2:30











  • I have added the answer. Please go through and let me know if you have any questions @BDA

    – Sarath Avanavu
    Mar 25 at 7:59













0












0








0







As per you expected output, I believe that you need to find the max of bal based on prod_code and v_txn_id column. To achieve you output, on your final aggregate table, you can use a window funtion (partition by) to find the max of bal based on prod_code and v_txn_id column by created a temporary column called temp_bal. Then in the outer query select distinct values based on prod_code, temp_bal and v_txn_id columns.



spark.sql("select distinct prod_code,temp_bal as bal,v_txn_id from(select *,max(bal) over(partition by prod_code,v_txn_id) as temp_bal from aggregates) order by prod_code,v_txn_id").show()


enter image description hereEDIT 1 :



As per your requirment please find the below script that will work according to the latest date/time for the v_txn_id.



spark.sql("select distinct a.prod_code,a.bal,a.v_txn_id from aggregates a join (select distinct v_txn_id,max(timestamp) over(partition by v_txn_id) as temp_timestamp from aggregates) b on a.v_txn_id=b.v_txn_id and a.timestamp=b.temp_timestamp order by a.v_txn_id").show()


enter image description here



Please let me know if you have any questions, else please mark this answer as accepted (tick icon).






share|improve this answer















As per you expected output, I believe that you need to find the max of bal based on prod_code and v_txn_id column. To achieve you output, on your final aggregate table, you can use a window funtion (partition by) to find the max of bal based on prod_code and v_txn_id column by created a temporary column called temp_bal. Then in the outer query select distinct values based on prod_code, temp_bal and v_txn_id columns.



spark.sql("select distinct prod_code,temp_bal as bal,v_txn_id from(select *,max(bal) over(partition by prod_code,v_txn_id) as temp_bal from aggregates) order by prod_code,v_txn_id").show()


enter image description hereEDIT 1 :



As per your requirment please find the below script that will work according to the latest date/time for the v_txn_id.



spark.sql("select distinct a.prod_code,a.bal,a.v_txn_id from aggregates a join (select distinct v_txn_id,max(timestamp) over(partition by v_txn_id) as temp_timestamp from aggregates) b on a.v_txn_id=b.v_txn_id and a.timestamp=b.temp_timestamp order by a.v_txn_id").show()


enter image description here



Please let me know if you have any questions, else please mark this answer as accepted (tick icon).







share|improve this answer














share|improve this answer



share|improve this answer








edited Mar 25 at 7:59

























answered Mar 23 at 5:48









Sarath AvanavuSarath Avanavu

11.4k74566




11.4k74566












  • Thanks for the detailed response. But actually, the balance may not be maximum. I have put an incremental balance for this sample run. Balance can be less for the same v_txn_id (erroneous previous transaction). So, in totality I am interested in the final value of the v_txn_id that has come as a part of the injestion process. So, in above example: P1,2000,TXNID2 / P1,4000,TXNID3 are actually erroneous transactions.

    – Prashant A
    Mar 23 at 14:52












  • Do you want to take the latest "v_txn_id" based on "timestamp" column? @BDA

    – Sarath Avanavu
    Mar 24 at 14:22











  • Yes, that will also do in case dropduplicates doesn't remove the old v_txn_ids. My assumption was that Dropduplicates will only give me the latest v_txn_ids. Anyways, if we can achieve it through timestamp column then it should work for me. Based on this small code piece it appears that my understanding of DropDuplicates is wrong (presumptuous).

    – Prashant A
    Mar 25 at 2:21












  • How can I apply Window (with/without watermark) and use processing time to get the latest v_txn_ids. The use-case then would be: multiple transactions have come in during the sliding window of let's say 5 minutes, and I want to get the latest transaction ids for that window of 5 minutes. Also, the next window transactions will be totally different from the previous window.

    – Prashant A
    Mar 25 at 2:30











  • I have added the answer. Please go through and let me know if you have any questions @BDA

    – Sarath Avanavu
    Mar 25 at 7:59

















  • Thanks for the detailed response. But actually, the balance may not be maximum. I have put an incremental balance for this sample run. Balance can be less for the same v_txn_id (erroneous previous transaction). So, in totality I am interested in the final value of the v_txn_id that has come as a part of the injestion process. So, in above example: P1,2000,TXNID2 / P1,4000,TXNID3 are actually erroneous transactions.

    – Prashant A
    Mar 23 at 14:52












  • Do you want to take the latest "v_txn_id" based on "timestamp" column? @BDA

    – Sarath Avanavu
    Mar 24 at 14:22











  • Yes, that will also do in case dropduplicates doesn't remove the old v_txn_ids. My assumption was that Dropduplicates will only give me the latest v_txn_ids. Anyways, if we can achieve it through timestamp column then it should work for me. Based on this small code piece it appears that my understanding of DropDuplicates is wrong (presumptuous).

    – Prashant A
    Mar 25 at 2:21












  • How can I apply Window (with/without watermark) and use processing time to get the latest v_txn_ids. The use-case then would be: multiple transactions have come in during the sliding window of let's say 5 minutes, and I want to get the latest transaction ids for that window of 5 minutes. Also, the next window transactions will be totally different from the previous window.

    – Prashant A
    Mar 25 at 2:30











  • I have added the answer. Please go through and let me know if you have any questions @BDA

    – Sarath Avanavu
    Mar 25 at 7:59
















Thanks for the detailed response. But actually, the balance may not be maximum. I have put an incremental balance for this sample run. Balance can be less for the same v_txn_id (erroneous previous transaction). So, in totality I am interested in the final value of the v_txn_id that has come as a part of the injestion process. So, in above example: P1,2000,TXNID2 / P1,4000,TXNID3 are actually erroneous transactions.

– Prashant A
Mar 23 at 14:52






Thanks for the detailed response. But actually, the balance may not be maximum. I have put an incremental balance for this sample run. Balance can be less for the same v_txn_id (erroneous previous transaction). So, in totality I am interested in the final value of the v_txn_id that has come as a part of the injestion process. So, in above example: P1,2000,TXNID2 / P1,4000,TXNID3 are actually erroneous transactions.

– Prashant A
Mar 23 at 14:52














Do you want to take the latest "v_txn_id" based on "timestamp" column? @BDA

– Sarath Avanavu
Mar 24 at 14:22





Do you want to take the latest "v_txn_id" based on "timestamp" column? @BDA

– Sarath Avanavu
Mar 24 at 14:22













Yes, that will also do in case dropduplicates doesn't remove the old v_txn_ids. My assumption was that Dropduplicates will only give me the latest v_txn_ids. Anyways, if we can achieve it through timestamp column then it should work for me. Based on this small code piece it appears that my understanding of DropDuplicates is wrong (presumptuous).

– Prashant A
Mar 25 at 2:21






Yes, that will also do in case dropduplicates doesn't remove the old v_txn_ids. My assumption was that Dropduplicates will only give me the latest v_txn_ids. Anyways, if we can achieve it through timestamp column then it should work for me. Based on this small code piece it appears that my understanding of DropDuplicates is wrong (presumptuous).

– Prashant A
Mar 25 at 2:21














How can I apply Window (with/without watermark) and use processing time to get the latest v_txn_ids. The use-case then would be: multiple transactions have come in during the sliding window of let's say 5 minutes, and I want to get the latest transaction ids for that window of 5 minutes. Also, the next window transactions will be totally different from the previous window.

– Prashant A
Mar 25 at 2:30





How can I apply Window (with/without watermark) and use processing time to get the latest v_txn_ids. The use-case then would be: multiple transactions have come in during the sliding window of let's say 5 minutes, and I want to get the latest transaction ids for that window of 5 minutes. Also, the next window transactions will be totally different from the previous window.

– Prashant A
Mar 25 at 2:30













I have added the answer. Please go through and let me know if you have any questions @BDA

– Sarath Avanavu
Mar 25 at 7:59





I have added the answer. Please go through and let me know if you have any questions @BDA

– Sarath Avanavu
Mar 25 at 7:59



















draft saved

draft discarded
















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55302583%2fdropduplicates-is-not-giving-expected-result%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Kamusi Yaliyomo Aina za kamusi | Muundo wa kamusi | Faida za kamusi | Dhima ya picha katika kamusi | Marejeo | Tazama pia | Viungo vya nje | UrambazajiKuhusu kamusiGo-SwahiliWiki-KamusiKamusi ya Kiswahili na Kiingerezakuihariri na kuongeza habari

Swift 4 - func physicsWorld not invoked on collision? The Next CEO of Stack OverflowHow to call Objective-C code from Swift#ifdef replacement in the Swift language@selector() in Swift?#pragma mark in Swift?Swift for loop: for index, element in array?dispatch_after - GCD in Swift?Swift Beta performance: sorting arraysSplit a String into an array in Swift?The use of Swift 3 @objc inference in Swift 4 mode is deprecated?How to optimize UITableViewCell, because my UITableView lags

Access current req object everywhere in Node.js ExpressWhy are global variables considered bad practice? (node.js)Using req & res across functionsHow do I get the path to the current script with Node.js?What is Node.js' Connect, Express and “middleware”?Node.js w/ express error handling in callbackHow to access the GET parameters after “?” in Express?Modify Node.js req object parametersAccess “app” variable inside of ExpressJS/ConnectJS middleware?Node.js Express app - request objectAngular Http Module considered middleware?Session variables in ExpressJSAdd properties to the req object in expressjs with Typescript