WEBVTT

00:04.240 --> 00:10.760
Going back to the videos that script has created in the data extraction section, we can modify this

00:10.760 --> 00:13.480
code as you see here, to create an airflow Dag.

00:13.520 --> 00:19.200
What we first need to do is to put this script under the DAGs folder, so that it will be mounted to

00:19.240 --> 00:21.840
the Docker containers where airflow is running.

00:22.040 --> 00:26.360
Before doing so, however, create another subfolder and call it API.

00:29.880 --> 00:34.480
Once you do that, you can move the videos that script under the DAGs folder.

00:34.520 --> 00:42.960
Just note I want to make is that this DAGs folder together with the config include logs, plugins and

00:42.960 --> 00:43.600
tests.

00:43.880 --> 00:49.360
They were created in the previous lecture when we had run the docker compose up command.

00:49.520 --> 00:54.280
Now what we need is a way to turn our Python code into airflow tasks.

00:54.840 --> 00:57.720
This is done by using Python decorators.

00:57.960 --> 01:03.930
We will not get too much in detail about Python decorators, but just know that the creators are a way

01:03.930 --> 01:06.410
to wrap a function within another function.

01:06.730 --> 01:12.410
And this wrapping effectively extends the functionality of the base function in order to define an airflow

01:12.410 --> 01:13.370
task decorator.

01:13.730 --> 01:22.130
You would use the at symbol so it would look something like this, followed by the task decorator.

01:22.250 --> 01:26.330
Also, to use this task decorator we need to import it from airflow.

01:26.330 --> 01:34.050
So let's do it right now from airflow dot decorators import task.

01:34.090 --> 01:41.330
Now this decorator creation we need to do it for each of the functions that we have in the script.

01:46.850 --> 01:52.050
Another thing is we need to change in the code is how we are handling the sensitive variables like the

01:52.050 --> 01:55.810
API key and nonsensitive variables like the channel handle.

01:55.850 --> 02:02.050
As we all know these variables, we define them already in the EMV and by referencing them in the docker

02:02.050 --> 02:07.130
compose using the airflow via underscore variable name notation.

02:07.170 --> 02:14.690
This can be seen in the Managing Variables document in the airflow docs, and we can take the example

02:14.730 --> 02:16.250
under this section.

02:16.450 --> 02:21.490
As you can see, it is using the airflow underscore underscore variable name notation.

02:21.490 --> 02:28.450
So coming back to VSCode to ensure that these airflow variables are being defined properly you can use

02:28.450 --> 02:29.810
the Docker exec command.

02:29.810 --> 02:31.210
So let's do that right now.

02:31.210 --> 02:35.050
And let's say that we will use the airflow web server as an example.

02:35.290 --> 02:39.130
The first thing that we need to do is to check if the Docker containers are running.

02:39.810 --> 02:45.010
Remember if Docker PS gives nothing back, it means you stopped your Docker services.

02:45.010 --> 02:50.010
So you would need to run the docker compose up minus d command.

02:51.330 --> 02:52.410
I will not do that.

02:52.450 --> 02:56.530
Instead I will write the docker exec minus it.

02:56.810 --> 03:02.410
We get the airflow web server service name and then bash.

03:02.610 --> 03:04.890
From here we can type the grep command.

03:08.180 --> 03:12.740
To check if we have variables starting with airflow var at the beginning.

03:14.260 --> 03:20.020
As you can see, the two variables for the API key and channel have been returned.

03:20.060 --> 03:25.620
Now that we confirmed the environment variables are present, we can simply call them using the variable

03:25.620 --> 03:26.860
dot get syntax.

03:27.060 --> 03:32.860
So these two commands over here will need to replace with the variable syntax.

03:33.340 --> 03:38.540
And they can be rewritten as variable dot get and then the API key.

03:38.580 --> 03:40.140
Same thing for the channel handle.

03:40.380 --> 03:47.740
We can also filter out and the view and also remove these three lines of code.

03:47.900 --> 03:51.980
And also very important we need to import the variable class.

03:52.820 --> 04:02.300
So let's do that right now from airflow dot believe it's under models import variable.

04:04.500 --> 04:10.220
One thing to note is that the variables won't show up using the airflow variables list command.

04:10.780 --> 04:12.540
Let's showcase that right now.

04:13.340 --> 04:15.060
Variables list.

04:15.780 --> 04:18.460
As you can see, it returned a no data found.

04:18.660 --> 04:24.660
This is because the variables are defined as environment variables and the same would apply for the

