How to output multiple records from Transformer?












1















Given: DSL topology with KStream::transform. As part of Transformer::transform execution multiple messages are generated from the input one (KeyValue<String, Message>).



I probably can return KeyValue<String, List<Message>> object from the Transformer::transform and apply flatMapValues as the next processor in the topology to flat the list. However I'm wondering is it possible to use ProcessorContext::forward for the same goal, i.e



public KeyValue<String, Message> transform(String key, Message message) {
Iterable<Message> messages = generateMultipleFromOne(message);
messages.forEach(m->context.forward(key, m));

return null;
}









share|improve this question





























    1















    Given: DSL topology with KStream::transform. As part of Transformer::transform execution multiple messages are generated from the input one (KeyValue<String, Message>).



    I probably can return KeyValue<String, List<Message>> object from the Transformer::transform and apply flatMapValues as the next processor in the topology to flat the list. However I'm wondering is it possible to use ProcessorContext::forward for the same goal, i.e



    public KeyValue<String, Message> transform(String key, Message message) {
    Iterable<Message> messages = generateMultipleFromOne(message);
    messages.forEach(m->context.forward(key, m));

    return null;
    }









    share|improve this question



























      1












      1








      1








      Given: DSL topology with KStream::transform. As part of Transformer::transform execution multiple messages are generated from the input one (KeyValue<String, Message>).



      I probably can return KeyValue<String, List<Message>> object from the Transformer::transform and apply flatMapValues as the next processor in the topology to flat the list. However I'm wondering is it possible to use ProcessorContext::forward for the same goal, i.e



      public KeyValue<String, Message> transform(String key, Message message) {
      Iterable<Message> messages = generateMultipleFromOne(message);
      messages.forEach(m->context.forward(key, m));

      return null;
      }









      share|improve this question
















      Given: DSL topology with KStream::transform. As part of Transformer::transform execution multiple messages are generated from the input one (KeyValue<String, Message>).



      I probably can return KeyValue<String, List<Message>> object from the Transformer::transform and apply flatMapValues as the next processor in the topology to flat the list. However I'm wondering is it possible to use ProcessorContext::forward for the same goal, i.e



      public KeyValue<String, Message> transform(String key, Message message) {
      Iterable<Message> messages = generateMultipleFromOne(message);
      messages.forEach(m->context.forward(key, m));

      return null;
      }






      java apache-kafka apache-kafka-streams






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Jan 22 at 13:21









      Jacek Laskowski

      44.2k18131265




      44.2k18131265










      asked Jan 18 at 20:22









      Dima SviderDima Svider

      3817




      3817
























          1 Answer
          1






          active

          oldest

          votes


















          1














          From the javadoc of Transformer.transform(K key, V value):




          If more than one output record should be forwarded downstream,
          ProcessorContext.forward(Object, Object) and
          ProcessorContext.forward(Object, Object, To) can be used.



          Note that returning a new KeyValue is merely for convenience. The same can be achieved by using ProcessorContext.forward(Object, Object) and returning null.







          share|improve this answer

























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54260925%2fhow-to-output-multiple-records-from-transformer%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            1














            From the javadoc of Transformer.transform(K key, V value):




            If more than one output record should be forwarded downstream,
            ProcessorContext.forward(Object, Object) and
            ProcessorContext.forward(Object, Object, To) can be used.



            Note that returning a new KeyValue is merely for convenience. The same can be achieved by using ProcessorContext.forward(Object, Object) and returning null.







            share|improve this answer






























              1














              From the javadoc of Transformer.transform(K key, V value):




              If more than one output record should be forwarded downstream,
              ProcessorContext.forward(Object, Object) and
              ProcessorContext.forward(Object, Object, To) can be used.



              Note that returning a new KeyValue is merely for convenience. The same can be achieved by using ProcessorContext.forward(Object, Object) and returning null.







              share|improve this answer




























                1












                1








                1







                From the javadoc of Transformer.transform(K key, V value):




                If more than one output record should be forwarded downstream,
                ProcessorContext.forward(Object, Object) and
                ProcessorContext.forward(Object, Object, To) can be used.



                Note that returning a new KeyValue is merely for convenience. The same can be achieved by using ProcessorContext.forward(Object, Object) and returning null.







                share|improve this answer















                From the javadoc of Transformer.transform(K key, V value):




                If more than one output record should be forwarded downstream,
                ProcessorContext.forward(Object, Object) and
                ProcessorContext.forward(Object, Object, To) can be used.



                Note that returning a new KeyValue is merely for convenience. The same can be achieved by using ProcessorContext.forward(Object, Object) and returning null.








                share|improve this answer














                share|improve this answer



                share|improve this answer








                edited Jan 22 at 13:28









                Jacek Laskowski

                44.2k18131265




                44.2k18131265










                answered Jan 20 at 18:02









                Dima SviderDima Svider

                3817




                3817






























                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54260925%2fhow-to-output-multiple-records-from-transformer%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Liquibase includeAll doesn't find base path

                    How to use setInterval in EJS file?

                    Petrus Granier-Deferre