[Kafka] Kafka Connect - JDBC Connector 예제

    Kafka 에서 Mysql 로 Sink Connector 구축하기

    이미지출처 : confluent

    1. 시작하며

    Kafka connect로 Kafka Topic 저장된 레코드를 Mysql로 데이터 마이그레이션 하고자한다. 이 과정은 Confluent 사의 JDBC Sink Connector 통해 구축한다.

    • Source Connector: MySQL -> kafka Connect(Source Connector, Debezium) -> Kafka
    • Sink Connector : Kafka -> kafka Connect(JDBC Sink Connector) -> Mysql

    2. docker-compose.yml 작성

    Source Connector 를 만들었던 예제에서 sink용 Mysql 하나 더 추가하였다.

    version: '3'
    services:
      mysql:
        image: mysql:8.0
        container_name: mysql
        ports:
          - 3306:3306
        environment:
          MYSQL_ROOT_PASSWORD: admin
          MYSQL_USER: mysqluser
          MYSQL_PASSWORD: mysqlpw 
        command:
          - --character-set-server=utf8mb4
          - --collation-server=utf8mb4_unicode_ci
        volumes:
          - D:/mysql/data:/var/lib/mysql
    
      zookeeper:
        container_name: zookeeper
        image: wurstmeister/zookeeper
        ports:
          - "2181:2181"
    
      kafka:
        container_name: kafka
        image: wurstmeister/kafka
        depends_on:
          - zookeeper
        ports:
          - "9092:9092"
        environment:
          KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
          KAFKA_ADVERTISED_PORT: 9092
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
    
      mysql-sink:
        image: mysql:8.0
        container_name: mysql-sink
        ports:
          - 3307:3306
        environment:
          MYSQL_ROOT_PASSWORD: admin
          MYSQL_USER: mysqluser
          MYSQL_PASSWORD: mysqlpw 
        command:
          - --character-set-server=utf8mb4
          - --collation-server=utf8mb4_unicode_ci
        volumes:
          - D:/mysql-sink/data:/var/lib/mysql

    3. DB 설정

    Kafka Connector를 통한 데이터베이스, 테이블 자동 생성 설정은 사용하지 않을 것이므로 데이터베이스와 테이블을 직접 생성한다.

    데이터베이스 및 테스트용 테이블 생성

    mysql -u root -p
    
    create database sinkdb;
    
    use sinkdb;
    
    CREATE TABLE accounts (
       account_id VARCHAR(255),
       role_id VARCHAR(255),
       user_name VARCHAR(255),
       user_description VARCHAR(255),
       update_date DATETIME DEFAULT CURRENT_TIMESTAMP,
       PRIMARY KEY (account_id)
    );

    mysql 사용자 추가 및 권한 확인

    mysqluser 유저에게 모든 데이터베이스, 테이블 권한 부여하자

    use mysql;
    
    // mysqluser 가 추가 되어 있는지 확인
    select host, user from user;
    
    // mysqluser 없으면 생성
    CREATE USER 'mysqluser'@'%' IDENTIFIED BY 'mysqlpw';
    // mysqluser 에게 권한 부여
    GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
    
    FLUSH PRIVILEGES;

    4. Kafka JDBC Connector (Source and Sink) 설치

    JDBC Connector 설치

    Confluent 사이트에서 confluentinc-kafka-connect-jdbc-10.2.5.zip 을 다운로드 받는다.

    해당 파일을 kafka 컨테이너로 업로드한다.

    docker cp confluentinc-kafka-connect-jdbc-10.2.5.zip kafka:/opt/kafka_2.13-2.7.1/connectors/

    파일 압축을 푼다.

    cd /opt/kafka_2.13-2.7.1/connectors
    unzip confluentinc-kafka-connect-jdbc-10.2.5.zip

    plugin 경로 확인

    source connector를 설치할 때 이미 /opt/kafka/config/connect-distributed.properties 파일의 plugin 경로를 수정해두었다.

    plugin.path=/opt/kafka_2.13-2.7.1/connectors

    5. Kafka connect 실행

    Distributed Mode로 kafka connect 실행

    connect-distributed.sh /opt/kafka/config/connect-distributed.properties

    재시작 후 플러그인이 설치된 것을 확인한다

    6. Sink Connector 생성하기

    worker, version, commit 및 Kafka 클러스터 ID에 대한 kafka Connect 클러스터 정보를 확인해보자.

    curl http://localhost:8083/
    {"version":"2.7.1","commit":"61dbce85d0d41457","kafka_cluster_id":"bMLmHpkkQNCwbEDd7ZDUFw"}

    MySQL 커넥터 플러그인 확인

    connector를 생성하기 앞서 설치된 플러그인 목록을 조회한다.

    curl --location --request GET 'localhost:8083/connector-plugins'

    io.confluent.connect.jdbc.JdbcSinkConnector, io.confluent.connect.jdbc.JdbcSourceConnector 가 있어야 한다.

    [
      {
        "class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "type": "sink",
        "version": "10.2.5"
      },
      {
        "class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "type": "source",
        "version": "10.2.5"
      }
    ]

    Rest API 로 connector 생성

    rest api 를 호출하여 connector 를 생성한다.
    No suitable driver found 에러가 발생하면 다음 링크를 확인한다.

    curl --location --request POST 'http://localhost:8083/connectors' \
    --header 'Content-Type: application/json' \
    --data-raw '{
      "name": "sink-test-connector",
      "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:mysql://mysql-sink:3306/sinkdb?user=mysqluser&password=mysqlpw",
        "auto.create": "false",
        "auto.evolve": "false",
        "delete.enabled": "true",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "table.name.format":"${topic}",
        "tombstones.on.delete": "true",
        "connection.user": "mysqluser",
        "connection.password": "mysqlpw",
        "topics.regex": "dbserver1.testdb.(.*)",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true",
        "transforms": "unwrap, route, TimestampConverter",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "true",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$3",
        "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
        "transforms.TimestampConverter.target.type": "Timestamp",
        "transforms.TimestampConverter.field": "update_date"
      }
    }'

    상세 configuration 확인

    https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/sink_config_options.html

    • auto.create: 자동 테이블 생성 (절대 하면 안됨)
    • auto.evolve : 자동 컬럼 생성
    • delete.enabled: null record를 삭제로 처리, pk.moderecord_key 여야 한다.
    • insert.mode : upsert, insert, update
    • topics.regex : topics 대신 정규식으로 topic 받아올 수 있음

    토픽 이름 RegrexRouter
    https://docs.confluent.io/platform/current/connect/transforms/regexrouter.html
    https://debezium.io/blog/2017/09/25/streaming-to-another-database/

    Before : logical-name.database-name.table-name
    After : table-name

    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3"

    TimestampConverter
    아래와 같은 이슈가 발생하여 TimestampConverter 를 추가 했다.
    https://docs.confluent.io/platform/current/connect/transforms/timestampconverter.html

      java.sql.BatchUpdateException: Data truncation: Incorrect datetime value: '1629112333000' for column 'update_date' at row 1
    com.mysql.cj.jdbc.exceptions.MysqlDataTruncation: Data truncation: Incorrect datetime value: '1629112333000' for column 'update_date' at row 1

    Before: 1629108672000
    After : 2021-08-16 10:11:12

      "transforms": "TimestampConverter",
      "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
      "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
      "transforms.TimestampConverter.target.type": "Timestamp",
      "transforms.TimestampConverter.field": "update_date"

    기타 Kafka Connect REST API

    # 목록
    curl --location --request GET 'http://localhost:8083/connectors'
    
    # 상세정보
    curl --location --request GET 'http://localhost:8083/connectors/sink-test-connector/config ' \
    --header 'Content-Type: application/json'
    
    # 삭제 
    curl --location --request DELETE 'http://localhost:8083/connectors/sink-test-connector'
    GET /connectors – returns a list with all connectors in use
    GET /connectors/{name} – returns details about a specific connector
    POST /connectors – creates a new connector; the request body should be a JSON object containing a string name field and an object config field with the connector configuration parameters
    GET /connectors/{name}/status – returns the current status of the connector – including if it is running, failed or paused – which worker it is assigned to, error information if it has failed, and the state of all its tasks
    DELETE /connectors/{name} – deletes a connector, gracefully stopping all tasks and deleting its configuration
    GET /connector-plugins – returns a list of connector plugins installed in the Kafka Connect cluster

    기타 명령어는 공식 사이트에서 보자
    https://docs.confluent.io/platform/current/connect/references/restapi.html
    https://developer.confluent.io/learn-kafka/kafka-connect/rest-api/

    7. Mysql Sink 확인

    테스트 데이터 입력

    INSERT INTO accounts VALUES ("123456", "111", "Susan Cooper", "God", "2021-08-16 10:11:12");
    INSERT INTO accounts VALUES ("123457", "111", "Rick Ford", "mistakes", "2021-08-16 11:12:13");
    INSERT INTO accounts VALUES ("123458", "999", "Bradley Fine", "face", "2021-08-16 12:13:14");

    콘솔 컨슈머 확인

    하나의 터미널에서는 데이터를 입력하고 다른 터미널에서는 topic 에 입력된 레코드 정보를 확인해보자. 이전 포스트
    에서 작성한 source connector 가 제대로 동작 한다면 topic 에 레코드가 저장된다.

    kafka-console-consumer.sh --topic dbserver1.testdb.accounts --bootstrap-server localhost:9092 --from-beginning

    Sink 용 Target DB 확인

    source DB 인 mysql 에 데이커를 insert, update 하면 데이터의 변경이 sink db인 mysql-db 에 반영된다.

    8. 마치며

    JDBC Connector 로 Kafka에서 Mysql 로 데이터를 마이그레이션하는 Kafka connect sink connector를 구축하였다. Sink connector 는 kafka topic 에 저장된 레코드를 다양한 외부 시스템(Mysql, ES 등)으로 전달해준다.

    Source connector, Sink connector 를 구축하여 데이터 CUD 를 진행하였다. 현재 Source DB에서 데이터 입력, 수정했을 때 kafka connect 가 제대로 동작하여 sink DB 에 변경사항이 반영되는것을 확인하였다. 그러나 Delete event 의 경우 동작하지 않았다. 좀 더 공부해서 connector 설정을 수정해봐야겠다.

    이전글 : MySQL 에서 Kafka 로 Source Connector 구축하기

    참고

    반응형

    댓글

    Designed by JB FACTORY