04:24.660 --> 04:25.020
UI.

04:25.500 --> 04:29.620
So we had to go on the airflow UI under the variable section.

04:29.660 --> 04:31.100
We will find these variables.

04:31.100 --> 04:33.180
And perhaps you can demonstrate this right now.

04:33.700 --> 04:36.220
So the airflow web server is up and running.

04:37.220 --> 04:38.020
Let's go here.

04:43.300 --> 04:45.580
So this is the airflow UI.

04:46.420 --> 04:52.500
If you recall we had defined the username and password which is airflow.

04:52.780 --> 04:55.700
And same thing 1234.

04:56.740 --> 05:00.180
So let's check the variable section under admin.

05:01.300 --> 05:04.220
And as you can see there are no variables listed here.

05:04.860 --> 05:06.420
Let's go back to VS code.

05:07.460 --> 05:11.430
Now in terms of changes to the functions and inputs in the script, we are done.

05:11.470 --> 05:13.870
However, we still need to create the actual Dag.

05:13.910 --> 05:17.350
Apart from that, we also need to define the tasks and set dependencies.

05:17.470 --> 05:24.070
For this, it is a general best practice that you create a Main.py script in the DAGs folder that will

05:24.070 --> 05:25.670
contain our Dag code.

05:25.710 --> 05:27.110
So let me do that right now.

05:27.590 --> 05:30.710
Define main dot pi.

05:33.470 --> 05:33.750
Okay.

05:33.750 --> 05:39.990
It's important that this main square y is not inside the API subfolder, but it is under the general

05:40.030 --> 05:40.910
dex folder.

05:41.350 --> 05:45.590
We will create other subfolders as we go along in this course.

05:45.830 --> 05:49.590
This may not be why we can populate by first doing the needed inputs.

05:50.390 --> 05:53.750
So let's start from airflow import Dag.

05:54.230 --> 05:56.390
This imports the main Dag class.

05:56.870 --> 05:58.710
We also import pendulum.

05:59.910 --> 06:03.510
The pendulum module is for dealing with time zones.

06:03.750 --> 06:11.030
And since we also need to deal with DateTime and Timedelta classes, we need to import the date, time

06:11.030 --> 06:12.750
and time delta modules.

06:15.990 --> 06:20.310
And also need to import the functions in the videos that Pi script.

06:29.710 --> 06:33.790
For now we will only have one leg, but as we progress we will create more.

06:34.350 --> 06:36.910
These legs will have some common arguments.

06:36.910 --> 06:43.750
So in order to factor this in, airflow has a default args argument that we can specify in the Dag build

06:43.950 --> 06:45.270
and default args.

06:45.310 --> 06:48.710
I already have one that we can use, which is this one over here.

06:48.910 --> 06:54.990
Going over it briefly here we have an example of default arguments where we can specify things like

06:54.990 --> 07:02.790
the Dag owner, what to do if emails are sent for failures, how many retries of the deck will occur

07:02.790 --> 07:04.750
if it fails, and more.

07:04.950 --> 07:11.030
In the start and end date arguments, which is the period defined for when the Dag will run, you see

07:11.030 --> 07:18.480
this Local underscore time zone variable where you can specify your local time zone, which for my home

07:18.480 --> 07:19.840
country is Malta.

07:20.600 --> 07:21.840
This you can see it here.

07:22.760 --> 07:25.920
You can adjust this depending where you are watching this course.

07:26.400 --> 07:31.520
You can just search for time zone database and you will find many websites that have this list.

07:31.560 --> 07:36.040
The page you see here is one of them and I will include it in the appendix of the section.

07:36.440 --> 07:39.680
Simply use the value under the Time Zone column.

07:39.680 --> 07:43.800
For Malta it is Europe Malta as you see it here.

07:44.040 --> 07:50.120
Going back to the script, something which I would like to point out, which confused me a bit when

07:50.120 --> 07:54.160
I first started using airflow is the start time.

07:54.480 --> 08:00.840
The starting airflow is the time at which airflow will begin running the Dag, but the first run will

08:00.840 --> 08:04.720
be scheduled at the end of the interval following the start date.

08:05.040 --> 08:10.920
So, for example, we have a deck that runs daily, and we set the start date to the 1st of January

08:10.920 --> 08:13.320
of 2025 at midnight.

08:14.040 --> 08:15.080
the next day.

08:15.080 --> 08:21.160
Grande will be on the 2nd of January at midnight, which is the end of the first interval following

