Connector Transforms Incorrectly Maps Partition Key












0















I have a simple database table in MySQL that has (id varchar(255), val varchar(255), ..., ...).



I set up Kafka Connect to stream the table to a topic (CONNECT_TOPIC) with twenty partitions. I have another topic (STREAM_TOPIC) that is populated by a kafka producer with twenty partitions.



The problem is the keys from the connector map to different partitions in the CONNECTOR_TOPIC than the keys in the STREAM_TOPIC. This means I can't join the two topics in a stream. I believe this is because the id is extracted incorrectly.



Here is an example of that output:



Stream Task ID 0_13 Partition number 13 Consumed CONNECT_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf
Stream Task ID 0_13 Partition number 13 Consumed JOINED CONNECT_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf
Stream Task ID 0_17 Partition number 17 Consumed STREAM_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf
Stream Task ID 0_17 Partition number 17 Consumed JOINED STREAM_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf

Stream Task ID 0_7 Partition number 7 Consumed STREAM_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a
Stream Task ID 0_7 Partition number 7 Consumed JOINED STREAM_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a
Stream Task ID 0_17 Partition number 17 Consumed CONNECT_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a
Stream Task ID 0_17 Partition number 17 Consumed JOINED CONNECT_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a

Stream Task ID 0_11 Partition number 11 Consumed CONNECT_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Consumed JOINED CONNECT_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Consumed STREAM_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Consumed JOINED STREAM_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Merged 90265a93-adac-4e93-856c-d1498eeeb22e


I have tried the following connector configs to transform the id:



        "name": "CONNECTOR",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

"key.converter": "org.apache.kafka.connect.json.JsonConverter",

"key.converter.schemas.enable":"false",

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",


"connection.url": "jdbc:mysql://localhost:3306/demo?user=rmoff&password=pw",

"table.whitelist": "CONNECTOR",

"mode": "timestamp",

"timestamp.column.name": "update_ts",

"validate.non.null": "false",

"transforms":"createKey,extractId, castString",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"id",
"transforms.extractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractId.field":"id",
"transforms.castString.type": "org.apache.kafka.connect.transforms.Cast$Key",
"transforms.castString.spec": "string",

"topic.prefix": "enrichment-"
}
}


This extracts the ID perfectly, but maps to the wrong partition. I also tried extractId instead of extractString, but the same thing happened. I can't find clear documentation anywhere about how exactly to include these transforms.



Problem in a nutshell:



I need to extract the id field from the row, make it the record key, and make sure it does not act differently than using a kafka producer to do



KafkaProducer.produce("string key", event)



If I populate both topics with a producer they end up on the correct partitions, but something about connect is mapping to different partitions, even though it is the same key










share|improve this question

























  • Kafka Connect uses the same DefaultPartitioner that a Producer would, so are you using AvroConverter or StringConverter, becuase that would cause different partition calculations

    – cricket_007
    Jan 18 at 22:30













  • I am not, I have schemas disabled and I am using JSON converter. I added my full config above

    – mikemikemike
    Jan 18 at 22:56













  • Thanks. It's not really clear what the actual message is from the output you've shown, but if the key is just a string, not a nested value, can you try StringConverter instead?

    – cricket_007
    Jan 18 at 23:38
















0















I have a simple database table in MySQL that has (id varchar(255), val varchar(255), ..., ...).



I set up Kafka Connect to stream the table to a topic (CONNECT_TOPIC) with twenty partitions. I have another topic (STREAM_TOPIC) that is populated by a kafka producer with twenty partitions.



The problem is the keys from the connector map to different partitions in the CONNECTOR_TOPIC than the keys in the STREAM_TOPIC. This means I can't join the two topics in a stream. I believe this is because the id is extracted incorrectly.



Here is an example of that output:



Stream Task ID 0_13 Partition number 13 Consumed CONNECT_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf
Stream Task ID 0_13 Partition number 13 Consumed JOINED CONNECT_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf
Stream Task ID 0_17 Partition number 17 Consumed STREAM_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf
Stream Task ID 0_17 Partition number 17 Consumed JOINED STREAM_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf

Stream Task ID 0_7 Partition number 7 Consumed STREAM_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a
Stream Task ID 0_7 Partition number 7 Consumed JOINED STREAM_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a
Stream Task ID 0_17 Partition number 17 Consumed CONNECT_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a
Stream Task ID 0_17 Partition number 17 Consumed JOINED CONNECT_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a

Stream Task ID 0_11 Partition number 11 Consumed CONNECT_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Consumed JOINED CONNECT_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Consumed STREAM_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Consumed JOINED STREAM_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Merged 90265a93-adac-4e93-856c-d1498eeeb22e


I have tried the following connector configs to transform the id:



        "name": "CONNECTOR",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

