How to receive streamed response from KSQL in Spring Kafka?
How to receive chunked response from kafka KSQL server in java spring boot app?
When I make a rest call to the /query endpoint I just get 1 row and the connection closes. How can I keep the connection open and receive multiple rows?
The doc says
The response is streamed back until the LIMIT specified in the
statement is reached, or the client closes the connection.
What's the way to achieve this in java? Even for KTable I get only 1 row in return.
https://docs.confluent.io/current/ksql/docs/developer-guide/api.html#run-a-query-and-stream-back-the-output
java apache-kafka ksql
add a comment |
How to receive chunked response from kafka KSQL server in java spring boot app?
When I make a rest call to the /query endpoint I just get 1 row and the connection closes. How can I keep the connection open and receive multiple rows?
The doc says
The response is streamed back until the LIMIT specified in the
statement is reached, or the client closes the connection.
What's the way to achieve this in java? Even for KTable I get only 1 row in return.
https://docs.confluent.io/current/ksql/docs/developer-guide/api.html#run-a-query-and-stream-back-the-output
java apache-kafka ksql
add a comment |
How to receive chunked response from kafka KSQL server in java spring boot app?
When I make a rest call to the /query endpoint I just get 1 row and the connection closes. How can I keep the connection open and receive multiple rows?
The doc says
The response is streamed back until the LIMIT specified in the
statement is reached, or the client closes the connection.
What's the way to achieve this in java? Even for KTable I get only 1 row in return.
https://docs.confluent.io/current/ksql/docs/developer-guide/api.html#run-a-query-and-stream-back-the-output
java apache-kafka ksql
How to receive chunked response from kafka KSQL server in java spring boot app?
When I make a rest call to the /query endpoint I just get 1 row and the connection closes. How can I keep the connection open and receive multiple rows?
The doc says
The response is streamed back until the LIMIT specified in the
statement is reached, or the client closes the connection.
What's the way to achieve this in java? Even for KTable I get only 1 row in return.
https://docs.confluent.io/current/ksql/docs/developer-guide/api.html#run-a-query-and-stream-back-the-output
java apache-kafka ksql
java apache-kafka ksql
edited Jan 20 at 0:20
cricket_007
81.3k1142111
81.3k1142111
asked Jan 19 at 13:51
Sergei LedvanovSergei Ledvanov
85331335
85331335
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
The way I was able to work it around is the following:
- get the response as a String
parse JSON objects line by line (
KafkaQueryResponseis an object that represents 1 row)
ResponseEntity<String> result = template.exchange("/query",
HttpMethod.POST,
new HttpEntity<>(params, headers),
String.class);
List<KafkaQueryResponse> array = new ArrayList<>();
JsonFactory jsonFactory = new JsonFactory();
try(BufferedReader br = new BufferedReader(new StringReader(result.getBody()))) {
Iterator<KafkaQueryResponse> value = objectMapper.readValues(jsonFactory.createParser(br), KafkaQueryResponse.class);
value.forEachRemaining(e -> {
if (e.getRow() != null) {
array.add(e);
}
});
}
array <---- this is the list of JSON objects
KafkaQueryResponse
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class KafkaQueryResponse {
private KafkaQueryRow row;
private String finalMessage;
private String errorMessage;
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public static class KafkaQueryRow {
private List<Object> columns;
}
}
This solution does not allow reading the streamed response in chunks. It waits for the whole response to arrive to the client, then closes the connection and then parses all json objects.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54267782%2fhow-to-receive-streamed-response-from-ksql-in-spring-kafka%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
The way I was able to work it around is the following:
- get the response as a String
parse JSON objects line by line (
KafkaQueryResponseis an object that represents 1 row)
ResponseEntity<String> result = template.exchange("/query",
HttpMethod.POST,
new HttpEntity<>(params, headers),
String.class);
List<KafkaQueryResponse> array = new ArrayList<>();
JsonFactory jsonFactory = new JsonFactory();
try(BufferedReader br = new BufferedReader(new StringReader(result.getBody()))) {
Iterator<KafkaQueryResponse> value = objectMapper.readValues(jsonFactory.createParser(br), KafkaQueryResponse.class);
value.forEachRemaining(e -> {
if (e.getRow() != null) {
array.add(e);
}
});
}
array <---- this is the list of JSON objects
KafkaQueryResponse
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class KafkaQueryResponse {
private KafkaQueryRow row;
private String finalMessage;
private String errorMessage;
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public static class KafkaQueryRow {
private List<Object> columns;
}
}
This solution does not allow reading the streamed response in chunks. It waits for the whole response to arrive to the client, then closes the connection and then parses all json objects.
add a comment |
The way I was able to work it around is the following:
- get the response as a String
parse JSON objects line by line (
KafkaQueryResponseis an object that represents 1 row)
ResponseEntity<String> result = template.exchange("/query",
HttpMethod.POST,
new HttpEntity<>(params, headers),
String.class);
List<KafkaQueryResponse> array = new ArrayList<>();
JsonFactory jsonFactory = new JsonFactory();
try(BufferedReader br = new BufferedReader(new StringReader(result.getBody()))) {
Iterator<KafkaQueryResponse> value = objectMapper.readValues(jsonFactory.createParser(br), KafkaQueryResponse.class);
value.forEachRemaining(e -> {
if (e.getRow() != null) {
array.add(e);
}
});
}
array <---- this is the list of JSON objects
KafkaQueryResponse
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class KafkaQueryResponse {
private KafkaQueryRow row;
private String finalMessage;
private String errorMessage;
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public static class KafkaQueryRow {
private List<Object> columns;
}
}
This solution does not allow reading the streamed response in chunks. It waits for the whole response to arrive to the client, then closes the connection and then parses all json objects.
add a comment |
The way I was able to work it around is the following:
- get the response as a String
parse JSON objects line by line (
KafkaQueryResponseis an object that represents 1 row)
ResponseEntity<String> result = template.exchange("/query",
HttpMethod.POST,
new HttpEntity<>(params, headers),
String.class);
List<KafkaQueryResponse> array = new ArrayList<>();
JsonFactory jsonFactory = new JsonFactory();
try(BufferedReader br = new BufferedReader(new StringReader(result.getBody()))) {
Iterator<KafkaQueryResponse> value = objectMapper.readValues(jsonFactory.createParser(br), KafkaQueryResponse.class);
value.forEachRemaining(e -> {
if (e.getRow() != null) {
array.add(e);
}
});
}
array <---- this is the list of JSON objects
KafkaQueryResponse
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class KafkaQueryResponse {
private KafkaQueryRow row;
private String finalMessage;
private String errorMessage;
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public static class KafkaQueryRow {
private List<Object> columns;
}
}
This solution does not allow reading the streamed response in chunks. It waits for the whole response to arrive to the client, then closes the connection and then parses all json objects.
The way I was able to work it around is the following:
- get the response as a String
parse JSON objects line by line (
KafkaQueryResponseis an object that represents 1 row)
ResponseEntity<String> result = template.exchange("/query",
HttpMethod.POST,
new HttpEntity<>(params, headers),
String.class);
List<KafkaQueryResponse> array = new ArrayList<>();
JsonFactory jsonFactory = new JsonFactory();
try(BufferedReader br = new BufferedReader(new StringReader(result.getBody()))) {
Iterator<KafkaQueryResponse> value = objectMapper.readValues(jsonFactory.createParser(br), KafkaQueryResponse.class);
value.forEachRemaining(e -> {
if (e.getRow() != null) {
array.add(e);
}
});
}
array <---- this is the list of JSON objects
KafkaQueryResponse
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class KafkaQueryResponse {
private KafkaQueryRow row;
private String finalMessage;
private String errorMessage;
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public static class KafkaQueryRow {
private List<Object> columns;
}
}
This solution does not allow reading the streamed response in chunks. It waits for the whole response to arrive to the client, then closes the connection and then parses all json objects.
answered Jan 20 at 12:22
Sergei LedvanovSergei Ledvanov
85331335
85331335
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54267782%2fhow-to-receive-streamed-response-from-ksql-in-spring-kafka%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown