WEBVTT

00:04.200 --> 00:11.560
Going over to the main.py file, where we have already that that produces the JSON file with the raw

00:11.560 --> 00:11.960
data.

00:12.000 --> 00:15.440
We can copy the Dag definition and update it for our needs.

00:15.480 --> 00:22.200
However, before we copy, we just need to import the main functions as modules for the data warehouse

00:22.200 --> 00:22.800
scripts.

00:25.560 --> 00:29.080
And now we can copy this drag and edit it.

00:29.080 --> 00:37.040
For our purposes here we will update the Dag ID and we can update it to update underscore db.

00:37.440 --> 00:50.840
We can also change the description to something like Dag to process JSON file and insert data into both

00:50.880 --> 00:54.000
staging and schemas.

00:54.880 --> 00:58.880
And we should also change the schedule to a time after the first runs.

00:59.080 --> 01:00.760
So let's say 3 p.m..

01:01.720 --> 01:07.380
We then just define the tasks for the update of the staging and core career tables, so this would look

01:07.380 --> 01:08.020
like this.

01:08.460 --> 01:20.380
Update staging will take from the staging table function and update core will take on the core table

01:20.380 --> 01:20.980
function.

01:21.980 --> 01:23.100
These we can remove.

01:25.460 --> 01:32.460
And finally we set the dependencies so that the update staging task runs before the update core task.

01:36.500 --> 01:42.900
So the actual Dag is written for the data warehouse part of this course.

01:42.940 --> 01:51.220
What we need to do now is to actually spin up the Docker containers and see if this Dag runs, or there

01:51.220 --> 01:53.540
are some errors that we have to eliminate.

01:53.620 --> 01:58.140
So let's first run the docker compose up minus d command.

02:10.900 --> 02:15.590
Let's just do a docker PS to make sure that the airflow containers are up and running.

02:15.590 --> 02:17.390
So the containers seem to be running.

02:17.550 --> 02:24.950
And now we can go on the airflow UI, which we can access through the localhost on port 8080.

02:25.910 --> 02:31.150
Let's insert the credentials which we know we can get from the env.

02:33.070 --> 02:36.950
And there seems to be an error with the import.

02:37.990 --> 02:44.590
Now I realize that I might have misspelled the Psycopg2 import.

02:44.830 --> 02:52.350
So let's go back to the data utils and let's correct this.

02:54.270 --> 02:55.670
Let's refresh.

02:56.350 --> 03:01.630
And we finally have the update underscore db back.

03:02.110 --> 03:07.590
As you can see here I'm not sure if you notice the produce underscore JSON just ran.

03:07.830 --> 03:11.950
This is because we just spun up the Docker containers.

03:12.270 --> 03:17.170
And the Dag from the last run was run now.

03:17.290 --> 03:22.370
So let's go on the update and unpause the Dag.

03:23.770 --> 03:28.850
As you can see, we have an error if we go on the logs.

03:28.890 --> 03:30.730
Let's see what the error could be.

03:30.770 --> 03:33.650
I'm seeing here there is another spelling mistake.

03:33.890 --> 03:34.490
So let me.

03:34.770 --> 03:35.530
That's right now.

03:37.410 --> 03:39.890
And let's rerun the Dag.

03:47.450 --> 03:54.570
So from what I can see, we have an error when we come to insert the actual values.

03:54.570 --> 03:58.570
I am looking at the actual script that we have.

03:59.370 --> 04:05.410
And from what I can see, we have this extra bracket here which is causing the error.

04:05.930 --> 04:10.210
Let me just see that we don't have any more of these errors here.

04:10.210 --> 04:11.650
Seems to be okay as well.

04:12.250 --> 04:13.570
And I think we are good to go.

04:15.210 --> 04:19.770
Let's refresh and let's try to rerun this again.

04:21.790 --> 04:28.830
So as you can see, inserts are happening into the staging table and it seems to have completed successfully,

04:29.030 --> 04:33.310
but the core table appears to have an error.

04:33.550 --> 04:35.590
So let's see what this error could be.

04:38.030 --> 04:41.310
So what I'm thinking is is we have another spelling error.

04:42.070 --> 04:49.510
Let me go inside the data warehouse UI script, trying to figure out where the error is coming from.

04:52.950 --> 04:54.190
So it's not here.

04:55.550 --> 04:56.430
Let's see if it's.

04:57.150 --> 04:58.110
So here it is.

04:58.110 --> 04:58.550
Here.

05:02.270 --> 05:10.510
So now that we have corrected the spelling error, let me go back here and let's try and run this.

05:11.950 --> 05:15.910
Let's do a refresh and run again.

05:16.870 --> 05:23.590
Finally success in both the staging table and the core table.

05:23.630 --> 05:28.960
We already had data in the staging table from the first success.

05:29.400 --> 05:33.680
So now the actual logic that is kicking in is the update here.

05:33.680 --> 05:42.720
We should have exactly the insert logic that is triggered, since there are no videos or no data inside

05:42.760 --> 05:44.640
the table schema.

05:44.640 --> 05:45.680
So that's it.

05:45.720 --> 05:52.360
A bit of errors along the road, but that is normal when you are testing and developing your DAGs.

05:52.600 --> 05:59.440
This is actually a good practice for me as well, to showcase how you would actually tackle solving

05:59.440 --> 06:00.440
issues in a Dag.

06:00.480 --> 06:07.560
As you can see, the logs section here is very important and one should always be looking at what the

06:07.560 --> 06:12.400
actual error is telling us and where we might have the actual error.

06:12.440 --> 06:16.120
So that's all I have to show you when it comes to running the Dag using airflow.

06:16.520 --> 06:22.920
However, this leaves us with the question where and how can we see the data that we just inserted?

06:22.920 --> 06:26.880
So this will be the next point of discussion in the next lecture.

06:27.200 --> 06:28.120
I will see you then.
