NIRVANA

ETL 파이프라인 설계해 보기 (1) 본문

DataEngineering

ETL 파이프라인 설계해 보기 (1)

녜잉 2024. 10. 6. 20:17

 

이 글에서 이어집니다!

https://nervertheless.tistory.com/221

 

[Apache Airflow] 웹 크롤링 DAG 작성하기

(복습을 위해 살펴보는) Airflow 구조 Scheduler스케줄된 Workflow(DAG)을 트리거하고 제출된 Task를 실행(조건을 만족할 경우, Task를 실행할 수 있게 함)executor에게 Task를 제공해주는 역할을 수행 생성된

nervertheless.tistory.com

 

이 전에 웹 크롤링 DAG를 작성했던 적이 있었는데 그 때는 목표에 의하면 s3 버킷에 저장을 했어야 했지만 EC2 인스턴스 문제로 저장까지 못했었다.. 

 

그래서 이번에는 AWS 말고 그냥 로컬 환경에서 크롤링 데이터를 수집하고 HDFS에 저장 해보려고 한다. 

 

먼저 환경을 구축하기 위해 도커 파일과 docker-compose.yml 파일을 작성했다. 

 

🐋Dockerfile.airflow

FROM apache/airflow:2.10.2-python3.10

# 필요한 패키지 설치
RUN pip install --no-cache-dir pymongo requests beautifulsoup4

# 환경 변수 설정
ENV AIRFLOW_HOME=~/airflow
ENV AIRFLOW_UID=50000
ENV AIRFLOW_GID=50000

# root 사용자로 전환하여 디렉토리 생성 및 권한 변경
USER root

# dags 디렉토리 생성
RUN mkdir -p /usr/local/airflow/dags && chown -R 50000:50000 /usr/local/airflow

# airflow 사용자로 전환
USER airflow

# 엔트리포인트 설정 및 데이터베이스 초기화 후 웹서버 실행
# CMD bash -c "airflow db init || exit 1 && airflow webserver"

 

apache airflow 공식 이미지를 사용하고 크롤링에 필요한 기본 라이브러리를 설치해줬다. 

사실 에어플로 도커 파일을 잘못 설정해서 몇 번 엎었는데 지금 보니 USER root 부터는 없어도 될 것 같다 🤔

 

 

🐋 Docker.sparkmaster

FROM spark:3.5.2-scala2.12-java11-python3-ubuntu

# root 권한으로 명령 실행
USER root

# 필요한 패키지 설치
RUN apt-get update && \
    apt-get install -y python3-pip && \
    pip3 install pymongo && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

# 환경 변수 설정
ENV SPARK_HOME=/opt/spark  
ENV PATH="$SPARK_HOME/bin:$PATH"

# Spark 설정 파일 복사 (필요한 경우)
# COPY spark-defaults.conf $SPARK_HOME/conf/

COPY ./jobs /opt/spark/jobs

CMD ["spark-class", "org.apache.spark.deploy.master.Master"]

 

스파크 마스터 역시 공식 이미지를 사용했다. 스파크는 실행할 스크립트가 없으면 컨테이너가 자동으로 종료 되기 때문에 각각 역할에 맞게 실행할 수 있도록 spark-class 스크립트를 제출해야 한다. CMD로org.apache.spark.deploy.master.Master 클래스를 실행하여 스파크 마스터 노드를 실행할 수 있게 했다. 

 

🖥️ spark-class 스크립트? 

내부 클래스 및 기능을 실행하기 위한 스크립트. 스파크의 다양한 구성 요소(ex, 마스터, 워커)를 시작할 때 사용.

Java 프로그램을 실행하는 데 필요한 환경 설정을 자동으로 해주고, 실행할 Java 클래스와 필요한 인자를 전달해주는 역할

매개변수로 다음과 같은 값들을 넘겨줄 수 있다. 

  • org.apache.spark.repl.Main
    • 주요 호출 함수: createSparkSession
  • org.apache.spark.deploy.SparkSubmit
    • org.apache.spark.launcher.Main에 넘겨주는 매개변수
  • org.apache.spark.launcher.Main

 

🐋 Docker.sparkworker

FROM spark:3.5.2-scala2.12-java11-python3-ubuntu

# root 권한으로 명령 실행
USER root

# 필요한 패키지 설치
RUN apt-get update && \
    apt-get install -y python3-pip && \
    pip3 install pymongo && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

# 환경 변수 설정
ENV SPARK_HOME=/opt/spark  
ENV PATH="$SPARK_HOME/bin:$PATH"

