ここに書いてある通りなのだが、なかなか見つからなかったので。
Airflow v2.2以前で、@task
デコレータで生成したタスクを ForLoop で複数生成したい場合:
from airflow.decorators import task
...
@task
def say_hi(name):
print("Hi", name)
names = ['Bob', 'Charlie', 'Jane']
for name in names:
start >> say_hi(name)
これでコードとしては動くが、loop で生成されるtask_id
はsay_hi_1
, say_hi_2
, …と単に連番が振られるだけになってしまう。
そこで以下のようにすれば、好みのtask_id
を設定できる:
...
for name in names:
start >> task(say_hi)(task_id=f"say_hi_to_{name}")(name)
何のことはない、@task
は decorator なので、関数を引数とした関数として使えるということ。
少々読みずらくはなってしまうが、Airflow v2.2 以前で@task
を使うには、いろいろと不便が伴うので仕方ない。
Airflow v2.3 以降について
Airflow v2.3以降では、以下のようにoverride
メソッドでtask_id
を設定できるようだ:
@task
def say_hi(name):
print("Hi", name)
names = ['Bob', 'Charlie', 'Jane']
for name in names:
say_hi.override(task_id=f"say_hi_to_{name}")
start >> say_hi()
また、少し話はそれるが、v2.3 以降なら ForLoop の代わりにexpand
メソッドが使える:
@task
def say_hi(name):
print("Hi", name)
names = ['Bob', 'Charlie', 'Jane']
say_hi.expand(name=names)
が、どうやらtask_id
は最初の例のように、say_hi_1
, say_hi_2
, …と連番で振られてしまい、任意の値は設定できないようだ。
Astronomer のページ でもtask_id
はマップ不可と言及されている:
Some parameters can’t be mapped. For example, task_id, pool, and many BaseOperator arguments.
既存の DAG でexpand
を取り入れる場合、task_id
が変わってしまえば、過去タスクのログが見えずらくなってしまう。
expand
メソッドは便利だが、task_id
が任意の値で設定できない点は注意が必要そうだ。