DataEngineering

[Apache Airflow] 웹 크롤링 DAG 작성하기

녜잉 2024. 9. 15. 19:55

 

(복습을 위해 살펴보는) Airflow 구조 

Scheduler

  • 스케줄된 Workflow(DAG)을 트리거하고 제출된 Task를 실행(조건을 만족할 경우, Task를 실행할 수 있게 함)
  • executor에게 Task를 제공해주는 역할을 수행 
  • 생성된 DAG를 지속적으로 모니터링하고, 종속성 및 타이밍 구성에 따라 실행할 작업을 예약한다

 

Executor

  • Task를 수행하는 역할 
  • 스케줄러와 통신하여 실행할 작업에 대한 정보를 받고, 다음 작업을 실행하는 데 필요한 프로세스나 컨테이너를 시작한다
  • 인프라나 요구사항에 따라 LocalExecutor, CeleryExecutor, KubernetesExecutor 등 다양한 유형의 실행자가 존재

 

Worker

  • Executor가 할당한 작업을 수행하는 구성 요소 (실제 Task 실행 주체) 
  • 선택한 Executor에 따라 별도의 프로세스나 컨테이너일 수 있으며, 작업에 정의된 실제 코드나 스크립트를 실행하고 해당 상태를 다시 executor에게 보고한다 

 

WebServer

  • DAG와 Task의 동작을 트리거하고 디버그 하기 위한 사용자 인터페이스
  • DAG 시각화, 모니터링 등 관리 가능 및 이슈 해결을 위한 대시보드 제공

 

Metadata Database

  • scheduler, executor, Web Serve에서 상태를 저장하기 위해 사용하는 데이터베이스
  • 작업 및 실행 기록에 대한 정보를 저장 
  • PostgerSQL, MySQL, SQLite 등을 지원 

 

(복습을 위한) Airflow DAG 작성해보기 

from airflow import DAG
from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta
from textwrap import dedent

