Kafka Sink Connector 생성해서 Postgres 연결하기
안녕하세요.
이번 글의 목표는
1. postgres 컨테이너 생성하기
2. Sink Connector 추가하기
3. 데이터 연동 확인하기 입니다.
1. postgres 컨테이너 생성하기
1.1 docker-compose.yml에 postgres 추가하기
docker-compose.yml 파일이 있는 곳으로 이동하여 docker-compose.yml을 수정합니다.
postgres:
image: postgres:14
container_name: postgres
ports:
- "5432:5432"
environment:
POSTGRES_ROOT_PASSWORD: admin
POSTGRES_USER: postuser
POSTGRES_PASSWORD: postps
이 부분을 추가해줍니다.
전체 docker-compose.yml은
version: '2'
services:
mysql:
image: mysql:8.0
container_name: mysql
ports:
- 3306:3306
environment:
MYSQL_ROOT_PASSWORD: admin
MYSQL_USER: mysqluser
MYSQL_PASSWORD: mysqlpw
command:
- --character-set-server=utf8mb4
- --collation-server=utf8mb4_unicode_ci
zookeeper:
container_name: zookeeper
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
container_name: kafka
image: wurstmeister/kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
mysql-sink:
image: mysql:8.0
container_name: mysql-sink
ports:
- 3307:3306
environment:
MYSQL_ROOT_PASSWORD: admin
MYSQL_USER: mysqluser
MYSQL_PASSWORD: mysqlpw
command:
- --character-set-server=utf8mb4
- --collation-server=utf8mb4_unicode_ci
postgres:
image: postgres:14
container_name: postgres
ports:
- "5432:5432"
environment:
POSTGRES_ROOT_PASSWORD: admin
POSTGRES_USER: postuser
POSTGRES_PASSWORD: postps
※docker-compose.yml은 띄어쓰기에 민감하니 복사 붙여넣기를 권장합니다.
1.2 docker-compose.yml 을 실행합니다.
실행 코드는
docker-compose -f docker-compose.yml up -d
만약 docker에 컨테이너가 제대로 설치 되지 않았다면?
# 재실행
docker restart [container]
docker restart postgres
#정지
docker stop [container]
docker stop postgres
#제거
docker rm [container]
docker rm postgres
#도움말보기
docker --help
위에 명령어를 이용해 컨테이너를 제거하고 다시 설치합니다.
설치가 제대로 되었다면 컨테이너 상태를 확인해봅니다.
docker ps
모든 컨테이너가 잘 실행되고 있다는 것을 확인할 수 있습니다!
1.3 postgres 설정하기
※postgres가 처음이라 헤맬 수 있습니다. 효율적인 명령어를 안다면 알려주세요!
postgres 내부로 이동합니다.
docker exec -it postgres /bin/bash
그 다음 postuser로 로그인 합니다.
psql -U postuser
sinkdb 데이터 베이스를 생성해줍니다.
CREATE DATABASE sinkdb;
postuser로 로그인해서 postuser=#로 바뀐 것을 확인할 수 있고,
db를 생성하여 CREATE DATABASE가 뜬 것을 확인할 수 있습니다.
#다음은 postuser에게 모든 권한을 부여합니다.
GRANT ALL PRIVILEGES ON DATABASE sinkdb to postuser;
#sinkdb로 접속
\c sinkdb
#스키마 생성
CREATE SCHEMA sinkdb AUTHORIZATION postuser;
#스키마 목록 조회
\dn
하나씩 입력하면 accounts의 스키마까지 생성하고 확인할 수 있습니다.
처음에 스키마를 accounts로 만들었지만 생각해보니 mysql처럼 스키마는 sinkdb로 하고 테이블을 accounts로 하는 것이 명확할 것 같아 다시 생성한 모습닙니다. 제가 삽질 했으니 여러분은 헷갈리지 마세요.
1.4 search_path등록하기
해당 세션 또는 터미널이 계속 조회를 할 수 있게 search_path를 등록합니다.
#search_path 등록
SET search_path to sinkdb,public;
#테이블 리스트 조회
\dt+
1.5 pg_hba.conf 파일 수정
pg_hba.conf파일을 수정합니다.
이 파일의 경로는 우선 \q로 postgres로 나와 줍니다.
root 상태에서 이동합니다.
#경로 이동
cd var/lib/postgresql/data
#파일 수정
vi pg_hba.conf
#vi가 없다면?
apt-get update
apt-get install vim
저는 method가 trust로 되어 있어서 md5로 수정해 주었습니다.
trust : 패스워드 없이 접근 가능
reject : 거부
md5 : md5암호화 전송
password : text로 전송
그리고
host all all 0.0.0.0/0 md5를 추가하여 kafka와 postgres가 다른 서버에 있어도 접속할 수 있게 해줍니다.
사실 같은 docker-compose라서 해줄 필요가 없지만 혹시 모르니까요.
:wq로 저장하고
이번엔 postgres.conf 를 아래와 같이 되어있는지 확인합니다. 저는 이미 저렇게 되어있어 :q로 나왔습니다.
이제 postgres를 재실행합니다.
exit로 나온뒤
docker restart postgres
2. Sink Connector 생성하기
2.1 커넥터 확인
카프카 내부에 커넥터가 잘 설치되어 있는지 확인합니다.
docker exec -it kafka /bon/bash
cd opt/kafka_2.13-2.8.1/connectors
저 많은 파일중 confluentinc-kafka-connect-jdbc-10.7.3과 debezium-connector-mysql이 필요합니다.
없다면 이전 포스팅을 통해 설치해주세요!
2023.06.19 - [IT] - Kafka로 CDC구현하기(1/3) - Kafka Source Connector 생성하기
2023.06.20 - [IT] - Kafka로 CDC구현하기(2/3) - Kafka Sink Connector 생성하기
Kafka로 CDC구현하기(2/3) - Kafka Sink Connector 생성해서 Mysql 연결하기
Kafka Sink Connector 생성하기 오늘은 Sink Connector를 생성해 보겠습니다. 저번시간에 Source Connector를 만들어서 Consumer를 확인해 봤죠? 2023.06.19 - [IT] - Kafka로 CDC구현하기(1/3) - Kafka Source Connector 생성하기
cmilk.tistory.com
2.2 분산모드 실행
connect-distributed.sh /opt/kafka/config/connect-distributed.properties
이미 실행중이라고 에러가 난다면 카프카 재실행 후 실행해주세요!
docker restart kafka
2.3 클러스터 정보 확인
curl http://localhost:8083
curl --location --request GET 'localhost:8083/connector-plugins' |jq
버전 상관없이 sink와 source가 하나씩 있어야합니다!
2.4 커넥터 생성
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "sink-postgres-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://postgres:5432/sinkdb?currentSchema=accounts",
"connection.user": "postuser",
"connection.password": "postps",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"tombstones.on.delete": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"table.name.format":"${topic}",
"topics.regex": "dbserver1.testdb.(.*)",
"offset.flush.interval.ms": "1000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap, route, TimestampConverter",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode":"rewrite" ,
"transforms.unwrap.delete.handling.mode": "none",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "update_date"
}
}'
2.6 컨슈머 확인
kafka-console-consumer.sh --topic dbserver1.testdb.accounts --bootstrap-server localhost:9092 --from-beginning
3.postgres 확인
#postgres접속
docker exec -it postgres /bin/bash
#postuser 사용
psql -U postuser
#sinkdb 접속
\c sinkdb
#데이터 확인
select * from accounts.accounts;
3.1 삭제 확인
삭제도 성공적으로 되는 것을 확인할 수 있습니다!
'IT' 카테고리의 다른 글
React + Spring 카카오 로그인 구현하기 - OAuth란, OAuth원리, 프론트 토큰 저장 방식 (0) | 2023.06.27 |
---|---|
Kafka로 CDC구현하기(2/3) - Kafka Sink Connector 생성해서 Mysql 연결하기 (0) | 2023.06.20 |
Kafka로 CDC구현하기(1/3) - Kafka Source Connector 생성하기 (0) | 2023.06.19 |