NIRVANA

[Airflow] 항공권 정보 ETL 파이프라인 만들어보기 본문

DataEngineering

[Airflow] 항공권 정보 ETL 파이프라인 만들어보기

녜잉 2025. 2. 2. 16:51

 

항공권 정보를 3일에 한번씩 크롤링하여 snowflake 테이블에 적재하는 파이프라인을 만들어보고자 한다. 

 

📌DAG Task 정의

  • 데이터 크롤링 및 CSV 저장 (E & T)
  • S3 적재 (L)
  • Snowflake 테이블에 적재 (L)

 

위와 같이 총 3가지의 task로 구성된 DAG를 작성하려 한다. 각 task는 순차적으로 진행되며 

데이터 크롤링 및 csv 저장 → s3 버킷 적재 → Snowflake 테이블 적재 순으로 진행된다. 

 

📌Snowflake 테이블 스키마 정의

프로젝트 기간이 여유롭지 못한 탓에 데이터 크롤링 시에 팀원들과 정했던 규칙이 있었다. Snowflake 테이블 스키마를 구성함에 있어서도 다음과 같이 정했던 규칙을 기반으로 구성했다.

  • 출발지는 무조건 인천(INC)
  • 편도만 취급
  • 항공권 가격, 도착지는 무조건 포함

따라서 편도/왕복 여부 컬럼이나 출발지 컬럼은 따로 포함하지 않았다. 

Columns  Type
id(PK) INT, Auto Increment
date(출발 날짜) VARCHAR
destination (도착지, 공항) VARCHAR
cityName(도착지 도시 이름) VARCHAR
airline(항공사) VARCHAR
price(항공권 가격) INT

 

 

Snowflake에 AirlinetInfo라는 데이터베이스를 만들고 그 아래에 TicketInfo라는 스키마를 만들었다. 그리고 그 아래에 AirlineTicket 테이블을 생성하였다. 

 

데이터가 있는 이유는 이미 DAG 한번 실행해서...입니다

 

 


 

📌항공권 크롤링하기 - 네이버 항공권

항공권 사이트로는 네이버 항공권을 선택했다. 크롤링에 앞서 (최대한 크롤링을 쉽게 하고 싶어서...) 네이버 항공권 사이트를 확인해보았다. 

 

네이버 항공권의 특징은 다음과 같다 

  • 페이지 네이션 없음(항공권 정보가 리스트로 정의, 스크롤하여 내리는 형식)
  • url에 파라미터 값으로
    • 출발지
    • 도착지
    • 출발 날짜(왕복인 경우 도착 날짜 포)
    • faretype = (외국:Y / 국내:YC)

url만 잘 선택하면 셀레니움으로 클릭하거나 할 필요가 없어 보여서 최대한 url을 잘 활용하여 크롤링을 진행해보았다. 

 

 

💡airline_ticket_crawling.py

출발지를 딕셔너리로 정의하여 공항과 도시 이름을 확인할 수 있도록 하였다. 

 

이후 foreign_country의 key로 순회하면서 url의 파라미터 값으로 도착지를 설정하여 해당 도시의 항공권을 크롤링할 수 있도록 하였고, date 역시 크롤링하는 날짜의 값을 넘길 수 있도록 설정하였다. 이때 for문을 다시 사용하여 3일치를 크롤링할 수 있도록 하였다. 

크롤링 함수를 호출할 때, 도시 딕셔너리와 url, 항공권 날짜를 함께 전달하여 필요한 정보를 csv에 저장할 수 있도록 했다. 

