kafka-connect-debezium-mysql-source-sink-replication-pipeline

Kafka Connect Debezium Mysql source/sink replication pipeline

Share this article

What we want to achieve?


We at Altenar regularly facing a need of meeting regulator requirements. The latest requirement was to set up a MySQL DB replication to a DB that is located in a different region, however we could not rely on native replication features, because we want to omit some particular tables and stream only data that is required.

So we came up with an idea to produce data fr om the DB to Kafka and then to sink it to a destination DB. Using Kafka Connect, we can sel ect which DB and tables will be replicated, so no extra data will be leaked.


What is Debezium and why?


Debezium is a connector designed specifically for Kafka, which serves as a pluggable and declarative data integration framework for Kafka. With Debezium, you can effortlessly capture and stream changes fr om various databases, such as MySQL, to Kafka topics. This change data capture process enables your applications to respond promptly to inserts, updates, and deletes made in the source database.

By utilizing Debezium as a connector within Kafka Connect, you gain the ability to set up a replication pipeline that seamlessly transfers data fr om a source database, like MySQL, to a target database, ensuring consistency and accuracy. Debezium’s connector capabilities ensure that all events, even in the face of failures, are captured and reliably delivered to Kafka topics.


Architecture




Schematic representation of the replication pipeline



Prerequisites


To follow the lab, you have to have:

  1. Confluent Kafka 7.5.x with Kafka Connect.
  2. MySQL 5.6 with enabled binary logs.

You can skip the following installation steps if you have already meet all requirements.


Installing Confluent Kafka


Deploy Kafka using Confluent Kafka Ansible playbooks available on GitHub.

For the demo you can deploy 3 VMs and install all roles on them, so each VM will have Broker, Connect, Kraft controller roles.

Also, you can spin up the whole infrastructure in docker containers using Debezium github examples, however they are not using features of ACL, SASL_SSL. So you will have to adapt their docker-compose on your own.


Installing mysql


You can deploy MySQL using Docker containers or run it on k8s. Here below manifests that you will need to apply against k8s cluster. In our case, it was GKE.


StatefullSet


---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: mysql
  namespace: mysql
spec:
  selector:
    matchLabels:
      app: mysql
  template:
    metadata:
      labels:
        app: mysql
    spec:
      containers:
      - image: mysql:5.6
        name: mysql
        volumeMounts:
        - name: config-volume 
          mountPath: /etc/mysql/conf.d/
        - name: pvc-mysql
          mountPath: /var/lib/mysql
        env:
        - name: MYSQL_ROOT_PASSWORD
          valueFrom:
            secretKeyRef:
              name: mysql-secret
              key: password
        ports:
          - name: tpc
            protocol: TCP
            containerPort: 3306
      volumes:
      - name: config-volume
        configMap:
          name: mysql-config
      - name: pvc-mysql
        persistentVolumeClaim:
          claimName: pvc-mysql


Service 

---
apiVersion: v1
kind: Service
metadata:
  annotations:                                                                                                                              
    cloud.google.com/load-balancer-type: Internal                                                                                           
    networking.gke.io/internal-load-balancer-allow-global-access: "true"   
  name: mysql
  namespace: mysql
  labels:
    app: mysql
spec:
  selector:
    app: mysql
  ports:
    - name: tcp
      protocol: TCP
      port: 3306
  type: LoadBalancer


 Secret

Password that will be used to connect to the MySQL instance as root user.



---
apiVersion: v1
data:
  password: MTIzNDU2Nzg= # here is based64 root password which is 12345678
kind: Secret
metadata:
  name: mysql-secret
  namespace: mysql
type: kubernetes.io/basic-auth

Configmap

Debezium reads binary logs, so make sure binary logs are enabled and uses ROW format, as it is only supported.



---
apiVersion: v1 
kind: ConfigMap
metadata:
  name: mysql-config
  namespace: mysql
data:
  mycustom.cnf: |
    [mysqld]
    log-bin=mysql-bin
    server-id=100
    binlog_format=ROW
    binlog_row_image=FULL

PVC



apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  annotations:
    volume.beta.kubernetes.io/storage-provisioner: pd.csi.storage.gke.io
    volume.kubernetes.io/storage-provisioner: pd.csi.storage.gke.io
  finalizers:
  - kubernetes.io/pvc-protection
  namespace: mysql
  name: pvc-mysql
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi
  storageClassName: standard
  volumeMode: Filesystem

