Kafka

Kafka 설치, pub/sub 및 Kafka connect

수수한개발자 2023. 2. 20.
728x90

Apache kafka

  • Apache Software Foundation의 Scalar 언어로 된 오픈 소스 메시지 브로커 프로젝트
    • - Open Source Message Broker Project
  • 링크드인(Linked-in)에서 개발, 2011년 오픈 소스화
    • - 2014 년 11월 링크드인에서 Kafka를 개발하던 엔지니어들이 Kafka개발에 집중하기 위해 Confluent라는 회사 창립
  • 실시간 데이터 피드를 관리하기 위해 통일된 높은 처리량, 높은 처리량, 낮은 지연 시간을 지닌 플랫폼 제공
  • Apple, Netflix, Yelp, Kakao, New York Times, Cisco, Paypal, Hyperledger Fabric, Uber, Salesfoce.com 등이 사용

 

End-to-End 연결 방식의 아키텍처의 단점

  • 데이터 연동의 복잡성 증가(HW, 운영체제, 장애 등)
  • 서로 다른 데이터 Pipeline 연결 구조
  • 확장이 어려운 구조

Apache kafka 장점

  • 모든 시스템으로 실시간으로 전송하여 처리할 수 있는 시스템
  • 데이터가 많아지더라도 확장이 용이한 시스템
  • Producer(메시지를 보내는 쪽)/Consumer(메시지를 받는 쪽) 분리
  • 메시지를 여러 Consumer에게 허용
  • 높은 처리량을 위한 메시지 최적화
  • Scale-out 가능
  • Eco-system

 

kafka Broker

  • 실행된 Kafka 애플리케이션 서버
  • 3대 이상의 Broker Cluster 구성
  • Zookeeper 연동- Controller 정보 저장
  • - 역할: 메타데이터(Broker ID, Controller ID 등) 저장
  • n개 Broker 중 1대는 Controller 기능 수행
    • 각 Broker에게 담당 파티션 할당 수행
    • Broker 정상 동작 모니터링 관리
  • - Controller 역할

3대 이상의 Broker Cluster를 구성하는 것을 권장합니다. 

 

위와 같은 상황에서 카프카 자체가 서버로서 구동을 할 때 Broker #0이 문제가 생겼을 때 Broker #1이 대신 메시지를 공유해 줌으로써 안전하게 메시지중개인을 사용할 수 있습니다. 이런 서버에 문제 생겼을 때 서버 리더, 장애 체크, 장애 복구 코드네이터라는 시스템을 연동해서 사용하는데 카프카에서는 이런 코디네이터가 Apache zookepper를 사용하는 것이 일반적입니다. 

 

 

Kafka 설치

 

https://kafka.apache.org/downloads

 

다운로드한 폴더에 가서 kafka_2.13-3.4.0.tgz 다운 및 압축 해제 (tar xvf kafka_2.13-3.4.0.tgz 명령어로 압축해제가 된다.)
config 폴더에는 zookeeper를 실행시킬 수 있는 zookeeper.properties 파일이 있고 Apache kafka를 실행시킬 수 있는 server.properties 가 있다.
bin 폴더에는 주키퍼를 실행, 종료할 수 있는 zookeeper-server-start.sh, zookeeper-server-stop.sh 파일이 있고 카프카를 실행, 종료시킬 수 있는 kafka-server-start.sh, kafka-server-stop.sh 가 있다.
카프카를 다운로드하면 윈도와 맥 같은 파일을 다운로드하는데 윈도는 bin/windows 폴더에 위와 같은. bat  파일들이 있다.

 

이번 글에서는 카프카는 pub/sub 기능 구현과, kafka에 메시지를 보냄에 있어서 자바 라이브러리를 통해 데이터를 받을 수 있는(kafka client)와 데이터베이스의 값들이 insert, update, delete 됐을 때(변경사항이( 생겼을 때)) 데이터베이스로부터 카프카가 변경된 데이터에 메시지를 가지고 그 값을 다른 쪽에 있는 데이터베이스, 서비스에 전달하는 기능(kafka connect)에 대해서 알아보겠습니다.

 

Kafka Client

Kafka와 데이터를 주고받기 위해 사용하는 Java Library

Kafka 서버 기동

Zookeeper 및 Kafka 서버 구동

여기서 $KAFKA_HOME 은 압축해제한 kafka 폴더를 뜻합니다.

  • $KAFKA_HOME/bin/zookeeper-server-start.sh
  • $KAFKA_HOME/config/zookeeper.properties
  • $KAFKA_HOME/bin/kafka-server-start.sh
  • $KAFKA_HOME/config/server.properties
  • ./bin/zookeeper-server-start.sh ./config/zookeeper.properties 주키퍼 서버 실행
  • ./bin/kafka-server-start.sh ./config/server.properties 카프카 서버 실행

Topic 생성

  • $KAFKA_HOME/bin/kafka-topics.sh --create --topic my_first_kafka --bootstrap-server localhost:9092 --partitions 1
  • ./bin/kafka-topics.sh --create --topic my_first_kafka --bootstrap-server localhost:9092 --partitions 1

./bin/kafka-topics.sh --create --topic my_first_kafka : my_first_kafka  라는 이름의 토픽을 생성합니다.

--bootstrap-server  localhost:9092 : 는 현재 단일 서버이기 때문에 주키퍼 서버가 아닌 카프카 서버에 토픽을 생성하겠다는 의미입니다. 

--partitions 1 : 이 명령어의 의미는 멀티 클러스터링을 구성했을때 카프카의 전달되는 메시지를 몇군대에 저장하겠다는 옵션입니다.

Topic 삭제 

  • ./bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic my_first_kafka

Topic 목록 확인

  • $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
  • ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

Topic 정보 확인

  • $KAFKA_HOME/bin/kafka-topics.sh --describe --topic my_first_kafka --bootstrap-server localhost:9092
  • ./bin/kafka-topics.sh --describe --topic my_first_kafka --bootstrap-server localhost:9092

 

Kafka Producer/Consumer 테스트

메세지 생산

  • $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_first_kafka
  • ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_first_kafka

위의 명령어를 친 뒤 메세지를 보낼수 있다.

 

메세지 소비

  • $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_first_kafka --from-beginning
  • ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_first_kafka --from-beginning

위의 명령어에서 --from-beginning 옵션은 말 그대로 시작한시점에서, 내가 참여하기전의 데이터까지 불러올 수 있다.

 

 

Kafka Connect

  • Kafka Connect를 통해 Data를 Import/Export 가능
  • 코드 없이 Configuration으로 데이터를 이동
  • Standalone mode, Distribution mode 지원
    • RESTful API 통해 지원
    • Stream 또는 Batch 형태로 데이터 전송 가능
    • 커스텀 Connector를 통한 다양한 Plugin 제공(File, S3, Hive, Mysql, etc...)

Kafka Connect 설치

- curl -O http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz

- curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz

이게 최신 버전

- tar xvf confluent-community-6.1.0.tar.gz       -> 압축 푸는 명령어

- cd $KAFKA_CONNECT_HOME        -> 작업할 디렉토리로 옮긴 뒤 디렉토리 이동

 

Kafka Connect 설정(기본으로 설정)

- ./config/connect-distributed.properties

 

Kafka Connect 실행

- ./bin/connect-distributed ./etc/kafka/connect-distributed.properties

 

 

JDBC Connector 설정

- https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html

  • Download and extract the ZIP file -> confluentinc-kafka-connect-jdbc-10.0.0.1zip 다운로드
  • confluentinc-kafka-connect-jdbc-10.0.1.zip

etc/kafka/connect-distributed.properties 파일 마지막에 아래 plugin 정보 추가

  • plugin.path=[confluentinc-kafka-connect-jdbc-10.0.1 폴더]

JdbcSourceConnector에서 Mysql 사용하기 위해 Mysql 드라이버 복사

  • ./share/java/kafka/ 폴더에 mysql-connector-java-8.0.27.jar 파일 복사

https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.27

 

 

kafka JDBC Connector는 REST API 지원한다.

Postman 으로 JDBC Connector를 등록한다.

 

POST 127.0.0.1:8083/connectors



{
    "name": "my-source-connect",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:mysql://localhost:3306/mydb",
        "connection.user": "root",
        "connection.password": "12345678",
        "mode": "incrementing",
        "incrementing.column.name": "id",
        "table.whitelist": "users",
        "topic.prefix": "my_topic_",
        "tasks.max": "1"
    }
}



{
    "name": "my-sink-connect",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://localhost:3306/mydb",
        "connection.user": "root",
        "connection.password": "12345678",
        "auto.create": "true",
        "auto.evolve": "true",
        "delete.enabled": "false",
        "tasks.max": "1",
        "topics": "my_topic_users"
    }
}

위와 같이 request를 날리면 밑의 요청으로 my_topic_users가 생깁니다.

my-source-connect 는 DB에 insert,update,delete 즉 데이터의 변경이 일어났을때 토픽으로 데이터베이스의 스키마 , 데이터 값들을 보내줍니다.

my-sink-connect는 위에서 topic에 보내진 스키마값을 DB에 직접날리는것이 아니라 카프카 토픽에 쏴주면 DB에 인서트가 됩니다. 

 

connetors 조회

GET 127.0.0.1:8083/connectors

 

connectors 상세 조회

GET 127.0.0.1:8083/connectors/{connetors-name}

 

connectors 삭제

DELETE http://localhost:8083/connectors/my-sink-connect

 

 

 

 

 

728x90

댓글