25 Aralık 2022 Pazar

Debezium Connector

Giriş
1. Connector'a bir isim verilir
2. connector.class her zaman io.debezium.connector.mysql.MySqlConnector olarak belirtilir. Bu connector veri tabanından okumak içindir. Açıklaması şöyle
connector.class - class that implements capturing logic. In our case, it will read mysql binlogs.
3. Veri tabanı bağlantısı bilgisi tanımlanır. Bu alanlar şöyle
database.hostname
database.port
database.user
database.password

4. Kafka bağlantı bilgisi tanımlanır. Bunlar şöyle
database.history.kafka.bootstrap.servers
database.history.kafka.topic

5. whitelist Alanları
database.whitelist ile hangi veri tabanının kullanılacağı
table.whitelist ile hangi tabloların kullanılacağı belirtilir

database.server.\*  Alanları
Açıklaması şöyle
database.server.\* - to identify mysql server. Will be used as a prefix when creating topics.
1. database.server.id
Açıklaması şöyle
This is the id given to the MySQL server during installation. Please refer to the section “Customize default MySQL configuration”.
2. database.server.name
Açıklaması şöyle
This is any String that can be used to uniquely identify the MySQL instance. This is especially useful when we are using a MySQL cluster.
database.history.kafka.bootstrap.servers
Kafka sunucusunun adresi

database.history.kafka.topic
Açıklaması şöyle
This is the topic Debezium connector will use to store the schema changes.
Açıklaması şöyle
database.history.kafka.topic - connect will upload db schemas there, and will use it to inform from where connect should begin reading if restarted.
database.whitelist Alanı
Açıklaması şöyle
If there are multiple databases in the instance, which one(s) need to be monitored and synched.
table.whitelist Alanı
Açıklaması şöyle
Which tables in the whitelist database list need to be monitored and synched.
Örnek
Şöyle yaparız. Burada docker compose ile çalıştırılan mysql isimli sunucuya debezium ismi ve passme şifresi ile bağlanıyor. ecommerce veri tabanındaki ecommerce.product isimli tabloyu dinlemeye başlıyor. Tabloyu ecommerce-cdc isimli kafka topic'e yazıyor. Schema değişikliklerini de ecommerce-schema-cdc isimli kafka topic'e yazıyor.
name=MySqlConnector
connector.class=io.debezium.connector.mysql.MySqlConnector

# number of connector instance for horizontally scaling
tasks.max=1

# just a unique and random number here
database.server.id=83456

# MySQL connectivity information 
# Hostname of MySQL instance is “mysql” as defined in docker compose
database.hostname=mysql
database.user=debezium
database.password=passme
database.allowPublicKeyRetrieval=true

# The target database 
database.include.list=ecommerce

# The target table
table.include.list=ecommerce.product

# Kafka topic prefix for CDC. Topic naming convention: <prefix>.<database>.<table name>
# In this example, the topic for product table is ecommerce-cdc.ecommerce.product
topic.prefix=ecommerce-cdc

# Configuration to capture schema changes
schema.history.internal.kafka.bootstrap.servers=localhost:9092
schema.history.internal.kafka.topic=ecommerce-schema-cdc
database.history.* Alanı
Açıklaması şöyle
The database.history.kafka.bootstrap.servers and database.history.kafka.topic properties are used to store and manage the database schema changes in Kafka.
Örnek
Şöyle yaparız
{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "mysqluser",
    "database.password": "mysqlpw",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.whitelist": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory"
  }
}
transforms* Alanı
Örnek - route
Şöyle yaparız
name=debezium-mysql-source
connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1
database.hostname=mysql
database.port=3306
database.user=root
database.password=<mysql-root-password>
database.server.id=223344
database.server.name=dbserver1
database.history.kafka.topic=students-schema
database.whitelist=test
table.whitelist=test.students
message.key.columns=test.students:id
database.history.kafka.bootstrap.servers=kafkaconnect-cp-kafka:9092
transforms=route
transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.route.regex=([^.data]+)\\.([^.]+)\\.([^.]+)
transforms.route.replacement=$3
transforms* için açıklaması şöyle
This set of properties are used to configure a Route SMT (Single Message Transformation). By default when the Debezium connector receives an update, it tries to send them to a topic named <database.server.name>.<database.name>.<table.name>. So in our example, it would be “dbserver1.test.students”. In this example, we only want to store the information in the “students” topic therefore, we are stipping off the first two parts.
Örnek - reroute
Açıklaması şöyle
To route change event records for multiple physical tables to the same topic, configure the topic routing transformation in the Kafka Connect configuration for the Debezium connector. Configuration of the topic routing SMT requires you to specify regular expressions that determine:

- The tables for which to route records. These tables must all have the same schema.
- The destination topic name.
Şöyle yaparız. Burada tablo isminde customers_shard olan tüm kayıtlar $1customers_all_shards topic'ine gönderiliyor
transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
Şöyle yaparız. Burada tablo isminde TABLE_ olan tüm kayıtlar TABLE_TOPIC topic'ine gönderiliyor. Her satır için de primary key yerine unique_id isimli sütun ekleniyor
"name": "table_connectors_v0",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.user": "user",
        "database.server.id": "1111111",
        "database.hostname": "121.0.0.1",
        "database.password": "password",
        "database.history.kafka.bootstrap.servers": "121.0.0.1:9092",
        "database.history.kafka.topic": "dbhistory.table1",
        "database.history.skip.unparseable.ddl": "true",
        "database.whitelist": "dbname",
        "database.server.name": "dbname",
        "database.port": "4444",
        "include.schema.changes": "false",
        "transforms": "Reroute",
        "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
        "transforms.Reroute.topic.regex": "(.*)TABLE_(.*)",
        "transforms.Reroute.topic.replacement": "TABLE_TOPIC",
        "transforms.Reroute.key.field.name":"unique_id",
        "table.whitelist": "dbname.TABLE_.*",
        "header.converter":"org.apache.kafka.connect.storage.SimpleHeaderConverter",
        "snapshot.mode":"schema_only"
  }
}
transforms.Reroute.key.field.name için açıklama şöyle
.. you can configure the topic routing transformation to insert a different field into the key. To do this, specify the key.field.name option and set it to a field name that does not clash with existing primary key field names
....
Name of a field to be added to the change event key. The value of this field identifies the original table name. For the SMT to add this field, key.enforce.uniqueness must be true, which is the default.




Hiç yorum yok:

Yorum Gönder

Soft Delete

Giriş Açıklaması  şöyle When using the soft delete mechanism on the database, you might run into a situation where a record with a unique co...