...
from airflow.operators.python import ShortCircuitOperator
...
...
def s3_data_load():
"""check whether data was crawled"""
s3_client = boto3.client(
"s3",
aws_access_key_id = Variable.get("AWS_ACCESS_KEY_ID"),
aws_secret_access_key = Variable.get("AWS_SECRET_ACCESS_KEY")
)
today = datetime.strftime(datetime.now() + timedelta(hours=9), "%Y-%m-%d")
Bucket, path = Variable.get("Bucket"), f'{Variable.get("kid_news_dir")}/{today}'
res = s3_client.list_objects_v2(Bucket=Bucket, Prefix=path, MaxKeys=1)
return 'Contents' in res
...
...
# task to check whether data exists
check_s3 = ShortCircuitOperator(
task_id="check_s3",
python_callable=s3_data_load,
dag=dag)
...
...
kid_news_scrapy >> check_s3 >> kid_wordcount