Kafka source connector is not pulling the record as expected when records are inserted in source topic from multiple sourcesKafka Connector - JMSSourceConnector for Kafka topicCan the JDBC Kafka Connector pull data from multiple databases?kafka connector HTTP/API sourcekafka connect multiple topics in sink connector propertiesHow to setup multiple Kafka JDBC sink connectors for a single topicmultiple connectors in kafka to different topics are going to same nodeDynamic topic in kafka connectorKafka MQTT connector with multiple topicsDeleting Records in database through kafka jdbc connector
Can I pay my credit card?
Physically unpleasant work environment
Have the writers and actors of GOT responded to its poor reception?
Why does a table with a defined constant in its index compute 10X slower?
I recently started my machine learning PhD and I have absolutely no idea what I'm doing
What color to choose as "danger" if the main color of my app is red
Why is the S-duct intake on the Tu-154 uniquely oblong?
What technology would Dwarves need to forge titanium?
Pedaling at different gear ratios on flat terrain: what's the point?
Working hours and productivity expectations for game artists and programmers
How would fantasy dwarves exist, realistically?
Gambler's Fallacy Dice
Failing students when it might cause them economic ruin
Divisor Rich and Poor Numbers
Managing heat dissipation in a magic wand
Is it possible to determine from only a photo of a cityscape whether it was taken close with wide angle or from a distance with zoom?
Is my homebrew Awakened Bear race balanced?
Why would you put your input amplifier in front of your filtering for an ECG signal?
How can I monitor the bulk API limit?
Would a "ring language" be possible?
How does this piece of code determine array size without using sizeof( )?
Why use a retrograde orbit?
How was the blinking terminal cursor invented?
Shortest amud or daf in Shas?
Kafka source connector is not pulling the record as expected when records are inserted in source topic from multiple sources
Kafka Connector - JMSSourceConnector for Kafka topicCan the JDBC Kafka Connector pull data from multiple databases?kafka connector HTTP/API sourcekafka connect multiple topics in sink connector propertiesHow to setup multiple Kafka JDBC sink connectors for a single topicmultiple connectors in kafka to different topics are going to same nodeDynamic topic in kafka connectorKafka MQTT connector with multiple topicsDeleting Records in database through kafka jdbc connector
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;
In one of my use case i am trying to create a pipeline
whenever i sent the message from custom partition, i sent the timestamp in milliseconds with LONG data type because in the schema, the timestamp column has been defined as long.
Code that i had earlier in custom partition:
Date date = new Date();
long timeMilli = date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);
Display result before i sent the record:
date = Tue Mar 26 22:02:04 EDT 2019 , time in millis = 1553652124063
value inserted in timestamp column in table2:
3/27/2019 2:02:04.063000 AM
Since its taking UK timezone (i believe), i put temporary fix for time being to subtract 4 hours from the current timestamp so that i can match with USA EST timestamp.
Date date = new Date();
Date adj_date = DateUtils.addHours(date,-4);
long timeMilli = adj_date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);
Display result:
date = Tue Mar 26 22:04:43 EDT 2019 , time in millis = 1553637883826
value inserted in timestamp column in table2:
3/26/2019 10:04:43.826000 PM
Please let me know if i am missing anything as i am not sure why this is happening when i sent message from custom partition.
apache-kafka kafka-producer-api apache-kafka-connect
add a comment |
In one of my use case i am trying to create a pipeline
whenever i sent the message from custom partition, i sent the timestamp in milliseconds with LONG data type because in the schema, the timestamp column has been defined as long.
Code that i had earlier in custom partition:
Date date = new Date();
long timeMilli = date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);
Display result before i sent the record:
date = Tue Mar 26 22:02:04 EDT 2019 , time in millis = 1553652124063
value inserted in timestamp column in table2:
3/27/2019 2:02:04.063000 AM
Since its taking UK timezone (i believe), i put temporary fix for time being to subtract 4 hours from the current timestamp so that i can match with USA EST timestamp.
Date date = new Date();
Date adj_date = DateUtils.addHours(date,-4);
long timeMilli = adj_date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);
Display result:
date = Tue Mar 26 22:04:43 EDT 2019 , time in millis = 1553637883826
value inserted in timestamp column in table2:
3/26/2019 10:04:43.826000 PM
Please let me know if i am missing anything as i am not sure why this is happening when i sent message from custom partition.
apache-kafka kafka-producer-api apache-kafka-connect
add a comment |
In one of my use case i am trying to create a pipeline
whenever i sent the message from custom partition, i sent the timestamp in milliseconds with LONG data type because in the schema, the timestamp column has been defined as long.
Code that i had earlier in custom partition:
Date date = new Date();
long timeMilli = date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);
Display result before i sent the record:
date = Tue Mar 26 22:02:04 EDT 2019 , time in millis = 1553652124063
value inserted in timestamp column in table2:
3/27/2019 2:02:04.063000 AM
Since its taking UK timezone (i believe), i put temporary fix for time being to subtract 4 hours from the current timestamp so that i can match with USA EST timestamp.
Date date = new Date();
Date adj_date = DateUtils.addHours(date,-4);
long timeMilli = adj_date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);
Display result:
date = Tue Mar 26 22:04:43 EDT 2019 , time in millis = 1553637883826
value inserted in timestamp column in table2:
3/26/2019 10:04:43.826000 PM
Please let me know if i am missing anything as i am not sure why this is happening when i sent message from custom partition.
apache-kafka kafka-producer-api apache-kafka-connect
In one of my use case i am trying to create a pipeline
whenever i sent the message from custom partition, i sent the timestamp in milliseconds with LONG data type because in the schema, the timestamp column has been defined as long.
Code that i had earlier in custom partition:
Date date = new Date();
long timeMilli = date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);
Display result before i sent the record:
date = Tue Mar 26 22:02:04 EDT 2019 , time in millis = 1553652124063
value inserted in timestamp column in table2:
3/27/2019 2:02:04.063000 AM
Since its taking UK timezone (i believe), i put temporary fix for time being to subtract 4 hours from the current timestamp so that i can match with USA EST timestamp.
Date date = new Date();
Date adj_date = DateUtils.addHours(date,-4);
long timeMilli = adj_date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);
Display result:
date = Tue Mar 26 22:04:43 EDT 2019 , time in millis = 1553637883826
value inserted in timestamp column in table2:
3/26/2019 10:04:43.826000 PM
Please let me know if i am missing anything as i am not sure why this is happening when i sent message from custom partition.
apache-kafka kafka-producer-api apache-kafka-connect
apache-kafka kafka-producer-api apache-kafka-connect
edited Apr 2 at 17:14
James Mark
asked Mar 23 at 17:33
James MarkJames Mark
277
277
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
Under the hood Jdbc Source Connector use following query:
SELECT * FROM someTable
WHERE
someTimestampColumn < $endTimetampValue
AND (
(someTimestampColumn = $beginTimetampValue AND someIncrementalColumn > $lastIncrementedValue)
OR someTimestampColumn > $beginTimetampValue)
ORDER BY someTimestampColumn, someIncrementalColumn ASC
Summarizing: The query retrieve rows if their timestamp column's value is earlier the current timestamp and is later than last checked.
Above parameters are:
beginTimetampValue
- value of timestamp column of last imported recordendTimetampValue
- current timestamp according to the DatabaselastIncrementedValue
- value of incremental column of last imported record
I think in your case Producer
put to the Tables records with higher timestamp, than you later insert manually (using the query).
When Jdbc Connector checks for new records to import to Kafka it skips them (because they don't fullfil someTimestampColumn < $endTimetampValue
timestamp condition)
You can also change log level to DEBUG
and see what is going on in logs
@wardziniak-Thanks much!. yes, you are correct!. connector inserts the UK timestamp(instead of USA EST timezone) if i sent message from custom partition.After fixing(temporary fix) the timestamp issue, source connector2 picks up all the records regardless of source. I do have other issue regarding the timestamp and i have updated the same in original question. Please could you check and let me know your view.. Thanks in advance!.
– James Mark
Mar 27 at 2:37
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55316524%2fkafka-source-connector-is-not-pulling-the-record-as-expected-when-records-are-in%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Under the hood Jdbc Source Connector use following query:
SELECT * FROM someTable
WHERE
someTimestampColumn < $endTimetampValue
AND (
(someTimestampColumn = $beginTimetampValue AND someIncrementalColumn > $lastIncrementedValue)
OR someTimestampColumn > $beginTimetampValue)
ORDER BY someTimestampColumn, someIncrementalColumn ASC
Summarizing: The query retrieve rows if their timestamp column's value is earlier the current timestamp and is later than last checked.
Above parameters are:
beginTimetampValue
- value of timestamp column of last imported recordendTimetampValue
- current timestamp according to the DatabaselastIncrementedValue
- value of incremental column of last imported record
I think in your case Producer
put to the Tables records with higher timestamp, than you later insert manually (using the query).
When Jdbc Connector checks for new records to import to Kafka it skips them (because they don't fullfil someTimestampColumn < $endTimetampValue
timestamp condition)
You can also change log level to DEBUG
and see what is going on in logs
@wardziniak-Thanks much!. yes, you are correct!. connector inserts the UK timestamp(instead of USA EST timezone) if i sent message from custom partition.After fixing(temporary fix) the timestamp issue, source connector2 picks up all the records regardless of source. I do have other issue regarding the timestamp and i have updated the same in original question. Please could you check and let me know your view.. Thanks in advance!.
– James Mark
Mar 27 at 2:37
add a comment |
Under the hood Jdbc Source Connector use following query:
SELECT * FROM someTable
WHERE
someTimestampColumn < $endTimetampValue
AND (
(someTimestampColumn = $beginTimetampValue AND someIncrementalColumn > $lastIncrementedValue)
OR someTimestampColumn > $beginTimetampValue)
ORDER BY someTimestampColumn, someIncrementalColumn ASC
Summarizing: The query retrieve rows if their timestamp column's value is earlier the current timestamp and is later than last checked.
Above parameters are:
beginTimetampValue
- value of timestamp column of last imported recordendTimetampValue
- current timestamp according to the DatabaselastIncrementedValue
- value of incremental column of last imported record
I think in your case Producer
put to the Tables records with higher timestamp, than you later insert manually (using the query).
When Jdbc Connector checks for new records to import to Kafka it skips them (because they don't fullfil someTimestampColumn < $endTimetampValue
timestamp condition)
You can also change log level to DEBUG
and see what is going on in logs
@wardziniak-Thanks much!. yes, you are correct!. connector inserts the UK timestamp(instead of USA EST timezone) if i sent message from custom partition.After fixing(temporary fix) the timestamp issue, source connector2 picks up all the records regardless of source. I do have other issue regarding the timestamp and i have updated the same in original question. Please could you check and let me know your view.. Thanks in advance!.
– James Mark
Mar 27 at 2:37
add a comment |
Under the hood Jdbc Source Connector use following query:
SELECT * FROM someTable
WHERE
someTimestampColumn < $endTimetampValue
AND (
(someTimestampColumn = $beginTimetampValue AND someIncrementalColumn > $lastIncrementedValue)
OR someTimestampColumn > $beginTimetampValue)
ORDER BY someTimestampColumn, someIncrementalColumn ASC
Summarizing: The query retrieve rows if their timestamp column's value is earlier the current timestamp and is later than last checked.
Above parameters are:
beginTimetampValue
- value of timestamp column of last imported recordendTimetampValue
- current timestamp according to the DatabaselastIncrementedValue
- value of incremental column of last imported record
I think in your case Producer
put to the Tables records with higher timestamp, than you later insert manually (using the query).
When Jdbc Connector checks for new records to import to Kafka it skips them (because they don't fullfil someTimestampColumn < $endTimetampValue
timestamp condition)
You can also change log level to DEBUG
and see what is going on in logs
Under the hood Jdbc Source Connector use following query:
SELECT * FROM someTable
WHERE
someTimestampColumn < $endTimetampValue
AND (
(someTimestampColumn = $beginTimetampValue AND someIncrementalColumn > $lastIncrementedValue)
OR someTimestampColumn > $beginTimetampValue)
ORDER BY someTimestampColumn, someIncrementalColumn ASC
Summarizing: The query retrieve rows if their timestamp column's value is earlier the current timestamp and is later than last checked.
Above parameters are:
beginTimetampValue
- value of timestamp column of last imported recordendTimetampValue
- current timestamp according to the DatabaselastIncrementedValue
- value of incremental column of last imported record
I think in your case Producer
put to the Tables records with higher timestamp, than you later insert manually (using the query).
When Jdbc Connector checks for new records to import to Kafka it skips them (because they don't fullfil someTimestampColumn < $endTimetampValue
timestamp condition)
You can also change log level to DEBUG
and see what is going on in logs
edited Mar 24 at 18:02
answered Mar 24 at 17:09
wardziniakwardziniak
2,5001718
2,5001718
@wardziniak-Thanks much!. yes, you are correct!. connector inserts the UK timestamp(instead of USA EST timezone) if i sent message from custom partition.After fixing(temporary fix) the timestamp issue, source connector2 picks up all the records regardless of source. I do have other issue regarding the timestamp and i have updated the same in original question. Please could you check and let me know your view.. Thanks in advance!.
– James Mark
Mar 27 at 2:37
add a comment |
@wardziniak-Thanks much!. yes, you are correct!. connector inserts the UK timestamp(instead of USA EST timezone) if i sent message from custom partition.After fixing(temporary fix) the timestamp issue, source connector2 picks up all the records regardless of source. I do have other issue regarding the timestamp and i have updated the same in original question. Please could you check and let me know your view.. Thanks in advance!.
– James Mark
Mar 27 at 2:37
@wardziniak-Thanks much!. yes, you are correct!. connector inserts the UK timestamp(instead of USA EST timezone) if i sent message from custom partition.After fixing(temporary fix) the timestamp issue, source connector2 picks up all the records regardless of source. I do have other issue regarding the timestamp and i have updated the same in original question. Please could you check and let me know your view.. Thanks in advance!.
– James Mark
Mar 27 at 2:37
@wardziniak-Thanks much!. yes, you are correct!. connector inserts the UK timestamp(instead of USA EST timezone) if i sent message from custom partition.After fixing(temporary fix) the timestamp issue, source connector2 picks up all the records regardless of source. I do have other issue regarding the timestamp and i have updated the same in original question. Please could you check and let me know your view.. Thanks in advance!.
– James Mark
Mar 27 at 2:37
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55316524%2fkafka-source-connector-is-not-pulling-the-record-as-expected-when-records-are-in%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown