Kafka 에서 Mysql 로 Sink Connector 구축하기
이미지출처 : confluent
1. 시작하며
Kafka connect로 Kafka Topic 저장된 레코드를 Mysql로 데이터 마이그레이션 하고자한다. 이 과정은 Confluent 사의 JDBC Sink Connector 통해 구축한다.
- Source Connector: MySQL -> kafka Connect(Source Connector, Debezium) -> Kafka
- Sink Connector : Kafka -> kafka Connect(JDBC Sink Connector) -> Mysql
2. docker-compose.yml 작성
Source Connector 를 만들었던 예제에서 sink용 Mysql 하나 더 추가하였다.
version: '3'
services:
mysql:
image: mysql:8.0
container_name: mysql
ports:
- 3306:3306
environment:
MYSQL_ROOT_PASSWORD: admin
MYSQL_USER: mysqluser
MYSQL_PASSWORD: mysqlpw
command:
- --character-set-server=utf8mb4
- --collation-server=utf8mb4_unicode_ci
volumes:
- D:/mysql/data:/var/lib/mysql
zookeeper:
container_name: zookeeper
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
container_name: kafka
image: wurstmeister/kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
mysql-sink:
image: mysql:8.0
container_name: mysql-sink
ports:
- 3307:3306
environment:
MYSQL_ROOT_PASSWORD: admin
MYSQL_USER: mysqluser
MYSQL_PASSWORD: mysqlpw
command:
- --character-set-server=utf8mb4
- --collation-server=utf8mb4_unicode_ci
volumes:
- D:/mysql-sink/data:/var/lib/mysql
3. DB 설정
Kafka Connector를 통한 데이터베이스, 테이블 자동 생성 설정은 사용하지 않을 것이므로 데이터베이스와 테이블을 직접 생성한다.
데이터베이스 및 테스트용 테이블 생성
mysql -u root -p
create database sinkdb;
use sinkdb;
CREATE TABLE accounts (
account_id VARCHAR(255),
role_id VARCHAR(255),
user_name VARCHAR(255),
user_description VARCHAR(255),
update_date DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (account_id)
);
mysql 사용자 추가 및 권한 확인
mysqluser 유저에게 모든 데이터베이스, 테이블 권한 부여하자
use mysql;
// mysqluser 가 추가 되어 있는지 확인
select host, user from user;
// mysqluser 없으면 생성
CREATE USER 'mysqluser'@'%' IDENTIFIED BY 'mysqlpw';
// mysqluser 에게 권한 부여
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
FLUSH PRIVILEGES;
4. Kafka JDBC Connector (Source and Sink) 설치
JDBC Connector 설치
Confluent 사이트에서 confluentinc-kafka-connect-jdbc-10.2.5.zip
을 다운로드 받는다.
해당 파일을 kafka 컨테이너로 업로드한다.
docker cp confluentinc-kafka-connect-jdbc-10.2.5.zip kafka:/opt/kafka_2.13-2.7.1/connectors/
파일 압축을 푼다.
cd /opt/kafka_2.13-2.7.1/connectors
unzip confluentinc-kafka-connect-jdbc-10.2.5.zip
plugin 경로 확인
source connector를 설치할 때 이미 /opt/kafka/config/connect-distributed.properties
파일의 plugin 경로를 수정해두었다.
plugin.path=/opt/kafka_2.13-2.7.1/connectors
5. Kafka connect 실행
Distributed Mode로 kafka connect 실행
connect-distributed.sh /opt/kafka/config/connect-distributed.properties
재시작 후 플러그인이 설치된 것을 확인한다
6. Sink Connector 생성하기
worker, version, commit 및 Kafka 클러스터 ID에 대한 kafka Connect 클러스터 정보를 확인해보자.
curl http://localhost:8083/
{"version":"2.7.1","commit":"61dbce85d0d41457","kafka_cluster_id":"bMLmHpkkQNCwbEDd7ZDUFw"}
MySQL 커넥터 플러그인 확인
connector를 생성하기 앞서 설치된 플러그인 목록을 조회한다.
curl --location --request GET 'localhost:8083/connector-plugins'
io.confluent.connect.jdbc.JdbcSinkConnector
, io.confluent.connect.jdbc.JdbcSourceConnector
가 있어야 한다.
[
{
"class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"type": "sink",
"version": "10.2.5"
},
{
"class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"type": "source",
"version": "10.2.5"
}
]
Rest API 로 connector 생성
rest api 를 호출하여 connector 를 생성한다.
No suitable driver found 에러가 발생하면 다음 링크를 확인한다.
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "sink-test-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://mysql-sink:3306/sinkdb?user=mysqluser&password=mysqlpw",
"auto.create": "false",
"auto.evolve": "false",
"delete.enabled": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"table.name.format":"${topic}",
"tombstones.on.delete": "true",
"connection.user": "mysqluser",
"connection.password": "mysqlpw",
"topics.regex": "dbserver1.testdb.(.*)",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap, route, TimestampConverter",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "update_date"
}
}'
상세 configuration 확인
https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/sink_config_options.html
auto.create
: 자동 테이블 생성 (절대 하면 안됨)auto.evolve
: 자동 컬럼 생성delete.enabled
: null record를 삭제로 처리,pk.mode
가record_key
여야 한다.insert.mode
: upsert, insert, updatetopics.regex
:topics
대신 정규식으로 topic 받아올 수 있음
토픽 이름 RegrexRouter
https://docs.confluent.io/platform/current/connect/transforms/regexrouter.html
https://debezium.io/blog/2017/09/25/streaming-to-another-database/
Before : logical-name
.database-name
.table-name
After : table-name
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
TimestampConverter
아래와 같은 이슈가 발생하여 TimestampConverter 를 추가 했다.
https://docs.confluent.io/platform/current/connect/transforms/timestampconverter.html
java.sql.BatchUpdateException: Data truncation: Incorrect datetime value: '1629112333000' for column 'update_date' at row 1
com.mysql.cj.jdbc.exceptions.MysqlDataTruncation: Data truncation: Incorrect datetime value: '1629112333000' for column 'update_date' at row 1
Before: 1629108672000
After : 2021-08-16 10:11:12
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "update_date"
기타 Kafka Connect REST API
# 목록
curl --location --request GET 'http://localhost:8083/connectors'
# 상세정보
curl --location --request GET 'http://localhost:8083/connectors/sink-test-connector/config ' \
--header 'Content-Type: application/json'
# 삭제
curl --location --request DELETE 'http://localhost:8083/connectors/sink-test-connector'
GET /connectors – returns a list with all connectors in use
GET /connectors/{name} – returns details about a specific connector
POST /connectors – creates a new connector; the request body should be a JSON object containing a string name field and an object config field with the connector configuration parameters
GET /connectors/{name}/status – returns the current status of the connector – including if it is running, failed or paused – which worker it is assigned to, error information if it has failed, and the state of all its tasks
DELETE /connectors/{name} – deletes a connector, gracefully stopping all tasks and deleting its configuration
GET /connector-plugins – returns a list of connector plugins installed in the Kafka Connect cluster
기타 명령어는 공식 사이트에서 보자
https://docs.confluent.io/platform/current/connect/references/restapi.html
https://developer.confluent.io/learn-kafka/kafka-connect/rest-api/
7. Mysql Sink 확인
테스트 데이터 입력
INSERT INTO accounts VALUES ("123456", "111", "Susan Cooper", "God", "2021-08-16 10:11:12");
INSERT INTO accounts VALUES ("123457", "111", "Rick Ford", "mistakes", "2021-08-16 11:12:13");
INSERT INTO accounts VALUES ("123458", "999", "Bradley Fine", "face", "2021-08-16 12:13:14");
콘솔 컨슈머 확인
하나의 터미널에서는 데이터를 입력하고 다른 터미널에서는 topic 에 입력된 레코드 정보를 확인해보자. 이전 포스트
에서 작성한 source connector 가 제대로 동작 한다면 topic 에 레코드가 저장된다.
kafka-console-consumer.sh --topic dbserver1.testdb.accounts --bootstrap-server localhost:9092 --from-beginning
Sink 용 Target DB 확인
source DB 인 mysql 에 데이커를 insert, update 하면 데이터의 변경이 sink db인 mysql-db 에 반영된다.
8. 마치며
JDBC Connector 로 Kafka에서 Mysql 로 데이터를 마이그레이션하는 Kafka connect sink connector를 구축하였다. Sink connector 는 kafka topic 에 저장된 레코드를 다양한 외부 시스템(Mysql, ES 등)으로 전달해준다.
Source connector, Sink connector 를 구축하여 데이터 CUD 를 진행하였다. 현재 Source DB에서 데이터 입력, 수정했을 때 kafka connect 가 제대로 동작하여 sink DB 에 변경사항이 반영되는것을 확인하였다. 그러나 Delete event 의 경우 동작하지 않았다. 좀 더 공부해서 connector 설정을 수정해봐야겠다.
이전글 : MySQL 에서 Kafka 로 Source Connector 구축하기
참고
- https://www.baeldung.com/kafka-connectors-guide
- https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/index.html
- https://developer.confluent.io/learn-kafka/kafka-connect/intro/
- https://blog.naver.com/qjawnswkd/222334228900
- https://sup2is.github.io/2020/06/08/kafka-connect-example.html
- https://dzone.com/articles/data-ingestion-from-rdbms-by-leveraging-confluents