본문 바로가기

OpenSource/Airflow

[Airflow] Custom Operator에서 Jinja template 값 치환 실패시 해결방법

앞서 BashOperator 확장을 통한 Spark Custom Operator 를 통해 Custom Operator를 만들어 보았고, dag 실행시 arguments를 전달하여 실행하는 방법을 통해 arguments를 dag에 전달하는 방법을 알아보았다.

필자는 SparkCustomOperator와 dag arguments 전달 방식을 통해 spark job을 수행하고자 하였는데, 어찌된 일인지 template의 값이 치환되지 않고 문자 그대로 수행되는 바람에 계속 실패를 하게 되어 이를 해결한 내용을 포스팅 하고자 한다.

1. 문제 발생

kudu_sync_task = SparkBashOperator(
        task_id="spark_task",
        driver_cores="{{ dag_run.conf['driver_cores'] }}",
        driver_memory="{{ dag_run.conf['driver_memory'] }}",
        executor_cores="{{ dag_run.conf['executor_cores'] }}",
        num_executors="{{ dag_run.conf['num_executors'] }}",
        executor_memory="{{ dag_run.conf['executor_memory'] }}",
        max_attempts=1,
        ...
    )

위의 수행결과를 보면

[2020-03-19 12:12:38,919] {bash_operator.py:115} INFO - Running command:  spark-submit --master yarn --deploy-mode cluster --driver-cores { dag_run.conf['driver_cores'] } --driver-memory { dag_run.conf['driver_memory'] } --executor-cores { dag_run.conf['executor_cores'] } --num-executors { dag_run.conf['num_executors'] } --executor-memory { dag_run.conf['executor_memory'] } ... 

위와 같이 template 코드가 값으로 치환되지 못하고 그대로 터미널에 문자로 출력되어 수행하려고 시도하여, Spark 잡이 실패하고 있었다.

2. 해결방법

Airflow Templating 문서에도 나와있듯, template_fields 를 이용하여, template 문법을 사용할 필드명을 지정해야 Jinja template을 사용할 수 있다.

따라서 필자는 아래와 같이 수정하였다.

class SparkBashOperator(BashOperator):
    ## template_fields를 이용하여 template을 사용할 필드를 지정
    template_fields = ['driver_cores', 'driver_memory', 'executor_cores', 'num_executors', 'executor_memory', 'job_args']


    @apply_defaults
    def __init__(
            self,
            spark_opts=[],
            driver_cores=1,
            driver_memory="2g",
            executor_cores="5",
            num_executors="1",
            executor_memory="5g",
            max_attempts=1,
            yarn_queue="root.default",
            spark_class="",
            jar="",
            keytab="",
            principal="",
            job_args=[],
            *args, **kwargs) -> None:
        super(SparkBashOperator, self).__init__(bash_command="", *args, **kwargs)

        self.driver_cores = driver_cores
        self.driver_memory = driver_memory
        self.executor_cores = executor_cores
        self.num_executors = num_executors
        self.executor_memory = executor_memory
        self.max_attempts = max_attempts
        self.yarn_queue = yarn_queue
        self.spark_class = spark_class
        self.jar = jar
        self.keytab = keytab
        self.principal = principal
        self.job_args = job_args
        self.spark_opts = spark_opts

        ##이하 생략 https://louisdev.tistory.com/21 참고