08:21.200 --> 08:22.160
the start date.

08:22.680 --> 08:27.400
This is one of those things which might be counterintuitive at first, and I just wanted to mention

08:27.400 --> 08:32.080
it, as it is good to know when defining the start dates and intervals.

08:32.280 --> 08:36.680
Now that we have these details out of the way, we can focus on the actual creation.

08:36.920 --> 08:43.120
Let's go to the DAGs airflow docs, and here you will see that there are three ways to actually declare

08:43.160 --> 08:43.680
a Dag.

08:43.880 --> 08:46.720
The first one is using a with statement.

08:47.320 --> 08:48.320
It's the one you see here.

08:48.760 --> 08:52.840
The next one is the standard constructor, which is this one here.

08:53.480 --> 08:57.440
And the last one is using the Dag decorator, which is this one here.

08:58.600 --> 09:03.880
As a matter of personal preference, I prefer using the with context manager.

09:04.160 --> 09:07.840
Going back to VS code, we can start writing up our Dag.

09:11.440 --> 09:14.720
The first important argument that we need to define is the dag id.

09:15.370 --> 09:17.970
I will name it as produce JSON.

09:17.970 --> 09:19.970
And why I chose this name is simple.

09:20.010 --> 09:28.370
Ultimately, this Dag needs to output a JSON file under the data directory for default arts.

09:29.210 --> 09:32.530
We can use the default args we have here.

09:33.890 --> 09:40.090
We can also include a description which I recommend that you do, especially as the number of DAGs will

09:40.090 --> 09:40.930
continue to grow.

09:41.570 --> 09:48.370
I will say this Dag to produce JSON file with the raw data.

09:49.250 --> 09:50.810
We also specify the schedule.

09:51.170 --> 09:54.410
Let's say in this case that it will run every day at 2 p.m..

09:54.730 --> 10:00.850
So in cron format this would look like zero 14.

10:01.770 --> 10:03.930
Star star and star.

10:04.370 --> 10:10.090
If you want to confirm this, you can go on Crontab Guru and replace with the values that we just used.

10:10.130 --> 10:14.250
As you can see it is a daily run at 2:00 pm.

10:14.450 --> 10:18.780
Finally, we can specify that catch up equals false.

10:19.460 --> 10:24.580
What this does is it tells airflow not to catch up on missed diagrams from the past.

10:25.180 --> 10:27.540
So let's conclude this with as Dag.

10:29.180 --> 10:34.100
And here we can define the tasks by calling the functions in the videos task script.

10:34.140 --> 10:36.340
So let's define tasks.

10:37.220 --> 10:39.620
We start off with the playlist id.

10:40.060 --> 10:42.500
Playlist id equals get playlist id.

10:43.100 --> 10:52.620
The actual function video IDs we get from the video id function referencing the playlist ID.

10:54.940 --> 11:03.100
Extracted data we get from the extracted video data function with the video IDs argument.

11:03.940 --> 11:13.900
And finally, we save the JSON using the safety JSON function with the extracted data argument.

11:15.620 --> 11:19.220
The last thing that we can do at this stage is to define dependencies.

11:23.500 --> 11:29.740
When we talk about dependencies, this refers to, in what order will the tasks run from left to right?

11:30.380 --> 11:37.620
This is important since the video IDs, for example, needs the output from the channel params to run.

11:37.900 --> 11:44.180
Before that, let me just simply change this to underscore task so it doesn't get confused in terms

11:44.180 --> 11:46.860
of conflict with the actual function name.

11:47.260 --> 11:50.180
And we can write the task dependencies.

11:50.380 --> 11:54.780
So playlist id video IDs.

11:56.460 --> 12:01.020
Extract the data and save JSON first.

12:01.420 --> 12:05.660
So here we are saying that this data JSON task depends on extracted data.

12:06.100 --> 12:07.740
This depends on video IDs.

12:08.260 --> 12:12.260
And the first thing it's going to run is the playlist ID function.

12:12.420 --> 12:13.180
And that's it.

12:13.220 --> 12:16.700
Congratulations you have built your first airflow Dag.

12:16.900 --> 12:20.150
Now if the local time was 2 p.m..

12:20.310 --> 12:23.190
This deck would run automatically if it is active.

12:23.230 --> 12:27.990
However, we can also manually trigger the deck, and an easy way to do this is to go on the airflow

12:28.030 --> 12:28.390
UI.

