본문 바로가기
IT

Kafka로 CDC구현하기(3/3) - Kafka Sink Connector 생성해서 Postgres 연결하기

by 쪼꼬맛우유임 2023. 6. 26.

Kafka Sink Connector 생성해서 Postgres 연결하기

 

 

안녕하세요.

 

이번 글의 목표는

1. postgres 컨테이너 생성하기

2. Sink Connector 추가하기

3. 데이터 연동 확인하기 입니다.

 

1. postgres 컨테이너 생성하기

 

1.1 docker-compose.yml에 postgres 추가하기

docker-compose.yml 파일이 있는 곳으로 이동하여 docker-compose.yml을 수정합니다.

 postgres:
  image: postgres:14
  container_name: postgres
  ports:
    - "5432:5432"
  environment:
   POSTGRES_ROOT_PASSWORD: admin
   POSTGRES_USER: postuser
   POSTGRES_PASSWORD: postps

이 부분을 추가해줍니다.

 

전체 docker-compose.yml은

version: '2'
services:
 mysql:
  image: mysql:8.0
  container_name: mysql
  ports:
    - 3306:3306
  environment:
   MYSQL_ROOT_PASSWORD: admin
   MYSQL_USER: mysqluser
   MYSQL_PASSWORD: mysqlpw
  command:
   - --character-set-server=utf8mb4
   - --collation-server=utf8mb4_unicode_ci

 zookeeper:
  container_name: zookeeper
  image: wurstmeister/zookeeper
  ports:
    - "2181:2181"

 kafka:
  container_name: kafka
  image: wurstmeister/kafka
  depends_on:
   - zookeeper
  ports:
    - "9092:9092"
  environment:
   KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
   KAFKA_ADVERTISED_PORT: 9092
   KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  volumes:
   - /var/run/docker.sock:/var/run/docker.sock

 mysql-sink:
  image: mysql:8.0
  container_name: mysql-sink
  ports:
   - 3307:3306
  environment:
   MYSQL_ROOT_PASSWORD: admin
   MYSQL_USER: mysqluser
   MYSQL_PASSWORD: mysqlpw
  command:
   - --character-set-server=utf8mb4
   - --collation-server=utf8mb4_unicode_ci

 postgres:
  image: postgres:14
  container_name: postgres
  ports:
    - "5432:5432"
  environment:
   POSTGRES_ROOT_PASSWORD: admin
   POSTGRES_USER: postuser
   POSTGRES_PASSWORD: postps

※docker-compose.yml은 띄어쓰기에 민감하니 복사 붙여넣기를 권장합니다.

 

1.2 docker-compose.yml 을 실행합니다.

 

실행 코드는

docker-compose -f docker-compose.yml up -d

docker-compose 실행

 

만약 docker에 컨테이너가 제대로 설치 되지 않았다면?

 

# 재실행
docker restart [container]
docker restart postgres

#정지
docker stop [container]
docker stop postgres

#제거 
docker rm [container]
docker rm postgres

#도움말보기
docker --help

위에 명령어를 이용해 컨테이너를 제거하고 다시 설치합니다. 

 

설치가 제대로 되었다면 컨테이너 상태를 확인해봅니다.

docker ps

 

 

docker ps

모든 컨테이너가 잘 실행되고 있다는 것을 확인할 수 있습니다!

 

1.3 postgres 설정하기

※postgres가 처음이라 헤맬 수 있습니다. 효율적인 명령어를 안다면 알려주세요!

 

postgres 내부로 이동합니다.

docker exec -it postgres /bin/bash

 

그 다음 postuser로 로그인 합니다.

 

psql -U postuser

 

로그인

 

sinkdb 데이터 베이스를 생성해줍니다.

CREATE DATABASE sinkdb;

db생성

postuser로 로그인해서 postuser=#로 바뀐 것을 확인할 수 있고,

db를 생성하여 CREATE DATABASE가 뜬 것을 확인할 수 있습니다.

 

 

#다음은 postuser에게 모든 권한을 부여합니다. 
GRANT ALL PRIVILEGES ON DATABASE sinkdb to postuser;

#sinkdb로 접속
\c sinkdb