Configure Mysql

1.Connect to the mysql pod.
kubectl exec -it -n mysql mysql-0 -- bash


2. Connect to MySQL, it will promt you for a password that you defined in k8s secret.

mysql -u root -p


3. Create a source databases and a table.



mysql> CRE ATE   DATABASE mydatabase_source;
mysql> USE mydatabase_source;
mysql> CRE ATE   TABLE users (
          id INT PRIMARY KEY AUTO_INCREMENT,
          name VARCHAR(30) NOT NULL,
          email VARCHAR(50) NOT NULL  
        );


4. Create a sink database. We will use the same MySQL instance for sink and source for test purposes.

mysql> CRE ATE   DATABASE mydatabase_sink;


5. Check that bin logs are enabled.



mysql> show variables like '%bin%';
| log_bin                                 | ON                             |
| log_bin_basename                        | /var/lib/mysql/mysql-bin       |
| log_bin_index                           | /var/lib/mysql/mysql-bin.index |
mysql> show binary logs;
+------------------+-----------+
| Log_name         | File_size |
+------------------+-----------+
| mysql-bin.000001 |     65402 |
| mysql-bin.000002 |   1410954 |
| mysql-bin.000003 |   4115961 |
| mysql-bin.000004 |       722 |
| mysql-bin.000005 |       143 |
| mysql-bin.000006 |      7657 |
| mysql-bin.000007 |    125246 |
| mysql-bin.000008 |       143 |
| mysql-bin.000009 |       120 |
+------------------+-----------+
9 rows in set (0.00 sec)


Configure Kafka Connect

Next operations you need to perform on each node, that is somehow Kafka Connect can make replication pipeline reliable and highly available.


 1.Ensure that Kafka Connect uses distributed config connect-distributed.properties.
grep distributed /usr/lib/systemd/system/confluent-kafka-connect.service

Description=Apache Kafka Connect - distributed
ExecStart=/usr/bin/connect-distributed /etc/kafka/connect-distributed.properties


2. Check that connect uses JsonConverter. Ensure that the following parameters are set in connect-distributed.properties



  key.converter.schemas.enable: true
  value.converter.schemas.enable: true
  internal.key.converter.schemas.enable: true
  internal.value.converter.schemas.enable: true
  internal.key.converter: org.apache.kafka.connect.json.JsonConverter
  internal.value.converter: org.apache.kafka.connect.json.JsonConverter
  key.converter: org.apache.kafka.connect.json.JsonConverter
  value.converter: org.apache.kafka.connect.json.JsonConverter

3. Install connect plugins, but first check a dir location for plugins.


In our case it is /usr/share/java/connect_plugins

grep path connect-distributed.properties

plugin.path=/usr/share/java/connect_plugins

So Debezim plugins need to be put into /usr/share/java/connect_plugins



cd /usr/share/java/connect_plugins
# installing a source plugin
curl https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.3.4.Final/debezium-connector-mysql-2.3.4.Final-plugin.tar.gz -o debezium-connector-mysql-2.3.4.Final-plugin.tar.gz
tar xvzf debezium-connector-mysql-2.3.4.Final-plugin.tar.gz
# installing a sink plugin
curl https://repo1.maven.org/maven2/io/debezium/debezium-connector-jdbc/2.2.1.Final/debezium-connector-jdbc-2.2.1.Final-plugin.tar.gz -o debezium-connector-jdbc-2.2.1.Final-plugin.tar.gz
tar xvzf debezium-connector-jdbc-2.2.1.Final-plugin.tar.gz

4. Restart Kafka connect daemon.

systemctl restart confluent-kafka-connect.service


5. Check what interfaces Kafka Connect listens to.

In our case all interfaces.



ss -tulpen | grep 8083

tcp   LISTEN 0      50                         *:8083             *:*    uid:989 ino:6997007 sk:1a v6only:0 <->


6. Check if plugins installed where localhost:8083 is your Kafka Connect listener.



curl -sS https://localhost:8083/connector-plugins -k | jq
[
  {
    "class": "io.debezium.connector.jdbc.JdbcSinkConnector",
    "type": "sink",
    "version": "2.2.1.Final"
  },
  {
    "class": "io.debezium.connector.mysql.MySqlConnector",
    "type": "source",
    "version": "2.3.4.Final"
  },
]

