[Apache Airflow] Using API

less than 1 minute read


Using API

  • Sensor Operator
  • Add Connection in Airflow Webserver
  • Test Airflow Task

Sensor Operator

  • path - AIRFLOW_HOME/dag/user_processing.py
# user_processing.py
...
from airflow.providers.http.sensors.http import HttpSensor

with DAG(
  ...
) as dag:
    # Sensor Operator
    is_api_available = HttpSensor(
        task_id="is_api_available",
        http_conn_id="user_api",
        # the endpoint of the url
        endpoint="api/"
    )


Add Connection in Airflow Webserver

# http provider was pre-installed when installing airflow
# pip3 install 'apache-airflow-providers-http'

# airflow webserver - Admin - Connections - + (추가)
# 1) Conn Id: python dag파일의 sqlite_conn_id 이름
user_api
# 2) Conn Type: 해당하는 connection
HTTP
# 3) description: 자유롭게 작성
API for getting users
# 4) Host: API URL address (endpoint는 dag file에 존재(api/))
https://randomuser.me/
# 확인

Test Airflow Task

  • allows to test a specific task
    • 1) without checking for dependencies
    • 2) neither storing any medata related to the task
# test my task (airflow tasks test [py_file] [dag_name] [execution_date])
airflow tasks test user_processing is_api_available 2022-01-07