NIRVANA

[Kafka & Spark] 실시간으로 사람 detect하고 집계 테이블 만들기(2) 본문

DataEngineering

[Kafka & Spark] 실시간으로 사람 detect하고 집계 테이블 만들기(2)

녜잉 2024. 8. 25. 21:38

지난번에 Spark 연동을 못했던 게 너무 아쉬워서 지난 글에 이어서 Spark 연동 후 집계 테이블을 mongodb에 적재하기를 해보려고 합니다. 

 

근데 먼저 결론부터 말하면 실패해서 지금 열심히 수정하고 있습니다...

 

1. docker-compose.yml 파일 수정하기 

 

Spark와 MongoDB를 사용하기 위해서 docker-compose.yml 파일을 다음과 같이 수정했다. 

version: '3'
services:
  zookeeper:
    image: zookeeper
    restart: always
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment: 
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: bitnami/kafka  
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1  
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1  

  mongo:
    image: mongo
    restart: always
    hostname: mongo
    ports:
      - "27017:27017"

  spark-master:
    image: bitnami/spark:latest
    restart: always
    hostname: spark-master
    ports:
      - "8080:8080"  # Spark Web UI
    environment:
      SPARK_MODE: master
      SPARK_MASTER_HOST: spark-master
    depends_on:
      - kafka

  spark-worker:
    image: bitnami/spark:latest
    restart: always
    hostname: spark-worker
    environment:
      SPARK_MODE: worker
      SPARK_MASTER_URL: spark://spark-master:7077
    depends_on:
      - spark-master

 

mongodb와 spakr는 같은 네트워크 안에 있어야 하므로 network도 새로 정의했고, spark는 worker랑 master를 나누었다. 

 

 

2. ReadDatawithSpark.py 파일 작성하기 

스파크 스트리밍을 사용해서 데이터를 읽어올 것이므로 스파크 세션을 정의해야 한다. 

이때, 카프카와의 연동을 위해서 스파크 버전에 맞는 'spark.jars.packages' 설정에 들어갈 의존성을 추가해야 한다. 

 

먼저 스파크 버전을 확인해준다.

spark-submit --version

 

다음과 같이 spark 버전과 스칼라 jdk 버전도 확인할 수 있다.  

 

https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10

 

 

1) Spark 세션 작성 

maven에서 spakr 3.5.2에 해당하는 의존성을 찾아 spakr 세션 정의 시에 config를 통해 추가해준다.  (*스칼라 버전 주의)

spark = (
    SparkSession
    .builder
    .appName("readVideoData")
    .config('spark.jars.packages','org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2')
    .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/counting_people.count_people") 
    .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/counting_people.count_people") 
    .master("local[*]")
    .getOrCreate()
)

 

다음과 같이 spark 세션을 정의했다. 

 

 

2) 데이터 스키마 정의 

다음으로는 카프카에서 읽어오는 데이터의 스키마를 정의해준다. 현재 우리는 카프카에서 타임 스탬프와 사람(바운딩 박스)의 id 데이터를 읽어오고 있으므로 다음과 같이 정의했다. 

schema = StructType([
    StructField("timestamp", StringType(), True),
    StructField("id", StringType(), True)
])

 

StructType과 StructField를 사용해서 스키마를 정의한다.

StructField()는 각각 "column name", "column Type", "nullable"를 파라미터로 받는다. 

 

 

3) 데이터 변환 

#CAST (value AS STRING): value 컬럼의 데이터를 string으로 변환
# as json: 변환한 데이터를 json이라는 새로운 컬럼에 저장 
value_df = kafka_df.selectExpr("CAST(value AS STRING) as json") \ 
    .select(from_json(col("json"), schema).alias("data")) \ #json 컬럼의 데이터를 JSON으로 파싱하고, 이를 data라는 컬럼으로 변환 
    .select("data.*") \ #data내의 모든 컬럼을 개별 컬럼으로 선택(= data.id, data.timestamp가 됨)
    .withColumn("timestamp", to_timestamp(col("timestamp"), 'yyyy/MM/dd HH:mm:ss')) #timestamp를 string에서 timestamp 형식으로 변경함

 

kafka에서 수신 받은 JSON 메시지는 바이너리로 수정 되므로 해당 데이터를 문자열로 변환하고, 다시 JSON으로 파싱하는 과정이 필요하다. 따라서 selectExpr()을 사용해서 데이터를 변환한다. 

 

SelectExpr

  • PySpark의 DataFrame에서 사용되는 메서드, SQL 표현식을 사용해 컬럼을 선택하거나 변환할 경우에 사용
  • 하나 이상의 SQL 표현식을 문자열로 받아 들여 해당 결과를 새로운 컬럼으로 생성하거나 기존 컬럼을 변환함
#컬럼 선택 -> DataFrame에서 name과 age 컬럼을 선택 
df = df.selectExpr("name", "age") 

#컬럼 변환 -> age 컬럼을 INT로 변환하고 age_int라는 새로운 컬럼을 생성 
df = df.selectExpr("CAST(age AS INT) AS age_int)

 

 

4) 시간 단위로 데이터 집계

