본문 바로가기
IT

Kafka로 CDC구현하기(1/3) - Kafka Source Connector 생성하기

by 쪼꼬맛우유임 2023. 6. 19.

Kafka Source Connector 생성하기

이번 글의 목표는 

1. Kafka 설치

2. Source Connector 생성하기

3. Consumer 생성하기

입니다!

 

환경

OCI centos

 

1.Kafka 설치하기

 

1.1 Docker를 이용하여 Kafka 및 필요프로그램을 설치해 줍니다.

git clone https://github.com/wurstmeister/kafka-docker.git

 

설치완료1

1.2 설치가 완료되었다면 kafka-docker라는 폴더가 생성되어 있습니다.

 

파일목록1

1.3 폴더를 확인해 보면 위와 같이 여러 파일이 있을 겁니다.

설치할 때 필요한 파일은 docker-compose.yml입니다. 기타 이상한 파일들을 차차 설치할 예정이니 사진과 다르다고 당황하지 마세요~!

 

1.4 vi docker-compose 명령어를 이용하여 yml파일을 수정해 줍니다.

version: '2'
services:
 mysql:
  image: mysql:8.0
  container_name: mysql
  ports:
    - 3306:3306
  environment:
   MYSQL_ROOT_PASSWORD: admin
   MYSQL_USER: mysqluser
   MYSQL_PASSWORD: mysqlpw
  command:
   - --character-set-server=utf8mb4
   - --collation-server=utf8mb4_unicode_ci

 zookeeper:
  container_name: zookeeper
  image: wurstmeister/zookeeper
  ports:
    - "2181:2181"

 kafka:
  container_name: kafka
  image: wurstmeister/kafka
  depends_on:
   - zookeeper
  ports:
    - "9092:9092"
  environment:
   KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
   KAFKA_ADVERTISED_PORT: 9092
   KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  volumes:
   - /var/run/docker.sock:/var/run/docker.sock

※ docker-compose파일은 띄어쓰기가 하나라도 틀리면 에러가 나니 신중하게 입력하시거나 편하게 복사, 붙여 넣기 하세요! space를 이용하여 띄어쓰기해주세요 tab은 안됩니다.

※ volumes 부분은 파일 위치를 지정해 주는 건데 없을 경우 임의로 설정됩니다. 이번 작업에서는 파일위치에 접근할 필요가 없으니 에러가 난다면 지우셔도 됩니다.

 

1.5 입력 후 esc 키를 누르고 :wq를 눌러 vi를 빠져나옵니다.

1.6 docker-compose를 실행합니다.

docker-compose -f docker-compose.yml up -d

 docker 컨테이너와 달리 docker-compose를 실행할 때는 docker-compose.yml이 있는 위치에서 해당 명령어를 입력하셔야 합니다!

 

1.7 코드가 뜨면서 설치가 끝났다면 다음 명령어 중 하나로 실행상태를 확인합니다.

docker-compose ps
docker ps
docker-compose -a
docker ps -a

 docker-compose ps는 docker-compose.yml에 작성된 컨테이너의 실행상태를 확인할 수 있고 docker ps는 docker로 실행 중인 컨테이너를 확인할 수 있습니다.

  -a는 현재 실행중인 아닌 컨테이너도 표시합니다.

※ 만약 exit으로 뜨면 docker restart <container name>으로 재실행하거나 docker-compose.yml을 잘 입력했는지 확인해 주세요.

 

2. Source Connector 생성하기

 

2.1 mysql 접속

 

위에 있는 yml파일대로 설치했다면 mysql이 설치됐습니다.  mysql에 접속하여 테이블을 하나 만들어줍니다.

 

mysql

docker exec -it mysql /bin/bash

docker컨테이너에 접속하는 명령어입니다. 앞으로 계속 사용할 명령어입니다. 굳이 노력하지 않아도 외워질 겁니다.

 

mysql -u root -p

mysql에 root로 접속한다는 명령어입니다. 이후 비밀번호를 입력하면 로그인이 됩니다.

mysql 로그인

비밀번호는 yml파일에 설정했습니다. 저는 admin으로 설정되어 있습니다.

 

2.2 mysql testdb 생성

 여기서부터는 sql명령어를 이용합니다.

create database testdb;
use testdb;
CREATE TABLE accounts (
account_id VARCHAR(255),
role_id VARCHAR(255),
user_name VARCHAR(255),
user_description VARCHAR(255),
update_date DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (account_id)
);

이렇게 db를 생성해 줍니다.

 

2.3 권한 설정

use mysql;

// mysqluser 가 추가 되어 있는지 확인
select host, user from user;

// mysqluser 없으면 생성
CREATE USER 'mysqluser'@'%' IDENTIFIED BY 'mysqlpw';

