Kafka를 설치하고 실행까지 완료했다면, 실행된 것을 확인하기 위한 예제를 진행할 수 있다.
※ Kafka 구조
Kafka는 대규모 임시 저장소 역할을 수행해준다.
Producer → Topic → Consumer
- Topic : Kafka의 메세지들을 저정하는 창고 역할, 이미 생성된 데이터를 생성하면 에러가 나니 주의
- Producer : 생성된 창고(topic)에 데이터를 넣어주는 일꾼 역할
- Consumer : 창고(topic)에서 데이터를 꺼내는 일꾼 역할
일꾼들을 고용하기 전에 창고가 생성되어 있어야 고용이 가능하듯이, Topic이라는 창고를 첫번째로 생성해준다.
1) Topic 생성하기
# 압축이 풀린 폴더안의 bin 폴더에서 진행
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
# 쉽게 풀어보기
./kafka-topics.sh --create --bootstrap-창고를 만들 컴퓨터 도메인:창고를 만들 port --창고의 메세지를 복제할 갯수 --메세지를 나눠서 저장할 창고 생성 갯수 --topic 메세지를 저장할 창고 이름
Topic을 생성하는 명령어를 살펴보면
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
kafka-topic.sh : Kafka의 창고(topic) 파일 실행
localhost:9092 : Kafak의 창고를 만들 컴퓨터 도메인:창고를 만들 port 를 지정, Kafka의 기본 port는 9092로 config → vi server.properties에서 확인이 가능.
—replication-factor 1 : 창고(topic)의 복사할 메세지의 복제 개수 1개를 생성한다는 의미
—partitions 1 : 다른 창고와 메세지를 나눠서 저장할 수 있는 창고를 1개 생성한다는 의미(메모리가 부족한 경우에 1개씩 생성)
test : 메세지를 저장할 창고의 이름

명령어를 입력한 후,
Created topic 창고 이름
메세지가 출력되는지 확인해야한다.
2) Topic 확인하기
# 압축이 풀린 폴더안의 bin 폴더에서 진행
./kafka-topics.sh --list --bootstrap-server localhost:9092
./kafka-topics.sh --list --bootstrap-server localhost:9092
kafka-topic.sh : Kafka의 창고(topic) 파일 실행
localhost:9092 : Kafka의 창고를 만든 컴퓨터 도메인:창고를 만든 port를 지정

창고 이름을 생성할 때, test라고 지정해주었고 확인해보니 test라고 창고 이름이 잘 지정되어있는 것이 확인 가능하다.
3) Producer 생성 후, 메세지 넣기
창고가 생성되었으니, 창고에 데이터를 넣어주는 일꾼인 Producer를 생성해주고 메세지를 넣어준다.
# 압축이 풀린 폴더안의 bin 폴더에서 진행
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
kafka-console-producer.sh : Kafka Producer 파일 실행
localhost:9092 : Kafka의 창고를 만든 컴퓨터 도메인:일꾼들 port 지정
test : 메세지가 저장된 창고의 이름

화살표(>) 모양으로 shell에 들어가졌다면, producer가 생성되었고 메세지를 넣어줄 수 있게 된 것이다. 화살표 모양이 생겼으니 메세지를 넣어주는데 현재는 'test test'라는 메세지를 넣어주었다.
메세지를 다 입력했다면, ctrl + z를 이용해서 shell에서 빠져나올 수 있다.
4) Consumer 생성 후, 메세지 확인하기
Producer가 Topic에 넣어놓은 메세지를 메세지를 가져오는 일꾼인 Consumer를 생성해주고 메세지를 가져와서 확인한다.
# 압축이 풀린 폴더안의 bin 폴더에서 진행
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
kafka-console-consumer.sh : Kafka Consumer 파일 실행
localhost:9092 : Kafka의 창고를 만든 컴퓨터 도메인:일꾼들 port 지정
test : 메세지가 저장된 창고의 이름
from-beginning : 메세지를 처음부터 읽도록 지정

test test라고 producer를 통해서 넣어두었던 메세지가 consumer를 이용해서 확인이 가능하다.
'PBL 빅데이터 > 산학프로젝트' 카테고리의 다른 글
| [산학프로젝트] Kafka의 실시간 데이터 Spark로 받아오기 (2) | 2021.08.30 |
|---|---|
| [산학프로젝트] Nifi로 Kafka Producer 연결하기(HDFS → Kafka) (2) | 2021.08.30 |
| [산학프로젝트] Quick Start VM안에 Kafka 설치하기 (0) | 2021.08.30 |
| [산학프로젝트] MobaXterm 사용하기 (1) | 2021.08.30 |
| [산학프로젝트] Nifi 한글 인코딩 변경하기 (0) | 2021.08.30 |