Both plugins are successfully installed.

6. Add required Kafka ACLs, where localhost:9092 is your Kafka Broker listener.

exampleserver — This topic prefix represents a logical name for our MySQL instance
schema-changes — This topic prefix reflects schema changes in the DB

The list of required ACLs.



### WRITE ###

kafka-acls --bootstrap-server localhost:9094 \
  --command-config client.properties \
  --add \
  --allow-principal User:kafka-connect \
  --operation Write --topic "schema-changes.exampleserver_source"

kafka-acls --bootstrap-server localhost:9094 \
  --command-config client.properties \
  --add \
  --allow-principal User:kafka-connect \
  --operation Write --resource-pattern-type PREFIXED --topic "exampleserver_source"

kafka-acls --bootstrap-server localhost:9094 \
  --command-config client.properties \
  --add \
  --allow-principal User:kafka-connect \
  --operation Write \
  --topic "__debezium-heartbeat.exampleserver_source"

### DescribeConfigs ###

kafka-acls --bootstrap-server localhost:9094 \
  --command-config client.properties \
  --add \
  --allow-principal User:kafka-connect \
  --operation DescribeConfigs --topic "connect-cluster-configs"

kafka-acls --bootstrap-server localhost:9094 \
  --command-config client.properties \
  --add \
  --allow-principal User:kafka-connect \
  --cluster --operation DescribeConfigs

kafka-acls --bootstrap-server localhost:9094 \
  --command-config client.properties \
  --add \
  --allow-principal User:kafka-connect \
   --topic "connect-cluster-status" --operation DescribeConfigs

kafka-acls --bootstrap-server localhost:9094 \
  --command-config client.properties \
  --add \
  --allow-principal User:kafka-connect \
  --resource-pattern-type PREFIXED \
  --topic "connect-cluster-offsets" --operation DescribeConfigs

### READ ###

kafka-acls --bootstrap-server localhost:9094 \
  --command-config client.properties \
  --add \
  --allow-principal User:kafka-connect \
  --operation Read --group "exampleserver_source"

kafka-acls --bootstrap-server localhost:9094 \
  --command-config client.properties \
  --add \
  --allow-principal User:kafka-connect \
  --operation Read --resource-pattern-type PREFIXED  --topic "exampleserver_source"

kafka-acls --bootstrap-server localhost:9094 \
  --command-config client.properties \
  --add \
  --allow-principal User:kafka-connect \
  --operation Read --resource-pattern-type PREFIXED --topic "schema-changes"

kafka-acls --bootstrap-server localhost:9094 \
  --command-config client.properties \
  --add \
  --allow-principal User:kafka-connect \
  --operation Read \
  --topic "__debezium-heartbeat.exampleserver_source"

kafka-acls --bootstrap-server localhost:9094 \
  --command-config client.properties \
  --add \
  --allow-principal User:kafka-connect \
  --operation Read --group "exampleserver_source-schemahistory"


### CREATE ###

kafka-acls --bootstrap-server localhost:9094 \
  --command-config client.properties \
  --add \
  --allow-principal User:kafka-connect \
  --operation Create --resource-pattern-type PREFIXED --topic "exampleserver_source"

kafka-acls --bootstrap-server localhost:9094 \
  --command-config client.properties \
  --add \
  --allow-principal User:kafka-connect \
  --operation Create --resource-pattern-type PREFIXED --topic "schema-changes"

### DESCRIBE ###

kafka-acls --bootstrap-server localhost:9094 \
  --command-config client.properties \
  --add \
  --allow-principal User:kafka-connect \
  --operation Describe --resource-pattern-type PREFIXED --topic "schema-changes"

kafka-acls --bootstrap-server localhost:9094 \
  --command-config client.properties \
  --add \
  --allow-principal User:kafka-connect \
  --cluster --operation Describe

kafka-acls --bootstrap-server localhost:9094 \
  --command-config client.properties \
  --add \
  --allow-principal User:kafka-connect \
  --cluster --operation Describe \
  --topic "__debezium-heartbeat.exampleserver_source"

7. Prepare json configuration for sink and source plugins.

source-mysql.json

