Nifi가 돌아다니면서 파일을 HDFS의 넘기고 이 데이터를 HDFS에서 Kafka로 넘어가는 과정을 실시간으로 진행된다고 가정이 가능하다. 이를 이용해서 실시간 데이터로 치고 이를 Spark로 받아오려고한다. (참고 : https://silver-ye.tistory.com/11?category=1017483)
1) Pyspark 불러오기
import pyspark
사용하기 전, pyspark를 불러온다.
2) SparkConf 생성하기
# 조부모 생성
from pyspark import SparkConf
# setMaster : Spark가 설치된 컴퓨터 ip
# set AppName : 실행중인 프로그램의 이름을 설정
configure = SparkConf().setMaster('local').setAppName('spark kafka')
print('configure =', configure)
setMaster에는 Spark가 설치된 컴퓨터의 ip를 입력해야하는데 현재는 한 대의 vm으로 진행됨으로 local이 된다.
3) SparkContext 생성하기
# 부모 생성
from pyspark import SparkContext
# conf = configure : SparkConf와 SparkContext의 종속 관계
sc = SparkContext.getOrCreate(conf = configure)
print('sc =', sc)
4) StreamingContext 생성하기
# 자식 생성
from pyspark.streaming import StreamingContext
# sc뒤에 숫자는 몇초마다 kafka data를 가져올 것인지를 의미
ssc = StreamingContext(sc,5)
print('ssc =', ssc)
5) Kafka와 Spark 연결하기
# 연결하기
# brokers : 지정해준대로 기본으로 localhost:9092 사용
# topic : Kafka안에서 불러올 topic 이름
brokers = 'localhost:9092'
topic = 'test1'
# pyspark를 2버전대로 변경하기
# KafkaUtils는 현재 2버전대 이후로 지원을 하지 않음으로 사용을 위해 변경
!pip install --force-reinstall pyspark==2.4.6
from pyspark.streaming.kafka import KafkaUtils
< 연결 과정>
KafkaUtils(kvs)
⬇
Kafka 정보 알려줌 ssc
⬇
5초마다 데이터 가져옴 Kafka
⬇
가져온 데이터를 줌
KafkaUtils(kvs) 받은 데이터를 리스트에 저장
6) kvs 지정
# kvs 지정하기
kvs = KafkaUtils.createDirectStream(ssc, [topic], {'metadata.broker.list': brokers})
미리 지정해둔 topic과 brokers로 Kafka에 연결한다.
7) 출력 지정하기
# lambda로 읽은 데이터 1줄 저장
lines = kvs.map(lambda x : x[1])
# 스파크가 가져온 데이터는 pprint로 받음
lines.pprint()
실행시켜도 실행이 안되는 이유는 kvs가 먼저 실행되어야한다.
8) ssc 실행하기
import time
# ssc.start() : ssc 실행 시작
ssc.start()
# kvs는 데이터를 받아서 위의 지정한 pprint로 출력되게 함
time.sleep(100)
# 메모리가 부족하면 stop하는 과정까지 넣어줄 수 있음
ssc.stop
ssc가 일하기 시작하면서 받아온 라인들이 출력된다.
'PBL 빅데이터 > 산학프로젝트' 카테고리의 다른 글
[산학프로젝트] Nifi Kafka Consumer 설정하기 (0) | 2021.08.30 |
---|---|
[산학프로젝트] Nifi Kafka에 1MB이상 파일 옮기기 (0) | 2021.08.30 |
[산학프로젝트] Nifi로 Kafka Producer 연결하기(HDFS → Kafka) (0) | 2021.08.30 |
[산학프로젝트] Kafka 실행 예제 (1) | 2021.08.30 |
[산학프로젝트] Quick Start VM안에 Kafka 설치하기 (0) | 2021.08.30 |