17.02.2020       Выпуск 322 (17.02.2020 - 23.02.2020)       Статьи

Airflow на примере

Как начать использовать Airflow


Экспериментальная функция:

Ниже вы видите текст статьи по ссылке. По нему можно быстро понять ссылка достойна прочтения или нет

Просим обратить внимание, что текст по ссылке и здесь может не совпадать.

Apache Airflow is a very interesting, popular and free tool to create, manage and monitor workflows, for example if you want to do ETL (Extract / Transform / Load) on data.

This sort of enterprise software often may seem complicated or overly unrelated to our everyday experience as developers but ... is it, really? How about if I just want to watch some TV shows? And experiment with some enterprise-level software at the same time?

Let's do that by learning how to use Airflow to watch TV.

Caveat: This post was originally a twitter thread, that's why all the examples are images and you can't copy/paste them. But hey, at least they are short. Also, typos, because I really just did this while tweeting it, no preparation beforehand.

Just in case: I did not download any "Star Trek: Picard" espisodes, and I have a Prime video subscription, so I don't need to download them via torrent. OTOH, if Sir Patrick ever reads this (which he won't): good job, sir!

This is a script that gives you the information about the latest already aired episode of a TV series.

And this is a script that gives you the link to a torrent to download that episode.

This, on the other hand, is a script to download that torrent.

Of course, one thing about this script is not like the other scripts.

While the others take a couple of seconds to run, this one may take hours or days. But let's ignore that for now. This is a script that moves your downloaded file into a nice directory hierarchy.

However, I think this is nicer because it does the same thing but with names "guaranteed" to be right, more uniform, and transcodes it to, say, something a chromecast would like.

I could add extra tiny scripts that get subtitles for the languages you like and put them in the right location, and so on, but you get the idea.

Basically: it's easy to automate "I want to watch the latest episode of Picard"

However, it's totally impractical, because:

1) You have to go and tell it to get it
2) It takes hours
3) It may fail, and then you have to do it again
4) It will take hours again

But what if there was a way to define this set of tasks so that they run automatically, you know when they are working, when they start and when they finish, and you have to do nothing except waiting for a message in telegram that tells you "go watch Picard"? And that's where Apache Airflow enters the picture.

If you have a script that is a number of steps which depend on one another and are executed in order then it's possible to convert them into very simple airflow DAGs. Now I will take a little while to learn exactly HOW to do that and will continue this thread in a bit. Because, really, ETL (Extract / Transform / Load) is not as complicated as it may appear to be in most cases. BTW, if you want to have airflow with Python 3.8 (because it's nicer)

Now, this may look complicated, but really, I am defining a DAG (Directed Acyclic Graph) or "thingies connected with arrows that has no loops in it"

What are the "thingies" inside an airflow DAG? Operators. They come in many flavors, but I am using this one now, which sets up a venv, runs a function, then removes everything. Nice and clean. (airflow.apache.org/docs/stable/_a…) So, let's convert this into an Airflow operator.

This is a script that gives you the information about the latest already aired episode of a TV series. pic.twitter.com/1IpxaskWTU

— Roberto Alsina (@ralsina) February 16, 2020

It's not hard! operators are (in the case of python operators) simply functions.

Details: do all the imports inside the function.

Have a list of requirements ready if you require things.

Now I need to put this operator inside the DAG I created earlier. Again, it's a matter of declaring things.

So, now that we have a (very stupid, one node, no arrows) DAG ... what can we do with it?

Well, we can make sure airflow sees it (you need to tell airflow where your dags live)

We can check that our DAG has some task in it!

A task is just an instance of an Operator, we added one, so there should be one.

And of course, we can TEST the thing, and have our task do its trick. Note that I have to pass a date. We could even use that in a 0.0.2 version to check episodes we missed!

Hey, that worked! (BTW, of course it did not work THE FIRST TIME, come on).

Backfill means "start at this date and run whatever would have run if we had actually started at that date"

Now, if you run "airflow scheduler" and "airflow webserver" you can see things like this

And yes, that means this task will run daily and report everything in the nice web UI and all that.

But of course a single task is a lame DAG, so let's make it a bit more interesting. Now, let's create a second operator, which will run AFTER the one we had done is finished, and use its output.

It's based on this:

And this is a script that gives you the link to a torrent to download that episode. pic.twitter.com/3p3zLbvWbq

— Roberto Alsina (@ralsina) February 16, 2020

Following the same mechanical changes as before (imports in the function, etc) it will look like this:

This uses two pieces of data from the previous task.
So, we need to do 2 things:

1) Connect the two operators
2) Pass data from one to the other

Connecting two tasks in a DAG is simple. Declare them both and tell airflow they are connected and in what direction.

And of course that's now reflected in the airflow UI. Here you can see that Check_Picard has been successful (dark green border) and Search_Torrent has no status because it never ran (white border)

It's probably worth mentioning that patience is important at this point in the project, since "it runs quickly with immediate feedback" is not one of the benefits we are getting here.

This will be slower than just running the scripts by hand. And now we have the search_torrent task failing.


Well, luckily we are using airflow, so we have logs!

The problem is, Search_Torrent is not getting the right arguments. It "wants" a dict with at least series_name, season and episode in it.

And ... that's now how these things work in airflow :-)

Slight detour, I just ran into this: (issues.apache.org/jira/browse/AI…)

So, I need to rewrite my nice virtualenved operators into uglier not-virtualenved ones.

Shame, airflow, shame. BTW, this is a minor but important part of developing software. Sometimes, you are doing things right and it will not work because there is a bug somewhere else.

Suck it up! So, back to the code, let's recap. I now have two operators. One operator looks for the latest episode of a TV series (example: Picard), and returns all the relevant data in a serializable thing, like a dict of dicts of strings and ints.

The second one will search for a torrent of a given episode of a series. It uses the series name, the season number, and the episode number.

How does it know the data it needs to search? Since it was returned by the previous task in the DAG, it gets it from "context". Specifically it's there as a XCOM with key "return value".

And once it has all the interesting torrent information, then it just adds it to the data it got and returns THAT

How do we use these operators in our DAG?

To search for Picard, I use my tvdb operator with an argument "Picard"

To get the picard torrent, I use my rarbg operator, and use provide_context-True, so it can access the output of the other operator.

And then I hook them up

Does it work? Yes it does!

So, let's make it more intersting. A DAG of 2 nodes with one arrow is not the least interesting DAG ever but it's close!

So, what happens if I also want to watch ... "The Rookie"? I can just setup a new tvdb operator with a different argument, and connect them both to the operator that searches for torrents.

In the airflow graph it looks like this:

And in another view, the tree view, it looks like this:

So, this DAG will look every day for new episodes of Picard or The Rookie, then, if there is one, will trigger a torrent search for it. Adding further operators and tasks to actually download the torrent, transcode it, putting it in the right place, getting subtitles, and so ... is left as an exercise for the reader (I gave some hints at the beginning of the thread)

If you enjoyed this thread, consider hiring me ;-)

Senior Python Dev, eng mgmt experience, remote preferred, based near Buenos Aires, Argentina. (ralsina.me/weblog/posts/l…)

Разместим вашу рекламу

Пиши: mail@pythondigest.ru

Нашли опечатку?

Выделите фрагмент и отправьте нажатием Ctrl+Enter.

Система Orphus