Kafka Source Connector 생성하기
이번 글의 목표는
1. Kafka 설치
2. Source Connector 생성하기
3. Consumer 생성하기
입니다!
환경
OCI centos
1.Kafka 설치하기
1.1 Docker를 이용하여 Kafka 및 필요프로그램을 설치해 줍니다.
git clone https://github.com/wurstmeister/kafka-docker.git
1.2 설치가 완료되었다면 kafka-docker라는 폴더가 생성되어 있습니다.
1.3 폴더를 확인해 보면 위와 같이 여러 파일이 있을 겁니다.
설치할 때 필요한 파일은 docker-compose.yml입니다. 기타 이상한 파일들을 차차 설치할 예정이니 사진과 다르다고 당황하지 마세요~!
1.4 vi docker-compose 명령어를 이용하여 yml파일을 수정해 줍니다.
version: '2'
services:
mysql:
image: mysql:8.0
container_name: mysql
ports:
- 3306:3306
environment:
MYSQL_ROOT_PASSWORD: admin
MYSQL_USER: mysqluser
MYSQL_PASSWORD: mysqlpw
command:
- --character-set-server=utf8mb4
- --collation-server=utf8mb4_unicode_ci
zookeeper:
container_name: zookeeper
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
container_name: kafka
image: wurstmeister/kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
※ docker-compose파일은 띄어쓰기가 하나라도 틀리면 에러가 나니 신중하게 입력하시거나 편하게 복사, 붙여 넣기 하세요! space를 이용하여 띄어쓰기해주세요 tab은 안됩니다.
※ volumes 부분은 파일 위치를 지정해 주는 건데 없을 경우 임의로 설정됩니다. 이번 작업에서는 파일위치에 접근할 필요가 없으니 에러가 난다면 지우셔도 됩니다.
1.5 입력 후 esc 키를 누르고 :wq를 눌러 vi를 빠져나옵니다.
1.6 docker-compose를 실행합니다.
docker-compose -f docker-compose.yml up -d
docker 컨테이너와 달리 docker-compose를 실행할 때는 docker-compose.yml이 있는 위치에서 해당 명령어를 입력하셔야 합니다!
1.7 코드가 뜨면서 설치가 끝났다면 다음 명령어 중 하나로 실행상태를 확인합니다.
docker-compose ps
docker ps
docker-compose -a
docker ps -a
docker-compose ps는 docker-compose.yml에 작성된 컨테이너의 실행상태를 확인할 수 있고 docker ps는 docker로 실행 중인 컨테이너를 확인할 수 있습니다.
-a는 현재 실행중인 아닌 컨테이너도 표시합니다.
※ 만약 exit으로 뜨면 docker restart <container name>으로 재실행하거나 docker-compose.yml을 잘 입력했는지 확인해 주세요.
2. Source Connector 생성하기
2.1 mysql 접속
위에 있는 yml파일대로 설치했다면 mysql이 설치됐습니다. mysql에 접속하여 테이블을 하나 만들어줍니다.
docker exec -it mysql /bin/bash
docker컨테이너에 접속하는 명령어입니다. 앞으로 계속 사용할 명령어입니다. 굳이 노력하지 않아도 외워질 겁니다.
mysql -u root -p
mysql에 root로 접속한다는 명령어입니다. 이후 비밀번호를 입력하면 로그인이 됩니다.
비밀번호는 yml파일에 설정했습니다. 저는 admin으로 설정되어 있습니다.
2.2 mysql testdb 생성
여기서부터는 sql명령어를 이용합니다.
create database testdb;
use testdb;
CREATE TABLE accounts (
account_id VARCHAR(255),
role_id VARCHAR(255),
user_name VARCHAR(255),
user_description VARCHAR(255),
update_date DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (account_id)
);
이렇게 db를 생성해 줍니다.
2.3 권한 설정
use mysql;
// mysqluser 가 추가 되어 있는지 확인
select host, user from user;
// mysqluser 없으면 생성
CREATE USER 'mysqluser'@'%' IDENTIFIED BY 'mysqlpw';
// mysqluser 에게 권한 부여
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
FLUSH PRIVILEGES;
새로운 계정을 생성해 줍니다. 이 계정을 이용하여 커넥터를 생성하여 kafka를 사용합니다. 계정은 이미 yml을 이용하여 생성이 되어 있을 겁니다. 권한 역시 부여되어 있겠지만 저는 권한이 제대로 부여되지 않아 문제가 발생했기에 한 번 더 부여하는 것을 추천합니다.
권한을 부여한 후 exit를 이용하여 docker컨테이너 밖으로 나옵니다.
2.4 Debezium Connector설치
kafka의 친구인 debezium을 설치합니다.
docker exec -it kafka /bin/bash
이번에는 카프카의 컨테이너에 접속합니다.
다음 kafka_2.13-2.8.1로 이동합니다.
ls 명령어를 입력하면 connectors폴더가 없죠?
mkdir connectors
다음 명령어를 입력하여 connectors를 생성 후 폴더로 들어갑니다.
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.6.Final/debezium-connector-mysql-1.9.6.Final-plugin.tar.gz
다음 명령어를 이용해 debezium을 설치합니다.
설치가 끝나면 tar파일이 있을 텐데요.
tar -zxvf debezium-connector-mysql-1.9.6.Final-plugin.tar.gz
압축을 해제합니다.
2.5plugin 경로수정
cd /opt/kafka/config
경로를 위와 같이 이동합니다.
vi connect-distributed.properties
밑에 부분에 plugin.path를 위와 같이 수정합니다. i를 누르면 수정할 수 있고 esc를 누르고 :wq를 눌러 저장할 수 있습니다.
2.6 이제 카프카를 분산모드로 실행합니다.
분산모드는 두 개 이상의 커넥터를 한 개의 클러스터로 묶어서 운영하는 기능입니다.
connect-distributed.sh /opt/kafka/config/connect-distributed.properties
코드가 뜨면서 실행됩니다.
INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)
위에 코드가 뜨면 성공입니다.
에러가 나도 다시 명령어를 입력하면 되는 경우도 있습니다.
저도 두 번 시도하고 성공했습니다.
2.7 kafka connect클러스터 확인
## 커넥터 8083 port 가 열려있는지 확인
netstat -lnp
curl http://localhost:8083/
분산모드를 실행한 후 다른 창에서 위에 코드를 입력하여 포트를 열어줍니다.
curl --location --request GET 'localhost:8083/connector-plugins'
보기 불편하시다면
curl --location --request GET 'localhost:8083/connector-plugins | jq'
를 입력하면
이렇게 볼 수 있습니다.
mysql 커넥터를 확인합니다.
3.1 커넥터생성
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "source-test-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "mysqluser",
"database.password": "mysqlpw",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.allowPublicKeyRetrieval": "true",
"database.include.list": "testdb",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.testdb",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode":"rewrite" ,
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"$1"
}
}'
다음은 위에 코드를 입력하여 커넥터를 생성합니다.
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode":"rewrite"
이 부분을 이용하면 후에 sink커넥터 생성 시 delete도 반영할 수 있습니다!
3.2 커넥터 확인
# 목록
curl --location --request GET 'http://localhost:8083/connectors'
# 상세정보
curl --location --request GET 'http://localhost:8083/connectors/source-test-connector/config'
#삭제
curl --location --request DELETE 'http://localhost:8083/connectors/source-test-connector'
#토픽 목록확인
kafka-topics.sh --list --bootstrap-server localhost:9092
3.3 테스트 데이터 입력
mysql에 접속하여 생성한 testdb에 데이터를 입력합니다.
INSERT INTO accounts VALUES ("123456", "111", "Susan Cooper", "God",
"2021-08-16 10:11:12");
INSERT INTO accounts VALUES ("123457", "111", "Rick Ford", "mistakes",
"2021-08-16 11:12:13");
INSERT INTO accounts VALUES ("123458", "999", "Bradley Fine", "face",
"2021-08-16 12:13:14");
3.4 컨슈머 확인
kafka-console-consumer.sh --topic dbserver1.testdb.accounts --bootstrap-server localhost:9092 --from-beginning
컨슈머가 데이터를 수집하고 있는 것을 확인할 수 있습니다!
이 글은 https://wecandev.tistory.com/109을 기반으로 작성되었습니다.
다음 글은 Sink Connector를 생성하여 CDC를 구현해 보겠습니다.
'IT' 카테고리의 다른 글
React + Spring 카카오 로그인 구현하기 - OAuth란, OAuth원리, 프론트 토큰 저장 방식 (0) | 2023.06.27 |
---|---|
Kafka로 CDC구현하기(3/3) - Kafka Sink Connector 생성해서 Postgres 연결하기 (0) | 2023.06.26 |
Kafka로 CDC구현하기(2/3) - Kafka Sink Connector 생성해서 Mysql 연결하기 (0) | 2023.06.20 |