12:28.710 --> 12:30.590
So let's go back to the airflow UI.

12:30.910 --> 12:32.670
We run a refresh.

12:34.830 --> 12:37.670
So it seems that we have an error with our deck.

12:38.030 --> 12:39.670
And I believe I know why.

12:40.630 --> 12:45.790
So this should actually be the environment variable name and not the value.

12:46.230 --> 12:50.190
So let's go back to VS code and change this to channelhandler.

12:53.070 --> 12:54.110
Let's refresh again.

12:54.150 --> 13:02.030
Now as you can see the refresh of dishes we had and we have the produce underscore JSON deck we just

13:02.030 --> 13:02.510
built.

13:02.710 --> 13:07.870
If we are to actually activate this deck, we would simply just need to run the trigger deck button

13:07.870 --> 13:08.230
here.

13:08.710 --> 13:15.030
From here, let's go inside the Dag itself and see what functionalities airflow provides.

13:15.310 --> 13:23.270
So now that we have clicked on the Dag itself, we will go over the main Sections that I would say irrelevant.

13:23.590 --> 13:29.710
Here you have the detail section where you have a brief overview of the parameters of your DAC.

13:31.110 --> 13:39.790
Here you get to see the DAC lineage, and here you get to see also the actual code that makes up your

13:39.830 --> 13:40.270
DAC.

13:40.910 --> 13:46.910
Going back to the actual DAGs, you will see that currently the Dag itself is paused.

13:47.110 --> 13:52.670
This is the intended behavior as that is how we defined it in the docker compose.

13:53.070 --> 13:57.910
So this is something that is by default, but you can change it inside the docker compose.

13:58.270 --> 14:05.790
So what we can do right now to actually test out our Dag is to compose our Dag going inside.

14:06.190 --> 14:09.350
And you will start seeing the execution of the Dag itself.

14:10.190 --> 14:19.510
This dark green means that the actual tasks complete successfully, while the light green implies the

14:19.510 --> 14:21.310
task is currently running.

14:23.600 --> 14:27.840
If we had to have any failures, we would get a red box.

14:28.320 --> 14:32.640
However, from what I can see here, all the tasks are a success.

14:32.640 --> 14:40.120
If we had to press on one of the tasks themselves and go inside the logs, we would see what was actually

14:40.160 --> 14:41.880
done at the code level.

14:41.920 --> 14:46.960
Maybe we can have more details if we press on this one here and seeing what was actually done.

14:47.880 --> 14:51.280
So here you can see some prints that we have.

14:51.520 --> 14:53.080
These are for the video IDs.

14:53.440 --> 14:59.760
Ultimately this Dag needs to produce a JSON file under the data directory.

14:59.960 --> 15:08.040
So we will know that this Dag successfully completed by going back to VS code and seeing if the JSON

15:08.040 --> 15:10.200
file was generated for today's date.

15:11.720 --> 15:13.640
Let's look inside the data folder.

15:13.640 --> 15:16.240
And as you can see we have a new JSON file.

15:16.800 --> 15:19.280
Note we have briefly gone over airflow.

15:19.640 --> 15:25.180
A final note that I want to emphasize for this lecture is that if we set the Dag to director on at 2

15:25.180 --> 15:30.720
p.m., should the Docker containers be running at that time, then the Dag will be executed.

15:31.440 --> 15:36.080
If, however, you have your laptop switched off, then the Docker environment won't be activated,

15:36.280 --> 15:41.800
airflow won't be running, and as a result the DAGs will not be run at the scheduled time.

15:41.960 --> 15:46.840
Now this contrasts greatly the setup that you would have in a production scenario.

15:47.040 --> 15:52.560
In a production scenario, you would have a dedicated server where airflow is running 24 seven, be

15:52.600 --> 15:57.480
it self-managed or on the cloud, most likely running on some Kubernetes environment.

15:57.640 --> 16:03.480
But in our case, having a dedicated environment for airflow which incurs costs is out of scope for

16:03.480 --> 16:04.200
this course.

16:04.440 --> 16:08.440
Here we are building the toolset to then apply the knowledge on the workplace.

16:08.600 --> 16:10.920
So that's all that I had to show you for this section.

16:11.080 --> 16:16.200
Do feel free to go on the airflow UI and explore a bit more what we have just covered.

16:16.520 --> 16:21.960
I'll see you in the next lecture to discuss the next Dag, which we will build for populating and updating

16:22.000 --> 16:22.880
the data warehouse.
