WSO2 SP - Kafka source with JSON attributesDirect Kafka Stream with PySpark (Apache Spark 1.6)PipelineDB consumer from KAFKA JSON with arrayHow to write JSON to HDFS partitioned by dates from Kafka by flumeApache Flink : Window Function on AllWindowed Stream - Combining Kafka TopicsConnecting Kafka producer/consumer to broker via TLSThere's no avro data in hdfs using kafka connectspark dataset encoder for kafka avro decoder messagePublish statistic from wso2 EI to wso2 Stream ProcessorNot able to send messages to kafka on AWS EC2, from producer written in AWS Lambda functionInvalid format of event in WSO2

sed delete all the words before a match

Double blind peer review when paper cites author's GitHub repo for code

In a topological space if there exists a loop that cannot be contracted to a point does there exist a simple loop that cannot be contracted also?

Word or idiom defining something barely functional

Secure my password from unsafe servers

Is it really ~648.69 km/s delta-v to "land" on the surface of the Sun?

Shabbat clothing on shabbat chazon

Why is there a need to prevent a racist, sexist, or otherwise bigoted vendor from discriminating who they sell to?

Are there any financial disadvantages to living significantly "below your means"?

Did Apollo leave poop on the moon?

How to mark beverage cans in a cooler for a blind person?

Did WWII Japanese soldiers engage in cannibalism of their enemies?

Physics of Guitar frets and sound

How to display a duet in lyrics?

How do I explain to a team that the project they will work on for six months will 100% fail?

Can a character who casts Shapechange and turns into a spellcaster use innate spellcasting to cast spells with a long casting time?

Generator for parity?

Best gun to modify into a monsterhunter weapon?

Should I self-publish my novella on Amazon or try my luck getting publishers?

How would I make this yellow more golden without affecting the white background?

Where to pee in London?

Does a code snippet compile? Or does it gets compiled?

Colleagues speaking another language and it impacts work

Is this cheap "air conditioner" able to cool a room?



WSO2 SP - Kafka source with JSON attributes


Direct Kafka Stream with PySpark (Apache Spark 1.6)PipelineDB consumer from KAFKA JSON with arrayHow to write JSON to HDFS partitioned by dates from Kafka by flumeApache Flink : Window Function on AllWindowed Stream - Combining Kafka TopicsConnecting Kafka producer/consumer to broker via TLSThere's no avro data in hdfs using kafka connectspark dataset encoder for kafka avro decoder messagePublish statistic from wso2 EI to wso2 Stream ProcessorNot able to send messages to kafka on AWS EC2, from producer written in AWS Lambda functionInvalid format of event in WSO2






.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;








4















I'm trying to read JSON data from Kafka, using following code:



@source(type = 'kafka', bootstrap.servers = 'localhost:9092', topic.list = 'TestTopic', 
group.id = 'test', threading.option = 'single.thread', @map(type = 'json'))

define stream myDataStream (json object);


But failed with following error:




[2019-03-27_11-39-32_103] ERROR
org.wso2.extension.siddhi.map.json.sourcemapper.JsonSourceMapper -
Stream "myDataStream" does not have an attribute named "ABC",
but the received event "event":"ABC":"1" does. Hence dropping the message.
Check whether the json string is in a correct format for default mapping.




I've tried adding the attributes



@source(type = 'kafka', bootstrap.servers = 'localhost:9092', 
topic.list = 'TestTopic', group.id = 'test',
threading.option = 'single.thread',
@map(type = 'json', @attributes(ABC = '$.ABC')))


Syntax error:




Error at 'json' defined at stream 'myDataStream', attribute 'json' is
not mapped




Any help would be greatly appreciated.