"key.converter": "org.apache.kafka.connect.json.JsonConverter",

"key.converter.schemas.enable":"false",

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",


"connection.url": "jdbc:mysql://localhost:3306/demo?user=rmoff&password=pw",

"table.whitelist": "CONNECTOR",

"mode": "timestamp",

"timestamp.column.name": "update_ts",

"validate.non.null": "false",

"transforms":"createKey,extractId, castString",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"id",
"transforms.extractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractId.field":"id",
"transforms.castString.type": "org.apache.kafka.connect.transforms.Cast$Key",
"transforms.castString.spec": "string",

"topic.prefix": "enrichment-"
}
}


This extracts the ID perfectly, but maps to the wrong partition. I also tried extractId instead of extractString, but the same thing happened. I can't find clear documentation anywhere about how exactly to include these transforms.



Problem in a nutshell:



I need to extract the id field from the row, make it the record key, and make sure it does not act differently than using a kafka producer to do



KafkaProducer.produce("string key", event)



If I populate both topics with a producer they end up on the correct partitions, but something about connect is mapping to different partitions, even though it is the same key










share|improve this question

























  • Kafka Connect uses the same DefaultPartitioner that a Producer would, so are you using AvroConverter or StringConverter, becuase that would cause different partition calculations

    – cricket_007
    Jan 18 at 22:30













  • I am not, I have schemas disabled and I am using JSON converter. I added my full config above

    – mikemikemike
    Jan 18 at 22:56













  • Thanks. It's not really clear what the actual message is from the output you've shown, but if the key is just a string, not a nested value, can you try StringConverter instead?

    – cricket_007
    Jan 18 at 23:38














0












0








0








I have a simple database table in MySQL that has (id varchar(255), val varchar(255), ..., ...).



I set up Kafka Connect to stream the table to a topic (CONNECT_TOPIC) with twenty partitions. I have another topic (STREAM_TOPIC) that is populated by a kafka producer with twenty partitions.



The problem is the keys from the connector map to different partitions in the CONNECTOR_TOPIC than the keys in the STREAM_TOPIC. This means I can't join the two topics in a stream. I believe this is because the id is extracted incorrectly.



Here is an example of that output:



Stream Task ID 0_13 Partition number 13 Consumed CONNECT_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf
Stream Task ID 0_13 Partition number 13 Consumed JOINED CONNECT_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf
Stream Task ID 0_17 Partition number 17 Consumed STREAM_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf
Stream Task ID 0_17 Partition number 17 Consumed JOINED STREAM_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf

Stream Task ID 0_7 Partition number 7 Consumed STREAM_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a
Stream Task ID 0_7 Partition number 7 Consumed JOINED STREAM_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a
Stream Task ID 0_17 Partition number 17 Consumed CONNECT_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a
Stream Task ID 0_17 Partition number 17 Consumed JOINED CONNECT_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a

Stream Task ID 0_11 Partition number 11 Consumed CONNECT_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Consumed JOINED CONNECT_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Consumed STREAM_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Consumed JOINED STREAM_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Merged 90265a93-adac-4e93-856c-d1498eeeb22e


I have tried the following connector configs to transform the id:



        "name": "CONNECTOR",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

"key.converter": "org.apache.kafka.connect.json.JsonConverter",

"key.converter.schemas.enable":"false",

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",


"connection.url": "jdbc:mysql://localhost:3306/demo?user=rmoff&password=pw",

"table.whitelist": "CONNECTOR",

"mode": "timestamp",

"timestamp.column.name": "update_ts",

"validate.non.null": "false",

"transforms":"createKey,extractId, castString",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"id",
"transforms.extractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractId.field":"id",
"transforms.castString.type": "org.apache.kafka.connect.transforms.Cast$Key",
"transforms.castString.spec": "string",

"topic.prefix": "enrichment-"
}
}


This extracts the ID perfectly, but maps to the wrong partition. I also tried extractId instead of extractString, but the same thing happened. I can't find clear documentation anywhere about how exactly to include these transforms.



Problem in a nutshell:



I need to extract the id field from the row, make it the record key, and make sure it does not act differently than using a kafka producer to do



KafkaProducer.produce("string key", event)



If I populate both topics with a producer they end up on the correct partitions, but something about connect is mapping to different partitions, even though it is the same key










share|improve this question
















I have a simple database table in MySQL that has (id varchar(255), val varchar(255), ..., ...).



I set up Kafka Connect to stream the table to a topic (CONNECT_TOPIC) with twenty partitions. I have another topic (STREAM_TOPIC) that is populated by a kafka producer with twenty partitions.



The problem is the keys from the connector map to different partitions in the CONNECTOR_TOPIC than the keys in the STREAM_TOPIC. This means I can't join the two topics in a stream. I believe this is because the id is extracted incorrectly.



