[Apache Airflow] Airflow - MySQL Data Extract

1 minute read


Airflow - MySQL Data Extract

  • Required Package Installation
  • Airflow - MySQL(local on Docker) Extract
  • Airflow - MySQL(EC2) Extract

Required Package Installation

  • mysql은 homebrew를 통해 설치 및 .zshrc에 환결설정 등록
# venv 실행 후
pip3 install mysql-connector-python
pip3 install mysqlclient
pip3 install apache-airflow-providers-mysql

# mysql provider 설치 확인
airflow providers list

Airflow - MySQL(local on Docker) Extract

from datetime import datetime

from airflow.models import DAG
from airflow.sensors.sql import SqlSensor
from airflow.hooks.mysql_hook import MySqlHook
from airflow.operators.python import PythonOperator

import pandas as pd

def _extract_member_from_mysql():
    # Get hook
    mysql_server = MySqlHook(mysql_conn_id="mysql_conn")
    # Execute query
    df = mysql_server.get_pandas_df(sql="SELECT * FROM member;")

    # Generate unique filename
    base_file_path = "tmp/member"
    path = "{}_{}.csv".format(base_file_path, datetime.now().strftime("%Y-%m-%d-%H-%M-%S"))
    # save file
    df.to_csv(path, index=False)

with DAG(
    "member_processing",
    schedule_interval="@daily",
    start_date=datetime(2022,1,11),
    catchup=False
) as dag:

    # check sql
    is_sql_available = SqlSensor(
        task_id="is_sql_available",
        conn_id="mysql_conn",
        sql="SELECT * FROM member;"
    )

    # extract data from mysql
    extract_member_mysql = PythonOperator(
        task_id="extract_member_mysql",
        python_callable=_extract_member_from_mysql
    )

is_sql_available >> extract_member_mysql
  • Add Connection in Airflow Webserver
# airflow webserver - Admin - Connections - + (추가)
# 1) Conn Id: python dag파일의 mysql_conn_id 이름
mysql_conn
# 2) Conn Type: 해당하는 connection
MySQL
# 3) description: 자유롭게 작성
Connection to local MySQL
# 4) Host:
0.0.0.0
# 5) Schema: DB 스키마
airflow
# 6) Login: DB id
root
# 7) Password: DB password 
1234
# 8) Port:
3306
# 확인

Airflow - MySQL(EC2) Extract

  • 위에서 mysql_conn_id만 바꾸기
...
def _extract_member_from_mysql():
    # Get hook
    mysql_server = MySqlHook(mysql_conn_id="ec2_mysql_conn")
...
with DAG(
    "ec2_member_processing",
...
) as dag:

    is_sql_available = SqlSensor(
        ...
        conn_id="ec2_mysql_conn",
        ...
    )
...

is_sql_available >> extract_member_mysql
  • Add Connection in Airflow Webserver
# airflow webserver - Admin - Connections - + (추가)
# 1) Conn Id: python dag파일의 mysql_conn_id 이름
ec2_mysql_conn
# 2) Conn Type: 해당하는 connection
MySQL
# 3) description: 자유롭게 작성
Connection to ec2 MySQL
# 4) Host:
ec2-15-164-164-229.ap-northeast-2.compute.amazonaws.com
# 5) Schema: DB 스키마
airflow
# 6) Login: DB id
jisu
# 7) Password: DB password 
********
# 8) Port:
3306
# 확인

ref