# Spark 설정 파일 복사 (필요한 경우)
# COPY spark-defaults.conf $SPARK_HOME/conf/

COPY ./jobs /opt/spark/jobs

CMD ["spark-class", "org.apache.spark.deploy.worker.Worker spark://localhost:7077"]

 

스파크 워커 역시 마스터 노드와 마찬가지로 spark-class를 사용하여 워커 노드로 실행할 수 있게 했다. 

 

 

🐋docker-compose.yml

 

도커 컴포즈 파일은 다음과 같다. 

version: '3.7'
services:
  mongo:
    image: mongo:latest
    container_name: mongo
    ports:
      - "27017:27017"
    volumes:
      - ./data/mongo:/data/db
    networks:
      - spark_network

  hdfs-namenode:
    image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8
    container_name: hdfs-namenode
    environment:
      - CLUSTER_NAME=hadoop
    ports:
      - "9870:9870"
    volumes:
      - hadoop_namenode:/hadoop/dfs/name
    networks:
      - spark_network

  hdfs-datanode:
    image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8
    container_name: hdfs-datanode
    environment:
      - CLUSTER_NAME=hadoop
      - CORE_CONF_fs_defaultFS=hdfs://hdfs-namenode:8020
    volumes:
      - hadoop_datanode:/hadoop/dfs/data
    networks:
      - spark_network

  spark-master:
    build:
      context: .
      dockerfile: Dockerfile.sparkmaster
    container_name: spark-master
    ports:
      - "7077:7077" 
    environment:
      - SPARK_MODE=master
    networks:
      - spark_network

    command: ["spark-class", "org.apache.spark.deploy.master.Master"]


  spark:
    build:
      context: .
      dockerfile: Dockerfile.sparkworker
    container_name: spark
    environment:
      - SPARK_MASTER_HOST=spark-master
      - SPARK_MASTER_PORT=7077
    ports:
      - "8081:8081"
    volumes:
      -  내 로컬 디렉토리 경로:/opt/spark/jobs
    networks:
      - spark_network
    depends_on:
      - spark-master
      - hdfs-namenode
      - hdfs-datanode
      - mongo
    

  airflow:
    build:
      context: .
      dockerfile: Dockerfile.airflow
    container_name: airflow
    user: root
    depends_on:
      - mongo
      - hdfs-namenode
      - hdfs-datanode
      - spark-master
    environment:
      - LOAD_EXAMPLES=no
      - AIRFLOW_WWW_USER_USERNAME=airflow
      - AIRFLOW_WWW_USER_PASSWORD=airflow
    ports:
      - "8080:8080"
    networks:
      - spark_network
    volumes:
      - 내 로컬 디렉토리 경로:~/airflow/dags
    command: >
      bash -c "airflow db init && airflow webserver"

networks:
  spark_network:
    driver: bridge

volumes:
  hadoop_namenode:
  hadoop_datanode:

 

웹 크롤링 및 데이터 저장 스케줄을 관리하기 위해 airflow 컨테이너를 하나 생성하고 크롤링 한 데이터를 저장할 저장소로 HDFS를 선택했으므로 HDFS 데이터 노드와 네임 노드를 컨테이너를 각각 만들었다.  (원래 계획대로라면) 크롤링 후 채용 공고를 카테고리 별로 분류하여 각각의 MongoDB 컬렉션에 저장하려고 했기 때문에 Spark master와 worker 컨테이너도 생성해주었다. 마지막으로 데이터 마트의 역할을 할 MongoDB 컨테이너도 생성해줬다. 

 

 

파이프라인은 위와 같이 설계했다. 

 


 

데이터 추출 (E) 

 

지난 번과 거의 동일하게 진행했다. 

 

🔈web_crawling_dag.py (Airflow) 

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule

from datetime import datetime, timedelta
import sys, os

def print_result(**kwargs):
    r = kwargs["task_instance"].xcom_pull(key='result_msg')
    print("message:", r)

with DAG(
    "crawling-recruiting",
    
    default_args = {
        'depends_on_past': False,
        'email' : ['이메일계정@naver.com'],
        'email_on_failure': True,
        'email_on_retry': True,
        'retries': 1,
        'retry_delay': timedelta(minutes=30),
    },
    
    description="job opening crawling",
    schedule_interval=timedelta(hours=1),
    start_date=datetime(2024, 10, 5),
    tags=['crawling_example'],
    
) as dag:
    
    start = BashOperator(
        task_id ='start',
        bash_command='echo "start"',
    )
    
    get_info_task = BashOperator(
        task_id = 'run_get_info',
        bash_command='python3 crawling_recruiting.py',
    )
    
    save_msg = PythonOperator(
        task_id = 'save_msg',
        python_callable=lambda **kwargs: kwargs["task_instance"].xcom_push(key='result_msg', value='Job completed!') 
    )
    
    msg = PythonOperator(
        task_id = 'msg',
        python_callable=print_result
    )
    
    complete = BashOperator(
        task_id = 'complete',
        bash_command='echo "complete!" ',
    )
    
    start >> get_info_task >> save_msg >> msg >> complete

 

 

