[Kafka] Kafka Connect - Debezium Connector 예제

    MySQL 에서 Kafka 로 Source Connector 구축하기

    이미지출처 : confluent

    1. 시작하며

    Kafka connect는 카프카용 데이터 통합 프레임워크이다. 이 때 Kafka connect는 Kafka connector가 동작하도록 실행해주는 프로세스이다. Kafka connector에는 Source connector와 Sink connector가 있다. 간단히 말하면 Source Connector는 Producer의 역할, Sink connector는 Consumer 역할을 한다.

    • Source Connector : 외부시스템 -> 커넥트 -> 카프카
    • Sink Connector : 카프카 -> 커넥트 -> 외부 시스템

    Debezium은 카프카 커넥트(Kafka Connect) 기반의 플러그인이며 데이터 변경 캡쳐를 위해 사용된다. Debezium Connector for MySQL 를 통해 Source connector를 생성하여 mysql 데이터 변경을 topic에 저장하는 과정을 docker 환경에서 실습해보자.

    • 구축 목표 : MySQL -> kafka Connect(Source Connector, Debezium) -> Kafka

    2. docker-compose.yml 작성

    기존의 wurstmeister 이미지를 활용하여 mysql, zookeeper, kafka 컨테이너를 생성한다.

    version: '3'
    services:
      mysql:
        image: mysql:8.0
        container_name: mysql
        ports:
          - 3306:3306
        environment:
          MYSQL_ROOT_PASSWORD: admin
          MYSQL_USER: mysqluser
          MYSQL_PASSWORD: mysqlpw 
        command:
          - --character-set-server=utf8mb4
          - --collation-server=utf8mb4_unicode_ci
        volumes:
          - D:/mysql/data:/var/lib/mysql
    
      zookeeper:
        container_name: zookeeper
        image: wurstmeister/zookeeper
        ports:
          - "2181:2181"
    
      kafka:
        container_name: kafka
        image: wurstmeister/kafka
        depends_on:
          - zookeeper
        ports:
          - "9092:9092"
        environment:
          KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
          KAFKA_ADVERTISED_PORT: 9092
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock

    컨테이너를 실행한다.

    docker-compose -f docker-compose.yml up -d 

    3. DB 설정

    데이터베이스 및 테스트용 테이블 생성

    mysql -u root -p
    
    create database testdb;
    
    use testdb;
    
    CREATE TABLE accounts (
       account_id VARCHAR(255),
       role_id VARCHAR(255),
       user_name VARCHAR(255),
       user_description VARCHAR(255),
       update_date DATETIME DEFAULT CURRENT_TIMESTAMP,
       PRIMARY KEY (account_id)
    );

    mysql 사용자 추가 및 권한 확인

    mysqluser 유저에게 모든 데이터베이스, 테이블 권한 부여하자

    use mysql;
    
    // mysqluser 가 추가 되어 있는지 확인
    select host, user from user;
    
    // mysqluser 없으면 생성
    CREATE USER 'mysqluser'@'%' IDENTIFIED BY 'mysqlpw';
    // mysqluser 에게 권한 부여
    GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
    
    FLUSH PRIVILEGES;

    4. Debezium Connector for MySQL 플러그인 설치

    Debezium Connector 설치

    Debezium 사이트에서 MySQL 커넥터 플러그인을 다운로드한다. 최신 버전인 debezium-connector-mysql-1.5.4.Final-plugin.tar.gz을 다운로드 받는다.

    로컬 컴퓨터에서 kafka 컨테이너로 debezium-connector-mysql-1.5.4.Final-plugin.tar.gz 를 업로드한다. /opt/kafka_2.13-2.7.1/ 경로에 미리 connectors 폴더를 만들어두었다.

    docker cp debezium-connector-mysql-1.5.4.Final-plugin.tar.gz kafka:/opt/kafka_2.13-2.7.1/connectors/debezium-connector-mysql-1.5.4.Final-plugin.tar.gz

    파일 압축을 푼다.

    cd /opt/kafka_2.13-2.7.1/connectors
    tar -zxvf debezium-connector-mysql-1.5.4.Final-plugin.tar.gz

    압축을 풀면 아래와 같은 파일이 나타난다.

    debezium-connector-mysql/CHANGELOG.md
    debezium-connector-mysql/CONTRIBUTE.md
    debezium-connector-mysql/COPYRIGHT.txt
    debezium-connector-mysql/LICENSE-3rd-PARTIES.txt
    debezium-connector-mysql/LICENSE.txt
    debezium-connector-mysql/README.md
    debezium-connector-mysql/README_ZH.md
    debezium-connector-mysql/debezium-core-1.5.4.Final.jar
    debezium-connector-mysql/debezium-api-1.5.4.Final.jar
    debezium-connector-mysql/guava-30.0-jre.jar
    debezium-connector-mysql/failureaccess-1.0.1.jar
    debezium-connector-mysql/debezium-ddl-parser-1.5.4.Final.jar
    debezium-connector-mysql/antlr4-runtime-4.7.2.jar
    debezium-connector-mysql/mysql-binlog-connector-java-0.25.1.jar
    debezium-connector-mysql/mysql-connector-java-8.0.21.jar
    debezium-connector-mysql/debezium-connector-mysql-1.5.4.Final.jar

    plugin 경로 수정

    카프카 컨테이너에 접속하여 /opt/kafka/config/connect-distributed.properties 파일을 수정한다. 파일을 수정한 뒤에는 카프카 컨테이너를 재시작해야 플러그인 경로가 정상 반영된다.

    // 원래 경로
    #plugin.path=
    
    // 수정 경로
    plugin.path=/opt/kafka_2.13-2.7.1/connectors

    처음에 MySQL 커넥터가 없기 때문에 따로 Debezium Connector for MySQL을 설치해주었다. 만약 처음에 plugin이 설치되어 있다면 따로 설치할 필요가 없다.

    plugin 경로에 파일이 없거나 플러그인 경로가 반영되지 않으면 io.debezium.connector.mysql.MySqlConnector 가 플러그인 목록에 나타나지 않는다.

    5. kafka connect 실행

    Distributed Mode로 kafka connect 실행

    분산모드(distributed) 카프카 커넥트를 실행한다. 분산모드는 2개 이상의 커넥트를 한 개의 클러스터를 묶어서 운영한다.

    connect-distributed.sh /opt/kafka/config/connect-distributed.properties

    정상적으로 실행되면 마지막에 아래와 같은 로그를 볼 수 있다.

    INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)

    6. Source Connector 생성하기

    Kafka Connect 클러스터 확인

    worker, version, commit 및 Kafka 클러스터 ID에 대한 kafka Connect 클러스터 정보를 확인해보자.

    ##  커넥터 8083 port 가 열려있는지 확인
    netstat -lnp
    
    curl http://localhost:8083/
    {"version":"2.7.1","commit":"61dbce85d0d41457","kafka_cluster_id":"bMLmHpkkQNCwbEDd7ZDUFw"}

    MySQL 커넥터 플러그인 확인

    connector를 생성하기 앞서 설치된 플러그인 목록을 조회한다.

    curl --location --request GET 'localhost:8083/connector-plugins'

    io.debezium.connector.mysql.MySqlConnector 가 있어야 한다.

    [
      {
        "class": "io.debezium.connector.mysql.MySqlConnector",
        "type": "source",
        "version": "1.5.4.Final"
      },
      {
        "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "type": "sink",
        "version": "2.7.1"
      },
      {
        "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "type": "source",
        "version": "2.7.1"
      },
      {
        "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
        "type": "source",
        "version": "1"
      },
      {
        "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
        "type": "source",
        "version": "1"
      },
      {
        "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "type": "source",
        "version": "1"
      }
    ]

    Rest API 로 connector 생성

    rest api 를 호출하여 connector 를 생성하자.

    curl --location --request POST 'http://localhost:8083/connectors' \
    --header 'Content-Type: application/json' \
    --data-raw '{
      "name": "source-test-connector",
      "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "mysqluser",
        "database.password": "mysqlpw",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.allowPublicKeyRetrieval": "true",
        "database.include.list": "testdb",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "dbhistory.testdb",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true",
        "transforms": "unwrap,addTopicPrefix",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.addTopicPrefix.regex":"(.*)",
        "transforms.addTopicPrefix.replacement":"$1"
      }
    }'

    설정 설명
    connector.class는 커넥터의 Java 클래스입니다.
    tasks.max는 이 커넥터에 대해 생성되어야 할 태스크의 최대 수입니다.
    database.hostname은 DB 엔드포인트입니다

    database.server.name는 MySQL 인스턴스를 고유하게 식별하는 데 사용할 수있는 문자열입니다.
    database.include.list는 지정한 서버에서 호스팅하는 데이터베이스의 목록을 포함합니다.
    database.history.kafka.bootstrap.servers는 부트스트랩 서버 주소
    database.history.kafka.topic은 데이터베이스 스키마 변경을 추적하기 위해 Debezium에서 내부적으로 사용하는 Kafka 주제입니다.

    connector 목록/상세 정보 확인

    # 목록
    curl --location --request GET 'http://localhost:8083/connectors'
    # 상세정보
    curl --location --request GET 'http://localhost:8083/connectors/{connector-name}/config ' \
    --header 'Content-Type: application/json'
    
    curl --location --request GET 'http://localhost:8083/connectors/source-test-connector/config ' \
    --header 'Content-Type: application/json'

    connector 삭제

    삭제할 일이 생기면 삭제하자.

    curl --location --request DELETE 'http://localhost:8083/connectors/source-test-connector'
    GET /connectors – returns a list with all connectors in use
    GET /connectors/{name} – returns details about a specific connector
    POST /connectors – creates a new connector; the request body should be a JSON object containing a string name field and an object config field with the connector configuration parameters
    GET /connectors/{name}/status – returns the current status of the connector – including if it is running, failed or paused – which worker it is assigned to, error information if it has failed, and the state of all its tasks
    DELETE /connectors/{name} – deletes a connector, gracefully stopping all tasks and deleting its configuration
    GET /connector-plugins – returns a list of connector plugins installed in the Kafka Connect cluster

    기타 명령어는 공식 사이트에서 보자
    https://docs.confluent.io/platform/current/connect/references/restapi.html
    https://developer.confluent.io/learn-kafka/kafka-connect/rest-api/

    Topic 목록 확인

    kafka-topics.sh --list --bootstrap-server localhost:9092

    아래와 같이 토픽 목록이 존재한다.

    ## 분산 모드로 카프카 커넥트 실행 후 생성되는 토픽
    __consumer_offsets
    connect-configs
    connect-offsets
    connect-status
    
    ## 커넥터 생성 후 생성되는 토픽
    dbhistory.testdb
    dbserver1 // DDL 정보
    dbserver1.testdb.accounts // 테이블 정보

    7. 레코드 확인

    테스트 데이터 입력

    INSERT INTO accounts VALUES ("123456", "111", "Susan Cooper", "God", "2021-08-16 10:11:12");
    INSERT INTO accounts VALUES ("123457", "111", "Rick Ford", "mistakes", "2021-08-16 11:12:13");
    INSERT INTO accounts VALUES ("123458", "999", "Bradley Fine", "face", "2021-08-16 12:13:14");

    콘솔 컨슈머 확인

    하나의 터미널에서는 데이터를 입력하고 다른 터미널에서는 topic 에 입력된 레코드 정보를 확인해보자.

    kafka-console-consumer.sh --topic dbserver1.testdb.accounts --bootstrap-server localhost:9092 --from-beginning

    serverName.databaseName.tableName 토픽에 아래와 같이 테이블의 변화를 수집하고 있다.
    https://debezium.io/documentation/reference/1.7/connectors/mysql.html

    {
      "schema": {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": true,
            "field": "account_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "role_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "user_name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "user_description"
          },
          {
            "type": "int64",
            "optional": true,
            "name": "io.debezium.time.Timestamp",
            "version": 1,
            "default": 0,
            "field": "update_date"
          }
        ],
        "optional": false,
        "name": "dbserver1.testdb.accounts.Value"
      },
      "payload": {
        "account_id": "123456",
        "role_id": "111",
        "user_name": "Susan Cooper",
        "user_description": "God",
        "update_date": 1629108672000
      }
    }

    8. 마치며

    Debezium connector는 CDC(hange Data Capture)에 일반적으로 사용된다. Debezium connector 을 통해 mysql 의 데이터 변경을 카프카 topic 에 저장할 수 있다. 각 테이블의 데이터 변경 내용은 serverName.databaseName.tableName topic 에 저장된다. 토픽에 저장된 레코드를 활용하여 MySQL, S3, elasticSearch 등 외부시스템에서 활용할 수 있다.

    다음글: [Kafka] Kafka Connect - JDBC Connector 예제

    참고

    반응형

    댓글

    Designed by JB FACTORY