DropDuplicates is not giving expected result Unicorn Meta Zoo #1: Why another podcast? Announcing the arrival of Valued Associate #679: Cesar Manara Data science time! April 2019 and salary with experience The Ask Question Wizard is Live!Spark 1.6.2: DropDuplicates giving unexpected resultsUnable to find encoder for type stored in Dataset when attempting to perform flatMap on a DataFrame in Spark 2.0How to expire state of dropDuplicates in structured streaming to avoid OOM?Dataframe dropduplicate in partitionMemory issue with spark structured streamingError in Spark Structured Streaming w/ File Source and File SinkSpark dropDuplicates source codeRead data from EventHub to table in DataBricks using scalaSpark timeout exception while writing on HDFS - java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
When I export an AI 300x60 art board it saves with bigger dimensions
Arriving in Atlanta after US Preclearance in Dublin. Will I go through TSA security in Atlanta to transfer to a connecting flight?
Getting AggregateResult variables from Execute Anonymous Window
Is it OK if I do not take the receipt in Germany?
Why aren't road bicycle wheels tiny?
Why doesn't the university give past final exams' answers?
What does the black goddess statue do and what is it?
Page Layouts : 1 column , 2 columns-left , 2 columns-right , 3 column
What is the numbering system used for the DSN dishes?
Why did Israel vote against lifting the American embargo on Cuba?
What do you call an IPA symbol that lacks a name (e.g. ɲ)?
Will I lose my paid in full property
How long can a nation maintain a technological edge over the rest of the world?
What is the evidence that custom checks in Northern Ireland are going to result in violence?
What's the difference between using dependency injection with a container and using a service locator?
Raising a bilingual kid. When should we introduce the majority language?
Marquee sign letters
Why does Java have support for time zone offsets with seconds precision?
What is the ongoing value of the Kanban board to the developers as opposed to management
Where to find documentation for `whois` command options?
Why isPrototypeOf() returns false?
Bright yellow or light yellow?
"Working on a knee"
Is it appropriate to mention a relatable company blog post when you're asked about the company?
DropDuplicates is not giving expected result
Unicorn Meta Zoo #1: Why another podcast?
Announcing the arrival of Valued Associate #679: Cesar Manara
Data science time! April 2019 and salary with experience
The Ask Question Wizard is Live!Spark 1.6.2: DropDuplicates giving unexpected resultsUnable to find encoder for type stored in Dataset when attempting to perform flatMap on a DataFrame in Spark 2.0How to expire state of dropDuplicates in structured streaming to avoid OOM?Dataframe dropduplicate in partitionMemory issue with spark structured streamingError in Spark Structured Streaming w/ File Source and File SinkSpark dropDuplicates source codeRead data from EventHub to table in DataBricks using scalaSpark timeout exception while writing on HDFS - java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;
I am working on a use-case of removing duplicate records from incoming structured data (in the form of CSV files within a folder on HDFS). In order to try this use-case, I wrote some sample code using files option to see if duplicates can be removed from the records that are present in the CSVs that are copied to the folder (HDFS).
Find below the codepiece:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.types.StructType, StructField, StringType, IntegerType
val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
val userSchema = new StructType()
.add("prod_code", "string")
.add("bal", "integer")
.add("v_txn_id", "string")
.add("timestamp", "Timestamp")
val csvDF = spark.readStream.option("sep", ",")
.schema(userSchema)
.csv("/user/Temp")
csvDF.dropDuplicates("v_txn_id")
csvDF.createOrReplaceTempView("table1")
val dbDf2 = spark.sql("select prod_code, bal, v_txn_id, current_timestamp timestamp from table1")
dbDf2.writeStream.queryName("aggregates").outputMode("update").format("memory").start()
spark.sql("select * from aggregates").show();
Now, when I copy a file in the folder with duplicate records (by v_txn_id), i still see that the result sink gets all the rows from the file:
P1,1000,TXNID1
P1,2000,TXNID2
P1,3000,TXNID2
P1,4000,TXNID3
P1,5000,TXNID3
P1,6000,TXNID4
All these rows in the csv file get moved to the result "aggregates". What I am expecting is:
P1,1000,TXNID1
P1,3000,TXNID2
P1,5000,TXNID3
P1,6000,TXNID4
This is the first time I am attempting structured streaming (with state), so pardon me for trivial question. Any suggestions would help a lot.
scala apache-spark apache-spark-sql spark-structured-streaming
add a comment |
I am working on a use-case of removing duplicate records from incoming structured data (in the form of CSV files within a folder on HDFS). In order to try this use-case, I wrote some sample code using files option to see if duplicates can be removed from the records that are present in the CSVs that are copied to the folder (HDFS).
Find below the codepiece:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.types.StructType, StructField, StringType, IntegerType
val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
val userSchema = new StructType()
.add("prod_code", "string")
.add("bal", "integer")
.add("v_txn_id", "string")
.add("timestamp", "Timestamp")
val csvDF = spark.readStream.option("sep", ",")
.schema(userSchema)
.csv("/user/Temp")
csvDF.dropDuplicates("v_txn_id")
csvDF.createOrReplaceTempView("table1")
val dbDf2 = spark.sql("select prod_code, bal, v_txn_id, current_timestamp timestamp from table1")
dbDf2.writeStream.queryName("aggregates").outputMode("update").format("memory").start()
spark.sql("select * from aggregates").show();
Now, when I copy a file in the folder with duplicate records (by v_txn_id), i still see that the result sink gets all the rows from the file:
P1,1000,TXNID1
P1,2000,TXNID2
P1,3000,TXNID2
P1,4000,TXNID3
P1,5000,TXNID3
P1,6000,TXNID4
All these rows in the csv file get moved to the result "aggregates". What I am expecting is:
P1,1000,TXNID1
P1,3000,TXNID2
P1,5000,TXNID3
P1,6000,TXNID4
This is the first time I am attempting structured streaming (with state), so pardon me for trivial question. Any suggestions would help a lot.
scala apache-spark apache-spark-sql spark-structured-streaming
Just based on the description it sounds like you're looking for complete mode, not update mode.
– user10938362
Mar 22 at 15:41
add a comment |
I am working on a use-case of removing duplicate records from incoming structured data (in the form of CSV files within a folder on HDFS). In order to try this use-case, I wrote some sample code using files option to see if duplicates can be removed from the records that are present in the CSVs that are copied to the folder (HDFS).
Find below the codepiece:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.types.StructType, StructField, StringType, IntegerType
val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
val userSchema = new StructType()
.add("prod_code", "string")
.add("bal", "integer")
.add("v_txn_id", "string")
.add("timestamp", "Timestamp")
val csvDF = spark.readStream.option("sep", ",")
.schema(userSchema)
.csv("/user/Temp")
csvDF.dropDuplicates("v_txn_id")
csvDF.createOrReplaceTempView("table1")
val dbDf2 = spark.sql("select prod_code, bal, v_txn_id, current_timestamp timestamp from table1")
dbDf2.writeStream.queryName("aggregates").outputMode("update").format("memory").start()
spark.sql("select * from aggregates").show();
Now, when I copy a file in the folder with duplicate records (by v_txn_id), i still see that the result sink gets all the rows from the file:
P1,1000,TXNID1
P1,2000,TXNID2
P1,3000,TXNID2
P1,4000,TXNID3
P1,5000,TXNID3
P1,6000,TXNID4
All these rows in the csv file get moved to the result "aggregates". What I am expecting is:
P1,1000,TXNID1
P1,3000,TXNID2
P1,5000,TXNID3
P1,6000,TXNID4
This is the first time I am attempting structured streaming (with state), so pardon me for trivial question. Any suggestions would help a lot.
scala apache-spark apache-spark-sql spark-structured-streaming
I am working on a use-case of removing duplicate records from incoming structured data (in the form of CSV files within a folder on HDFS). In order to try this use-case, I wrote some sample code using files option to see if duplicates can be removed from the records that are present in the CSVs that are copied to the folder (HDFS).
Find below the codepiece:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.types.StructType, StructField, StringType, IntegerType
val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
val userSchema = new StructType()
.add("prod_code", "string")
.add("bal", "integer")
.add("v_txn_id", "string")
.add("timestamp", "Timestamp")
val csvDF = spark.readStream.option("sep", ",")
.schema(userSchema)
.csv("/user/Temp")
csvDF.dropDuplicates("v_txn_id")
csvDF.createOrReplaceTempView("table1")
val dbDf2 = spark.sql("select prod_code, bal, v_txn_id, current_timestamp timestamp from table1")
dbDf2.writeStream.queryName("aggregates").outputMode("update").format("memory").start()
spark.sql("select * from aggregates").show();
Now, when I copy a file in the folder with duplicate records (by v_txn_id), i still see that the result sink gets all the rows from the file:
P1,1000,TXNID1
P1,2000,TXNID2
P1,3000,TXNID2
P1,4000,TXNID3
P1,5000,TXNID3
P1,6000,TXNID4
All these rows in the csv file get moved to the result "aggregates". What I am expecting is:
P1,1000,TXNID1
P1,3000,TXNID2
P1,5000,TXNID3
P1,6000,TXNID4
This is the first time I am attempting structured streaming (with state), so pardon me for trivial question. Any suggestions would help a lot.
scala apache-spark apache-spark-sql spark-structured-streaming
scala apache-spark apache-spark-sql spark-structured-streaming
edited Mar 22 at 15:28
Carlos Vilchez
2,2781924
2,2781924
asked Mar 22 at 15:07
Prashant APrashant A
270110
270110
Just based on the description it sounds like you're looking for complete mode, not update mode.
– user10938362
Mar 22 at 15:41
add a comment |
Just based on the description it sounds like you're looking for complete mode, not update mode.
– user10938362
Mar 22 at 15:41
Just based on the description it sounds like you're looking for complete mode, not update mode.
– user10938362
Mar 22 at 15:41
Just based on the description it sounds like you're looking for complete mode, not update mode.
– user10938362
Mar 22 at 15:41
add a comment |
1 Answer
1
active
oldest
votes
As per you expected output, I believe that you need to find the max of bal based on prod_code
and v_txn_id
column. To achieve you output, on your final aggregate
table, you can use a window funtion (partition by
) to find the max of bal
based on prod_code
and v_txn_id
column by created a temporary column called temp_bal
. Then in the outer query select distinct values based on prod_code
, temp_bal
and v_txn_id
columns.
spark.sql("select distinct prod_code,temp_bal as bal,v_txn_id from(select *,max(bal) over(partition by prod_code,v_txn_id) as temp_bal from aggregates) order by prod_code,v_txn_id").show()
EDIT 1 :
As per your requirment please find the below script that will work according to the latest date/time for the v_txn_id
.
spark.sql("select distinct a.prod_code,a.bal,a.v_txn_id from aggregates a join (select distinct v_txn_id,max(timestamp) over(partition by v_txn_id) as temp_timestamp from aggregates) b on a.v_txn_id=b.v_txn_id and a.timestamp=b.temp_timestamp order by a.v_txn_id").show()
Please let me know if you have any questions, else please mark this answer as accepted (tick icon).
Thanks for the detailed response. But actually, the balance may not be maximum. I have put an incremental balance for this sample run. Balance can be less for the same v_txn_id (erroneous previous transaction). So, in totality I am interested in the final value of the v_txn_id that has come as a part of the injestion process. So, in above example: P1,2000,TXNID2 / P1,4000,TXNID3 are actually erroneous transactions.
– Prashant A
Mar 23 at 14:52
Do you want to take the latest "v_txn_id" based on "timestamp" column? @BDA
– Sarath Avanavu
Mar 24 at 14:22
Yes, that will also do in case dropduplicates doesn't remove the old v_txn_ids. My assumption was that Dropduplicates will only give me the latest v_txn_ids. Anyways, if we can achieve it through timestamp column then it should work for me. Based on this small code piece it appears that my understanding of DropDuplicates is wrong (presumptuous).
– Prashant A
Mar 25 at 2:21
How can I apply Window (with/without watermark) and use processing time to get the latest v_txn_ids. The use-case then would be: multiple transactions have come in during the sliding window of let's say 5 minutes, and I want to get the latest transaction ids for that window of 5 minutes. Also, the next window transactions will be totally different from the previous window.
– Prashant A
Mar 25 at 2:30
I have added the answer. Please go through and let me know if you have any questions @BDA
– Sarath Avanavu
Mar 25 at 7:59
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55302583%2fdropduplicates-is-not-giving-expected-result%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
As per you expected output, I believe that you need to find the max of bal based on prod_code
and v_txn_id
column. To achieve you output, on your final aggregate
table, you can use a window funtion (partition by
) to find the max of bal
based on prod_code
and v_txn_id
column by created a temporary column called temp_bal
. Then in the outer query select distinct values based on prod_code
, temp_bal
and v_txn_id
columns.
spark.sql("select distinct prod_code,temp_bal as bal,v_txn_id from(select *,max(bal) over(partition by prod_code,v_txn_id) as temp_bal from aggregates) order by prod_code,v_txn_id").show()
EDIT 1 :
As per your requirment please find the below script that will work according to the latest date/time for the v_txn_id
.
spark.sql("select distinct a.prod_code,a.bal,a.v_txn_id from aggregates a join (select distinct v_txn_id,max(timestamp) over(partition by v_txn_id) as temp_timestamp from aggregates) b on a.v_txn_id=b.v_txn_id and a.timestamp=b.temp_timestamp order by a.v_txn_id").show()
Please let me know if you have any questions, else please mark this answer as accepted (tick icon).
Thanks for the detailed response. But actually, the balance may not be maximum. I have put an incremental balance for this sample run. Balance can be less for the same v_txn_id (erroneous previous transaction). So, in totality I am interested in the final value of the v_txn_id that has come as a part of the injestion process. So, in above example: P1,2000,TXNID2 / P1,4000,TXNID3 are actually erroneous transactions.
– Prashant A
Mar 23 at 14:52
Do you want to take the latest "v_txn_id" based on "timestamp" column? @BDA
– Sarath Avanavu
Mar 24 at 14:22
Yes, that will also do in case dropduplicates doesn't remove the old v_txn_ids. My assumption was that Dropduplicates will only give me the latest v_txn_ids. Anyways, if we can achieve it through timestamp column then it should work for me. Based on this small code piece it appears that my understanding of DropDuplicates is wrong (presumptuous).
– Prashant A
Mar 25 at 2:21
How can I apply Window (with/without watermark) and use processing time to get the latest v_txn_ids. The use-case then would be: multiple transactions have come in during the sliding window of let's say 5 minutes, and I want to get the latest transaction ids for that window of 5 minutes. Also, the next window transactions will be totally different from the previous window.
– Prashant A
Mar 25 at 2:30
I have added the answer. Please go through and let me know if you have any questions @BDA
– Sarath Avanavu
Mar 25 at 7:59
add a comment |
As per you expected output, I believe that you need to find the max of bal based on prod_code
and v_txn_id
column. To achieve you output, on your final aggregate
table, you can use a window funtion (partition by
) to find the max of bal
based on prod_code
and v_txn_id
column by created a temporary column called temp_bal
. Then in the outer query select distinct values based on prod_code
, temp_bal
and v_txn_id
columns.
spark.sql("select distinct prod_code,temp_bal as bal,v_txn_id from(select *,max(bal) over(partition by prod_code,v_txn_id) as temp_bal from aggregates) order by prod_code,v_txn_id").show()
EDIT 1 :
As per your requirment please find the below script that will work according to the latest date/time for the v_txn_id
.
spark.sql("select distinct a.prod_code,a.bal,a.v_txn_id from aggregates a join (select distinct v_txn_id,max(timestamp) over(partition by v_txn_id) as temp_timestamp from aggregates) b on a.v_txn_id=b.v_txn_id and a.timestamp=b.temp_timestamp order by a.v_txn_id").show()
Please let me know if you have any questions, else please mark this answer as accepted (tick icon).
Thanks for the detailed response. But actually, the balance may not be maximum. I have put an incremental balance for this sample run. Balance can be less for the same v_txn_id (erroneous previous transaction). So, in totality I am interested in the final value of the v_txn_id that has come as a part of the injestion process. So, in above example: P1,2000,TXNID2 / P1,4000,TXNID3 are actually erroneous transactions.
– Prashant A
Mar 23 at 14:52
Do you want to take the latest "v_txn_id" based on "timestamp" column? @BDA
– Sarath Avanavu
Mar 24 at 14:22
Yes, that will also do in case dropduplicates doesn't remove the old v_txn_ids. My assumption was that Dropduplicates will only give me the latest v_txn_ids. Anyways, if we can achieve it through timestamp column then it should work for me. Based on this small code piece it appears that my understanding of DropDuplicates is wrong (presumptuous).
– Prashant A
Mar 25 at 2:21
How can I apply Window (with/without watermark) and use processing time to get the latest v_txn_ids. The use-case then would be: multiple transactions have come in during the sliding window of let's say 5 minutes, and I want to get the latest transaction ids for that window of 5 minutes. Also, the next window transactions will be totally different from the previous window.
– Prashant A
Mar 25 at 2:30
I have added the answer. Please go through and let me know if you have any questions @BDA
– Sarath Avanavu
Mar 25 at 7:59
add a comment |
As per you expected output, I believe that you need to find the max of bal based on prod_code
and v_txn_id
column. To achieve you output, on your final aggregate
table, you can use a window funtion (partition by
) to find the max of bal
based on prod_code
and v_txn_id
column by created a temporary column called temp_bal
. Then in the outer query select distinct values based on prod_code
, temp_bal
and v_txn_id
columns.
spark.sql("select distinct prod_code,temp_bal as bal,v_txn_id from(select *,max(bal) over(partition by prod_code,v_txn_id) as temp_bal from aggregates) order by prod_code,v_txn_id").show()
EDIT 1 :
As per your requirment please find the below script that will work according to the latest date/time for the v_txn_id
.
spark.sql("select distinct a.prod_code,a.bal,a.v_txn_id from aggregates a join (select distinct v_txn_id,max(timestamp) over(partition by v_txn_id) as temp_timestamp from aggregates) b on a.v_txn_id=b.v_txn_id and a.timestamp=b.temp_timestamp order by a.v_txn_id").show()
Please let me know if you have any questions, else please mark this answer as accepted (tick icon).
As per you expected output, I believe that you need to find the max of bal based on prod_code
and v_txn_id
column. To achieve you output, on your final aggregate
table, you can use a window funtion (partition by
) to find the max of bal
based on prod_code
and v_txn_id
column by created a temporary column called temp_bal
. Then in the outer query select distinct values based on prod_code
, temp_bal
and v_txn_id
columns.
spark.sql("select distinct prod_code,temp_bal as bal,v_txn_id from(select *,max(bal) over(partition by prod_code,v_txn_id) as temp_bal from aggregates) order by prod_code,v_txn_id").show()
EDIT 1 :
As per your requirment please find the below script that will work according to the latest date/time for the v_txn_id
.
spark.sql("select distinct a.prod_code,a.bal,a.v_txn_id from aggregates a join (select distinct v_txn_id,max(timestamp) over(partition by v_txn_id) as temp_timestamp from aggregates) b on a.v_txn_id=b.v_txn_id and a.timestamp=b.temp_timestamp order by a.v_txn_id").show()
Please let me know if you have any questions, else please mark this answer as accepted (tick icon).
edited Mar 25 at 7:59
answered Mar 23 at 5:48
Sarath AvanavuSarath Avanavu
11.4k74566
11.4k74566
Thanks for the detailed response. But actually, the balance may not be maximum. I have put an incremental balance for this sample run. Balance can be less for the same v_txn_id (erroneous previous transaction). So, in totality I am interested in the final value of the v_txn_id that has come as a part of the injestion process. So, in above example: P1,2000,TXNID2 / P1,4000,TXNID3 are actually erroneous transactions.
– Prashant A
Mar 23 at 14:52
Do you want to take the latest "v_txn_id" based on "timestamp" column? @BDA
– Sarath Avanavu
Mar 24 at 14:22
Yes, that will also do in case dropduplicates doesn't remove the old v_txn_ids. My assumption was that Dropduplicates will only give me the latest v_txn_ids. Anyways, if we can achieve it through timestamp column then it should work for me. Based on this small code piece it appears that my understanding of DropDuplicates is wrong (presumptuous).
– Prashant A
Mar 25 at 2:21
How can I apply Window (with/without watermark) and use processing time to get the latest v_txn_ids. The use-case then would be: multiple transactions have come in during the sliding window of let's say 5 minutes, and I want to get the latest transaction ids for that window of 5 minutes. Also, the next window transactions will be totally different from the previous window.
– Prashant A
Mar 25 at 2:30
I have added the answer. Please go through and let me know if you have any questions @BDA
– Sarath Avanavu
Mar 25 at 7:59
add a comment |
Thanks for the detailed response. But actually, the balance may not be maximum. I have put an incremental balance for this sample run. Balance can be less for the same v_txn_id (erroneous previous transaction). So, in totality I am interested in the final value of the v_txn_id that has come as a part of the injestion process. So, in above example: P1,2000,TXNID2 / P1,4000,TXNID3 are actually erroneous transactions.
– Prashant A
Mar 23 at 14:52
Do you want to take the latest "v_txn_id" based on "timestamp" column? @BDA
– Sarath Avanavu
Mar 24 at 14:22
Yes, that will also do in case dropduplicates doesn't remove the old v_txn_ids. My assumption was that Dropduplicates will only give me the latest v_txn_ids. Anyways, if we can achieve it through timestamp column then it should work for me. Based on this small code piece it appears that my understanding of DropDuplicates is wrong (presumptuous).
– Prashant A
Mar 25 at 2:21
How can I apply Window (with/without watermark) and use processing time to get the latest v_txn_ids. The use-case then would be: multiple transactions have come in during the sliding window of let's say 5 minutes, and I want to get the latest transaction ids for that window of 5 minutes. Also, the next window transactions will be totally different from the previous window.
– Prashant A
Mar 25 at 2:30
I have added the answer. Please go through and let me know if you have any questions @BDA
– Sarath Avanavu
Mar 25 at 7:59
Thanks for the detailed response. But actually, the balance may not be maximum. I have put an incremental balance for this sample run. Balance can be less for the same v_txn_id (erroneous previous transaction). So, in totality I am interested in the final value of the v_txn_id that has come as a part of the injestion process. So, in above example: P1,2000,TXNID2 / P1,4000,TXNID3 are actually erroneous transactions.
– Prashant A
Mar 23 at 14:52
Thanks for the detailed response. But actually, the balance may not be maximum. I have put an incremental balance for this sample run. Balance can be less for the same v_txn_id (erroneous previous transaction). So, in totality I am interested in the final value of the v_txn_id that has come as a part of the injestion process. So, in above example: P1,2000,TXNID2 / P1,4000,TXNID3 are actually erroneous transactions.
– Prashant A
Mar 23 at 14:52
Do you want to take the latest "v_txn_id" based on "timestamp" column? @BDA
– Sarath Avanavu
Mar 24 at 14:22
Do you want to take the latest "v_txn_id" based on "timestamp" column? @BDA
– Sarath Avanavu
Mar 24 at 14:22
Yes, that will also do in case dropduplicates doesn't remove the old v_txn_ids. My assumption was that Dropduplicates will only give me the latest v_txn_ids. Anyways, if we can achieve it through timestamp column then it should work for me. Based on this small code piece it appears that my understanding of DropDuplicates is wrong (presumptuous).
– Prashant A
Mar 25 at 2:21
Yes, that will also do in case dropduplicates doesn't remove the old v_txn_ids. My assumption was that Dropduplicates will only give me the latest v_txn_ids. Anyways, if we can achieve it through timestamp column then it should work for me. Based on this small code piece it appears that my understanding of DropDuplicates is wrong (presumptuous).
– Prashant A
Mar 25 at 2:21
How can I apply Window (with/without watermark) and use processing time to get the latest v_txn_ids. The use-case then would be: multiple transactions have come in during the sliding window of let's say 5 minutes, and I want to get the latest transaction ids for that window of 5 minutes. Also, the next window transactions will be totally different from the previous window.
– Prashant A
Mar 25 at 2:30
How can I apply Window (with/without watermark) and use processing time to get the latest v_txn_ids. The use-case then would be: multiple transactions have come in during the sliding window of let's say 5 minutes, and I want to get the latest transaction ids for that window of 5 minutes. Also, the next window transactions will be totally different from the previous window.
– Prashant A
Mar 25 at 2:30
I have added the answer. Please go through and let me know if you have any questions @BDA
– Sarath Avanavu
Mar 25 at 7:59
I have added the answer. Please go through and let me know if you have any questions @BDA
– Sarath Avanavu
Mar 25 at 7:59
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55302583%2fdropduplicates-is-not-giving-expected-result%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Just based on the description it sounds like you're looking for complete mode, not update mode.
– user10938362
Mar 22 at 15:41