Kinesis Consumer recordProcessor initialize(…) is called multiple timesHow KCL application is integrated to Kinesis Connector to emit data on S3Why checkpoint is needed on an Amazon Kinesis stream when shutting down a shard?Seems not a Kinesis client library application for spark plus kinesis integrationExpected behavior for AWS Kinesis ShardIteratorType TRIM_HORIZONAmazon KCL Checkpoints and Trim HorizonKinesis consumer recordprocessor factory and record processor threadsafetyKinesis client library record processor failureSame Kinesis Consumer running on multiple EC2 instancesMultiple Worker Threads on a shard in Kinesis

Why did MS-DOS applications built using Turbo Pascal fail to start with a division by zero error on faster systems?

On the feasibility of space battleships

Co-author responds to email by mistake cc'ing the EiC

Don't understand MOSFET as amplifier

Fried gnocchi with spinach, bacon, cream sauce in a single pan

How should I face my manager if I make a mistake because a senior coworker explained me something wrong?

Why we don't have vaccination against all diseases which are caused by microbes?

Why don't electrons take the shorter path in coils

System to validate run time complexity requirements

What does it mean to have a subnet mask /32?

Science fiction short story where aliens contact a drunk about Earth's impending destruction

Were there 486SX revisions without an FPU on the die?

Why aren't RCS openings an issue for spacecraft heat shields?

How big would a Daddy Longlegs Spider need to be to kill an average Human?

How do I find the fastest route from Heathrow to an address in London using all forms of transport?

In what ways can a Non-paladin access Paladin spells?

Is "stainless" a bulk or a surface property of stainless steel?

Efficiently pathfinding many flocking enemies around obstacles

The teacher logged me in as administrator for doing a short task, is the whole system now compromised?

Don't these experiments suggest that locality has to be abandoned in the quantum realm?

Ask for a paid taxi in order to arrive as early as possible for an interview within the city

How is "sein" conjugated in this sub-sentence?

Why is 日本 read as "nihon" but not "nitsuhon"?

Three Singles in Three Clubs



Kinesis Consumer recordProcessor initialize(…) is called multiple times


How KCL application is integrated to Kinesis Connector to emit data on S3Why checkpoint is needed on an Amazon Kinesis stream when shutting down a shard?Seems not a Kinesis client library application for spark plus kinesis integrationExpected behavior for AWS Kinesis ShardIteratorType TRIM_HORIZONAmazon KCL Checkpoints and Trim HorizonKinesis consumer recordprocessor factory and record processor threadsafetyKinesis client library record processor failureSame Kinesis Consumer running on multiple EC2 instancesMultiple Worker Threads on a shard in Kinesis






.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;








0















Problem: initialize(...) method is called multiple times on a recordProcessor when that recordProcessor has high count of children threads.



Environment:




AWS SDK: 1.11.498



KCL: 1.9.3



Scala: 2.12.8



JDK: OpenJDK 1.8



Host: Amazon Linux 2 on ECS [ami-007571470797b8ffa]



Number of shards: 8




Implementation:



I have implemented a RecordProcessor (extending IRecordProcessor) in Scala.
- This record processor starts a stats reporting thread on initialize(...) to report stats to our collectors.
- Also, this record processor internally distributes records to be processed among multiple worker threads via a work queue when processRecords(...) is called. These worker threads are also started on initialize(...).



Problem:
When shard count is 8 and worker threads count is 16, KCL calls initialize(...) method multiple times on same recordProcessor. This throws IllegalThreadStateException as statsReporterThread is tried to be started when it's already started by previous initialize(...) call.



Catch:
When shard count is still 8, but worker thread count is 1, multiple initialize(...) calls are NOT made and everything works perfect.
This is puzzling as worker threads are not exposed to KCL, they are internal implementation of the record processor.
I suspected lower ulimits, so I increased them, it didn't help.



Also, when this same application is run on my laptop, it works! But fails on AWS ECS.



Code:



class RecordProcessor() extends IRecordProcessor 

val statsReporter = new StatsReporter()
val statsReporterThread = new Thread(statsReporter)

val workQueue: LinkedBlockingDeque[Record] = ...
val workerThreads: ListBuffer[Thread] = ...

def initialize(shardId) =
statsReporterThread.start()

(0 until 16).foreach(_ =>
val wThread = new Thread(new Worker(workQueue))
workerThreads += wThread
wThread.start()
)


def processRecords(records, checkpointer) =
records.foreach(record =>
wq.put(record)
)

if (currentTimeMs > nextTimeInMs)
checkpoint(checkpointer)
nextTimeInMs = currentTimeMs + 15000



def shutdown(checkpointer, reason) =
workerThreads.foreach(w =>
w.interrupt()
w.join()
)

statsReporterThread.interrupt()
statsReporterThread.join()



class Worker(workQueue) extends Runnable
override def run(): Unit =
while (!Thread.currentThread().isInterrupted)
val record = q.take()
process(record)