def naver_airline_ticke_info_crawling():
    
    foreign_country = {'TYO': '도쿄', 'OKA': '오키나와', 'SPK':'삿포로',
                        'NYC':'뉴욕', 'LAX':'로스앤젤레스', 'SEA':'시애틀', 
                        'PAR':'파리', 'BOD':'보르도', 'NCE':'니스',
                        'BER': '베를린', 'MUC':'뮌헨', 'CGN':'쾰른',
                        'LON': '런던', 'MAN': '맨체스터', 'EDI':'에딘버러'
                        }
    
    #domestic = ['CJU']
    
    #국외
    for destination in foreign_country.keys():
        
        date = datetime.now()
        
        for i in range(3):
            date = date + timedelta(days=i)
            today = date.strftime("%Y%m%d")
            foreign_country_url = f"https://flight.naver.com/flights/international/ICN-{destination}-{today}?adult=1&fareType=Y"
            data_crawling(foreign_country_url, destination, date ,foreign_country )
            time.sleep(5)

 

 


📌S3 버킷에 CSV 파일 적재하기 

s3에 csv 파일을 적재할 때나, snowflake에서 s3에 있는 csv 파일을 읽어서 데이터를 업로드할 때나 자동으로 되게 하려면 패턴을 지정해두는 편이 좋을 것 같아서 csv 파일을 저장할 때 규칙을 

 

airline_crawling_data_{크롤링 날짜}.csv 로 설정하였다. 

 

💡upload_to_s3.py

def load_csv_to_bucket():
    today = datetime.now().strftime("%Y-%m-%d")
    s3_client = conn_to_s3()
    file_path = f'data/airline_crawling_data_{today}.csv'
    bucket_name = 'yr-s3-project'
    #s3_key = 'thrid_project_airticket'
    object_name = f"thrid_project_airticket/airline_crawling_data_{today}.csv"
    
    try:
        s3_client.upload_file(file_path, bucket_name, object_name)
    
    except Exception as e:
        print(f"error:{e}")

 


 

📌S3 버킷에서 Snowflake 테이블로 데이터 적재하기

s3 버킷과 snowflake 연동을 어떻게 하는지는 아래에서 자세하게 작성했으므로 생략했다!

https://nervertheless.tistory.com/227

 

[SnowFlake] S3버킷 연동 및 Json 데이터를 관계형 테이블에 로드

🪣S3버킷과 SnowFlake 연동하기 🪣S3 버킷 생성 + 데이터 업로드아래와 같이 간단하게 yr-s3-project라는 이름을 갖는 bucket을 생성하였다.  S3 > 생성한 버킷 > 권한 > 버킷 정책에서 편집을 눌러 버킷

nervertheless.tistory.com

 

지난 번에는 json 파일을 읽어왔지만 이번에는 csv 파일을 읽어온다는 점이 다르다. 따라서 File Format을 다음과 같이 만들었다. 

CREATE OR REPLACE FILE FORMAT csv_format
  TYPE = 'CSV'
  FIELD_DELIMITER = ','
  SKIP_HEADER = 1;

 

csv 파일은 쉼표로 데이터가 나누어져 있으므로 구분자가 , 임을 명시해주었다. 

 

📌Snowflake 테이블 관리 전략

어떻게 웨어하우스에 있는 테이블을 관리하는 것이 좋을까 고민해보다가 (incremental update vs Full Refresh) 지난 날짜의 항공권 정보는 필요 없다는 것에 초점을 맞추어 테이블 관리 전략을 작성해 보았다.

우리 팀의 경우, 슬랙 봇을 운영할 것이므로 사용자가 슬랙봇에게 항공권 정보를 묻는 이전 시점의 데이터를 가지고 있을 필요가 없다는 특징이 있었다. 

  • 어차피 지난 날의 항공권 정보는 가지고 있을 필요가 없음
  • 데이터를 s3 lake에 csv 파일 형태로 적재해두고(data lake) 데이터가 업데이트 되었을 때 snowflake(data warehouse)에 있는 데이터를 full refresh 형태로 덮어쓰기 진행
    • 즉, Data Lake에는 과거~현재 데이터가 모두 존재
    • Data Warehouse에는 가장 최신 크롤링 시점의 데이터만 존재

 

💡insert_snowflake_from_s3.py

s3의 데이터를 snowflake로 적재하는 모든 과정은 트랜잭션으로 진행했다. 

또한 Full Refresh로 관리하기 위해 이전에 생성한 테이블은 삭제하고, 다시 테이블을 생성하여 COPY INTO 명령어로 csv 파일을 읽어들어 테이블에 데이터를 벌크 형식으로 추가했다. 

### snowfalke로 데이터 업로드 (Full Refresh 구현)
def upload_data_in_table():
    
    today = datetime.now().strftime("%Y-%m-%d")
    
    conn = connect_to_snowflake()
    cur = conn.cursor()
    
    try:
        #트랜잭션 시작
        cur.execute("BEGIN");
        
        #만약 테이블이 있다면 테이블을 삭제
        cur.execute("DROP TABLE IF EXISTS airline_ticket;")
        
        #테이블 생성 
        sql = """
        CREATE TABLE airline_ticket(
        id INT AUTOINCREMENT,
        ticket_date DATE,
        destination VARCHAR(50),
        cityName VARCHAR(50),
        airline VARCHAR(100),
        price INT)
        """
        cur.execute(sql)
        
        #테이블에 데이터 넣기
        sql = f"""
        COPY INTO airline_ticket (ticket_date, destination, cityName ,airline,price)
                FROM @third_project_stage/airline_crawling_data_{today}.csv 
                FILE_FORMAT = (
                    TYPE = CSV,
                    FIELD_OPTIONALLY_ENCLOSED_BY='"',  
                    SKIP_HEADER=1  
                )
                ON_ERROR = CONTINUE;
        """
        
        cur.execute(sql)
        
        #트랜잭션 완료
        cur.execute("COMMIT");
        
        print("데이터 업로드 완료")
        
        conn.commit()
    
    except Exception as e:
        logging.info(f"error: {e}")
        
        #에러가 발생했을 경우 rollback
        cur.execute("ROLLBACK;")

 

 


📌Airflow DAG 작성하기 

위의 파이썬 스크립트는 모두 scripts 폴더 안에 넣어 dag 파일에서 import 하여 사용할 수 있도록 했다. 

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

from scripts.airline_ticket_crawling import *
from scripts.upload_to_s3 import * 
from scripts.insert_snowflake_from_s3 import *


dag = DAG(
    dag_id = 'dataCrawlingTest',
    start_date=datetime(2025, 1, 25),
    catchup= False, 
    tags = ['first_example'],
    schedule= '0 10 */3 * *'  #3일마다, 매일 10시에 
)

#data crawling
airline_ticket_crawling_task = PythonOperator(
    task_id = 'naver_data_crawling',
    python_callable= naver_airline_ticke_info_crawling,
    dag = dag
)

#data upload to s3
upload_to_bucket = PythonOperator(
    task_id = 'bucket_upload',
    python_callable= load_csv_to_bucket,
    dag = dag
)

#s3 copy into snowflake
copy_into_snowflake = PythonOperator(
    task_id = 'datawarehouse_upload',
    python_callable= upload_data_in_table,
    dag = dag
)

airline_ticket_crawling_task >> upload_to_bucket >> copy_into_snowflake

#upload_to_bucket >> copy_into_snowflake

 

태스크는 앞에서 설명한 스크립트 파일 안에 있는 함수를 호출해주기만 했기 때문에 DAG 선언 부분만 자세하게 확인하고 넘어가도록 하겠다. 

 

dag = DAG(
    dag_id = 'dataCrawlingTest',
    start_date=datetime(2025, 1, 25),
    catchup= False, 
    tags = ['first_example'],
    schedule= '0 10 */3 * *'  #3일마다, 아침 10시에 
)

 

dag_id는 dataCrawlingTest로 주었고, catchup은 False로 해두었다. 또한 3일에 한번씩, 아침 10시에 파이프라인이 동작하도록 스케줄링을 해주었다. 

 


 

📌DAG 실행을 위한 기타 설정

airflow DAG 실행에 앞서 CLI 환경에서 크롤링을 진행해야 하므로(...) 필요한 패키지를 다운 받아야 했다. 따라서 airflow Docker 파일을 생성하여 빌드 시 커스텀 된 docker 파일을 사용할 수 있도록 했다. 

 