aggregated_df = value_df.groupBy(window(col("timestamp"), "1 hour"), col("id")) \
    .count() \
    .groupBy("window") \
    .agg(count("id").alias("unique_count")) \
    .select(col("window.start").alias("hour"), col("unique_count"))

 

groupBy()를 통해 windwo함수값와 id컬럼을 기준으로 데이터를 그룹화한다. 이때, window()함수 timestamp컬럼을 사용하여 1시간 단위의 시간 창을 생성한 것이 된다. 

count()함수를 사용하여 각 시간과 ID의 개수를 계산하고 그걸 다시 window 컬럼을 통해서 (각 시간대 별 데이터 집계를 수행하기 위해) 그룹화를 진행한다. 

agg() 함수의 count()함수를 사용해서  id 컬럼의 고유 값 개수를 세고, 해당 값을 unique_count 컬럼에 저장한다. 

select()함수를 사용해 window.start와 count 컬럼을 고르고 각각의 필드 이름을 hour와 unique_count로 변경한다. 

 

agg()함수

  • PySpark의 DataFrame에서 집계 연산을 수행하기 위한 메서드, 여러 집계 함수를 한 번에 적용할 수 있도록 도움
  • groupBy()와 함께 사용, 그룹화된 데이터에 대해 다양한 집계 연산을 수행
  • 사전 형식이나 컬럼 표현식 형식으로 사용 가능
# 사전 형식 사용
# category 컬럼을 기준으로 데이터를 그룹화하고 price 컬럼의 평균('avg')과 qunatity 컬럼의 합계('sum')을 계산 
df.groupBy("category).agg(
	{"price" : "avg","quantity": "sum}
)

#컬럼 표현식

from pyspark.sql.functions import avg, sum

df.groupBy("category").agg(
	avg("price").alias("avg_price"),
    sum("qunatity").alias("totl_quantity")
 )

 

 

 

스파크에서 카프카 데이터를 읽어오는 파이썬 파일을 작성하였으니 이제 실행을 해야 한다

docker cp 파이썬파일이름.py {도커 컨테이너 id}:{파일이 복사될 경로}

 

master 노드에 복사를 해주었다! (아무래도 master 노드가 워커 노드에게 작업을 할당해주니까...master 노드에 있어야 할 것 같았음..)

 

 spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2 --master spark://spark-master:7077 readDatawithSpark.py

 

으로 실행을 하면 된다!!

 

처음에는 config에 추가했는데 자꾸 에러가 발생해서...여러번 삽질하다가 그냥 submit할 때 packages 옵션으로 제출하는게 괜찮다는 글을 보고 바꾸니까 됐다..

 

pyspark.errors.exceptions.captured.AnalysisException: Detected pattern of possible 'correctness' issue due to global watermark.

 

근데 또 이런 에러가 발생했다. 대충 찾아보니까 지연된 데이터를 처리하게 되어 데이터의 정확성을 장담할 수 없다는 의미에서 경고해주는 거라고 한다...! 해당 방법을 해결하기 위해서는 워터 마크를 추가해줘야 한다.

 

워터마크란?

  • 특정 시간 이후에 처리에서 제외할 이벤트나 이벤트 집합에 대한 시간 기준
  • 워터마크가 없으면 중간 결과를 영원히 저장하게 되므로 부하 방지를 위해서 워터마크를 사용, 오래된 데이터를 제거 
value_df = kafka_df.selectExpr("CAST(value AS STRING) as json") \
    .select(from_json(col("json"), schema).alias("data")) \
    .select("data.*") \
    .withColumn("timestamp", to_timestamp(col("timestamp"), 'yyyy/MM/dd HH:mm:ss'))
    

value_df = value_df.withWatermark("timestamp", "2 hour")

 

이렇게 수정해줬다. wirhWatermark() 함수를 사용해서 워터마크를 적용할 컬럼과 최대 지 허용 시간을 설정해줬다. 우리는 timestamp에 시간 정보를 가지고 있으므로 timestamp 컬럼에 최대 지연 허용 시간 2시간으로 정해줬다. 

spark = (
    SparkSession
    .builder
    .appName("readVideoData")
    .config('spark.jars.packages','org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2')
    .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/counting_people.count_people") 
    .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/counting_people.count_people") 
    .config('spark.sql.streaming.statefulOperator.checkCorrectness.enabled', 'false')
    .getOrCreate()
)

 

근데도 계속 에러가 떠서 그냥 세션 빌드할 때 무시하라고 config로 추가했다...ㅋ

 

이제 진짜진짜 되겠지??라는 마인드로 했는데

 

pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = 40fd2053-b21a-4741-9de9-02e0117bd8e5, runId = 7f0f20f5-bcad-4daa-8e0b-fd215d063ec9] terminated with exception: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics

 

카프카랑 연결 못하겠다고 하네....

지금 일단 짐작 가는 게 너무 많아가지고 내일 다시 천천히 생각해봐야겠다.. 🥲