Skip to content
Go back

taskデコレータを使ってtask_idを変更する(Airflow v2.2以前)

Published:  at  12:00 AM

ここに書いてある通りなのだが、なかなか見つからなかったので。

Airflow v2.2以前で、@taskデコレータで生成したタスクを ForLoop で複数生成したい場合:

from airflow.decorators import task
...

@task
def say_hi(name):
    print("Hi", name)

names = ['Bob', 'Charlie', 'Jane']

for name in names:
    start >> say_hi(name)

これでコードとしては動くが、loop で生成されるtask_idsay_hi_1, say_hi_2, …と単に連番が振られるだけになってしまう。

そこで以下のようにすれば、好みのtask_id を設定できる:

...

for name in names:
    start >> task(say_hi)(task_id=f"say_hi_to_{name}")(name)

何のことはない、@taskは decorator なので、関数を引数とした関数として使えるということ。 少々読みずらくはなってしまうが、Airflow v2.2 以前で@taskを使うには、いろいろと不便が伴うので仕方ない。

Airflow v2.3 以降について

Airflow v2.3以降では、以下のようにoverrideメソッドでtask_idを設定できるようだ:

@task
def say_hi(name):
    print("Hi", name)

names = ['Bob', 'Charlie', 'Jane']

for name in names:
    say_hi.override(task_id=f"say_hi_to_{name}")

    start >> say_hi()

また、少し話はそれるが、v2.3 以降なら ForLoop の代わりにexpandメソッドが使える:

@task
def say_hi(name):
    print("Hi", name)

names = ['Bob', 'Charlie', 'Jane']
say_hi.expand(name=names)

が、どうやらtask_idは最初の例のように、say_hi_1, say_hi_2, …と連番で振られてしまい、任意の値は設定できないようだ。

Astronomer のページ でもtask_idはマップ不可と言及されている:

Some parameters can’t be mapped. For example, task_id, pool, and many BaseOperator arguments.

既存の DAG でexpandを取り入れる場合、task_idが変わってしまえば、過去タスクのログが見えずらくなってしまう。 expandメソッドは便利だが、task_idが任意の値で設定できない点は注意が必要そうだ。



Previous Post
Airflowを学ぶにはAirflow Summitの動画を見るのが良い