DataEngineering

[Apache Airflow] Apache Airflow 기반의 데이터 파이프라인 CH2 정리

녜잉 2024. 7. 25. 22:02

2.1 다양한 소스에서 데이터 수집 

  • 로켓에 대한 뉴스를 한 곳에 수집하길 원하는 John
  • John은 모든 로켓 발사에 대한 정보를 자동으로 수집, 최신의 로켓 발사에 대한 정보를 자동으로 수집하여 최신의 로켓 발사에 대해 간파할 수 있도록 하는 프로그램을 작성하고자 함 

 

2.1.1 데이터 탐색

  • 데이터 수집을 위해 과거및 예정된 로켓 발사 데이터를 수집하는 온라인 저장소 Launch Library 1를 사용
    • Launch Library2는 누구나 사용할 수 있는 오픈 API

John의 로켓 이미지 다운로드 심성 모형

 

2.1.2 첫번째 Airflow DAG 작성

  • Airflow를 사용하면 하나 이상의 단계로 구성된 대규모 작업을 개별 태스크로 분할하고 DAG로 작성할 수 있음
    • 다중 태스크를 병렬로 실행하며 서로 다른 기술 사용 가능 

Airflow에서 John의 심성 모형 워크플는 아래와 같은 세 가지태스크로 정리될 수 있음

  • Download)lucnhes
  • get_pictures
  • notify

Airflow의 태스크에 매핑된 John의 심성 모형

 

 

로켓 발사 데이터 다운로드 및 처리를 위한 DAG 

import json 
import pathlib
import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG( #객체 인스턴스 생성(구체화) - 모든 워크플로의 시작점이 됨 
    dag_id = "download_rocket_lauches", #DAG 이름 정의 
    start_date=airflow.utils.dates.days_ago(14), #DAG처음 실행 시작 날짜 정의 
    schedule_interval=None, #DAG 실행 간격 
)

download_launches = BashOperator( #BashOPerator를 이용, curl로 URL 결과값을 다운로드 
    tast_id = "download_launches",#태스크 이름 정의 
    bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'", 
    dag = dag, 
)

def _get_pictures():
    #경로가 존재하는지 확인 
    pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)
    
    #launches.json 파일에 있는 모든 그림 파일을 다운로드 
    with open("/tmp/launches.json") as f:
        
        launches = json.load(f)
        image_urls = [launch["image"] for launch in launches["results"]]
        
        for image_url in image_urls:
            try:
                response = requests.get(image_url)
                image_filename = image_url.split("/")[-1]
                target_file = f"/tmp/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content)
                
                print(f"Downloaded {image_url} to {target_file}")
                
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be invalid URL.")
                
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")

get_pictures = PythonOperator( #DAG에서 PythonOpertor를 사용하여 파이썬 함수 호출 
    task_id = "get_pictures",
    python_callable=_get_pictures,
    #provide_context=True,
    dag=dag,
)
    
notify=BashOperator(
    task_id = "notify", 
    bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
    dag = dag,
)

download_launches >> get_pictures >> notify #태스크 실행 순서 설정

 

 

1) DAG 객체 인스턴스 생성 

dag = DAG( #객체 인스턴스 생성(구체화) - 모든 워크플로의 시작점이 됨 
    dag_id = "download_rocket_lauches", #DAG 이름 정의 
    start_date=airflow.utils.dates.days_ago(14), #DAG처음 실행 시작 날짜 정의 
    schedule_interval=None, #DAG 실행 간격 
)
  • DAG 클래스는 두 개의 인자를 필요로 함
    • dag_id : Airflow UI에 표시되는 DAG 이름
    • start_date : 워크플로가 처음 실행되는 날짜/ 시간 
    • schedule_interval : DAG의 자동 실행 여부 결정 
  • 모든 오퍼레이터는 선언된 dag객체를 참조하여 인스턴스가 어떤 DAG에 속한 것인지 Airflow에 알림

 

2) 한 개의 Airflow 워크플로 스크립트 혹은 다수의 오퍼레이터를 포함하는 스크립트 구성

 

BashOperator로 객체 인스턴스를 생성

download_launches = BashOperator( #BashOPerator를 이용, curl로 URL 결과값을 다운로드 
    tast_id = "download_launches",#태스크 이름 정의 
    bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'", 
    dag = dag, 
)
  • 각 오퍼레이터는 하나의 태스크를 수행하고, 여러 개의 오퍼레이터가 Airflow의 워크플로 또는 DAG를 구성 
  • 오퍼레이터는 서로 독립적으로 실행될 수도, 순서를 정의 실행 될 수도 있음 
    • 순서를 정의해 실행하는 것을 Airflow에서는 의존성이라고 함

 

