docker container start airflow
docker container exec -it airflow bash
# install openjdk8 (in root)
apt-get update
apt-get install -y openjdk-8-jdk
# check
java-version
# add in user airflow's .bashrc
su - airflow
JAVA_HOME="/usr/lib/jvm/java-8-openjdk-arm64"
export PATH="$PATH:$JAVA_HOME/bin"
# don't forget
source .bashrc
# install packages for SparkSubmitOperator
pip3 install apache-airflow-providers-apache-spark
# Connection in Airflow-Webserver
# Conn Id: spark_standalone
# COnn Type: Spark
# Host: spark://spark-master
# Port: 7077
# Extra: {"queue": "root.default", "deploy_mode": "cluster", "spark_home":"/usr/bin/spark-3.1.2-bin-hadoop3.2", "spark_binary": "spark-submit", "namespace": "default"}
from airflow.providers.apache.spark.operators import spark_submit
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
...
...
mongo_jar1 = "/home/airflow/airflow/spark/jars/bson-4.0.5.jar,"
mongo_jar2 = "/home/airflow/airflow/spark/jars/mongo-spark-connector_2.12-3.0.1.jar,"
mongo_jar3 = "/home/airflow/airflow/spark/jars/mongodb-driver-core-4.0.5.jar,"
mongo_jar4 = "/home/airflow/airflow/spark/jars/mongodb-driver-sync-4.0.5.jar,"
mysql_jar = "/home/airflow/airflow/spark/jars/mysql-connector-java-8.0.21.jar"
...
...
wordcount = SparkSubmitOperator(
task_id="wordcount",
conn_id="spark_standalone",
application="/home/airflow/airflow/spark/applications/word_count_dump.py",
total_executor_cores="6",
executor_cores="2",
executor_memory="3072m",
num_executors="3",
name="spark-wordcount",
verbose=False,
driver_memory="2g",
jars=mongo_jar1 + mongo_jar2 + mongo_jar3 + mongo_jar4 + mysql_jar,
dag=dag
)
...
# test
airflow tasks test kid_news_wordcount wordcount 2022-03-12