ApacheAirflow(十二):PythonOperator
- 互联网
- 2025-08-12 08:39:01

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客
🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。
🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频
PythonOperator可以调用Python函数,由于Python基本可以调用任何类型的任务,如果实在找不到合适的Operator,将任务转为Python函数,使用PythonOperator即可。
关于PythonOperator常用参数如下,更多参数可以查看官网:airflow.operators.python — Airflow Documentation
python_callable(python callable):调用的python函数 op_kwargs(dict):调用python函数对应的 **args 参数,dict格式,使用参照案例。 op_args(list):调用python函数对应的 *args 参数,多个封装到一个tuple中,list格式,使用参照案例。PythonOperator调度案例
import random from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator # python中 * 关键字参数允许你传入0个或任意个参数,这些可变参数在函数调用时自动组装为一个tuple。 # python中 ** 关键字参数允许你传入0个或任意个含参数名的参数,这些关键字参数在函数内部自动组装为一个dict。 def print__hello1(*a,**b): print(a) print(b) print("hello airflow1") # 返回的值只会打印到日志中 return{"sss1":"xxx1"} def print__hello2(random_base): print(random_base) print("hello airflow2") # 返回的值只会打印到日志中 return{"sss2":"xxx2"} default_args = { 'owner':'maliu', 'start_date':datetime(2021, 10, 1), 'retries': 1, # 失败重试次数 'retry_delay': timedelta(minutes=5) # 失败重试间隔 } dag = DAG( dag_id = 'execute_pythoncode', default_args=default_args, schedule_interval=timedelta(minutes=1) ) first=PythonOperator( task_id='first', #填写 print__hello1 方法时,不要加上“()” python_callable=print__hello1, # op_args 对应 print_hello1 方法中的a参数 op_args=[1,2,3,"hello","world"], # op_kwargs 对应 print__hello1 方法中的b参数 op_kwargs={"id":"1","name":"zs","age":18}, dag = dag ) second=PythonOperator( task_id='second', #填写 print__hello2 方法时,不要加上“()” python_callable=print__hello2, # random_base 参数对应 print_hello2 方法中参数“random_base” op_kwargs={"random_base":random.randint(0,9)}, dag=dag ) first >> secondApacheAirflow(十二):PythonOperator由讯客互联互联网栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“ApacheAirflow(十二):PythonOperator”
上一篇
黄鹤楼