25 Aralık 2022 Pazar

Debezium Connector

Giriş
1. Connector'a bir isim verilir
2. connector.class her zaman io.debezium.connector.mysql.MySqlConnector olarak belirtilir. Bu connector veri tabanından okumak içindir. Açıklaması şöyle
connector.class - class that implements capturing logic. In our case, it will read mysql binlogs.
3. Veri tabanı bağlantısı bilgisi tanımlanır. Bu alanlar şöyle
database.hostname
database.port
database.user
database.password

4. Kafka bağlantı bilgisi tanımlanır. Bunlar şöyle
database.history.kafka.bootstrap.servers
database.history.kafka.topic

5. whitelist Alanları
database.whitelist ile hangi veri tabanının kullanılacağı
table.whitelist ile hangi tabloların kullanılacağı belirtilir

database.server.\*  Alanları
Açıklaması şöyle
database.server.\* - to identify mysql server. Will be used as a prefix when creating topics.
1. database.server.id
Açıklaması şöyle
This is the id given to the MySQL server during installation. Please refer to the section “Customize default MySQL configuration”.
2. database.server.name
Açıklaması şöyle
This is any String that can be used to uniquely identify the MySQL instance. This is especially useful when we are using a MySQL cluster.
database.history.kafka.bootstrap.servers
Kafka sunucusunun adresi

database.history.kafka.topic
Açıklaması şöyle
This is the topic Debezium connector will use to store the schema changes.
Açıklaması şöyle
database.history.kafka.topic - connect will upload db schemas there, and will use it to inform from where connect should begin reading if restarted.
database.whitelist Alanı
Açıklaması şöyle
If there are multiple databases in the instance, which one(s) need to be monitored and synched.
table.whitelist Alanı
Açıklaması şöyle
Which tables in the whitelist database list need to be monitored and synched.
Örnek
Şöyle yaparız. Burada docker compose ile çalıştırılan mysql isimli sunucuya debezium ismi ve passme şifresi ile bağlanıyor. ecommerce veri tabanındaki ecommerce.product isimli tabloyu dinlemeye başlıyor. Tabloyu ecommerce-cdc isimli kafka topic'e yazıyor. Schema değişikliklerini de ecommerce-schema-cdc isimli kafka topic'e yazıyor.
name=MySqlConnector
connector.class=io.debezium.connector.mysql.MySqlConnector

# number of connector instance for horizontally scaling
tasks.max=1

# just a unique and random number here
database.server.id=83456

# MySQL connectivity information 
# Hostname of MySQL instance is “mysql” as defined in docker compose
database.hostname=mysql
database.user=debezium
database.password=passme
database.allowPublicKeyRetrieval=true

# The target database 
database.include.list=ecommerce

# The target table
table.include.list=ecommerce.product

# Kafka topic prefix for CDC. Topic naming convention: <prefix>.<database>.<table name>
# In this example, the topic for product table is ecommerce-cdc.ecommerce.product
topic.prefix=ecommerce-cdc

# Configuration to capture schema changes
schema.history.internal.kafka.bootstrap.servers=localhost:9092
schema.history.internal.kafka.topic=ecommerce-schema-cdc
database.history.* Alanı
Açıklaması şöyle
The database.history.kafka.bootstrap.servers and database.history.kafka.topic properties are used to store and manage the database schema changes in Kafka.
Örnek
Şöyle yaparız
{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "mysqluser",
    "database.password": "mysqlpw",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.whitelist": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory"
  }
}
transforms* Alanı
Örnek - route
Şöyle yaparız
name=debezium-mysql-source
connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1
database.hostname=mysql
database.port=3306
database.user=root
database.password=<mysql-root-password>
database.server.id=223344
database.server.name=dbserver1
database.history.kafka.topic=students-schema
database.whitelist=test
table.whitelist=test.students
message.key.columns=test.students:id
database.history.kafka.bootstrap.servers=kafkaconnect-cp-kafka:9092
transforms=route
transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.route.regex=([^.data]+)\\.([^.]+)\\.([^.]+)
transforms.route.replacement=$3
transforms* için açıklaması şöyle
This set of properties are used to configure a Route SMT (Single Message Transformation). By default when the Debezium connector receives an update, it tries to send them to a topic named <database.server.name>.<database.name>.<table.name>. So in our example, it would be “dbserver1.test.students”. In this example, we only want to store the information in the “students” topic therefore, we are stipping off the first two parts.
Örnek - reroute
Açıklaması şöyle
To route change event records for multiple physical tables to the same topic, configure the topic routing transformation in the Kafka Connect configuration for the Debezium connector. Configuration of the topic routing SMT requires you to specify regular expressions that determine:

