이전 포스팅을 통해 SparkSubmitOperator을 사용해보았다. 하지만 포스팅 말미에도 적어놓았지만 SparkSubmitOperator의 이슈때문에(yarn queue를 지정하기 어려운점) BashOperator를 상속하여 SparkBashOperator를 만들어 보았다.
BashOperator를 이용하여 spark_submit을 직접 호출하는 형태임으로, 상황에 따라 Spark Binary, Hadoop Binary, Hadoop Config 등 설정이 필요하다.
전체 코드
from airflow.operators.bash_operator import BashOperator
from airflow.utils.decorators import apply_defaults
import re
class SparkBashOperator(BashOperator):
@apply_defaults
def __init__(
self,
spark_opts=[],
driver_cores=1,
driver_memory="2g",
executor_cores="5",
num_executors="1",
executor_memory="5g",
max_attempts=1,
yarn_queue="root.default",
spark_class="",
jar="",
keytab="",
principal="",
job_args=[],
*args, **kwargs) -> None:
super(SparkBashOperator, self).__init__(bash_command="", *args, **kwargs)
self.driver_cores = driver_cores
self.driver_memory = driver_memory
self.executor_cores = executor_cores
self.nun_executors = num_executors
self.executor_memory = executor_memory
self.max_attempts = max_attempts
self.yarn_queue = yarn_queue
self.spark_class = spark_class
self.jar = jar
self.keytab = keytab
self.principal = principal
self.job_args = job_args
self.spark_opts = spark_opts
def execute(self, context):
command = """
spark-submit --master yarn \
--deploy-mode cluster \
--driver-cores {driver_cores} \
--driver-memory {driver_memory} \
--executor-cores {executor_cores} \
--num-executors {nun_executors} \
--executor-memory {executor_memory} \
--keytab {keytab} \
--principal {principal} \
--conf spark.yarn.maxAppAttempts={max_attempts} \
{spark_opts} \
--queue {yarn_queue} \
--class {spark_class} \
{jar} {job_args}
""".format(driver_cores=self.driver_cores,
driver_memory=self.driver_memory,
executor_cores=self.executor_cores,
nun_executors=self.nun_executors,
executor_memory=self.executor_memory,
max_attempts=self.max_attempts,
yarn_queue=self.yarn_queue,
spark_class=self.spark_class,
jar=self.jar,
keytab=self.keytab,
principal=self.principal,
job_args=' '.join(self.job_args),
spark_opts=' '.join(self.spark_opts))
self.bash_command = re.sub("\s\s+", " ", command)
super(SparkBashOperator, self).execute(context)
코드는 여기에서 확인가능하다.
사용법
default_args = {
'start_date': datetime(2015, 12, 1),
'retries': 0,
'catchup': False,
'retry_delay': timedelta(minutes=5),
}
spark_args = dict(driver_cores=1,
driver_memory="2g",
executor_cores="5",
num_executors="1",
executor_memory="5g",
max_attempts=1,
yarn_queue="default",
spark_opts=["--conf \"spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps\"",
"--conf spark.eventLog.enabled=false"
],
spark_class="className",
jar="jar path",
keytab=var_json['keytab'],
principal=var_json['principal'],
job_args=['--table=tableName',
'--prefix=prefix',
],
)
dag = DAG('spark', default_args=default_args, schedule_interval="@once")
cdl_to_cdp = SparkBashOperator(
task_id='spark_bash_operator',
**spark_args,
dag=dag)
'OpenSource > Airflow' 카테고리의 다른 글
[Airflow] Local 개발환경 설정(2)_Dag 개발 (0) | 2020.02.12 |
---|---|
[Airflow] Local 개발환경 설정(1)_설치 (0) | 2020.02.12 |
[Airflow] SparkSubmitOperator를 이용한 spark Job Submit (0) | 2020.02.12 |
[Airflow] macOS catalina에서 hostname does not match this instance's hostname 에러 처리 (0) | 2020.02.12 |
[Airflow] Scheduler SPOF(Single Point Of Failure) 제거하기 (0) | 2020.02.11 |