Kinesis Consumer recordProcessor initialize(…) is called multiple timesHow KCL application is integrated to Kinesis Connector to emit data on S3Why checkpoint is needed on an Amazon Kinesis stream when shutting down a shard?Seems not a Kinesis client library application for spark plus kinesis integrationExpected behavior for AWS Kinesis ShardIteratorType TRIM_HORIZONAmazon KCL Checkpoints and Trim HorizonKinesis consumer recordprocessor factory and record processor threadsafetyKinesis client library record processor failureSame Kinesis Consumer running on multiple EC2 instancesMultiple Worker Threads on a shard in Kinesis
Why did MS-DOS applications built using Turbo Pascal fail to start with a division by zero error on faster systems?
On the feasibility of space battleships
Co-author responds to email by mistake cc'ing the EiC
Don't understand MOSFET as amplifier
Fried gnocchi with spinach, bacon, cream sauce in a single pan
How should I face my manager if I make a mistake because a senior coworker explained me something wrong?
Why we don't have vaccination against all diseases which are caused by microbes?
Why don't electrons take the shorter path in coils
System to validate run time complexity requirements
What does it mean to have a subnet mask /32?
Science fiction short story where aliens contact a drunk about Earth's impending destruction
Were there 486SX revisions without an FPU on the die?
Why aren't RCS openings an issue for spacecraft heat shields?
How big would a Daddy Longlegs Spider need to be to kill an average Human?
How do I find the fastest route from Heathrow to an address in London using all forms of transport?
In what ways can a Non-paladin access Paladin spells?
Is "stainless" a bulk or a surface property of stainless steel?
Efficiently pathfinding many flocking enemies around obstacles
The teacher logged me in as administrator for doing a short task, is the whole system now compromised?
Don't these experiments suggest that locality has to be abandoned in the quantum realm?
Ask for a paid taxi in order to arrive as early as possible for an interview within the city
How is "sein" conjugated in this sub-sentence?
Why is 日本 read as "nihon" but not "nitsuhon"?
Three Singles in Three Clubs
Kinesis Consumer recordProcessor initialize(…) is called multiple times
How KCL application is integrated to Kinesis Connector to emit data on S3Why checkpoint is needed on an Amazon Kinesis stream when shutting down a shard?Seems not a Kinesis client library application for spark plus kinesis integrationExpected behavior for AWS Kinesis ShardIteratorType TRIM_HORIZONAmazon KCL Checkpoints and Trim HorizonKinesis consumer recordprocessor factory and record processor threadsafetyKinesis client library record processor failureSame Kinesis Consumer running on multiple EC2 instancesMultiple Worker Threads on a shard in Kinesis
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;
Problem: initialize(...)
method is called multiple times on a recordProcessor when that recordProcessor has high count of children threads.
Environment:
AWS SDK: 1.11.498
KCL: 1.9.3
Scala: 2.12.8
JDK: OpenJDK 1.8
Host: Amazon Linux 2 on ECS [ami-007571470797b8ffa]
Number of shards: 8
Implementation:
I have implemented a RecordProcessor (extending IRecordProcessor) in Scala.
- This record processor starts a stats reporting thread on initialize(...)
to report stats to our collectors.
- Also, this record processor internally distributes records to be processed among multiple worker threads via a work queue when processRecords(...)
is called. These worker threads are also started on initialize(...)
.
Problem:
When shard count is 8 and worker threads count is 16, KCL calls initialize(...)
method multiple times on same recordProcessor. This throws IllegalThreadStateException
as statsReporterThread
is tried to be started when it's already started by previous initialize(...)
call.
Catch:
When shard count is still 8, but worker thread count is 1, multiple initialize(...)
calls are NOT made and everything works perfect.
This is puzzling as worker threads are not exposed to KCL, they are internal implementation of the record processor.
I suspected lower ulimits, so I increased them, it didn't help.
Also, when this same application is run on my laptop, it works! But fails on AWS ECS.
Code:
class RecordProcessor() extends IRecordProcessor
val statsReporter = new StatsReporter()
val statsReporterThread = new Thread(statsReporter)
val workQueue: LinkedBlockingDeque[Record] = ...
val workerThreads: ListBuffer[Thread] = ...
def initialize(shardId) =
statsReporterThread.start()
(0 until 16).foreach(_ =>
val wThread = new Thread(new Worker(workQueue))
workerThreads += wThread
wThread.start()
)
def processRecords(records, checkpointer) =
records.foreach(record =>
wq.put(record)
)
if (currentTimeMs > nextTimeInMs)
checkpoint(checkpointer)
nextTimeInMs = currentTimeMs + 15000
def shutdown(checkpointer, reason) =
workerThreads.foreach(w =>
w.interrupt()
w.join()
)
statsReporterThread.interrupt()
statsReporterThread.join()
class Worker(workQueue) extends Runnable
override def run(): Unit =
while (!Thread.currentThread().isInterrupted)
val record = q.take()
process(record)
Any help / pointers would be highly appreciated!
Thanks!
multithreading scala amazon-web-services amazon-kinesis amazon-kcl
add a comment |
Problem: initialize(...)
method is called multiple times on a recordProcessor when that recordProcessor has high count of children threads.
Environment:
AWS SDK: 1.11.498
KCL: 1.9.3
Scala: 2.12.8
JDK: OpenJDK 1.8
Host: Amazon Linux 2 on ECS [ami-007571470797b8ffa]
Number of shards: 8
Implementation:
I have implemented a RecordProcessor (extending IRecordProcessor) in Scala.
- This record processor starts a stats reporting thread on initialize(...)
to report stats to our collectors.
- Also, this record processor internally distributes records to be processed among multiple worker threads via a work queue when processRecords(...)
is called. These worker threads are also started on initialize(...)
.
Problem:
When shard count is 8 and worker threads count is 16, KCL calls initialize(...)
method multiple times on same recordProcessor. This throws IllegalThreadStateException
as statsReporterThread
is tried to be started when it's already started by previous initialize(...)
call.
Catch:
When shard count is still 8, but worker thread count is 1, multiple initialize(...)
calls are NOT made and everything works perfect.
This is puzzling as worker threads are not exposed to KCL, they are internal implementation of the record processor.
I suspected lower ulimits, so I increased them, it didn't help.
Also, when this same application is run on my laptop, it works! But fails on AWS ECS.
Code:
class RecordProcessor() extends IRecordProcessor
val statsReporter = new StatsReporter()
val statsReporterThread = new Thread(statsReporter)
val workQueue: LinkedBlockingDeque[Record] = ...
val workerThreads: ListBuffer[Thread] = ...
def initialize(shardId) =
statsReporterThread.start()
(0 until 16).foreach(_ =>
val wThread = new Thread(new Worker(workQueue))
workerThreads += wThread
wThread.start()
)
def processRecords(records, checkpointer) =
records.foreach(record =>
wq.put(record)
)
if (currentTimeMs > nextTimeInMs)
checkpoint(checkpointer)
nextTimeInMs = currentTimeMs + 15000
def shutdown(checkpointer, reason) =
workerThreads.foreach(w =>
w.interrupt()
w.join()
)
statsReporterThread.interrupt()
statsReporterThread.join()
class Worker(workQueue) extends Runnable
override def run(): Unit =
while (!Thread.currentThread().isInterrupted)
val record = q.take()
process(record)
Any help / pointers would be highly appreciated!
Thanks!
multithreading scala amazon-web-services amazon-kinesis amazon-kcl
add a comment |
Problem: initialize(...)
method is called multiple times on a recordProcessor when that recordProcessor has high count of children threads.
Environment:
AWS SDK: 1.11.498
KCL: 1.9.3
Scala: 2.12.8
JDK: OpenJDK 1.8
Host: Amazon Linux 2 on ECS [ami-007571470797b8ffa]
Number of shards: 8
Implementation:
I have implemented a RecordProcessor (extending IRecordProcessor) in Scala.
- This record processor starts a stats reporting thread on initialize(...)
to report stats to our collectors.
- Also, this record processor internally distributes records to be processed among multiple worker threads via a work queue when processRecords(...)
is called. These worker threads are also started on initialize(...)
.
Problem:
When shard count is 8 and worker threads count is 16, KCL calls initialize(...)
method multiple times on same recordProcessor. This throws IllegalThreadStateException
as statsReporterThread
is tried to be started when it's already started by previous initialize(...)
call.
Catch:
When shard count is still 8, but worker thread count is 1, multiple initialize(...)
calls are NOT made and everything works perfect.
This is puzzling as worker threads are not exposed to KCL, they are internal implementation of the record processor.
I suspected lower ulimits, so I increased them, it didn't help.
Also, when this same application is run on my laptop, it works! But fails on AWS ECS.
Code:
class RecordProcessor() extends IRecordProcessor
val statsReporter = new StatsReporter()
val statsReporterThread = new Thread(statsReporter)
val workQueue: LinkedBlockingDeque[Record] = ...
val workerThreads: ListBuffer[Thread] = ...
def initialize(shardId) =
statsReporterThread.start()
(0 until 16).foreach(_ =>
val wThread = new Thread(new Worker(workQueue))
workerThreads += wThread
wThread.start()
)
def processRecords(records, checkpointer) =
records.foreach(record =>
wq.put(record)
)
if (currentTimeMs > nextTimeInMs)
checkpoint(checkpointer)
nextTimeInMs = currentTimeMs + 15000
def shutdown(checkpointer, reason) =
workerThreads.foreach(w =>
w.interrupt()
w.join()
)
statsReporterThread.interrupt()
statsReporterThread.join()
class Worker(workQueue) extends Runnable
override def run(): Unit =
while (!Thread.currentThread().isInterrupted)
val record = q.take()
process(record)
Any help / pointers would be highly appreciated!
Thanks!
multithreading scala amazon-web-services amazon-kinesis amazon-kcl
Problem: initialize(...)
method is called multiple times on a recordProcessor when that recordProcessor has high count of children threads.
Environment:
AWS SDK: 1.11.498
KCL: 1.9.3
Scala: 2.12.8
JDK: OpenJDK 1.8
Host: Amazon Linux 2 on ECS [ami-007571470797b8ffa]
Number of shards: 8
Implementation:
I have implemented a RecordProcessor (extending IRecordProcessor) in Scala.
- This record processor starts a stats reporting thread on initialize(...)
to report stats to our collectors.
- Also, this record processor internally distributes records to be processed among multiple worker threads via a work queue when processRecords(...)
is called. These worker threads are also started on initialize(...)
.
Problem:
When shard count is 8 and worker threads count is 16, KCL calls initialize(...)
method multiple times on same recordProcessor. This throws IllegalThreadStateException
as statsReporterThread
is tried to be started when it's already started by previous initialize(...)
call.
Catch:
When shard count is still 8, but worker thread count is 1, multiple initialize(...)
calls are NOT made and everything works perfect.
This is puzzling as worker threads are not exposed to KCL, they are internal implementation of the record processor.
I suspected lower ulimits, so I increased them, it didn't help.
Also, when this same application is run on my laptop, it works! But fails on AWS ECS.
Code:
class RecordProcessor() extends IRecordProcessor
val statsReporter = new StatsReporter()
val statsReporterThread = new Thread(statsReporter)
val workQueue: LinkedBlockingDeque[Record] = ...
val workerThreads: ListBuffer[Thread] = ...
def initialize(shardId) =
statsReporterThread.start()
(0 until 16).foreach(_ =>
val wThread = new Thread(new Worker(workQueue))
workerThreads += wThread
wThread.start()
)
def processRecords(records, checkpointer) =
records.foreach(record =>
wq.put(record)
)
if (currentTimeMs > nextTimeInMs)
checkpoint(checkpointer)
nextTimeInMs = currentTimeMs + 15000
def shutdown(checkpointer, reason) =
workerThreads.foreach(w =>
w.interrupt()
w.join()
)
statsReporterThread.interrupt()
statsReporterThread.join()
class Worker(workQueue) extends Runnable
override def run(): Unit =
while (!Thread.currentThread().isInterrupted)
val record = q.take()
process(record)
Any help / pointers would be highly appreciated!
Thanks!
multithreading scala amazon-web-services amazon-kinesis amazon-kcl
multithreading scala amazon-web-services amazon-kinesis amazon-kcl
asked Mar 27 at 15:43
Tushar SudakeTushar Sudake
8231 gold badge11 silver badges20 bronze badges
8231 gold badge11 silver badges20 bronze badges
add a comment |
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55381255%2fkinesis-consumer-recordprocessor-initialize-is-called-multiple-times%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Is this question similar to what you get asked at work? Learn more about asking and sharing private information with your coworkers using Stack Overflow for Teams.
Is this question similar to what you get asked at work? Learn more about asking and sharing private information with your coworkers using Stack Overflow for Teams.
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55381255%2fkinesis-consumer-recordprocessor-initialize-is-called-multiple-times%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown