Apache kafka
- Apache Software Foundation의 Scalar 언어로 된 오픈 소스 메시지 브로커 프로젝트
- - Open Source Message Broker Project
- 링크드인(Linked-in)에서 개발, 2011년 오픈 소스화
- - 2014 년 11월 링크드인에서 Kafka를 개발하던 엔지니어들이 Kafka개발에 집중하기 위해 Confluent라는 회사 창립
- 실시간 데이터 피드를 관리하기 위해 통일된 높은 처리량, 높은 처리량, 낮은 지연 시간을 지닌 플랫폼 제공
- Apple, Netflix, Yelp, Kakao, New York Times, Cisco, Paypal, Hyperledger Fabric, Uber, Salesfoce.com 등이 사용
End-to-End 연결 방식의 아키텍처의 단점
- 데이터 연동의 복잡성 증가(HW, 운영체제, 장애 등)
- 서로 다른 데이터 Pipeline 연결 구조
- 확장이 어려운 구조
Apache kafka 장점
- 모든 시스템으로 실시간으로 전송하여 처리할 수 있는 시스템
- 데이터가 많아지더라도 확장이 용이한 시스템
- Producer(메시지를 보내는 쪽)/Consumer(메시지를 받는 쪽) 분리
- 메시지를 여러 Consumer에게 허용
- 높은 처리량을 위한 메시지 최적화
- Scale-out 가능
- Eco-system
kafka Broker
- 실행된 Kafka 애플리케이션 서버
- 3대 이상의 Broker Cluster 구성
- Zookeeper 연동- Controller 정보 저장
- - 역할: 메타데이터(Broker ID, Controller ID 등) 저장
- n개 Broker 중 1대는 Controller 기능 수행
- 각 Broker에게 담당 파티션 할당 수행
- Broker 정상 동작 모니터링 관리
- - Controller 역할
3대 이상의 Broker Cluster를 구성하는 것을 권장합니다.
위와 같은 상황에서 카프카 자체가 서버로서 구동을 할 때 Broker #0이 문제가 생겼을 때 Broker #1이 대신 메시지를 공유해 줌으로써 안전하게 메시지중개인을 사용할 수 있습니다. 이런 서버에 문제 생겼을 때 서버 리더, 장애 체크, 장애 복구 코드네이터라는 시스템을 연동해서 사용하는데 카프카에서는 이런 코디네이터가 Apache zookepper를 사용하는 것이 일반적입니다.
Kafka 설치
https://kafka.apache.org/downloads
다운로드한 폴더에 가서 kafka_2.13-3.4.0.tgz 다운 및 압축 해제 (tar xvf kafka_2.13-3.4.0.tgz 명령어로 압축해제가 된다.)
config 폴더에는 zookeeper를 실행시킬 수 있는 zookeeper.properties 파일이 있고 Apache kafka를 실행시킬 수 있는 server.properties 가 있다.
bin 폴더에는 주키퍼를 실행, 종료할 수 있는 zookeeper-server-start.sh, zookeeper-server-stop.sh 파일이 있고 카프카를 실행, 종료시킬 수 있는 kafka-server-start.sh, kafka-server-stop.sh 가 있다.
카프카를 다운로드하면 윈도와 맥 같은 파일을 다운로드하는데 윈도는 bin/windows 폴더에 위와 같은. bat 파일들이 있다.
이번 글에서는 카프카는 pub/sub 기능 구현과, kafka에 메시지를 보냄에 있어서 자바 라이브러리를 통해 데이터를 받을 수 있는(kafka client)와 데이터베이스의 값들이 insert, update, delete 됐을 때(변경사항이( 생겼을 때)) 데이터베이스로부터 카프카가 변경된 데이터에 메시지를 가지고 그 값을 다른 쪽에 있는 데이터베이스, 서비스에 전달하는 기능(kafka connect)에 대해서 알아보겠습니다.
Kafka Client
Kafka와 데이터를 주고받기 위해 사용하는 Java Library
- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
Producer, Consumer, Admin, Stream 등 Kafka 관련 API 제공 다양한 3rd party library 존재: C/C++, Node.js, Python,. NET 등 - https://cwiki.apache.org/confluence/display/KAFKA/Clients
Kafka 서버 기동
Zookeeper 및 Kafka 서버 구동
여기서 $KAFKA_HOME 은 압축해제한 kafka 폴더를 뜻합니다.
- $KAFKA_HOME/bin/zookeeper-server-start.sh
- $KAFKA_HOME/config/zookeeper.properties
- $KAFKA_HOME/bin/kafka-server-start.sh
- $KAFKA_HOME/config/server.properties
- ./bin/zookeeper-server-start.sh ./config/zookeeper.properties 주키퍼 서버 실행
- ./bin/kafka-server-start.sh ./config/server.properties 카프카 서버 실행
Topic 생성
- $KAFKA_HOME/bin/kafka-topics.sh --create --topic my_first_kafka --bootstrap-server localhost:9092 --partitions 1
- ./bin/kafka-topics.sh --create --topic my_first_kafka --bootstrap-server localhost:9092 --partitions 1
./bin/kafka-topics.sh --create --topic my_first_kafka : my_first_kafka 라는 이름의 토픽을 생성합니다.
--bootstrap-server localhost:9092 : 는 현재 단일 서버이기 때문에 주키퍼 서버가 아닌 카프카 서버에 토픽을 생성하겠다는 의미입니다.
--partitions 1 : 이 명령어의 의미는 멀티 클러스터링을 구성했을때 카프카의 전달되는 메시지를 몇군대에 저장하겠다는 옵션입니다.
Topic 삭제
- ./bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic my_first_kafka
Topic 목록 확인
- $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
Topic 정보 확인
- $KAFKA_HOME/bin/kafka-topics.sh --describe --topic my_first_kafka --bootstrap-server localhost:9092
- ./bin/kafka-topics.sh --describe --topic my_first_kafka --bootstrap-server localhost:9092
Kafka Producer/Consumer 테스트
메세지 생산
- $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_first_kafka
- ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_first_kafka
위의 명령어를 친 뒤 메세지를 보낼수 있다.
메세지 소비
- $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_first_kafka --from-beginning
- ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_first_kafka --from-beginning
위의 명령어에서 --from-beginning 옵션은 말 그대로 시작한시점에서, 내가 참여하기전의 데이터까지 불러올 수 있다.
Kafka Connect
- Kafka Connect를 통해 Data를 Import/Export 가능
- 코드 없이 Configuration으로 데이터를 이동
- Standalone mode, Distribution mode 지원
- RESTful API 통해 지원
- Stream 또는 Batch 형태로 데이터 전송 가능
- 커스텀 Connector를 통한 다양한 Plugin 제공(File, S3, Hive, Mysql, etc...)
Kafka Connect 설치
- curl -O http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz
- curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz
이게 최신 버전
- tar xvf confluent-community-6.1.0.tar.gz -> 압축 푸는 명령어
- cd $KAFKA_CONNECT_HOME -> 작업할 디렉토리로 옮긴 뒤 디렉토리 이동
Kafka Connect 설정(기본으로 설정)
- ./config/connect-distributed.properties
Kafka Connect 실행
- ./bin/connect-distributed ./etc/kafka/connect-distributed.properties
JDBC Connector 설정
- https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html
- Download and extract the ZIP file -> confluentinc-kafka-connect-jdbc-10.0.0.1zip 다운로드
- confluentinc-kafka-connect-jdbc-10.0.1.zip
etc/kafka/connect-distributed.properties 파일 마지막에 아래 plugin 정보 추가
- plugin.path=[confluentinc-kafka-connect-jdbc-10.0.1 폴더]
JdbcSourceConnector에서 Mysql 사용하기 위해 Mysql 드라이버 복사
- ./share/java/kafka/ 폴더에 mysql-connector-java-8.0.27.jar 파일 복사
https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.27
kafka JDBC Connector는 REST API 지원한다.
Postman 으로 JDBC Connector를 등록한다.
POST 127.0.0.1:8083/connectors
{
"name": "my-source-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "root",
"connection.password": "12345678",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "users",
"topic.prefix": "my_topic_",
"tasks.max": "1"
}
}
{
"name": "my-sink-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "root",
"connection.password": "12345678",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "false",
"tasks.max": "1",
"topics": "my_topic_users"
}
}
위와 같이 request를 날리면 밑의 요청으로 my_topic_users가 생깁니다.
my-source-connect 는 DB에 insert,update,delete 즉 데이터의 변경이 일어났을때 토픽으로 데이터베이스의 스키마 , 데이터 값들을 보내줍니다.
my-sink-connect는 위에서 topic에 보내진 스키마값을 DB에 직접날리는것이 아니라 카프카 토픽에 쏴주면 DB에 인서트가 됩니다.
connetors 조회
GET 127.0.0.1:8083/connectors
connectors 상세 조회
GET 127.0.0.1:8083/connectors/{connetors-name}
connectors 삭제
DELETE http://localhost:8083/connectors/my-sink-connect
댓글