Giriş
1. Connector'a bir isim verilir
2. connector.class her zaman io.debezium.connector.mysql.MySqlConnector olarak belirtilir. Bu connector veri tabanından okumak içindir. Açıklaması şöyle
connector.class - class that implements capturing logic. In our case, it will read mysql binlogs.
3. Veri tabanı bağlantısı bilgisi tanımlanır. Bu alanlar şöyle
database.hostname
database.port
database.user
database.password
4. Kafka bağlantı bilgisi tanımlanır. Bunlar şöyle
database.history.kafka.bootstrap.servers
database.history.kafka.topic
5. whitelist Alanları
database.whitelist ile hangi veri tabanının kullanılacağı
table.whitelist ile hangi tabloların kullanılacağı belirtilir
database.server.\* Alanları
Açıklaması şöyle
database.server.\* - to identify mysql server. Will be used as a prefix when creating topics.
1. database.server.id
Açıklaması şöyle
This is the id given to the MySQL server during installation. Please refer to the section “Customize default MySQL configuration”.
2. database.server.name
Açıklaması şöyle
This is any String that can be used to uniquely identify the MySQL instance. This is especially useful when we are using a MySQL cluster.
database.history.kafka.bootstrap.servers
Kafka sunucusunun adresi
database.history.kafka.topic
Açıklaması şöyle
This is the topic Debezium connector will use to store the schema changes.
Açıklaması şöyle
database.history.kafka.topic - connect will upload db schemas there, and will use it to inform from where connect should begin reading if restarted.
database.whitelist Alanı
Açıklaması şöyle
If there are multiple databases in the instance, which one(s) need to be monitored and synched.
table.whitelist Alanı
Açıklaması şöyle
Which tables in the whitelist database list need to be monitored and synched.
Örnek
Şöyle yaparız. Burada docker compose ile çalıştırılan mysql isimli sunucuya debezium ismi ve passme şifresi ile bağlanıyor. ecommerce veri tabanındaki ecommerce.product isimli tabloyu dinlemeye başlıyor. Tabloyu ecommerce-cdc isimli kafka topic'e yazıyor. Schema değişikliklerini de ecommerce-schema-cdc isimli kafka topic'e yazıyor.
name=MySqlConnector connector.class=io.debezium.connector.mysql.MySqlConnector # number of connector instance for horizontally scaling tasks.max=1 # just a unique and random number here database.server.id=83456 # MySQL connectivity information # Hostname of MySQL instance is “mysql” as defined in docker compose database.hostname=mysql database.user=debezium database.password=passme database.allowPublicKeyRetrieval=true # The target database database.include.list=ecommerce # The target table table.include.list=ecommerce.product # Kafka topic prefix for CDC. Topic naming convention: <prefix>.<database>.<table name> # In this example, the topic for product table is ecommerce-cdc.ecommerce.product topic.prefix=ecommerce-cdc # Configuration to capture schema changes schema.history.internal.kafka.bootstrap.servers=localhost:9092 schema.history.internal.kafka.topic=ecommerce-schema-cdc
database.history.* Alanı
Açıklaması şöyle
The database.history.kafka.bootstrap.servers and database.history.kafka.topic properties are used to store and manage the database schema changes in Kafka.
Örnek
Şöyle yaparız
{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql", "database.port": "3306", "database.user": "mysqluser", "database.password": "mysqlpw", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.inventory" } }
transforms* Alanı
Örnek - route
Şöyle yaparız
name=debezium-mysql-source connector.class=io.debezium.connector.mysql.MySqlConnector tasks.max=1 database.hostname=mysql database.port=3306 database.user=root database.password=<mysql-root-password> database.server.id=223344 database.server.name=dbserver1 database.history.kafka.topic=students-schema database.whitelist=test table.whitelist=test.students message.key.columns=test.students:id database.history.kafka.bootstrap.servers=kafkaconnect-cp-kafka:9092 transforms=route transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter transforms.route.regex=([^.data]+)\\.([^.]+)\\.([^.]+) transforms.route.replacement=$3
transforms* için açıklaması şöyle
This set of properties are used to configure a Route SMT (Single Message Transformation). By default when the Debezium connector receives an update, it tries to send them to a topic named <database.server.name>.<database.name>.<table.name>. So in our example, it would be “dbserver1.test.students”. In this example, we only want to store the information in the “students” topic therefore, we are stipping off the first two parts.
Örnek - reroute
Açıklaması şöyle
To route change event records for multiple physical tables to the same topic, configure the topic routing transformation in the Kafka Connect configuration for the Debezium connector. Configuration of the topic routing SMT requires you to specify regular expressions that determine:
- The tables for which to route records. These tables must all have the same schema.
- The destination topic name.
Şöyle yaparız. Burada tablo isminde customers_shard olan tüm kayıtlar $1customers_all_shards topic'ine gönderiliyor
transforms=Reroute transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter transforms.Reroute.topic.regex=(.*)customers_shard(.*) transforms.Reroute.topic.replacement=$1customers_all_shards
Şöyle yaparız. Burada tablo isminde TABLE_ olan tüm kayıtlar TABLE_TOPIC topic'ine gönderiliyor. Her satır için de primary key yerine unique_id isimli sütun ekleniyor
"name": "table_connectors_v0", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.user": "user", "database.server.id": "1111111", "database.hostname": "121.0.0.1", "database.password": "password", "database.history.kafka.bootstrap.servers": "121.0.0.1:9092", "database.history.kafka.topic": "dbhistory.table1", "database.history.skip.unparseable.ddl": "true", "database.whitelist": "dbname", "database.server.name": "dbname", "database.port": "4444", "include.schema.changes": "false", "transforms": "Reroute", "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter", "transforms.Reroute.topic.regex": "(.*)TABLE_(.*)", "transforms.Reroute.topic.replacement": "TABLE_TOPIC", "transforms.Reroute.key.field.name":"unique_id", "table.whitelist": "dbname.TABLE_.*", "header.converter":"org.apache.kafka.connect.storage.SimpleHeaderConverter", "snapshot.mode":"schema_only" } }
transforms.Reroute.key.field.name için açıklama şöyle
.. you can configure the topic routing transformation to insert a different field into the key. To do this, specify the key.field.name option and set it to a field name that does not clash with existing primary key field names....Name of a field to be added to the change event key. The value of this field identifies the original table name. For the SMT to add this field, key.enforce.uniqueness must be true, which is the default.