with DAG(
    "tutorial",
    default_args={
        "depends_on_past": False,
        "email": ["airflow@example.com"],
        "email_on_retry": False,
        "email_on_failure": False,
        "retries": 1, 
        "retry_delay": timedelta(minutes=5),
        "queue": "bash_queue",
        "pool": "backfill",
        "priority_weight": 10,
        "end_date": datetime(2025, 1, 1),
        "wait_for_downstream": False,
        "sla": timedelta(hours=2),
        "execution_timeout": timedelta(seconds=300),
        #"on_failure_callback": some_function or list of functions
        #"on_success_callback": some_function or list of functions
        #"on_retry_callback: some_function or list of functions
        #"sla_miss_callback": some_function or list of functions
        "trigger_rule": "all_success"
    },
    
    description="A simple tutorial DAG",
    schedule_interval="0 0 * * *",
    start_date=datetime(2024, 9, 14),
    catchup=False,
    tags=["example"],
) as dag:
    
    t1 = BashOperator(
        task_id = "print_date",
        bash_command="date",
    )
    
    t2 = BashOperator(
        task_id = "sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    
    t3 = BashOperator(
        task_id = "templated",
        depends_on_past=False,
        bash_command=templated_command,
    )
    
    t1 >> [t2, t3]

 

with DAG(...) as dag: 구문

  • DAG을 컨텍스트로 생성하고 Task를 정의할 수 있게 함

default_args 딕셔너리 

  • DAG 내의 각 Task에 공통으로 적용될 기본 설정 값 정의
default_args={
        "depends_on_past": 이전 Task 실행의 성공 여부에 따라 현재 Task 실행 여부 결정 
        "email": Task 실패 시 이메일을 받을 사람들의 목록 정의 
        "email_on_retry": Task가 재시도될 때 이메일을 보낼지 여부 설정 
        "email_on_failure": Task가 실패했을 때 이메일 보낼지 여부 설정 
        "retries": Task 실패 시 재시도할 횟수 지정 
        "retry_delay": Task가 실패한 후 재시도하기 전, 기다리는 시간 지정 
        "queue": Task가 실행될 작업 큐 지정 
        "pool": 리소스 제한을 위해 사용되는 풀 지정 
        "priority_weight": Task 우선순위 지정, 값이 클 수록 높은 우선순위이며 리소스가 제한 된 경우 우선적으로 실행 
        "end_date": Task가 더 이상 실행되지 않는 종료 날짜 지정 
        "wait_for_downstream": 현재 Task 성공 시, 하위 Task가 모두 완료될 때까지 기다릴지 여부 설정 
        "sla": 서비스 수준 계약 시간 
        "execution_timeout": Task가 완료되어야 하는 시간, 지정된 시간 내에 완료되지 않으면 실패로 간주 
        "on_failure_callback": Task 실패 시 호출되는 콜백 함수 
        "on_success_callback":Task가 성공적으로 완료되었을 때 호출되는 콜백 함수 
        "on_retry_callback:  Task가 재시도될 때 호출되는 콜백 함수 
        "sla_miss_callback": SLA 미준수 시 호출되는 콜백 함수 
        "trigger_rule": Task가 실행될 조건 정의 
    }

 

Task정의 

    t1 = BashOperator(
        task_id = "print_date",
        bash_command="date",
    )
    
    t2 = BashOperator(
        task_id = "sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    
    t3 = BashOperator(
        task_id = "templated",
        depends_on_past=False,
        bash_command=templated_command,
    )
    
    t1 >> [t2, t3] #t1 성공 시 t2와 t3 병렬 실행

 

 


 

 

AWS EC2에 Airflow설치하기 

 

EC2 내에서 가상환경을 설정하고 진행하기 위해 필요한 몇가지 설정을 진행해줬다

sudo apt-get update
sudo apt-get install python3-virtualenv

virtualenv airflow
cd airflow
source bin/activate #가상환경 활성화

pip install apache-airflow
export AIRFLOW_HOME=~/airflow
echo $AIRFLOW_HOME

 

#db 초기화
airflow db init

#사용자 계정 생성
airflow users create \
	--username admin \
    --firstname {firstname}  \
    --lastname {lastname} \
    --role Admin \
    --email {email주소}

 

Airflow Web UI를 8080 포트에 띄우기 위해 인바운드 규칙에서 8080 포트를 허용하도록 인바운드 규칙을 수정한다.

 

 

airflow webserver --port 8080

 

퍼블릭 IPv4 주소:8080으로 들어가면 Airflow 로그인 페이지가 나온다. 거기서 이전에 유저를 생성하며 만들었던 대로 입력하면

 

이렇게 웹 서버가 잘 동작하는 것을 확인할 수 있다! 

mkdir dags

#sudo apt-get update
#sudo apt install nano

nano airflow.cfg

 

airflow.cfg 파일을 살펴보면 dags_folder 변수에 DAG가 저장되는 디렉토리를 지정한다는 걸 알수 있다. 기본 값은 /home/ubuntu/airflow/dags 이다. dags 파일을 mkdir로 생성한다. 

 

+) 기본 예제 DAG를 삭제(?)하기 위해 airflow.cfg 파일 load_examples 변수를 False로 설정한다. 

 

 

DAG 파일 작성하기 

  • start → get_info_task → msg → end 순으로 실행 
  • start, end로 각각 시작과 끝에 메시지 출력
  • get_info_task로 클롤링 파이썬 스크립트 실행
  • msg로 실행 결과 출력 
  • 1시간에 1번씩 DAG 실행 
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule

from datetime import datetime, timedelta
import sys, os

def print_result(**kwargs):
    r = kwargs["ti"].xcom_pull(key='result_msg')
    print("message:", r)

with DAG(
    "crawling-recruiting",
    
    default_args = {
        'depends_on_past': False,
        'email' : ['likeeu23@naver.com'],
        'email_on_failure': True,
        'email_on_retry': True,
        'retries': 1,
        'retry_delay': timedelta(minutes=30),
    },
    
    description="job opening crawling",
    schedule=timedelta(hours=1),
    start_date=datetime(2024, 9, 15),
    catchup=False,
    tags=['crawling_example'],
    
) as dag:
    
    start = BashOperator(
        task_id ='start',
        bash_command='echo "start"',
    )
    
    get_info_task = BashOperator(
        task_id = 'run_get_info',
        bash_command='python3 crawling_recruiting.py',
    )
    
    msg = PythonOperator(
        task_id = 'msg',
        python_callable=print_result, 
        provide_context=True,
    )
    
    complete = BashOperator(
        task_id = 'complete',
        bash_command='echo "complete!" ',
    )
    
    start >> get_info_task >> msg >> complete

 

dags list로 확인하면 dag가 잘 등록 된 것을 확인할 수 있다 

 

크롤링 파일 작성하기

크롤링 알못이어서 코드 참고했습니다..! (https://github.com/sesac-google-ai-1st/saramin-repo-2/blob/main/crawling.py)

# -*- encoding: euc-kr-*-
import time
import selenium
from selenium import webdriver
import pandas as pd
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service as ChromeService

options = webdriver.ChromeOptions()
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
options.add_argument('log-level=3')

import warnings
warnings.filterwarnings('ignore')

driver = webdriver.Chrome()
url = 'https://www.saramin.co.kr'
driver.get(url)
time.sleep(1.5)

search ="정보보호"

driver.find_element(By.CSS_SELECTOR, 'button#btn_search.btn_search').click()
time.sleep(1.5)

search_bax = driver.find_element(By.TAG_NAME, 'input')
search_bax.send_keys(search)
time.sleep(1.5)

driver.find_element(By.CSS_SELECTOR, 'button#btn_search_recruit.btn_search').click()
time.sleep(1.5)

info_list_all = driver.find_elements(By.XPATH, '//*[@id="recruit_info_list"]/div[1]/div')
info_list_addr = driver.find_elements(By.XPATH, '//*[@id="recruit_info_list"]/div[1]/div/div/h2/a')
    
next_buttom = len(driver.find_elements(By.XPATH, '//*[@id="recruit_info_list"]/div[2]/div/a'))

df = pd.DataFrame()


for i in range(10):
    info_list_all
    info_list_addr
    
    info_list_all_inner = [i.get_attribute('innerText') for i in info_list_all]
    time.sleep(3)
    
    info_list_addr_href = [i.get_attribute('href') for i in info_list_addr]
    time.sleep(3)
    
    df = pd.DataFrame({
        'info': info_list_all_inner,
        'link': info_list_addr_href
    })
    
    time.sleep(5)
    
    try:
        for j in range(next_buttom):
            refix_buttom = driver.find_elements(By.XPATH, '//*[@id="recruit_info_list"]/div[2]/div/a')[j]
            time.sleep(3)
            refix_buttom.click()
            time.sleep(3)
            
            time.sleep(10)
            
            df = pd.concat([df, pd.DataFrame({
                'info': info_list_all_inner,
                'link': info_list_addr_href
            })])
            
            time.sleep(5)
            
    except:
        break
    
    finally:
        df.to_csv('data.csv', index=False)
        driver.close()

 

이렇게 잘 크롤링 된 것을 확인할 수 있었다! 

 

 

ubuntu에서 Selenium 실행할 수 있도록 환경 세팅하기 

아직 ubuntu에서는 크롬 드라이버를 설치해줘야 한다길래,,,드라이버 설치도 함께 진행해줬다. 

apt-get update
wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb
sudo apt install ./google-chrome-stable_current_amd64.deb

google-chrome -version
#sduo apt install unzip

wget https://storage.googleapis.com/chrome-for-testing-public/128.0.6613.137/linux64/chromedriver-linux64.zip
unzip chromedriver-linux64.zip

 

https://dalgalim.tistory.com/entry/%ED%81%AC%EB%A1%AC%EB%93%9C%EB%9D%BC%EC%9D%B4%EB%B2%84-%EC%84%A4%EC%B9%98

 

크롬드라이버 설치

크롬드라이버 다운로드 https://googlechromelabs.github.io/chrome-for-testing/https://andamiro1.tistory.com/31 - 크롬 드라이버 Chromedriver 128버전 리눅스 64비트(Chromedriver version 128 Linux 64 bit)- https://storage.googleapis.com

dalgalim.tistory.com

 

options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')

driver=webdriver.Chrome(excutable_path='./chromedriver-linux64/chromedriver', chrmoe_options=options)

 

크롤링 코드에 ubuntu환경에서 사용하기 위해 필요한 내용을 추가해주었다. 

 

 

그러고 냅다 트리거 시키기면!! 정의해놓은대로 DAG가 실행된다.

근데 크롤링 하는 task 상태가 up_for_retry여서 로그를 확인해봤다. 

airflow.exceptions.AirflowException: Bash command failed. The command returned a non-zero exit code 2.

 

BashOperator 부분에서 명령어 실패했다는 건데 뭔가 느낌적으로 무좍건 크롤링 코드 실행하는 부분에서 오류 났을 것 같음!

 

그래서 수정하려고 했는데..

 

 

왜 무한 로딩인데요 선생님..

 

 

오류 난 부분 수정하고.. 크롤링한 파일 S3에 적재해오는 건 다음에 해오겠습니다....


 

 

참고:

https://velog.io/@starbirdnara/Ubuntu-%ED%81%AC%EB%A1%A4%EB%A7%81%EC%9D%84-%EC%9C%84%ED%95%9C-selenium-%EC%82%AC%EC%9A%A9

https://dalgalim.tistory.com/entry/%ED%81%AC%EB%A1%AC%EB%93%9C%EB%9D%BC%EC%9D%B4%EB%B2%84-%EC%84%A4%EC%B9%98

https://github.com/sesac-google-ai-1st/saramin-repo-2

https://velog.io/@me529/%EB%8D%B0%EC%9D%B4%ED%84%B0%EC%97%94%EC%A7%80%EB%8B%88%EC%96%B4%EB%A7%81-Airflow-%EC%A0%81%EC%9A%A9%ED%95%B4%EB%B3%B4%EA%B8%B0#--get_url_task--selenium-%ED%81%AC%EB%A1%A4%EB%A7%81

https://velog.io/@dbgpwl34/EC2ubuntu%EC%97%90-airflow-%EC%84%A4%EC%B9%98