Here is an example of that output:



Stream Task ID 0_13 Partition number 13 Consumed CONNECT_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf
Stream Task ID 0_13 Partition number 13 Consumed JOINED CONNECT_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf
Stream Task ID 0_17 Partition number 17 Consumed STREAM_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf
Stream Task ID 0_17 Partition number 17 Consumed JOINED STREAM_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf

Stream Task ID 0_7 Partition number 7 Consumed STREAM_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a
Stream Task ID 0_7 Partition number 7 Consumed JOINED STREAM_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a
Stream Task ID 0_17 Partition number 17 Consumed CONNECT_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a
Stream Task ID 0_17 Partition number 17 Consumed JOINED CONNECT_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a

Stream Task ID 0_11 Partition number 11 Consumed CONNECT_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Consumed JOINED CONNECT_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Consumed STREAM_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Consumed JOINED STREAM_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Merged 90265a93-adac-4e93-856c-d1498eeeb22e


I have tried the following connector configs to transform the id:



        "name": "CONNECTOR",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

"key.converter": "org.apache.kafka.connect.json.JsonConverter",

"key.converter.schemas.enable":"false",

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",


"connection.url": "jdbc:mysql://localhost:3306/demo?user=rmoff&password=pw",

"table.whitelist": "CONNECTOR",

"mode": "timestamp",

"timestamp.column.name": "update_ts",

"validate.non.null": "false",

"transforms":"createKey,extractId, castString",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"id",
"transforms.extractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractId.field":"id",
"transforms.castString.type": "org.apache.kafka.connect.transforms.Cast$Key",
"transforms.castString.spec": "string",

"topic.prefix": "enrichment-"
}
}


This extracts the ID perfectly, but maps to the wrong partition. I also tried extractId instead of extractString, but the same thing happened. I can't find clear documentation anywhere about how exactly to include these transforms.



Problem in a nutshell:



I need to extract the id field from the row, make it the record key, and make sure it does not act differently than using a kafka producer to do



KafkaProducer.produce("string key", event)



If I populate both topics with a producer they end up on the correct partitions, but something about connect is mapping to different partitions, even though it is the same key







apache-kafka apache-kafka-connect






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Jan 18 at 23:38









cricket_007

81k1142110




81k1142110










asked Jan 18 at 21:45









mikemikemikemikemikemike

162




162













  • Kafka Connect uses the same DefaultPartitioner that a Producer would, so are you using AvroConverter or StringConverter, becuase that would cause different partition calculations

    – cricket_007
    Jan 18 at 22:30













  • I am not, I have schemas disabled and I am using JSON converter. I added my full config above

    – mikemikemike
    Jan 18 at 22:56













  • Thanks. It's not really clear what the actual message is from the output you've shown, but if the key is just a string, not a nested value, can you try StringConverter instead?

    – cricket_007
    Jan 18 at 23:38



















  • Kafka Connect uses the same DefaultPartitioner that a Producer would, so are you using AvroConverter or StringConverter, becuase that would cause different partition calculations

    – cricket_007
    Jan 18 at 22:30













  • I am not, I have schemas disabled and I am using JSON converter. I added my full config above

    – mikemikemike
    Jan 18 at 22:56













  • Thanks. It's not really clear what the actual message is from the output you've shown, but if the key is just a string, not a nested value, can you try StringConverter instead?

    – cricket_007
    Jan 18 at 23:38

















Kafka Connect uses the same DefaultPartitioner that a Producer would, so are you using AvroConverter or StringConverter, becuase that would cause different partition calculations

– cricket_007
Jan 18 at 22:30







Kafka Connect uses the same DefaultPartitioner that a Producer would, so are you using AvroConverter or StringConverter, becuase that would cause different partition calculations

– cricket_007
Jan 18 at 22:30















I am not, I have schemas disabled and I am using JSON converter. I added my full config above

– mikemikemike
Jan 18 at 22:56







I am not, I have schemas disabled and I am using JSON converter. I added my full config above

– mikemikemike
Jan 18 at 22:56















Thanks. It's not really clear what the actual message is from the output you've shown, but if the key is just a string, not a nested value, can you try StringConverter instead?

– cricket_007
Jan 18 at 23:38





Thanks. It's not really clear what the actual message is from the output you've shown, but if the key is just a string, not a nested value, can you try StringConverter instead?

– cricket_007
Jan 18 at 23:38












0






active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54261845%2fconnector-transforms-incorrectly-maps-partition-key%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes
















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54261845%2fconnector-transforms-incorrectly-maps-partition-key%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Liquibase includeAll doesn't find base path

How to use setInterval in EJS file?

Petrus Granier-Deferre