- The tables for which to route records. These tables must all have the same schema.
- The destination topic name.
Şöyle yaparız. Burada tablo isminde customers_shard olan tüm kayıtlar $1customers_all_shards topic'ine gönderiliyor
transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
Şöyle yaparız. Burada tablo isminde TABLE_ olan tüm kayıtlar TABLE_TOPIC topic'ine gönderiliyor. Her satır için de primary key yerine unique_id isimli sütun ekleniyor
"name": "table_connectors_v0",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.user": "user",
        "database.server.id": "1111111",
        "database.hostname": "121.0.0.1",
        "database.password": "password",
        "database.history.kafka.bootstrap.servers": "121.0.0.1:9092",
        "database.history.kafka.topic": "dbhistory.table1",
        "database.history.skip.unparseable.ddl": "true",
        "database.whitelist": "dbname",
        "database.server.name": "dbname",
        "database.port": "4444",
        "include.schema.changes": "false",
        "transforms": "Reroute",
        "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
        "transforms.Reroute.topic.regex": "(.*)TABLE_(.*)",
        "transforms.Reroute.topic.replacement": "TABLE_TOPIC",
        "transforms.Reroute.key.field.name":"unique_id",
        "table.whitelist": "dbname.TABLE_.*",
        "header.converter":"org.apache.kafka.connect.storage.SimpleHeaderConverter",
        "snapshot.mode":"schema_only"
  }
}
transforms.Reroute.key.field.name için açıklama şöyle
.. you can configure the topic routing transformation to insert a different field into the key. To do this, specify the key.field.name option and set it to a field name that does not clash with existing primary key field names
....
Name of a field to be added to the change event key. The value of this field identifies the original table name. For the SMT to add this field, key.enforce.uniqueness must be true, which is the default.




20 Aralık 2022 Salı

Docker Compose ve MySQL Debezium

Örnek
4 tane container başlatır.
confluentinc/cp-zookeeper:6.0.0
confluentinc/cp-kafka:6.0.0
confluentinc/cp-kafka-connect:latest
debezium/example-mysql:1.7
Şöyle yaparız
---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  broker:
    image: confluentinc/cp-kafka:6.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9999:9999"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: localhost
  connect:
    image: confluentinc/cp-kafka-connect:latest
    hostname: connect
    container_name: connect
    depends_on:
      - broker
    ports:
      - "8083:8083"
    command:
      - bash
      - -c
      - |
        confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
        confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
        confluent-hub install --no-prompt debezium/debezium-connector-mysql:latest
        /etc/confluent/docker/run
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
      KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=connect -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=5555 -Dcom.sun.management.jmxremote.port=5555
  mysql:
    image: debezium/example-mysql:1.7
    hostname: mysql
    container_name: mysql
    depends_on:
      - broker
    environment:
      - MYSQL_ROOT_PASSWORD=debezium
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw
    ports:
      - '3306:3306'
Örnek
Şöyle yaparız. Burada debezium/connect kullanılıyor. Kafka yerine de redpanda kullanılıyor. 
# redpanda-debezium.compose.yml
version: "3.3"
services:
  redpanda:
    image: vectorized/redpanda
    ports:
      - "9092:9092"
      - "29092:29092"
    command:
      - redpanda
      - start
      - --overprovisioned
      - --smp
      - "1"
      - --memory
      - "1G"
      - --reserve-memory
      - "0M"
      - --node-id
      - "0"
      - --kafka-addr
      - PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
      - --advertise-kafka-addr
      - PLAINTEXT://redpanda:29092,OUTSIDE://redpanda:9092
      - --check=false
  connect:
    image: debezium/connect
    depends_on:
      - redpanda
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: "redpanda:9092"
      GROUP_ID: "1"
      CONFIG_STORAGE_TOPIC: "inventory.configs"
      OFFSET_STORAGE_TOPIC: "inventory.offset"
      STATUS_STORAGE_TOPIC: "inventory.status"
  mysql:
    image: debezium/example-mysql:1.6
    ports:
      - "3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD: debezium
      MYSQL_USER: mysqluser
      MYSQL_PASSWORD: mysqlpw
Örnek
Şöyle yaparız
version: '3'

