NIRVANA

[Apache Spark] 스트림 데이터 처리 본문

DataEngineering

[Apache Spark] 스트림 데이터 처리

녜잉 2024. 8. 4. 14:23

스트림 처리?

  • 끊임없이 들어오는 데이터 흐름을 연속적으로 처리하는 것
  • 전통적인 분산 스트림 처리에서는 스트림 처리를 레코드 단위 처리 모델로 구현
    • 각 노드들은 지향성 그래프로 구성 되어 지속적으로 한번에 하나씩 레코드를 받게 된다.
    • 레코드를 받은 노드는 레코드를 처리하여 생성된 레코드를 그래프 상 다음 노드로 보내게 된다.
  • 장점 
    • 입력 레코드가 파이프라인에서 처리 되고 결과가 레코드로 생성되기 까지 몇 밀리초 만에 가능하다
  • 단점 
    • 특정 노드가 장애를 겪거나 다른 노드보다 느린 상황에서는 효울성이 떨어진다 

 
 

Spark Streaming

  • 실시간 데이터 분석을 위한 스파크 컴포넌트
  • 실시간 데이터 스트림을 받아 데이터를 디스트림(DStream, distretized stream, 이산 스트림)이라 불리는 추상화 개념의 작은 배치 단위로 나누고 디스트림을 스파크 엔진으로 분석 진행 
    • Spark RDD를 기반으로 Micro-batch 수행 

 

 DStream

  • 시간별로 도착한 데이터들의 연속적인 모임으로 각 스트림은 시간별 RDD들의 집합으로 구성된다 

 

  • 디스트림은 RDD와 마찬가지로 트랜스포메이션 연산과 출력 연산을 지원한다 
    • 트랜스포메이션(transformation): 새로운 디스트림을 만드는 연산
      • 상태 정보가 없는 트랜스포메이션 - select(), filter(), map()
      • 상태 정보 유지 트랜스포메이션 - count() .. etc
    • 출력 연산: 외부 시스템에 데이터를 작성 

 

🖥️예제

네트워크 소켓에서 텍스트 스트림을 수신하고 단어 빈도 계산해서 출력하기 

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

#스파크 기본 컨텍스트, 로컬에서 2개의 스레드 사용하여 실행 
sc = SparkContext("local[2]", "NetworkWordCount")
#실시간 스트리밍 처리를 위한 컨텍스트, 1초 간격으로 데이터 처리 
ssc = StreamingContext(sc, 1)
  
#socketTextStream: 지정된 호스트와 포트에서 텍스트 스트림 수신 
lines = ssc.socketTextStream("localhost", 9999)

#flatMap: 각 줄을 공백을 기준으로 단어로 분할 - 리스트를 개별 단어로 평탄화 
words = lines.flatMap(lambda line: line.split(" "))
  
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()
  
ssc.start()           
ssc.awaitTermination()

 
 

Structured Streaming

  • SparkSQL 엔진에 구축된 확장 가능하고 내결함성이 있는 스트림 처리 엔진
  • 실시간으로 들어오는 데이터 스트림을 지속적으로 추가되는 '테이블'로 처리하는 것이 핵심 아이디어!
    • 스트리밍 계산을 정적 테이블과 같은 표준 배치 유사 쿼리로 표현
    • 스파크는 이러한 테이블을 무제한 입력 테이블로 간주하고 증분 쿼리로 실행하게 된다 
  • 위 그림과 같이 입력되는 데이터 스트림을 입력 테이블로 보기 때문에 스트림에 도착하는 모든 데이터 항목은 입력 테이블에 추가 되는 새로운 행과 같다 
  • 새로운 입력에 대한 쿼리를 통해 결과 테이블을 생성
    • 즉, 일정한 간격마다 새로운 행이 입력 테이블에 추가 되고, 새로운 행이 입력 테이블에 추가됨에 따라 결과 테이블도 업데이트 된다 
    • 업데이트된 결과 테이블은 파일 시스템이나 데이터베이스에 작성 된다 

 

출력 모드 지정 

  • 추가 모드(기본) : 마지막 트리거 이후 새로 추가된 행들을 결과 테이블 혹은 데이터 프레임에 작성하는 방식. 출력된 모든 행이 이후의 쿼리에 의해 변경되지 않는 것을 보장하므로 이전 출력 결과를 조작하지 않을 쿼리들에 의해 사용된다 
  • 전체(완료) 모드 : 모든 트리거 후에 전체 결과 테이블이 싱크로 출력 되며 집계 쿼리에 대한 지원이 가능하다 
  • 업데이트 모드 : 지난 트리거 이후 결과 테이블 혹은 데이터 프레임에서 변경된 행들이 출력 된다 

콘솔 출력 외에 구조적 스트리밍은 파일 및 카프카로의 출력도 지원하며 foreachBatch()와 foreach() API를 사용하여 임의의 위치에 작성하는 것도 가능하다 
 

🖥️예제

네트워크 소켓에서 텍스트 스트림을 수신하고 단어 빈도 계산해서 출력하기 

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

#스트리밍 소스에 대해서는 DataStreamReader를 만들어주는 spark.readStream 사용     
lines = (spark.readStream.format("socket")
         .option("host", "localhost")
         .option("port", 9999)
         .load())

words = lines.select(split(col("value"), "\\s").alias("word"))
counts = words.groupBy("word").count() #스트리밍 데이터 프레임, 스트리밍 입력이 지속적으로 처리되면서 계산 수행

streamingQuery = (counts
                  .writeStream
                  .format("console")
                  .outputMode("complete")
                  .trigger(processingTime="1 second")
                  .start())

streamingQuery.awaitTermination()

 
 

Structured Streaming vs Spark Streaming 

실시간 처리 기능 

  • Structured Streaming은 배치 처리를 하지 않고 데이터 스트림에 데이터를 계속 추가하므로 지연이 거의 없는 데이터 처리가 가능하다. 
  • Spark Streaming은 input 데이터를 배치 작업으로 처리하며 DStream으로 생성하는 작업을 거치므로 배치 처리를 하지 않는 Structured Streaming에 비해 실시간 처리 기능이 낮다. 

RDD와 DataFrame/DataSet

  • Structured Streaming은 DataFrame과 Dataset을 사용하고 Spark Streaming은 RDD를 사용해 스트리밍을 처리한다
  • RDD와 DataFrame을 비교했을 때, 성능과 사용 용의성 측면에서 DataFrame에 더욱 우수하다. 

이벤트 시간 처리 

  • Spark Streaming은 윈도우 연산을 각 프로세스 시간을 기준으로 진행하지만, Structured Streaming은 이벤트 시간, 즉 실제로 데이터가 들어온 시간을 기준으로 데이터를 처리하므로 지연된 데이터를 정확하게 처리하는 것이 가능하다. 

 
 


 
윈도우에서...nc 사용하는게 너무 복잡해서(귀찮아서)
실습은 우분투 환경 구축하고 하겠습니다...............
 
 
참고:
https://wikidocs.net/28681
https://velog.io/@yje876/kafka-%EC%8B%A4%EC%8A%B5
https://velog.io/@hyunwoozz/spark-Structured-Spark-Streaming%EC%97%90-%EB%8C%80%ED%95%9C-%EA%B0%84%EB%8B%A8-%EB%A6%AC%EB%B7%B0
https://spark.apache.org/docs/latest/streaming-programming-guide.html#discretized-streams-dstreams
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

Structured Streaming Programming Guide - Spark 3.5.1 Documentation

Structured Streaming Programming Guide Overview Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on s

spark.apache.org