🔈crawling_recruiting.py (Airflow) 

import time
import selenium
from selenium import webdriver
import pandas as pd
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

options = webdriver.ChromeOptions()
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
options.add_argument('log-level=3')
options.add_argument('--no-sandbox')
options.add_argument('--headless')
options.add_argument('--disable-dev-shm-usage')

from pyspark.sql import SparkSession
from pyspark import SparkConf

import warnings
warnings.filterwarnings('ignore')

service = ChromeService(executable_path='/opt/airflow/dags/chromedriver-linux64/chromedriver')
driver = webdriver.Chrome(service=service, options=options)  # options 추가
url = 'https://www.saramin.co.kr'
driver.get(url)

max_attempts = 5
attempts = 0 

search = "정보보호"

# 대기 설정
wait = WebDriverWait(driver, 20)  # 최대 20초 대기

# 데이터프레임 초기화
df = pd.DataFrame()  # df를 try 블록 밖에서 초기화

while attempts < max_attempts:

    try:
        # 요소가 로드될 때까지 대기하고 클릭
        wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, 'button#btn_search.btn_search'))).click()

        # 검색 박스 대기 및 입력
        search_box = wait.until(EC.presence_of_element_located((By.TAG_NAME, 'input')))
        search_box.send_keys(search)

        # 검색 버튼 클릭 대기
        wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, 'button#btn_search_recruit.btn_search'))).click()

        # 정보 수집을 위한 반복문
        while True:
            time.sleep(1.5)  # 페이지 로딩 대기
            info_list_all = driver.find_elements(By.XPATH, '//*[@id="recruit_info_list"]/div[1]/div')
            info_list_addr = driver.find_elements(By.XPATH, '//*[@id="recruit_info_list"]/div[1]/div/div/h2/a')

            info_list_all_inner = [i.get_attribute('innerText') for i in info_list_all]
            info_list_addr_href = [i.get_attribute('href') for i in info_list_addr]

            # 데이터프레임에 저장
            new_df = pd.DataFrame({
                'info': info_list_all_inner,
                'link': info_list_addr_href
            })

            # 기존 데이터프레임에 새로운 데이터 추가
            df = pd.concat([df, new_df], ignore_index=True)

            # 다음 버튼 클릭 로직
            next_buttons = driver.find_elements(By.XPATH, '//*[@id="recruit_info_list"]/div[2]/div/a')
            if not next_buttons:
                break  # 다음 버튼이 없으면 종료

            for button in next_buttons:
                button.click()
                time.sleep(3)  # 페이지 로딩 대기
                # 정보를 수집하는 추가 로직 필요
    
    except Exception as e:
        attempts += 1
        print(f"시도 {attempts} - 클릭할 수 없음: {e}")
        time.sleep(2)  # 다음 시도 전 대기 (2초 대기)

    finally:
        df.to_csv('data.csv', index=False)  # 데이터 저장
        driver.quit()  # 모든 창 닫기
        
 conf = SparkConf() \
    .set("spark.hadoop.dfs.client.max.block.size", "134217728") \
    .set("spark.hadoop.dfs.client.rpc.max-size", "512000000")
    
spark = SparkSession.builder \
    .appName("CrawlingDataSave") \
    .config(conf=conf)
    .master("local[*]") \
    .getOrCreate()

spark_df = spark.createDataFrame(df)

spark_df.write.csv('hdfs://hdfs-namenode:8020/user/JobSaveData/job_data.csv', header=True)

 

 

 

기존 크롤링 코드에서 스파크 세션을 추가하여 판다스 데이터 프레임을 Spark 데이터 프레임으로 변환한 뒤 HDFS에 저장하려고 했다.  그런데 조금 슬픈 사연이 생겨서.. 크롤링한 데이터를 HDFS로 보내는 걸 못하게 됨...

 

스파크 세션 생성에서 master 옵션을 처음 써봤기 때문에 확인해보고 가려고 한다.

 

스파크 세션은 스파크 클러스터와 통신하는 엔트리포인트이자, 한 프로그램에 하나만 생성되는 싱글톤 패턴을 가진다. 다양한 데이터 소스를 하나의 엔트리 포인트로 프로그래밍 가능하다는 장점이 있다. 

 

여기서 master()는 스파크 애플리케이션이 실행될 마스터를 설정하는 값으로 스파크 어플리케이션이 어디에서, 어떻게 실행될지를 결정한다. 

 

local[*]에서 local은 Spark 애플리케이션을 현재 머신의 로컬 모드에서 실행하라는 뜻이다. 즉, 클러스터 모드가 아닌 단일 머신에서 멀티 스레드로 실행되게 하라는 뜻이 된다. [*]로컬 모드에서 사용할 수 있는 모든 코어를 사용하라는 의미이다. 해당 부분에 특정 숫자를 넣으면 해당 숫자 만큼의 코어를 사용하게 된다. 

 

 

 

더 많은 옵션은 공식 문서에 잘 나와 있다

 

이슈가 있어서..! 크롤링한 데이터를 HDFS로 보내는 데에는 실패했기 때문에 같은 형식의 데이터를 HDFS로 보냈을 때, 잘 전송 되어지는 지를 확인해봤다. 

 

test.py

from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf() \
        .set("spark.hadoop.dfs.client.max.block.size", "134217728") \
        .set("spark.hadoop.dfs.client.rpc.max-size", "512000000")

# Spark 세션 생성
spark = SparkSession.builder \
    .appName("testSendData") \
    .config(conf=conf) \
    .getOrCreate()

# 예시 DataFrame 생성
data = [("정보보호", "링크1"), ("빅데이터", "링크2")]
columns = ["info", "link"]


df = spark.createDataFrame(data, columns)

#df = df.repartition(100)

# HDFS에 저장, 경로가 존재하지 않으면 자동으로 생성
df.write.csv("hdfs://hdfs-namenode:8020/user/testSaveData/test.csv", header=True)

# Spark 세션 종료
spark.stop()

 

해당 파이썬 파일을 실행하면 데이터 전송 바(?) 같은 그래프가 나오고 데이터 전송이 완료 된다 

 

하둡 네임 노드 컨테이너 안으로 들어가서 다음 명령어를 입력하면 파일이 저장된 경로를 알려준다. 

 

hdfs dfs -ls hdfs://hdfs-namenode:8020/user/testSaveData/

 

hdfs dfs -cat hdfs://hdfs-namenode:8020/user/testSaveData/test.csv/*

 

데이터를 출력하면 잘 전송된 것을 확인할 수 있다! 

 

사실 처음에는 그냥 test.csv까지만 입력했었는데 그 경로도 폴더였다. 처음에는 왜 폴더인거지? 하고 생각해봤는데 HDFS가 분산 저장 시스템이어서 그랬던 거다.. 

 

💡정리 

1. 스파크 세션으로 데이터 프레임을 HDFS에 전송

2. 각 파일이 여러 HDFS 블록 단위로 나누어져서 데이터 노드에 분산 저장,
   네임 노드에서는 메타 데이터(파일 및 디렉토리 구조, 각 블록 위치 등)만 저장 

3. hdfs dfs -cat과 같은 명령어를 사용하여 (네임노드로) 파일 내용 요청 

4. 요청을 받은 파일의 메타 데이터를 확인, 해당 파일이 저장된 데이터 노드 정보 제공

5. 네임 노드에게 제공 받은 데이터 노드의 정보를 HDFS 클라이언트에서 처리
→ 지정된 데이터 노드에서 실제 데이터 블록을 읽고, 블록을 재조합하여 원래 파일로 보여줌 

 

 


 

 

확실히 미니 플젝(이걸 미니 플젝이라고 해도 되는건지 모르겠지만요)을 하니까 배우는 느낌

하둡도 처음 써보고(...) 스파크도 물론 맛보기지만 처음 써보고(...) 근데 짱 재밌어!

사실 도커 파일 작성하는 것도 너무 헷갈리고, 너무 어렵고, 뭐라는지 하나도 모르겠고 그랬는데

이번에 이거 하면서 이제 조금 알 것 같음 하하 

더 정진하겟슴다 다음엔 HDFS에서 데이터 가져와서 스파크로 집계하고 몽고 디비 적재까지를 목표로..

최대한 빠른 시일 내에 오겠습니다 🙇‍♀️🙇‍♀️

 

 

참고

https://surgach.tistory.com/86

https://velog.io/@kjw9684/spark-2-3.-Spark-%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%A8-%EA%B5%AC%EC%A1%B0

https://spark.apache.org/docs/latest/submitting-applications.html#master-urls

https://goateedev.tistory.com/97