MySQL 에서 Kafka 로 Source Connector 구축하기
이미지출처 : confluent
1. 시작하며
Kafka connect는 카프카용 데이터 통합 프레임워크이다. 이 때 Kafka connect는 Kafka connector가 동작하도록 실행해주는 프로세스이다. Kafka connector에는 Source connector와 Sink connector가 있다. 간단히 말하면 Source Connector는 Producer의 역할, Sink connector는 Consumer 역할을 한다.
- Source Connector : 외부시스템 -> 커넥트 -> 카프카
- Sink Connector : 카프카 -> 커넥트 -> 외부 시스템
Debezium은 카프카 커넥트(Kafka Connect) 기반의 플러그인이며 데이터 변경 캡쳐를 위해 사용된다. Debezium Connector for MySQL 를 통해 Source connector를 생성하여 mysql 데이터 변경을 topic에 저장하는 과정을 docker 환경에서 실습해보자.
- 구축 목표 : MySQL -> kafka Connect(Source Connector, Debezium) -> Kafka
2. docker-compose.yml 작성
기존의 wurstmeister 이미지를 활용하여 mysql, zookeeper, kafka 컨테이너를 생성한다.
version: '3'
services:
mysql:
image: mysql:8.0
container_name: mysql
ports:
- 3306:3306
environment:
MYSQL_ROOT_PASSWORD: admin
MYSQL_USER: mysqluser
MYSQL_PASSWORD: mysqlpw
command:
- --character-set-server=utf8mb4
- --collation-server=utf8mb4_unicode_ci
volumes:
- D:/mysql/data:/var/lib/mysql
zookeeper:
container_name: zookeeper
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
container_name: kafka
image: wurstmeister/kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
컨테이너를 실행한다.
docker-compose -f docker-compose.yml up -d
3. DB 설정
데이터베이스 및 테스트용 테이블 생성
mysql -u root -p
create database testdb;
use testdb;
CREATE TABLE accounts (
account_id VARCHAR(255),
role_id VARCHAR(255),
user_name VARCHAR(255),
user_description VARCHAR(255),
update_date DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (account_id)
);
mysql 사용자 추가 및 권한 확인
mysqluser 유저에게 모든 데이터베이스, 테이블 권한 부여하자
use mysql;
// mysqluser 가 추가 되어 있는지 확인
select host, user from user;
// mysqluser 없으면 생성
CREATE USER 'mysqluser'@'%' IDENTIFIED BY 'mysqlpw';
// mysqluser 에게 권한 부여
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
FLUSH PRIVILEGES;
4. Debezium Connector for MySQL 플러그인 설치
Debezium Connector 설치
Debezium 사이트에서 MySQL 커넥터 플러그인을 다운로드한다. 최신 버전인 debezium-connector-mysql-1.5.4.Final-plugin.tar.gz
을 다운로드 받는다.
로컬 컴퓨터에서 kafka 컨테이너로 debezium-connector-mysql-1.5.4.Final-plugin.tar.gz
를 업로드한다. /opt/kafka_2.13-2.7.1/
경로에 미리 connectors
폴더를 만들어두었다.
docker cp debezium-connector-mysql-1.5.4.Final-plugin.tar.gz kafka:/opt/kafka_2.13-2.7.1/connectors/debezium-connector-mysql-1.5.4.Final-plugin.tar.gz
파일 압축을 푼다.
cd /opt/kafka_2.13-2.7.1/connectors
tar -zxvf debezium-connector-mysql-1.5.4.Final-plugin.tar.gz
압축을 풀면 아래와 같은 파일이 나타난다.
debezium-connector-mysql/CHANGELOG.md
debezium-connector-mysql/CONTRIBUTE.md
debezium-connector-mysql/COPYRIGHT.txt
debezium-connector-mysql/LICENSE-3rd-PARTIES.txt
debezium-connector-mysql/LICENSE.txt
debezium-connector-mysql/README.md
debezium-connector-mysql/README_ZH.md
debezium-connector-mysql/debezium-core-1.5.4.Final.jar
debezium-connector-mysql/debezium-api-1.5.4.Final.jar
debezium-connector-mysql/guava-30.0-jre.jar
debezium-connector-mysql/failureaccess-1.0.1.jar
debezium-connector-mysql/debezium-ddl-parser-1.5.4.Final.jar
debezium-connector-mysql/antlr4-runtime-4.7.2.jar
debezium-connector-mysql/mysql-binlog-connector-java-0.25.1.jar
debezium-connector-mysql/mysql-connector-java-8.0.21.jar
debezium-connector-mysql/debezium-connector-mysql-1.5.4.Final.jar
plugin 경로 수정
카프카 컨테이너에 접속하여 /opt/kafka/config/connect-distributed.properties
파일을 수정한다. 파일을 수정한 뒤에는 카프카 컨테이너를 재시작해야 플러그인 경로가 정상 반영된다.
// 원래 경로
#plugin.path=
// 수정 경로
plugin.path=/opt/kafka_2.13-2.7.1/connectors
처음에 MySQL 커넥터가 없기 때문에 따로 Debezium Connector for MySQL을 설치해주었다. 만약 처음에 plugin이 설치되어 있다면 따로 설치할 필요가 없다.
plugin 경로에 파일이 없거나 플러그인 경로가 반영되지 않으면 io.debezium.connector.mysql.MySqlConnector 가 플러그인 목록에 나타나지 않는다.
5. kafka connect 실행
Distributed Mode로 kafka connect 실행
분산모드(distributed) 카프카 커넥트를 실행한다. 분산모드는 2개 이상의 커넥트를 한 개의 클러스터를 묶어서 운영한다.
connect-distributed.sh /opt/kafka/config/connect-distributed.properties
정상적으로 실행되면 마지막에 아래와 같은 로그를 볼 수 있다.
INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)
6. Source Connector 생성하기
Kafka Connect 클러스터 확인
worker, version, commit 및 Kafka 클러스터 ID에 대한 kafka Connect 클러스터 정보를 확인해보자.
## 커넥터 8083 port 가 열려있는지 확인
netstat -lnp
curl http://localhost:8083/
{"version":"2.7.1","commit":"61dbce85d0d41457","kafka_cluster_id":"bMLmHpkkQNCwbEDd7ZDUFw"}
MySQL 커넥터 플러그인 확인
connector를 생성하기 앞서 설치된 플러그인 목록을 조회한다.
curl --location --request GET 'localhost:8083/connector-plugins'
io.debezium.connector.mysql.MySqlConnector
가 있어야 한다.
[
{
"class": "io.debezium.connector.mysql.MySqlConnector",
"type": "source",
"version": "1.5.4.Final"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "2.7.1"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "2.7.1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
Rest API 로 connector 생성
rest api 를 호출하여 connector 를 생성하자.
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "source-test-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "mysqluser",
"database.password": "mysqlpw",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.allowPublicKeyRetrieval": "true",
"database.include.list": "testdb",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.testdb",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"$1"
}
}'
설정 설명connector.class
는 커넥터의 Java 클래스입니다.tasks.max
는 이 커넥터에 대해 생성되어야 할 태스크의 최대 수입니다.database.hostname
은 DB 엔드포인트입니다
database.server.name
는 MySQL 인스턴스를 고유하게 식별하는 데 사용할 수있는 문자열입니다.database.include.list
는 지정한 서버에서 호스팅하는 데이터베이스의 목록을 포함합니다.database.history.kafka.bootstrap.servers
는 부트스트랩 서버 주소database.history.kafka.topic
은 데이터베이스 스키마 변경을 추적하기 위해 Debezium에서 내부적으로 사용하는 Kafka 주제입니다.
connector 목록/상세 정보 확인
# 목록
curl --location --request GET 'http://localhost:8083/connectors'
# 상세정보
curl --location --request GET 'http://localhost:8083/connectors/{connector-name}/config ' \
--header 'Content-Type: application/json'
curl --location --request GET 'http://localhost:8083/connectors/source-test-connector/config ' \
--header 'Content-Type: application/json'
connector 삭제
삭제할 일이 생기면 삭제하자.
curl --location --request DELETE 'http://localhost:8083/connectors/source-test-connector'
GET /connectors – returns a list with all connectors in use
GET /connectors/{name} – returns details about a specific connector
POST /connectors – creates a new connector; the request body should be a JSON object containing a string name field and an object config field with the connector configuration parameters
GET /connectors/{name}/status – returns the current status of the connector – including if it is running, failed or paused – which worker it is assigned to, error information if it has failed, and the state of all its tasks
DELETE /connectors/{name} – deletes a connector, gracefully stopping all tasks and deleting its configuration
GET /connector-plugins – returns a list of connector plugins installed in the Kafka Connect cluster
기타 명령어는 공식 사이트에서 보자
https://docs.confluent.io/platform/current/connect/references/restapi.html
https://developer.confluent.io/learn-kafka/kafka-connect/rest-api/
Topic 목록 확인
kafka-topics.sh --list --bootstrap-server localhost:9092
아래와 같이 토픽 목록이 존재한다.
## 분산 모드로 카프카 커넥트 실행 후 생성되는 토픽
__consumer_offsets
connect-configs
connect-offsets
connect-status
## 커넥터 생성 후 생성되는 토픽
dbhistory.testdb
dbserver1 // DDL 정보
dbserver1.testdb.accounts // 테이블 정보
7. 레코드 확인
테스트 데이터 입력
INSERT INTO accounts VALUES ("123456", "111", "Susan Cooper", "God", "2021-08-16 10:11:12");
INSERT INTO accounts VALUES ("123457", "111", "Rick Ford", "mistakes", "2021-08-16 11:12:13");
INSERT INTO accounts VALUES ("123458", "999", "Bradley Fine", "face", "2021-08-16 12:13:14");
콘솔 컨슈머 확인
하나의 터미널에서는 데이터를 입력하고 다른 터미널에서는 topic 에 입력된 레코드 정보를 확인해보자.
kafka-console-consumer.sh --topic dbserver1.testdb.accounts --bootstrap-server localhost:9092 --from-beginning
serverName.databaseName.tableName
토픽에 아래와 같이 테이블의 변화를 수집하고 있다.
https://debezium.io/documentation/reference/1.7/connectors/mysql.html
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "account_id"
},
{
"type": "string",
"optional": true,
"field": "role_id"
},
{
"type": "string",
"optional": true,
"field": "user_name"
},
{
"type": "string",
"optional": true,
"field": "user_description"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.Timestamp",
"version": 1,
"default": 0,
"field": "update_date"
}
],
"optional": false,
"name": "dbserver1.testdb.accounts.Value"
},
"payload": {
"account_id": "123456",
"role_id": "111",
"user_name": "Susan Cooper",
"user_description": "God",
"update_date": 1629108672000
}
}
8. 마치며
Debezium connector는 CDC(hange Data Capture)에 일반적으로 사용된다. Debezium connector 을 통해 mysql 의 데이터 변경을 카프카 topic 에 저장할 수 있다. 각 테이블의 데이터 변경 내용은 serverName.databaseName.tableName
topic 에 저장된다. 토픽에 저장된 레코드를 활용하여 MySQL, S3, elasticSearch 등 외부시스템에서 활용할 수 있다.
참고
- https://debezium.io/documentation/reference/connectors/mysql.html
- https://developer.confluent.io/learn-kafka/kafka-connect/rest-api/
- https://aws.amazon.com/ko/blogs/korea/introducing-amazon-msk-connect-stream-data-to-and-from-your-apache-kafka-clusters-using-managed-connectors/
- https://ichi.pro/ko/debezium-mich-kafkaconnectleul-sayonghan-db-db-dong-gihwa-mysql-postgresql-235712713194852
- https://www.baeldung.com/kafka-connectors-guide
- https://mongsil-jeong.tistory.com/35
- https://sup2is.github.io/2020/06/08/kafka-connect-example.html
- https://www.confluent.io/blog/no-more-silos-how-to-integrate-your-databases-with-apache-kafka-and-cdc/