[Apache Airflow] SSHOperator & SparkSubmitOperator

1 minute read


SSHOperator & SparkSubmitOperator

  • SSHOperator
  • SparkSubmitOperator

SSHOperator

  • Spark Cluster의 Job 실행시키기 위한 SSHOperator 설치
  • Spark Cluster는 🔗 링크 참고
  • 추후, 비밀번호가 아닌 id_rsa를 바탕으로 ssh 연결하는 것이 더 좋은 방법으로 보임
docker container start airflow
docker container exec -it airflow bash

# install openssh-client
apt-get update
apt-get install openssh-client

# ssh connect to spark-master's root
ssh root@spark-master

# airflow virtual environment
su - airflow
cd airflow
source ./.venv/bin/activate
pip3 install apache-airflow-providers-ssh

# Connection in Airflow-Webserver
# Conn Id: ssh_spark
# Conn Type: SSH
# Host: spark-master
# Username: root
# Password: 1234
# Port:
# Extra:

# test
airflow tasks test kid_news_wordcount wordcount 2022-03-27

SparkSubmitOperator

  • 결과적으로, Airflow의 Python에는 Spark 쪽 Cluster의 Python과 다른 환경이여서 Job이 정상 작동하지 않음
docker container start airflow
docker container exec -it airflow bash

# install openjdk8 (in root)
apt-get update
apt-get install -y openjdk-8-jdk
# check
java-version

# add in user airflow's .bashrc
su - airflow
JAVA_HOME="/usr/lib/jvm/java-8-openjdk-arm64"
export PATH="$PATH:$JAVA_HOME/bin"
# don't forget
source .bashrc

# install packages for SparkSubmitOperator
pip3 install apache-airflow-providers-apache-spark

# Connection in Airflow-Webserver
# Conn Id: spark_standalone
# COnn Type: Spark
# Host: spark://spark-master
# Port: 7077
# Extra: {"queue": "root.default", "deploy_mode": "cluster", "spark_home":"/usr/bin/spark-3.1.2-bin-hadoop3.2", "spark_binary": "spark-submit", "namespace": "default"}

from airflow.providers.apache.spark.operators import spark_submit
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
...

...
mongo_jar1 = "/home/airflow/airflow/spark/jars/bson-4.0.5.jar,"
mongo_jar2 = "/home/airflow/airflow/spark/jars/mongo-spark-connector_2.12-3.0.1.jar,"
mongo_jar3 = "/home/airflow/airflow/spark/jars/mongodb-driver-core-4.0.5.jar,"
mongo_jar4 = "/home/airflow/airflow/spark/jars/mongodb-driver-sync-4.0.5.jar,"
mysql_jar = "/home/airflow/airflow/spark/jars/mysql-connector-java-8.0.21.jar"
...

...
wordcount = SparkSubmitOperator(
    task_id="wordcount",
    conn_id="spark_standalone",
    application="/home/airflow/airflow/spark/applications/word_count_dump.py",
    total_executor_cores="6",
    executor_cores="2",
    executor_memory="3072m",
    num_executors="3",
    name="spark-wordcount",
    verbose=False,
    driver_memory="2g",
    jars=mongo_jar1 + mongo_jar2 + mongo_jar3 + mongo_jar4 + mysql_jar,
    dag=dag
)
...

# test
airflow tasks test kid_news_wordcount wordcount 2022-03-12

ref