[Apache Airflow] ShortCircuitOperator

less than 1 minute read


ShortCircuitOperator

  • Condition에 따라 다음 Task의 실행 여부를 결정하고자 할 때 사용
  • Operator 내의 Python 함수가 True인 경우 다음 Task 진행, 아닐 경우 Dag을 종료시킴
    • check_s3의 s3_data_load가 True인 경우 kid_wordcount Task가 실행
    • check_s3의 s3_data_load가 False인 경우 이 후의 Task는 모두 Skip 됨
...
from airflow.operators.python import ShortCircuitOperator
...

...
def s3_data_load():
    """check whether data was crawled"""
    s3_client = boto3.client(
        "s3", 
        aws_access_key_id = Variable.get("AWS_ACCESS_KEY_ID"),
        aws_secret_access_key = Variable.get("AWS_SECRET_ACCESS_KEY")
    )
    today = datetime.strftime(datetime.now() + timedelta(hours=9), "%Y-%m-%d")
    Bucket, path = Variable.get("Bucket"), f'{Variable.get("kid_news_dir")}/{today}'

    res = s3_client.list_objects_v2(Bucket=Bucket, Prefix=path, MaxKeys=1)
    return 'Contents' in res
...

...
# task to check whether data exists
check_s3 = ShortCircuitOperator(
    task_id="check_s3",
    python_callable=s3_data_load,
    dag=dag)
...

...
kid_news_scrapy >> check_s3 >> kid_wordcount

ref