#스키마 생성
CREATE SCHEMA sinkdb AUTHORIZATION postuser;

#스키마 목록 조회
\dn

하나씩 입력하면 accounts의 스키마까지 생성하고 확인할 수 있습니다.

 

cheer up
cheerup 2

처음에 스키마를 accounts로 만들었지만 생각해보니 mysql처럼 스키마는 sinkdb로 하고 테이블을 accounts로 하는 것이 명확할 것 같아 다시 생성한 모습닙니다. 제가 삽질 했으니 여러분은 헷갈리지 마세요.

 

1.4 search_path등록하기

해당 세션 또는 터미널이 계속 조회를 할 수 있게 search_path를 등록합니다.

#search_path 등록
SET search_path to sinkdb,public;

#테이블 리스트 조회
\dt+

search_path

1.5 pg_hba.conf 파일 수정

pg_hba.conf파일을 수정합니다.

이 파일의 경로는 우선 \q로 postgres로 나와 줍니다. 

path

root 상태에서 이동합니다.

#경로 이동
cd var/lib/postgresql/data

#파일 수정
vi pg_hba.conf

#vi가 없다면?
apt-get update
apt-get install vim

like this

저는 method가 trust로 되어 있어서 md5로 수정해 주었습니다.

trust : 패스워드 없이 접근 가능

reject : 거부

md5 : md5암호화 전송

password : text로 전송

 

그리고 

host all all 0.0.0.0/0 md5를 추가하여 kafka와 postgres가 다른 서버에 있어도 접속할 수 있게 해줍니다.

사실 같은 docker-compose라서 해줄 필요가 없지만 혹시 모르니까요.

 

:wq로 저장하고

이번엔 postgres.conf 를 아래와 같이 되어있는지 확인합니다. 저는 이미 저렇게 되어있어 :q로 나왔습니다.

 

like this

이제 postgres를 재실행합니다.

exit로 나온뒤

docker restart postgres

 

2. Sink Connector 생성하기

 

2.1 커넥터 확인

카프카 내부에 커넥터가 잘 설치되어 있는지 확인합니다. 

docker exec -it kafka /bon/bash
cd opt/kafka_2.13-2.8.1/connectors

what is that?

저 많은 파일중 confluentinc-kafka-connect-jdbc-10.7.3과 debezium-connector-mysql이 필요합니다.

없다면 이전 포스팅을 통해 설치해주세요!

2.2 분산모드 실행

connect-distributed.sh /opt/kafka/config/connect-distributed.properties

 

이미 실행중이라고 에러가 난다면 카프카 재실행 후 실행해주세요!

docker restart kafka

2.3 클러스터 정보 확인

curl http://localhost:8083

curl --location --request GET 'localhost:8083/connector-plugins' |jq

check

버전 상관없이 sink와 source가 하나씩 있어야합니다!

 

2.4 커넥터 생성

curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
  "name": "sink-postgres-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:postgresql://postgres:5432/sinkdb?currentSchema=accounts",
    "connection.user": "postuser",
    "connection.password": "postps",
    "auto.create": "true",
    "auto.evolve": "true",
    "delete.enabled": "true",
    "tombstones.on.delete": "true",
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "table.name.format":"${topic}",
    "topics.regex": "dbserver1.testdb.(.*)",
    "offset.flush.interval.ms": "1000",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "transforms": "unwrap, route, TimestampConverter",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode":"rewrite" ,
    "transforms.unwrap.delete.handling.mode": "none",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3",
    "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
    "transforms.TimestampConverter.target.type": "Timestamp",
    "transforms.TimestampConverter.field": "update_date"
  }
}'

2.6 컨슈머 확인

kafka-console-consumer.sh --topic dbserver1.testdb.accounts --bootstrap-server localhost:9092 --from-beginning

3.postgres 확인

#postgres접속
docker exec -it postgres /bin/bash

#postuser 사용
psql -U postuser

#sinkdb 접속
\c sinkdb

#데이터 확인
select * from accounts.accounts;

3.1 삭제 확인

삭제도 성공적으로 되는 것을 확인할 수 있습니다!

mysql
컨슈머
postgres