본문 바로가기

PBL 빅데이터/산학프로젝트

[산학프로젝트] Kafka의 실시간 데이터 Spark로 받아오기

Nifi가 돌아다니면서 파일을 HDFS의 넘기고 이 데이터를 HDFS에서 Kafka로 넘어가는 과정을 실시간으로 진행된다고 가정이 가능하다. 이를 이용해서 실시간 데이터로 치고 이를 Spark로 받아오려고한다. (참고 : https://silver-ye.tistory.com/11?category=1017483)

1) Pyspark 불러오기

import pyspark

사용하기 전, pyspark를 불러온다.

 

2) SparkConf 생성하기

# 조부모 생성

from pyspark import SparkConf

# setMaster : Spark가 설치된 컴퓨터 ip

# set AppName : 실행중인 프로그램의 이름을 설정

configure = SparkConf().setMaster('local').setAppName('spark kafka')

print('configure =', configure)

setMaster에는 Spark가 설치된 컴퓨터의 ip를 입력해야하는데 현재는 한 대의 vm으로 진행됨으로 local이 된다.

 

3) SparkContext 생성하기

# 부모 생성

from pyspark import SparkContext

# conf = configure : SparkConf와 SparkContext의 종속 관계

sc = SparkContext.getOrCreate(conf = configure)

print('sc =', sc)

 

4) StreamingContext 생성하기

# 자식 생성

from pyspark.streaming import StreamingContext

# sc뒤에 숫자는 몇초마다 kafka data를 가져올 것인지를 의미

ssc = StreamingContext(sc,5)

print('ssc =', ssc)

 

5) Kafka와 Spark 연결하기

# 연결하기

# brokers : 지정해준대로 기본으로 localhost:9092 사용

# topic : Kafka안에서 불러올 topic 이름

brokers = 'localhost:9092'

topic = 'test1'

# pyspark를 2버전대로 변경하기

# KafkaUtils는 현재 2버전대 이후로 지원을 하지 않음으로 사용을 위해 변경

!pip install --force-reinstall pyspark==2.4.6
from pyspark.streaming.kafka import KafkaUtils

< 연결 과정>

KafkaUtils(kvs)

Kafka 정보 알려줌 ssc

5초마다 데이터 가져옴 Kafka

가져온 데이터를 줌

KafkaUtils(kvs) 받은 데이터를 리스트에 저장

 

6) kvs 지정

# kvs 지정하기

kvs = KafkaUtils.createDirectStream(ssc, [topic], {'metadata.broker.list': brokers})

미리 지정해둔 topic과 brokers로 Kafka에 연결한다.

 

7) 출력 지정하기

# lambda로 읽은 데이터 1줄 저장

lines = kvs.map(lambda x : x[1])

# 스파크가 가져온 데이터는 pprint로 받음

lines.pprint()

실행시켜도 실행이 안되는 이유는 kvs가 먼저 실행되어야한다.

 

8) ssc 실행하기

import time

# ssc.start() : ssc 실행 시작

ssc.start()

# kvs는 데이터를 받아서 위의 지정한 pprint로 출력되게 함

time.sleep(100)

# 메모리가 부족하면 stop하는 과정까지 넣어줄 수 있음

ssc.stop

ssc가 일하기 시작하면서 받아온 라인들이 출력된다.