How to receive streamed response from KSQL in Spring Kafka?












0















How to receive chunked response from kafka KSQL server in java spring boot app?



When I make a rest call to the /query endpoint I just get 1 row and the connection closes. How can I keep the connection open and receive multiple rows?



The doc says




The response is streamed back until the LIMIT specified in the
statement is reached, or the client closes the connection.




What's the way to achieve this in java? Even for KTable I get only 1 row in return.



https://docs.confluent.io/current/ksql/docs/developer-guide/api.html#run-a-query-and-stream-back-the-output










share|improve this question





























    0















    How to receive chunked response from kafka KSQL server in java spring boot app?



    When I make a rest call to the /query endpoint I just get 1 row and the connection closes. How can I keep the connection open and receive multiple rows?



    The doc says




    The response is streamed back until the LIMIT specified in the
    statement is reached, or the client closes the connection.




    What's the way to achieve this in java? Even for KTable I get only 1 row in return.



    https://docs.confluent.io/current/ksql/docs/developer-guide/api.html#run-a-query-and-stream-back-the-output










    share|improve this question



























      0












      0








      0








      How to receive chunked response from kafka KSQL server in java spring boot app?



      When I make a rest call to the /query endpoint I just get 1 row and the connection closes. How can I keep the connection open and receive multiple rows?



      The doc says




      The response is streamed back until the LIMIT specified in the
      statement is reached, or the client closes the connection.




      What's the way to achieve this in java? Even for KTable I get only 1 row in return.



      https://docs.confluent.io/current/ksql/docs/developer-guide/api.html#run-a-query-and-stream-back-the-output










      share|improve this question
















      How to receive chunked response from kafka KSQL server in java spring boot app?



      When I make a rest call to the /query endpoint I just get 1 row and the connection closes. How can I keep the connection open and receive multiple rows?



      The doc says




      The response is streamed back until the LIMIT specified in the
      statement is reached, or the client closes the connection.




      What's the way to achieve this in java? Even for KTable I get only 1 row in return.



      https://docs.confluent.io/current/ksql/docs/developer-guide/api.html#run-a-query-and-stream-back-the-output







      java apache-kafka ksql






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Jan 20 at 0:20









      cricket_007

      81.3k1142111




      81.3k1142111










      asked Jan 19 at 13:51









      Sergei LedvanovSergei Ledvanov

      85331335




      85331335
























          1 Answer
          1






          active

          oldest

          votes


















          0














          The way I was able to work it around is the following:




          • get the response as a String


          • parse JSON objects line by line (KafkaQueryResponse is an object that represents 1 row)



                ResponseEntity<String> result = template.exchange("/query",
            HttpMethod.POST,
            new HttpEntity<>(params, headers),
            String.class);

            List<KafkaQueryResponse> array = new ArrayList<>();
            JsonFactory jsonFactory = new JsonFactory();
            try(BufferedReader br = new BufferedReader(new StringReader(result.getBody()))) {
            Iterator<KafkaQueryResponse> value = objectMapper.readValues(jsonFactory.createParser(br), KafkaQueryResponse.class);
            value.forEachRemaining(e -> {
            if (e.getRow() != null) {
            array.add(e);
            }
            });
            }
            array <---- this is the list of JSON objects



          KafkaQueryResponse



              @Data
          @JsonIgnoreProperties(ignoreUnknown = true)
          public class KafkaQueryResponse {
          private KafkaQueryRow row;
          private String finalMessage;
          private String errorMessage;

          @Data
          @JsonIgnoreProperties(ignoreUnknown = true)
          public static class KafkaQueryRow {
          private List<Object> columns;
          }
          }


          This solution does not allow reading the streamed response in chunks. It waits for the whole response to arrive to the client, then closes the connection and then parses all json objects.






          share|improve this answer























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54267782%2fhow-to-receive-streamed-response-from-ksql-in-spring-kafka%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            0














            The way I was able to work it around is the following:




            • get the response as a String


            • parse JSON objects line by line (KafkaQueryResponse is an object that represents 1 row)



                  ResponseEntity<String> result = template.exchange("/query",
              HttpMethod.POST,
              new HttpEntity<>(params, headers),
              String.class);

              List<KafkaQueryResponse> array = new ArrayList<>();
              JsonFactory jsonFactory = new JsonFactory();
              try(BufferedReader br = new BufferedReader(new StringReader(result.getBody()))) {
              Iterator<KafkaQueryResponse> value = objectMapper.readValues(jsonFactory.createParser(br), KafkaQueryResponse.class);
              value.forEachRemaining(e -> {
              if (e.getRow() != null) {
              array.add(e);
              }
              });
              }
              array <---- this is the list of JSON objects



            KafkaQueryResponse



                @Data
            @JsonIgnoreProperties(ignoreUnknown = true)
            public class KafkaQueryResponse {
            private KafkaQueryRow row;
            private String finalMessage;
            private String errorMessage;

            @Data
            @JsonIgnoreProperties(ignoreUnknown = true)
            public static class KafkaQueryRow {
            private List<Object> columns;
            }
            }


            This solution does not allow reading the streamed response in chunks. It waits for the whole response to arrive to the client, then closes the connection and then parses all json objects.






            share|improve this answer




























              0














              The way I was able to work it around is the following:




              • get the response as a String


              • parse JSON objects line by line (KafkaQueryResponse is an object that represents 1 row)



                    ResponseEntity<String> result = template.exchange("/query",
                HttpMethod.POST,
                new HttpEntity<>(params, headers),
                String.class);

                List<KafkaQueryResponse> array = new ArrayList<>();
                JsonFactory jsonFactory = new JsonFactory();
                try(BufferedReader br = new BufferedReader(new StringReader(result.getBody()))) {
                Iterator<KafkaQueryResponse> value = objectMapper.readValues(jsonFactory.createParser(br), KafkaQueryResponse.class);
                value.forEachRemaining(e -> {
                if (e.getRow() != null) {
                array.add(e);
                }
                });
                }
                array <---- this is the list of JSON objects



              KafkaQueryResponse



                  @Data
              @JsonIgnoreProperties(ignoreUnknown = true)
              public class KafkaQueryResponse {
              private KafkaQueryRow row;
              private String finalMessage;
              private String errorMessage;

              @Data
              @JsonIgnoreProperties(ignoreUnknown = true)
              public static class KafkaQueryRow {
              private List<Object> columns;
              }
              }


              This solution does not allow reading the streamed response in chunks. It waits for the whole response to arrive to the client, then closes the connection and then parses all json objects.






              share|improve this answer


























                0












                0








                0







                The way I was able to work it around is the following:




                • get the response as a String


                • parse JSON objects line by line (KafkaQueryResponse is an object that represents 1 row)



                      ResponseEntity<String> result = template.exchange("/query",
                  HttpMethod.POST,
                  new HttpEntity<>(params, headers),
                  String.class);

                  List<KafkaQueryResponse> array = new ArrayList<>();
                  JsonFactory jsonFactory = new JsonFactory();
                  try(BufferedReader br = new BufferedReader(new StringReader(result.getBody()))) {
                  Iterator<KafkaQueryResponse> value = objectMapper.readValues(jsonFactory.createParser(br), KafkaQueryResponse.class);
                  value.forEachRemaining(e -> {
                  if (e.getRow() != null) {
                  array.add(e);
                  }
                  });
                  }
                  array <---- this is the list of JSON objects



                KafkaQueryResponse



                    @Data
                @JsonIgnoreProperties(ignoreUnknown = true)
                public class KafkaQueryResponse {
                private KafkaQueryRow row;
                private String finalMessage;
                private String errorMessage;

                @Data
                @JsonIgnoreProperties(ignoreUnknown = true)
                public static class KafkaQueryRow {
                private List<Object> columns;
                }
                }


                This solution does not allow reading the streamed response in chunks. It waits for the whole response to arrive to the client, then closes the connection and then parses all json objects.






                share|improve this answer













                The way I was able to work it around is the following:




                • get the response as a String


                • parse JSON objects line by line (KafkaQueryResponse is an object that represents 1 row)



                      ResponseEntity<String> result = template.exchange("/query",
                  HttpMethod.POST,
                  new HttpEntity<>(params, headers),
                  String.class);

                  List<KafkaQueryResponse> array = new ArrayList<>();
                  JsonFactory jsonFactory = new JsonFactory();
                  try(BufferedReader br = new BufferedReader(new StringReader(result.getBody()))) {
                  Iterator<KafkaQueryResponse> value = objectMapper.readValues(jsonFactory.createParser(br), KafkaQueryResponse.class);
                  value.forEachRemaining(e -> {
                  if (e.getRow() != null) {
                  array.add(e);
                  }
                  });
                  }
                  array <---- this is the list of JSON objects



                KafkaQueryResponse



                    @Data
                @JsonIgnoreProperties(ignoreUnknown = true)
                public class KafkaQueryResponse {
                private KafkaQueryRow row;
                private String finalMessage;
                private String errorMessage;

                @Data
                @JsonIgnoreProperties(ignoreUnknown = true)
                public static class KafkaQueryRow {
                private List<Object> columns;
                }
                }


                This solution does not allow reading the streamed response in chunks. It waits for the whole response to arrive to the client, then closes the connection and then parses all json objects.







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Jan 20 at 12:22









                Sergei LedvanovSergei Ledvanov

                85331335




                85331335






























                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54267782%2fhow-to-receive-streamed-response-from-ksql-in-spring-kafka%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Callistus III

                    Ostreoida

                    Plistias Cous