SALS and SSL parameters can be checked in /etc/kafka/connect-distributed.properties.



{
"name": "mysql-source-connector",
"config": {
"heartbeat.interval.ms": "3000",
"autoReconnect":"true",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"topic.prefix": "exampleserver_source", # prefix for topiucs
"database.hostname": "10.10.10.10", # DB host ip
"database.port": "3306",
"database.user": "root",
"database.password": "********",
"database.server.id": "1000",
"database.include.list": "mydatabase_source", # select a DB to replicate
"database.whitelist": "mydatabase_source", # monitor a DB
"table.include.list": "mydatabase_source.users", # choose which table to replicate
"topic.creation.enable": "true", # auto-create topics
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 3,
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.compression.type": "lz4",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9094", # list of brokers
"schema.history.internal.kafka.topic": "schema-changes.exampleserver_source",
"schema.history.internal.consumer.security.protocol": "SASL_SSL",
"schema.history.internal.consumer.sasl.mechanism": "PLAIN",
"schema.history.internal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"*******\" password=\"*******\";",
"schema.history.internal.producer.security.protocol": "SASL_SSL",
"schema.history.internal.producer.sasl.mechanism": "PLAIN",
"schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"*******\" password=\"*******\";",
"schema.history.internal.producer.ssl.truststore.location": "var/ssl/private/kafka_connect.truststore.jks",
"schema.history.internal.producer.ssl.truststore.password": "**************",
"schema.history.internal.consumer.ssl.truststore.location": "/var/ssl/private/kafka_connect.truststore.jks",
"schema.history.internal.consumer.ssl.truststore.password": "**************"
}
}

sink-mysql.json



{
    "name": "mysql-sink-connector",  
    "config": {
        "heartbeat.interval.ms": "3000",                                                            
        "autoReconnect":"true",
        "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",  
        "tasks.max": "3",  
        "connection.url":"jdbc:mysql://10.10.10.10:3306/mydatabase_sink",   # specify sink DB
        "connection.username": "root",  
        "connection.password": "*********",  
        "ins ert.mode": "upsert",  
        "delete.enabled": "true",  
        "primary.key.mode": "record_key",  
        "schema.evolution": "basic",  
        "database.time_zone": "UTC",
        "auto.evolve": "true",
        "quote.identifiers":"true",
        "auto.create":"true",                                 # auto create tables
        "val ue.converter.schemas.enable":"true",              # auto reflect schema changes
        "value.converter":"org.apache.kafka.connect.json.JsonConverter",
        "table.name.format": "${topic}",
        "topics.regex":"exampleserver_source.mydatabase_source.*",  # topics regexp to replicate
        "pk.mode" :"kafka"
  }
}

8. Upload configs

You should get 200 HTTP status code. Upon uploading a config, Kafka checks MySQL connection and if all required parameters are filled in.



curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" https://localhost:8083/connectors/ -k -d @sink-mysql.json
HTTP/1.1 100 Continue
HTTP/1.1 201 Created
Date: Thu, 28 Sep 2023 14:41:43 GMT
Location: https://localhost:8083/connectors/mysql-source-connector
Content-Type: application/json
Content-Length: 2361
Server: Jetty(9.4.51.v20230217)



curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" https://localhost:8083/connectors/ -k -d @source-mysql.json
HTTP/1.1 100 Continue
HTTP/1.1 201 Created
Date: Thu, 28 Sep 2023 14:41:43 GMT
Location: https://localhost:8083/connectors/mysql-sink-connector
Content-Type: application/json
Content-Length: 2361
Server: Jetty(9.4.51.v20230217)

If you want to replace configurations, you can delete connectors and apply new configurations after

curl -X DELETE https://localhost:8083/connectors/mysql-source-connector -k
curl -X DELETE https://localhost:8083/connectors/mysql-sink-connector -k

Highly recommended to always tailing the log to monitor the status of Kafka Connect daemon and connectors.

tail -f /var/log/kafka/connect.log

9. Check connectors status.

Get a list of connectors first.


curl  https://localhost:8083/connectors/ -k   | jq

[
  "mysql-source-connector",
  "mysql-sink-connector"
]

Check status of connectors and workers.





curl  https://localhost:8083/connectors/mysql-sink-connector/status -k   | jq