Any help / pointers would be highly appreciated!



Thanks!










share|improve this question






























    0















    Problem: initialize(...) method is called multiple times on a recordProcessor when that recordProcessor has high count of children threads.



    Environment:




    AWS SDK: 1.11.498



    KCL: 1.9.3



    Scala: 2.12.8



    JDK: OpenJDK 1.8



    Host: Amazon Linux 2 on ECS [ami-007571470797b8ffa]



    Number of shards: 8




    Implementation:



    I have implemented a RecordProcessor (extending IRecordProcessor) in Scala.
    - This record processor starts a stats reporting thread on initialize(...) to report stats to our collectors.
    - Also, this record processor internally distributes records to be processed among multiple worker threads via a work queue when processRecords(...) is called. These worker threads are also started on initialize(...).



    Problem:
    When shard count is 8 and worker threads count is 16, KCL calls initialize(...) method multiple times on same recordProcessor. This throws IllegalThreadStateException as statsReporterThread is tried to be started when it's already started by previous initialize(...) call.



    Catch:
    When shard count is still 8, but worker thread count is 1, multiple initialize(...) calls are NOT made and everything works perfect.
    This is puzzling as worker threads are not exposed to KCL, they are internal implementation of the record processor.
    I suspected lower ulimits, so I increased them, it didn't help.



    Also, when this same application is run on my laptop, it works! But fails on AWS ECS.



    Code:



    class RecordProcessor() extends IRecordProcessor 

    val statsReporter = new StatsReporter()
    val statsReporterThread = new Thread(statsReporter)

    val workQueue: LinkedBlockingDeque[Record] = ...
    val workerThreads: ListBuffer[Thread] = ...

    def initialize(shardId) =
    statsReporterThread.start()

    (0 until 16).foreach(_ =>
    val wThread = new Thread(new Worker(workQueue))
    workerThreads += wThread
    wThread.start()
    )


    def processRecords(records, checkpointer) =
    records.foreach(record =>
    wq.put(record)
    )

    if (currentTimeMs > nextTimeInMs)
    checkpoint(checkpointer)
    nextTimeInMs = currentTimeMs + 15000



    def shutdown(checkpointer, reason) =
    workerThreads.foreach(w =>
    w.interrupt()
    w.join()
    )

    statsReporterThread.interrupt()
    statsReporterThread.join()



    class Worker(workQueue) extends Runnable
    override def run(): Unit =
    while (!Thread.currentThread().isInterrupted)
    val record = q.take()
    process(record)





    Any help / pointers would be highly appreciated!



    Thanks!










    share|improve this question


























      0












      0








      0








      Problem: initialize(...) method is called multiple times on a recordProcessor when that recordProcessor has high count of children threads.



      Environment:




      AWS SDK: 1.11.498



      KCL: 1.9.3



      Scala: 2.12.8



      JDK: OpenJDK 1.8



      Host: Amazon Linux 2 on ECS [ami-007571470797b8ffa]



      Number of shards: 8




      Implementation:



      I have implemented a RecordProcessor (extending IRecordProcessor) in Scala.
      - This record processor starts a stats reporting thread on initialize(...) to report stats to our collectors.
      - Also, this record processor internally distributes records to be processed among multiple worker threads via a work queue when processRecords(...) is called. These worker threads are also started on initialize(...).



      Problem:
      When shard count is 8 and worker threads count is 16, KCL calls initialize(...) method multiple times on same recordProcessor. This throws IllegalThreadStateException as statsReporterThread is tried to be started when it's already started by previous initialize(...) call.



      Catch:
      When shard count is still 8, but worker thread count is 1, multiple initialize(...) calls are NOT made and everything works perfect.
      This is puzzling as worker threads are not exposed to KCL, they are internal implementation of the record processor.
      I suspected lower ulimits, so I increased them, it didn't help.



      Also, when this same application is run on my laptop, it works! But fails on AWS ECS.



      Code:



      class RecordProcessor() extends IRecordProcessor 

      val statsReporter = new StatsReporter()
      val statsReporterThread = new Thread(statsReporter)

      val workQueue: LinkedBlockingDeque[Record] = ...
      val workerThreads: ListBuffer[Thread] = ...

      def initialize(shardId) =
      statsReporterThread.start()

      (0 until 16).foreach(_ =>
      val wThread = new Thread(new Worker(workQueue))
      workerThreads += wThread
      wThread.start()
      )


      def processRecords(records, checkpointer) =
      records.foreach(record =>
      wq.put(record)
      )

      if (currentTimeMs > nextTimeInMs)
      checkpoint(checkpointer)
      nextTimeInMs = currentTimeMs + 15000



      def shutdown(checkpointer, reason) =
      workerThreads.foreach(w =>
      w.interrupt()
      w.join()
      )

      statsReporterThread.interrupt()
      statsReporterThread.join()



      class Worker(workQueue) extends Runnable
      override def run(): Unit =
      while (!Thread.currentThread().isInterrupted)
      val record = q.take()
      process(record)





      Any help / pointers would be highly appreciated!



      Thanks!










      share|improve this question














      Problem: initialize(...) method is called multiple times on a recordProcessor when that recordProcessor has high count of children threads.



      Environment:




      AWS SDK: 1.11.498



      KCL: 1.9.3



      Scala: 2.12.8



      JDK: OpenJDK 1.8



      Host: Amazon Linux 2 on ECS [ami-007571470797b8ffa]



      Number of shards: 8




      Implementation:



      I have implemented a RecordProcessor (extending IRecordProcessor) in Scala.
      - This record processor starts a stats reporting thread on initialize(...) to report stats to our collectors.
      - Also, this record processor internally distributes records to be processed among multiple worker threads via a work queue when processRecords(...) is called. These worker threads are also started on initialize(...).



      Problem:
      When shard count is 8 and worker threads count is 16, KCL calls initialize(...) method multiple times on same recordProcessor. This throws IllegalThreadStateException as statsReporterThread is tried to be started when it's already started by previous initialize(...) call.



      Catch:
      When shard count is still 8, but worker thread count is 1, multiple initialize(...) calls are NOT made and everything works perfect.
      This is puzzling as worker threads are not exposed to KCL, they are internal implementation of the record processor.
      I suspected lower ulimits, so I increased them, it didn't help.



      Also, when this same application is run on my laptop, it works! But fails on AWS ECS.



      Code:



      class RecordProcessor() extends IRecordProcessor 

      val statsReporter = new StatsReporter()
      val statsReporterThread = new Thread(statsReporter)

      val workQueue: LinkedBlockingDeque[Record] = ...
      val workerThreads: ListBuffer[Thread] = ...

      def initialize(shardId) =
      statsReporterThread.start()

      (0 until 16).foreach(_ =>
      val wThread = new Thread(new Worker(workQueue))
      workerThreads += wThread
      wThread.start()
      )


      def processRecords(records, checkpointer) =
      records.foreach(record =>
      wq.put(record)
      )

      if (currentTimeMs > nextTimeInMs)
      checkpoint(checkpointer)
      nextTimeInMs = currentTimeMs + 15000



      def shutdown(checkpointer, reason) =
      workerThreads.foreach(w =>
      w.interrupt()
      w.join()
      )

      statsReporterThread.interrupt()
      statsReporterThread.join()



      class Worker(workQueue) extends Runnable
      override def run(): Unit =
      while (!Thread.currentThread().isInterrupted)
      val record = q.take()
      process(record)





      Any help / pointers would be highly appreciated!



      Thanks!







      multithreading scala amazon-web-services amazon-kinesis amazon-kcl






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Mar 27 at 15:43









      Tushar SudakeTushar Sudake

      8231 gold badge11 silver badges20 bronze badges




      8231 gold badge11 silver badges20 bronze badges

























          0






          active

          oldest

          votes










          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55381255%2fkinesis-consumer-recordprocessor-initialize-is-called-multiple-times%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          0






          active

          oldest

          votes








          0






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes




          Is this question similar to what you get asked at work? Learn more about asking and sharing private information with your coworkers using Stack Overflow for Teams.







          Is this question similar to what you get asked at work? Learn more about asking and sharing private information with your coworkers using Stack Overflow for Teams.



















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55381255%2fkinesis-consumer-recordprocessor-initialize-is-called-multiple-times%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Kamusi Yaliyomo Aina za kamusi | Muundo wa kamusi | Faida za kamusi | Dhima ya picha katika kamusi | Marejeo | Tazama pia | Viungo vya nje | UrambazajiKuhusu kamusiGo-SwahiliWiki-KamusiKamusi ya Kiswahili na Kiingerezakuihariri na kuongeza habari

          Swift 4 - func physicsWorld not invoked on collision? The Next CEO of Stack OverflowHow to call Objective-C code from Swift#ifdef replacement in the Swift language@selector() in Swift?#pragma mark in Swift?Swift for loop: for index, element in array?dispatch_after - GCD in Swift?Swift Beta performance: sorting arraysSplit a String into an array in Swift?The use of Swift 3 @objc inference in Swift 4 mode is deprecated?How to optimize UITableViewCell, because my UITableView lags

          Access current req object everywhere in Node.js ExpressWhy are global variables considered bad practice? (node.js)Using req & res across functionsHow do I get the path to the current script with Node.js?What is Node.js' Connect, Express and “middleware”?Node.js w/ express error handling in callbackHow to access the GET parameters after “?” in Express?Modify Node.js req object parametersAccess “app” variable inside of ExpressJS/ConnectJS middleware?Node.js Express app - request objectAngular Http Module considered middleware?Session variables in ExpressJSAdd properties to the req object in expressjs with Typescript