Spark에서 하는 일을 주기적으로 수행하기 위해 무엇이 있는지 조사했다.

여러 가지 도구들이 있었는데 그 중에서 AirflowLuigi가 좋아보였다.

둘 중에 어떤 걸로 정할 지 고민하다 Airflow로 정했다.

두 가지 모두 좋아보였지만 AirflowApache에서 관리하고 대시보드가 유려해보였다.

Airflow를 설치하는 방법은 Quick Start에 잘 나와 있어 그대로 따라하면 된다.

$ pip install 'apache-airflow[ssh]'

ssh는 원격 서버에 접속하여 spark-submit을 수행하기 위해 설치했다.

$ airflow initdb

~/airflow/airflow.cfg, airflow.db, logs, unittests.cfg 가 만들어진다.

Airflow를 구동할 때 airflow webserver와 같이 커맨드로만 가능해서

다음과 같이 start-airflow.sh, stop-airflow.sh을 만들어서 실행했다.

#!/bin/sh
nohup airflow webserver > /dev/null 2>&1 &
nohup airflow scheduler > /dev/null 2>&1 &
#!/bin/sh
pkill -f airflow

신규로 DAG를 만들기 위해서 ~/airflowdags 디렉토리를 만들고 파일을 추가한다.

$ cd ~/airflow
$ mkdir dags && cd dags
$ touch my_first_dag.py

my_fist_dag.py에 다음과 같이 작성하여 spark-submit을 주기적으로 수행하도록 했다.

# -- coding: utf-8 --
from datetime import timedelta
import airflow
from airflow.models import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook
args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
    dag_id='my_dag',
    default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60),
    tags=['spark']
)
templated_bash_command = """
    spark-submit \
    --class my_class \
    --master spark://spark-master:7077 \
    --executor-cores 2 \
    --executor-memory 2g \
    my_first.jar
"""
hook = SSHHook(
    ssh_conn_id='ssh_default',
    remote_host='spark-master',
    username='username',
    key_file='~/.ssh/id_rsa'
)
run_ssh = SSHOperator(
    task_id='spark_submit_task',
    ssh_hook=hook,
    command=templated_bash_command,
    dag=dag
)
run_ssh

schedule_interval='0 0 * * *'은 하루에 한 번 밤에 수행한다고 문서에 적혀있는데

UTC 시간대로 동작하여 한국 시간으로는 오전 9시에 수행한다.

참고 문헌

  1. http://bytepawn.com/luigi-airflow-pinball.html
  2. https://airflow.apache.org/docs/stable/installation.html
  3. https://airflow.apache.org/docs/stable/scheduler.html#dag-runs