🐋Dockerfile

FROM apache/airflow:2.9.1

USER root

RUN apt-get update && apt-get install -y \
    curl \
    unzip \
    wget \
    libnss3 \
    libgconf-2-4 \
    libx11-xcb1 \
    libappindicator3-1 \
    fonts-liberation \
    xdg-utils \
    && rm -rf /var/lib/apt/lists/*

# Google Chrome 설치
RUN apt-get update && apt-get install -y wget && \
    wget -q -O google-chrome.deb http://dl.google.com/linux/chrome/deb/pool/main/g/google-chrome-stable/google-chrome-stable_132.0.6834.159-1_amd64.deb \
    && apt install -y ./google-chrome.deb \
    && rm google-chrome.deb

# ChromeDriver 버전 확인 후 설치
RUN apt-get update && \
    apt-get install -y wget unzip && \
    wget -q -O chromedriver.zip https://storage.googleapis.com/chrome-for-testing-public/132.0.6834.159/linux64/chromedriver-linux64.zip && \
    unzip chromedriver.zip 

# 기본 유저로 복귀
USER airflow

 

CLI 환경에서 크롤링을 하기 위해서는 구글 크롬과 설치된 구글 크롬에 맞는 드라이버를 설치해야 했다. 

현재 가장 최신 버전인 구글 크롬은 2025.01.29 업데이트 기준인 133.0.6943.33 인데 가장 최신 드라이버는 132.0.6834.159 였다.. ^_^ 그래서 다운 그레이드 해서 드라이버 기준으로 크롬을 설치할 수 있도록 했다. 

(여기가 제일 힘들었음...진심..)

+) airflow에서 apt 로 install 할때는 루트 유저로 해야 하고 pip 으로 install 할때는 airflow 유저로 해야 하는 이유가 뭘까 진짜.

 

 

🐬docker-compose.yaml

docker-compose.yaml 파일도 조금 수정해주었다. 

 

크롤링 된 데이터가 data 폴더 아래에 저장 될 수 있도록 opt/airflow에 data 폴더를 생성하고, 로컬 컴퓨터와 마운트 해주어서 확인할 수 있도록 해주었다. 

 AIRFLOW_VAR_DATA_DIR: /opt/airflow/data
volumes:
	- ${AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data
    
    
 ///
 
airflow-init:
	mkdir -p /sources/logs /sources/dags /sources/plugins/sources/data
    chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins, data}

 

 


 

📌DAG 실행

airflow 스케줄러 컨테이너 안으로 들어가서 airflowd dag를 실행해주었다. 

docker exec -it {컨테이너 id} /bin/bash
airflow dags test dataCrawlingTest 2025-02-02

 

 

 

DAG를 잘 실행했다는 메시지가 콘솔창에 찍히고, airflow 웹 UI에서도 DAG가 성공적으로 실행 되었음을 확인할 수 있다. 

 

 

snowflake에도 데이터가 잘 적재 되었음을 확인할 수 있다! 

 

 


📌코드에서 개선 및 수정 해보고 싶은 점

1. 현재는 csv 파일이 있는지 확인하고, 있으면 이어서 작성하고 아니면 생성을 하게 되는데 크롤링을 하다가 중간에 실패할 경우 DAG를 다시 실행하면 중복 데이터가 작성된다는 점이 수정되어야 한다. 

 

2. full refresh가 아니라 incremental update인 경우를 가정에서 backfill까지 가능하도록 코드를 수정해보고 싶다. 

 

 

1은 빨리 하고 2는 프로젝트 끝나고 해봐야지...good..👍

 

❤️전체 코드 확인:

https://github.com/YEERRIn/airflow-dag

 

GitHub - YEERRIn/airflow-dag: airflow dag

airflow dag . Contribute to YEERRIn/airflow-dag development by creating an account on GitHub.

github.com