{
  "name": "mysql-sink-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "localhost:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "localhost:8083"
    }
  ],
  "type": "sink"
}


Our connector and worker are running.

However, It might fail due to multiple reason, then there will be the following output.



curl  https://localhost:8083/connectors/mysql-sink-connector/status -k   | jq

{
  "name": "mysql-sink-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "localhost:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "trace": "io.debezium.DebeziumException: Unexpected error whil.........",
      "worker_id": "localhost:8083"
    }
  ],
  "type": "sink"
}

Check the error trace and after restart the job with id 0.

curl -X POST  https://localhost:8083/connectors/mysql-source-connector/tasks/0/restart -k


Perform tests

The setup is ready, so it is time to test.

  1. Connect to the DB and ins ert a test val ue into the table.
use mydatabase_source;
INS ERT INTO users (name, email) VALUES ('John1', '[email protected]');

2. Right after, a topic with the name exampleserver_source.mydatabase_source.users should be automatically created and populated with the data.

This is a screenshot of a very useful web application that allows browsing Kafka Brokers — Kafka-UI.


3. Then check sink DB mydatabase_sink if it gets new table with the data.



use mydatabase_sink;
mysql> show tables;
+----------------------------------------------+
| Tables_in_mydatabase_sink                    |
+----------------------------------------------+
| exampleserver_source_mydatabase_source_users |

mysql> sel ect * fr om exampleserver_source_mydatabase_source_users lim it 1;
+----+-------+-------------------+
| id | name  | email             |
+----+-------+-------------------+
|  1 | John1 | [email protected] |
+----+-------+-------------------+

Replication is working.


4. However, you might notice that the name of the table is different. This is because plugin create a table based on topic name.

We can transform data adding to sink connector the following config and re-apply the config. Here we rewrite the name of topic and catch only first matching group (.*)



"transforms": "unwrap, rename",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.rename.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.rename.regex": ".*\\.(.*)",
"transforms.rename.replacement": "$1",

So instead of table name in sink database exampleserver_source.mydatabase_source.users connector will produce records to users.
However, the setting "auto.create":"true", that should create a table in a sink DB won’t do it for transformed val ue. So the table users will not be automatically created and you should create a table of upload a schema first.


Monitoring

This article will not cover monitoring in details, but strongly suggest considering setting up Prometheus and Alert-manager.

Kafka Connect is exposing metrics so they can be consumed by Prometheus directly.

Below is shown the most important alert that will check if any of Kafka Connect workers in Failed status. That will be enough to catch both sink and source connectors in failure.



---
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
  labels:
    app: kube-prometheus-stack-operator
    release: prometheus
  name: kafka-connect-worker-failed
  namespace: monitoring
spec:
  groups:
  - name: kafka
    rules:
    - alert: KafkaConnectWorkerFailed
      expr: sum by (connector) (kafka_connect_connect_worker_metrics_connector_failed_task_count) > 0
      for: 10s
      annotations:
          summary: 'Kafka connect plugin {{ $labels.connector }} has failed'
      labels:
        severity: critical

Conclusion

The configuration that we built allows for seamless data integration and synchronization between the source and target databases. With the Kafka Connect Debezium sink/source connector, the data is then seamlessly transferred fr om source MySQL to Kafka and to MySQL sink database, enabling the replication of data in a controlled and efficient manner. This replication process ensures that all events are captured and reliably delivered, even in the face of failures.


You can also read this article on Medium in Alexander Murylev
 

Previous Next

  • localised-solution-the-key-in-mexican-market

    Localised solution the key in Mexican market

  • altenar-boosts-latam-presence-with-uruguay-office

    Altenar boosts LatAm presence with Uruguay office

  • gambling-laws-in-belgium-regulation-and-legality

    Gambling Laws in Belgium: Regulation and Legality

  • sports-betting-platforms-white-label-vs-turnkey-vs-api

    Sports Betting Platforms: White Label vs Turnkey vs API

  • platform-provider-anakatech-enlists-altenar-to-launch-its-sports-betting-vertical

    Platform provider Anakatech enlists Altenar to launch its sports betting vertical

Fill out the form and we’ll be in touch as soon as possible

Enquiry Type
Region of Operation
How did you hear about us?

This form collects your data so that we can correspond with you. Read our Privacy Policy for more information