해당 코드에서는 사진을 저장하는 디렉터리 위치에 대한 정보가 없는 상태에서 사진을 다운로드 하면 워크플로가 정상적으로 작동하지 않으므로, 태스크를 순서대로 실행하기 위해 태스크 간 의존을 설정해줌 

download_launches >> get_pictures >> notify #태스크 실행 순서 설정

 

 

2.2.1 태스크와 오퍼레이터 차이점 

1) Airflow Operator

  • 단일 작업 수행 
  • BashOperator(배시 스크립트 실행에 사용) / PythonOperator(파이썬 함수 실행에 사용)와 같이 일반적인 작업을 실행에 사용
  • EmailOperator(이메일 발송에 사용) / SimpleHTTPOperator(HTTP 엔드포인트 호출)과 같이 좀 더 특수한 목적을 위해 사용 

이때, DAG는 오퍼레이터 집합에 대한 실행을 오케스트레이션하는 역할을 진행

오케스트레이션에서는 오퍼레이터의 시작과 정지, 오퍼레이터가 완료되면 연속된 다음 태스크의 시작, 오퍼레이터 간의 의존성 보장이 포함됨 

 

해당 책과 Airflow 공식 문서 전반에 걸쳐 오퍼레이터와 태스크는 같은 의미로 혼용되어짐 

 

사용자 관점에서 모두 같은 의미! 즉, 오퍼레이터 = 태스크

오퍼레이터는 단일 작업 수행 기능을 제공하며, Airflow에서는 다양한 서브 클래스를 제공

  

그렇지만 실제로 두 용어에는 차이점이 존재함 

 

2) Airflow Task

  • 작업의 올바른 실행을 보장하기 위한 오퍼레이터의 래퍼 혹은 매니저
  • 오퍼레이터의 상태를관리하고 사용자에게 상태 변경을 표시 
  • 사용자는 오퍼레이터를 활용하여 수행할 작업에 집중하며, Airflow는 태스크를 통해 작업을 올바르게 실행 

 

 

2.2.2 임의 파이썬 코드 실행

PythonOperator를 사용하여 실행할 파이썬 코드를 정의, 로켓 사진을 다운로드 받기 위한 코드를 구현할 수 있음

def _get_pictures():
    #경로가 존재하는지 확인, 디렉터리가 없다면 생성 
    pathlib.path("/tmp/images").mkdir(parents=True, exit_ok=True)
    
    #launches.json 파일에 있는 모든 그림 파일을 다운로드 
    with open("/tmp/launches.json") as f: #로켓 발사 json 파일 열기 
        
        launches = json.load(f) #데이터를 섞기 위해 Json파일로 읽기 
        image_urls = [launch["image"] for launch in launches["results"]] #모든 발사에 대한 image의 URL 값 읽기 
        
        for image_url in image_urls: #모든 이미지를 읽기 위해 URL 루프 
            try:
                response = requests.get(image_url)
                image_filename = image_url.split("/")[-1] #마지막 파일 이름만 가져오기 
                target_file = f"/tmp/images/{image_filename}" #타겟 파일 저장 경로 구성 
                with open(target_file, "wb") as f: #타겟 파일 핸들 열기 
                    f.write(response.content) #파일 경로에 이미지 쓰기 
                
                print(f"Downloaded {image_url} to {target_file}") #결과 출력 
                
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be invalid URL.")
                
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")

get_pictures = PythonOperator( #DAG에서 PythonOpertor를 사용하여 파이썬 함수 호출 
    task_id = "get_pictures",
    python_callable=_get_pictures,
    dag=dag,
)
  • Airflow의 PythonOperator는 파이썬 코드 실행을 담당함
  • BashOperator와 마찬가지로 모든 오퍼레이터는 task_id를 필요로 함
    • task_id는 태스크 실행 시에 참조되며, AirflowUI에도 표시됨 
  • PythonOperator 사용 시 아래의 인자를 포함 함
    • task_id : 태스크(오퍼레이터) 자기 자신, 즉 이름 
    • python_callable : 호출이 가능한 일반 함수(자신이 호출하고자 하는 함수) 
    • dag: 오퍼레이터가 속한 dag 
  • 오퍼레이터를 실행하면 오퍼레이터가 함수를 호출하고, 호출된 함수가 실행됨 

 

2.3 Airflow에서 DAG 실행하기 

Airflow는 아래의 세 가지 컴포넌트로 구성됨

  • 스케줄러
  • 웹 서버
  • 데이터베이스 

 

DAG import에서 오류가 나서.......................

오류 해결하고 해결 방법과 함께 다시 와야지 아오