[Apache Airflow] Airflow - MySQL Data Extract
1 minute read
- Required Package Installation
- Airflow - MySQL(local on Docker) Extract
- Airflow - MySQL(EC2) Extract
Required Package Installation
- mysql은 homebrew를 통해 설치 및 .zshrc에 환결설정 등록
# venv 실행 후
pip3 install mysql-connector-python
pip3 install mysqlclient
pip3 install apache-airflow-providers-mysql
# mysql provider 설치 확인
airflow providers list
from datetime import datetime
from airflow.models import DAG
from airflow.sensors.sql import SqlSensor
from airflow.hooks.mysql_hook import MySqlHook
from airflow.operators.python import PythonOperator
import pandas as pd
def _extract_member_from_mysql():
# Get hook
mysql_server = MySqlHook(mysql_conn_id="mysql_conn")
# Execute query
df = mysql_server.get_pandas_df(sql="SELECT * FROM member;")
# Generate unique filename
base_file_path = "tmp/member"
path = "{}_{}.csv".format(base_file_path, datetime.now().strftime("%Y-%m-%d-%H-%M-%S"))
# save file
df.to_csv(path, index=False)
with DAG(
"member_processing",
schedule_interval="@daily",
start_date=datetime(2022,1,11),
catchup=False
) as dag:
# check sql
is_sql_available = SqlSensor(
task_id="is_sql_available",
conn_id="mysql_conn",
sql="SELECT * FROM member;"
)
# extract data from mysql
extract_member_mysql = PythonOperator(
task_id="extract_member_mysql",
python_callable=_extract_member_from_mysql
)
is_sql_available >> extract_member_mysql
- Add Connection in Airflow Webserver
# airflow webserver - Admin - Connections - + (추가)
# 1) Conn Id: python dag파일의 mysql_conn_id 이름
mysql_conn
# 2) Conn Type: 해당하는 connection
MySQL
# 3) description: 자유롭게 작성
Connection to local MySQL
# 4) Host:
0.0.0.0
# 5) Schema: DB 스키마
airflow
# 6) Login: DB id
root
# 7) Password: DB password
1234
# 8) Port:
3306
# 확인
...
def _extract_member_from_mysql():
# Get hook
mysql_server = MySqlHook(mysql_conn_id="ec2_mysql_conn")
...
with DAG(
"ec2_member_processing",
...
) as dag:
is_sql_available = SqlSensor(
...
conn_id="ec2_mysql_conn",
...
)
...
is_sql_available >> extract_member_mysql
- Add Connection in Airflow Webserver
# airflow webserver - Admin - Connections - + (추가)
# 1) Conn Id: python dag파일의 mysql_conn_id 이름
ec2_mysql_conn
# 2) Conn Type: 해당하는 connection
MySQL
# 3) description: 자유롭게 작성
Connection to ec2 MySQL
# 4) Host:
ec2-15-164-164-229.ap-northeast-2.compute.amazonaws.com
# 5) Schema: DB 스키마
airflow
# 6) Login: DB id
jisu
# 7) Password: DB password
********
# 8) Port:
3306
# 확인
ref