// mysqluser 에게 권한 부여
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';

FLUSH PRIVILEGES;

새로운 계정을 생성해 줍니다. 이 계정을 이용하여 커넥터를 생성하여 kafka를 사용합니다. 계정은 이미 yml을 이용하여 생성이 되어 있을 겁니다. 권한 역시 부여되어 있겠지만 저는 권한이 제대로 부여되지 않아 문제가 발생했기에  한 번 더 부여하는 것을 추천합니다.

 

권한을 부여한 후 exit를 이용하여 docker컨테이너 밖으로 나옵니다.

exit1

 

2.4 Debezium Connector설치

 kafka의 친구인 debezium을 설치합니다.

docker exec -it kafka /bin/bash

이번에는 카프카의 컨테이너에 접속합니다. 

 

이동1

다음 kafka_2.13-2.8.1로 이동합니다.

ls 명령어를 입력하면 connectors폴더가 없죠?

mkdir connectors

다음 명령어를 입력하여 connectors를 생성 후 폴더로 들어갑니다.

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.6.Final/debezium-connector-mysql-1.9.6.Final-plugin.tar.gz

다음 명령어를 이용해 debezium을 설치합니다.

 

설치가 끝나면 tar파일이 있을 텐데요.

tar -zxvf debezium-connector-mysql-1.9.6.Final-plugin.tar.gz

압축을 해제합니다.

jdbc는 스포...

2.5plugin 경로수정

cd  /opt/kafka/config

경로를 위와 같이 이동합니다.

vi connect-distributed.properties

경로 수정

밑에 부분에 plugin.path를 위와 같이 수정합니다. i를 누르면 수정할 수 있고 esc를 누르고 :wq를 눌러 저장할 수 있습니다.

 

2.6 이제 카프카를 분산모드로 실행합니다.

분산모드는 두 개 이상의 커넥터를 한 개의 클러스터로 묶어서 운영하는 기능입니다.

connect-distributed.sh /opt/kafka/config/connect-distributed.properties

코드가 뜨면서 실행됩니다. 

INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)

위에 코드가 뜨면 성공입니다.

에러가 나도 다시 명령어를 입력하면 되는 경우도 있습니다.

저도 두 번 시도하고 성공했습니다.

 

2.7 kafka connect클러스터 확인

## 커넥터 8083 port 가 열려있는지 확인
netstat -lnp
curl http://localhost:8083/

분산모드를 실행한 후 다른 창에서 위에 코드를 입력하여 포트를 열어줍니다.

curl --location --request GET 'localhost:8083/connector-plugins'

이렇게!

보기 불편하시다면

curl --location --request GET 'localhost:8083/connector-plugins | jq'

를 입력하면

이렇게 볼 수 있습니다.

mysql 커넥터를 확인합니다.

 

3.1 커넥터생성

curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "source-test-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "mysqluser",
"database.password": "mysqlpw",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.allowPublicKeyRetrieval": "true",
"database.include.list": "testdb",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.testdb",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode":"rewrite" ,
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"$1"
}
}'

다음은 위에 코드를 입력하여 커넥터를 생성합니다. 

"transforms.unwrap.drop.tombstones": "false",

"transforms.unwrap.delete.handling.mode":"rewrite" 

이 부분을 이용하면 후에 sink커넥터 생성 시 delete도 반영할 수 있습니다!

 

3.2 커넥터 확인

# 목록
curl --location --request GET 'http://localhost:8083/connectors'

# 상세정보
 curl --location --request GET 'http://localhost:8083/connectors/source-test-connector/config'

#삭제
curl --location --request DELETE 'http://localhost:8083/connectors/source-test-connector'

#토픽 목록확인
kafka-topics.sh --list --bootstrap-server localhost:9092

3.3 테스트 데이터 입력

mysql에 접속하여 생성한 testdb에 데이터를 입력합니다.

INSERT INTO accounts VALUES ("123456", "111", "Susan Cooper", "God", 
"2021-08-16 10:11:12");
INSERT INTO accounts VALUES ("123457", "111", "Rick Ford", "mistakes", 
"2021-08-16 11:12:13");
INSERT INTO accounts VALUES ("123458", "999", "Bradley Fine", "face", 
"2021-08-16 12:13:14");

3.4 컨슈머 확인

kafka-console-consumer.sh --topic dbserver1.testdb.accounts --bootstrap-server localhost:9092 --from-beginning

컨슈머가 데이터를 수집하고 있는 것을 확인할 수 있습니다!

 

이 글은 https://wecandev.tistory.com/109을 기반으로 작성되었습니다.

 

다음 글은 Sink Connector를 생성하여 CDC를 구현해 보겠습니다.