在使用airflow的過程當中須要大量的dag腳本進行性能測試,若是一個個去編寫dag腳本未免太過麻煩,因而想到用python的jinja2模板引擎實現批量腳本生成。python
先經過pip命令安裝jinja2模塊:bash
$ pip install jinja2
而後建立模板文件(模板能夠是任何形式的文本格式,沒有特定擴展名,甚至能夠不要擴展名):性能
dag_template
測試
from datetime import timedelta, datetime import pytz from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator from airflow.models import DAG default_args = { 'owner': 'cord', # 'depends_on_past': False, 'depends_on_past': True, # 'start_date': airflow.utils.dates.days_ago(2), 'wait_for_downstream': True, 'execution_timeout': timedelta(minutes=3), 'email': ['123456@qq.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } tz = pytz.timezone('Asia/Shanghai') dt = datetime(2018, 7, 19, 18, 20, tzinfo=tz) utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None) dag = DAG( '{{ dag_name }}', default_args=default_args, description='my DAG', schedule_interval='*/1 * * * *', start_date=utc_dt ) root = DummyOperator(task_id='root', dag=dag) for i in range(50): i = str(i) task = BashOperator( task_id='task'+i, bash_command= 'echo `date`', dag=dag) task.set_downstream(root)
jinja2中有兩種分隔符: {% ... %}
和{{ ... }}
,其中{% ... %}
用於執行for循環或者賦值語句,{{ ... }}
負責將表達式的值填充到模板中。這裏使用{{ ... }}
用於填充dag文件的dag_id 。code
經過該模板便可批量生成dag腳本文件,生成代碼以下:模板引擎
Tool.py
ip
import os from jinja2 import Environment, FileSystemLoader #獲取模板 env = Environment(loader = FileSystemLoader(searchpath="")) template = env.get_template("dag_template") #刪除已有的生成文件 for f in os.listdir("./output"): path_file = os.path.join("./output", f) if os.path.isfile(path_file): os.remove(path_file) #生成新的文件 for i in range(1, 101): output = template.render({'dag_name' : "benchmark%d" % i}) with open("./output/bm%d.py" % i, 'w') as out: out.write(output)
經過執行Tool.py
便可批量生成dag腳本文件了。rem