services:
  mysql:
    image: debezium/example-mysql:latest
    ports:
      - "3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD: dbz
      MYSQL_USER: mysqluser
      MYSQL_PASSWORD: mysqlpw
      MYSQL_DATABASE: inventory
    networks:
      - inventory

  zookeeper:
    image: debezium/zookeeper:1.0
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - inventory

  kafka:
    image: debezium/kafka:1.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CONNECT_BOOTSTRAP_SERVERS: kafka:9092
    networks:
      - inventory

  connect:
    image: debezium/connect:1.0
    ports:
      - "8083:8083"
    environment:
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: my_connect_configs
      OFFSET_STORAGE_TOPIC: my_connect_offsets
      STATUS_STORAGE_TOPIC: my_connect_statuses
      BOOTSTRAP_SERVERS: kafka:9092
    networks:
      - inventory
    depends_on:
      - kafka

networks:
  inventory:



14 Aralık 2022 Çarşamba

CREATE TABLE

PARTITION BY X
5 çeşit partition yöntemi var. 

Eğer bölümleme yaptıktan sonra silmek istersek şöyle yaparız
ALTER TABLE customers REMOVE PARTITIONING;

1. RANGE partitioning
RANGE partitioning yazısına taşıdım. Sütün hangi aralığa girerse o bölümlemeye yazılır

2. LIST partitioning
LIST partitioning yazısına taşıdım. Sütün hangi listeye girerse o bölümlemeye yazılır

3. HASH partitioning
HASH partitioning  yazısına taşıdım. Sütun bir hash fonksiyonunda girdikten sonra dönen değer N kadar bölümlemeye ayrılır

4. KEY partitioning
Açıklaması şöyle
KEY partitioning is similar to partitioning by HASH, except that only one or more columns to be evaluated are supplied, and the SQL server provides its own hashing function. These columns can contain other than integer values, since the hashing function supplied by SQL guarantees an integer result regardless of the column data type. An extension to this type, LINEAR KEY, is also available.
Açıklaması şöyle
For this type of partitioning, the same as the LIST partitioning, the key has to be part of a primary/unique key be it single or compound.
ALTER TABLE
Örnek
Şöyle yaparız
CREATE TABLE sales ( saleDate date, … ) 
  PARTITION BY KEY(saleDate) PARTITIONS 16;

ALTER TABLE sales 
  ADD PARTITION (date = '2016-05-14');
Örnek
Şöyle yaparız
ALTER TABLE customers PARTITION BY KEY(registered_at) PARTITIONS 4;

5. COLUMN partitioning
Açıklaması şöyle
COLUMN partitioning is a variant of RANGE and LIST partitioning. COLUMNS partitioning enables the use of multiple columns in partitioning keys. All of these columns are taken into account both for the purpose of placing rows in partitions and for the determination of which partitions are to be checked for matching rows in partition pruning.
Örnek
Şöyle yaparız
CREATE TABLE rcx (
  a INT,
  b INT,
  c CHAR(3),
  d INT
)
PARTITION BY RANGE COLUMNS(a,d,c) (
  PARTITION p0 VALUES LESS THAN (5,10,'ggg'),
  PARTITION p1 VALUES LESS THAN (10,20,'mmm'),
  PARTITION p2 VALUES LESS THAN (15,30,'sss'),
  PARTITION p3 VALUES LESS THAN (MAXVALUE,MAXVALUE,MAXVALUE)
);


6 Aralık 2022 Salı

INSERT INTO İle Bulk İşlem

Örnek
Şöyle yaparız
INSERT INTO dept VALUES(1,'dept1'),(2,'dept2')



DESC - Describe Table

Örnek
Şöyle yaparız
DESC emp


GRANT Permisssions

Giriş
SELECT, INSERT gibi şeyler kullanılabilir.

Örnek
Şöyle yaparız. test isimli kullanıcıya localhost veri tabanında izimler veriliyor
GRANT select, insert ON test.* TO 'test'@'localhost' IDENTIFIED BY '123'

# View account permissions
SHOW GRANTS FOR 'test'@'localhost'

# Withdraw operation authority
REVEOKE INSERT ON test.* FROM 'test'@'localhost'


CREATE EVENT - Scheduled Task İçindir

Örnek Şöyle yaparız CREATE EVENT myevent     ON SCHEDULE AT CURRENT_TIMESTAMP + INTERVAL 1 HOUR     DO       UPDATE myschema.mytable SET myc...