share|improve this question
































    4















    I'm trying to read JSON data from Kafka, using following code:



    @source(type = 'kafka', bootstrap.servers = 'localhost:9092', topic.list = 'TestTopic', 
    group.id = 'test', threading.option = 'single.thread', @map(type = 'json'))

    define stream myDataStream (json object);


    But failed with following error:




    [2019-03-27_11-39-32_103] ERROR
    org.wso2.extension.siddhi.map.json.sourcemapper.JsonSourceMapper -
    Stream "myDataStream" does not have an attribute named "ABC",
    but the received event "event":"ABC":"1" does. Hence dropping the message.
    Check whether the json string is in a correct format for default mapping.




    I've tried adding the attributes



    @source(type = 'kafka', bootstrap.servers = 'localhost:9092', 
    topic.list = 'TestTopic', group.id = 'test',
    threading.option = 'single.thread',
    @map(type = 'json', @attributes(ABC = '$.ABC')))


    Syntax error:




    Error at 'json' defined at stream 'myDataStream', attribute 'json' is
    not mapped




    Any help would be greatly appreciated.










    share|improve this question




























      4












      4








      4








      I'm trying to read JSON data from Kafka, using following code:



      @source(type = 'kafka', bootstrap.servers = 'localhost:9092', topic.list = 'TestTopic', 
      group.id = 'test', threading.option = 'single.thread', @map(type = 'json'))

      define stream myDataStream (json object);


      But failed with following error:




      [2019-03-27_11-39-32_103] ERROR
      org.wso2.extension.siddhi.map.json.sourcemapper.JsonSourceMapper -
      Stream "myDataStream" does not have an attribute named "ABC",
      but the received event "event":"ABC":"1" does. Hence dropping the message.
      Check whether the json string is in a correct format for default mapping.




      I've tried adding the attributes



      @source(type = 'kafka', bootstrap.servers = 'localhost:9092', 
      topic.list = 'TestTopic', group.id = 'test',
      threading.option = 'single.thread',
      @map(type = 'json', @attributes(ABC = '$.ABC')))


      Syntax error:




      Error at 'json' defined at stream 'myDataStream', attribute 'json' is
      not mapped




      Any help would be greatly appreciated.










      share|improve this question
















      I'm trying to read JSON data from Kafka, using following code:



      @source(type = 'kafka', bootstrap.servers = 'localhost:9092', topic.list = 'TestTopic', 
      group.id = 'test', threading.option = 'single.thread', @map(type = 'json'))

      define stream myDataStream (json object);


      But failed with following error:




      [2019-03-27_11-39-32_103] ERROR
      org.wso2.extension.siddhi.map.json.sourcemapper.JsonSourceMapper -
      Stream "myDataStream" does not have an attribute named "ABC",
      but the received event "event":"ABC":"1" does. Hence dropping the message.
      Check whether the json string is in a correct format for default mapping.




      I've tried adding the attributes



      @source(type = 'kafka', bootstrap.servers = 'localhost:9092', 
      topic.list = 'TestTopic', group.id = 'test',
      threading.option = 'single.thread',
      @map(type = 'json', @attributes(ABC = '$.ABC')))


      Syntax error:




      Error at 'json' defined at stream 'myDataStream', attribute 'json' is
      not mapped




      Any help would be greatly appreciated.







      apache-kafka wso2 kafka-consumer-api kafka-producer-api wso2sp






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Mar 27 at 12:20









      Faiz Fareed

      6617 silver badges25 bronze badges




      6617 silver badges25 bronze badges










      asked Mar 27 at 6:53









      ManuManu

      327 bronze badges




      327 bronze badges

























          1 Answer
          1






          active

          oldest

          votes


















          1














          There is an error in the syntax of the stream,



          define stream myDataStream (ABC string);


          Here the attribute name is the key of the JSON messages, in this case, ABC






          share|improve this answer

























          • Thanks very much. I will have 100s of key/value pairs coming from kafka. So do we have to map all those keys to input stream? Or is there a way to access the JSON as an object and parse required key out of it? something like $.ABC or json.ABC?

            – Manu
            Mar 27 at 13:49











          • Yes, it is possible, you can use custom mapping to get the required keys only, @map(type = 'json', @attributes(key1 = '$.ABC.key'))) define stream myDataStream(key1 string);

            – Niveathika
            Mar 27 at 14:31












          • Thanks very much, that helped up to an extend. My JSON will be "event":"A":"1","B":"2","C":"3" and added attribute as @attributes(json = '$.event'). But the string gets assigned to the variable is not in JSON format, it coverts JSON key=value format, for example A=1,B=2,C=3. Hence, I'm not able use JSON further using json:getString(json,"$.A").

            – Manu
            Mar 27 at 16:43











          • Or is there a way to convert A=1,B=2,C=3 back to "A":"1","B":"2","C":"3", in wso2sp or siddhi, please?

            – Manu
            Mar 27 at 17:34











          • Correct me if I am wrong, essentially rather than mapping attributes at the source, you want to keep the JSON string as it is throughout the analytics flow? for manipulation downstream?

            – Niveathika
            Mar 27 at 18:37










          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55371371%2fwso2-sp-kafka-source-with-json-attributes%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          1














          There is an error in the syntax of the stream,



          define stream myDataStream (ABC string);


          Here the attribute name is the key of the JSON messages, in this case, ABC






          share|improve this answer

























          • Thanks very much. I will have 100s of key/value pairs coming from kafka. So do we have to map all those keys to input stream? Or is there a way to access the JSON as an object and parse required key out of it? something like $.ABC or json.ABC?

            – Manu
            Mar 27 at 13:49











          • Yes, it is possible, you can use custom mapping to get the required keys only, @map(type = 'json', @attributes(key1 = '$.ABC.key'))) define stream myDataStream(key1 string);

            – Niveathika
            Mar 27 at 14:31












          • Thanks very much, that helped up to an extend. My JSON will be "event":"A":"1","B":"2","C":"3" and added attribute as @attributes(json = '$.event'). But the string gets assigned to the variable is not in JSON format, it coverts JSON key=value format, for example A=1,B=2,C=3. Hence, I'm not able use JSON further using json:getString(json,"$.A").

            – Manu
            Mar 27 at 16:43











          • Or is there a way to convert A=1,B=2,C=3 back to "A":"1","B":"2","C":"3", in wso2sp or siddhi, please?

            – Manu
            Mar 27 at 17:34











          • Correct me if I am wrong, essentially rather than mapping attributes at the source, you want to keep the JSON string as it is throughout the analytics flow? for manipulation downstream?

            – Niveathika
            Mar 27 at 18:37















          1














          There is an error in the syntax of the stream,



          define stream myDataStream (ABC string);


          Here the attribute name is the key of the JSON messages, in this case, ABC






          share|improve this answer

























          • Thanks very much. I will have 100s of key/value pairs coming from kafka. So do we have to map all those keys to input stream? Or is there a way to access the JSON as an object and parse required key out of it? something like $.ABC or json.ABC?

            – Manu
            Mar 27 at 13:49











          • Yes, it is possible, you can use custom mapping to get the required keys only, @map(type = 'json', @attributes(key1 = '$.ABC.key'))) define stream myDataStream(key1 string);

            – Niveathika
            Mar 27 at 14:31












          • Thanks very much, that helped up to an extend. My JSON will be "event":"A":"1","B":"2","C":"3" and added attribute as @attributes(json = '$.event'). But the string gets assigned to the variable is not in JSON format, it coverts JSON key=value format, for example A=1,B=2,C=3. Hence, I'm not able use JSON further using json:getString(json,"$.A").

            – Manu
            Mar 27 at 16:43











          • Or is there a way to convert A=1,B=2,C=3 back to "A":"1","B":"2","C":"3", in wso2sp or siddhi, please?

            – Manu
            Mar 27 at 17:34











          • Correct me if I am wrong, essentially rather than mapping attributes at the source, you want to keep the JSON string as it is throughout the analytics flow? for manipulation downstream?

            – Niveathika
            Mar 27 at 18:37













          1












          1








          1







          There is an error in the syntax of the stream,



          define stream myDataStream (ABC string);


          Here the attribute name is the key of the JSON messages, in this case, ABC






          share|improve this answer













          There is an error in the syntax of the stream,



          define stream myDataStream (ABC string);


          Here the attribute name is the key of the JSON messages, in this case, ABC







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Mar 27 at 12:40









          NiveathikaNiveathika

          4124 silver badges8 bronze badges




          4124 silver badges8 bronze badges















          • Thanks very much. I will have 100s of key/value pairs coming from kafka. So do we have to map all those keys to input stream? Or is there a way to access the JSON as an object and parse required key out of it? something like $.ABC or json.ABC?

            – Manu
            Mar 27 at 13:49











          • Yes, it is possible, you can use custom mapping to get the required keys only, @map(type = 'json', @attributes(key1 = '$.ABC.key'))) define stream myDataStream(key1 string);

            – Niveathika
            Mar 27 at 14:31












          • Thanks very much, that helped up to an extend. My JSON will be "event":"A":"1","B":"2","C":"3" and added attribute as @attributes(json = '$.event'). But the string gets assigned to the variable is not in JSON format, it coverts JSON key=value format, for example A=1,B=2,C=3. Hence, I'm not able use JSON further using json:getString(json,"$.A").

            – Manu
            Mar 27 at 16:43











          • Or is there a way to convert A=1,B=2,C=3 back to "A":"1","B":"2","C":"3", in wso2sp or siddhi, please?

            – Manu
            Mar 27 at 17:34











          • Correct me if I am wrong, essentially rather than mapping attributes at the source, you want to keep the JSON string as it is throughout the analytics flow? for manipulation downstream?

            – Niveathika
            Mar 27 at 18:37

















          • Thanks very much. I will have 100s of key/value pairs coming from kafka. So do we have to map all those keys to input stream? Or is there a way to access the JSON as an object and parse required key out of it? something like $.ABC or json.ABC?

            – Manu
            Mar 27 at 13:49











          • Yes, it is possible, you can use custom mapping to get the required keys only, @map(type = 'json', @attributes(key1 = '$.ABC.key'))) define stream myDataStream(key1 string);

            – Niveathika
            Mar 27 at 14:31












          • Thanks very much, that helped up to an extend. My JSON will be "event":"A":"1","B":"2","C":"3" and added attribute as @attributes(json = '$.event'). But the string gets assigned to the variable is not in JSON format, it coverts JSON key=value format, for example A=1,B=2,C=3. Hence, I'm not able use JSON further using json:getString(json,"$.A").

            – Manu
            Mar 27 at 16:43











          • Or is there a way to convert A=1,B=2,C=3 back to "A":"1","B":"2","C":"3", in wso2sp or siddhi, please?

            – Manu
            Mar 27 at 17:34











          • Correct me if I am wrong, essentially rather than mapping attributes at the source, you want to keep the JSON string as it is throughout the analytics flow? for manipulation downstream?

            – Niveathika
            Mar 27 at 18:37
















          Thanks very much. I will have 100s of key/value pairs coming from kafka. So do we have to map all those keys to input stream? Or is there a way to access the JSON as an object and parse required key out of it? something like $.ABC or json.ABC?

          – Manu
          Mar 27 at 13:49





          Thanks very much. I will have 100s of key/value pairs coming from kafka. So do we have to map all those keys to input stream? Or is there a way to access the JSON as an object and parse required key out of it? something like $.ABC or json.ABC?

          – Manu
          Mar 27 at 13:49













          Yes, it is possible, you can use custom mapping to get the required keys only, @map(type = 'json', @attributes(key1 = '$.ABC.key'))) define stream myDataStream(key1 string);

          – Niveathika
          Mar 27 at 14:31






          Yes, it is possible, you can use custom mapping to get the required keys only, @map(type = 'json', @attributes(key1 = '$.ABC.key'))) define stream myDataStream(key1 string);

          – Niveathika
          Mar 27 at 14:31














          Thanks very much, that helped up to an extend. My JSON will be "event":"A":"1","B":"2","C":"3" and added attribute as @attributes(json = '$.event'). But the string gets assigned to the variable is not in JSON format, it coverts JSON key=value format, for example A=1,B=2,C=3. Hence, I'm not able use JSON further using json:getString(json,"$.A").

          – Manu
          Mar 27 at 16:43





          Thanks very much, that helped up to an extend. My JSON will be "event":"A":"1","B":"2","C":"3" and added attribute as @attributes(json = '$.event'). But the string gets assigned to the variable is not in JSON format, it coverts JSON key=value format, for example A=1,B=2,C=3. Hence, I'm not able use JSON further using json:getString(json,"$.A").

          – Manu
          Mar 27 at 16:43













          Or is there a way to convert A=1,B=2,C=3 back to "A":"1","B":"2","C":"3", in wso2sp or siddhi, please?

          – Manu
          Mar 27 at 17:34





          Or is there a way to convert A=1,B=2,C=3 back to "A":"1","B":"2","C":"3", in wso2sp or siddhi, please?

          – Manu
          Mar 27 at 17:34













          Correct me if I am wrong, essentially rather than mapping attributes at the source, you want to keep the JSON string as it is throughout the analytics flow? for manipulation downstream?

          – Niveathika
          Mar 27 at 18:37





          Correct me if I am wrong, essentially rather than mapping attributes at the source, you want to keep the JSON string as it is throughout the analytics flow? for manipulation downstream?

          – Niveathika
          Mar 27 at 18:37








          Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.







          Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.



















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55371371%2fwso2-sp-kafka-source-with-json-attributes%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Kamusi Yaliyomo Aina za kamusi | Muundo wa kamusi | Faida za kamusi | Dhima ya picha katika kamusi | Marejeo | Tazama pia | Viungo vya nje | UrambazajiKuhusu kamusiGo-SwahiliWiki-KamusiKamusi ya Kiswahili na Kiingerezakuihariri na kuongeza habari

          Swift 4 - func physicsWorld not invoked on collision? The Next CEO of Stack OverflowHow to call Objective-C code from Swift#ifdef replacement in the Swift language@selector() in Swift?#pragma mark in Swift?Swift for loop: for index, element in array?dispatch_after - GCD in Swift?Swift Beta performance: sorting arraysSplit a String into an array in Swift?The use of Swift 3 @objc inference in Swift 4 mode is deprecated?How to optimize UITableViewCell, because my UITableView lags

          Access current req object everywhere in Node.js ExpressWhy are global variables considered bad practice? (node.js)Using req & res across functionsHow do I get the path to the current script with Node.js?What is Node.js' Connect, Express and “middleware”?Node.js w/ express error handling in callbackHow to access the GET parameters after “?” in Express?Modify Node.js req object parametersAccess “app” variable inside of ExpressJS/ConnectJS middleware?Node.js Express app - request objectAngular Http Module considered middleware?Session variables in ExpressJSAdd properties to the req object in expressjs with Typescript