WEBVTT

00:04.240 --> 00:10.160
What is left of this stage is to build the main functions to populate the two schema layers for staging

00:10.160 --> 00:10.840
and core.

00:10.880 --> 00:16.960
This will bring all together the functions that we have defined for connections, inserts, updates,

00:16.960 --> 00:19.200
deletions, and transformations.

00:19.200 --> 00:23.960
To do this, we need to import the functions that we defined in the other scripts.

00:23.960 --> 00:27.960
So let's first start by creating a new script.

00:28.400 --> 00:29.840
Let's call it Datawarehouse.

00:30.680 --> 00:32.800
And here we will do the imports.

00:34.600 --> 00:39.600
We also need to import the logging and the airflow task decorator.

00:39.720 --> 00:46.040
And same as we have done already, we start by setting up the logger and defining the global variable

00:46.040 --> 00:47.120
for the table name.

00:48.440 --> 00:52.760
And now we can proceed to the main function relating to the staging table.

00:53.040 --> 00:57.320
So let's define a function called staging table.

01:00.720 --> 01:05.280
That will create the actual staging table in our data warehouse.

01:06.080 --> 01:09.720
And since we are working with airflow, we need to use the task decorator.

01:13.240 --> 01:18.880
In the function, we define the schema variable and also initialize the connection and cursor.

01:19.360 --> 01:25.920
So the schema will be staging and we define the quantum cursor as none.

01:27.720 --> 01:30.880
Next we open a try and accept clause.

01:31.200 --> 01:39.360
And in the try we start off by setting up the connection and cursor variables using the very first function

01:39.360 --> 01:44.440
that we introduced in this section, which we got from the utils script.

01:48.720 --> 01:55.560
We load the raw data, extract it from the YouTube API, and save this JSON file using the load data

01:55.560 --> 01:58.040
function from the data loading script.

02:04.520 --> 02:09.960
Here I'm realizing that the load data doesn't seem to have been imported correctly.

02:10.280 --> 02:16.560
Now let's go into the loading, and we can see here that it actually was defined as load underscore

02:16.560 --> 02:17.040
path.

02:17.320 --> 02:20.240
So maybe let's change this to load data.

02:20.920 --> 02:23.200
And now going back to the data warehouse script.

02:23.480 --> 02:23.920
Exactly.

02:23.920 --> 02:29.320
We should see that this will be showing as it has been properly imported.

02:29.560 --> 02:32.200
We then create the schema and table for staging.

02:32.240 --> 02:33.280
If they don't exist.

02:33.960 --> 02:45.880
So create schema schema, create table with schema argument and also define a variable table IDs to

02:45.920 --> 02:52.520
get all the video IDs for the staging table by using the get video IDs defined in utils.

02:56.200 --> 03:03.200
Having these variables defined, we can now loop through each row and insert or update depending on

03:03.200 --> 03:04.440
certain conditions.

03:04.680 --> 03:10.120
So let's start the for loop for Follow in YouTube data.

03:11.240 --> 03:14.680
If it's a first time insert, which means the table is empty.

03:14.840 --> 03:17.240
We write if the length.

03:19.720 --> 03:23.560
The table ids is equal to zero.

03:23.920 --> 03:30.280
We insert data using the insert rows defined in data modification script.

03:33.120 --> 03:37.600
Else if the table is already populated with data, we have one of two actions.

03:37.880 --> 03:44.720
We either update the data of videos that are already present in the staging table, and if the video

03:44.720 --> 03:51.280
ID in the loaded JSON is not in the staging table, that means we have a new video, so we need to insert

03:51.280 --> 03:52.720
it into the staging table.

03:53.000 --> 03:56.880
These two functions combined encapsulate an upsert logic.

04:07.160 --> 04:10.160
So this covers the insertion and update part.

04:10.200 --> 04:14.320
Now we just need to take care of possible removal of videos by the channel.

04:14.320 --> 04:23.320
So we'll define a set variable called IDs in JSON, which will give all the video IDs that are present

04:23.320 --> 04:24.680
in the JSON data.

04:24.680 --> 04:27.960
We define a set by using the curly brackets.

04:32.480 --> 04:36.040
Next we define an IDs to delete variable.

04:36.040 --> 04:41.840
That will be the difference between the video IDs we have in the staging table and the above defined

04:41.880 --> 04:43.320
IDs in JSON.

04:43.360 --> 04:52.240
We can now convert the table IDs to a Python set, so we can use the minus operator to get the difference

04:52.240 --> 04:53.480
between the two sets.

04:53.520 --> 04:54.480
If there is any.

04:54.480 --> 04:56.280
So this will look as follows.

05:00.640 --> 05:07.840
A note on this is that we convert a table ids to a set, so we can use this inbuilt functionality of

05:07.840 --> 05:11.520
Python sets to get the difference between two sets.

05:11.720 --> 05:18.920
If I just delete is populated, then we use the delete rows function we defined in the data modification

05:18.920 --> 05:19.560
script.

05:19.560 --> 05:23.280
So if it is to delete exists.

05:27.680 --> 05:34.280
The final step in the try clause is to log the table update so we can use logger.info.

05:41.560 --> 05:49.360
If for some reason any part of the clause fails, we log this as an error and the except clause and

05:49.360 --> 05:49.960
raise it.

05:53.360 --> 06:03.200
As a final step, we use the finally block to ensure that the connection and cursor are closed so that

06:03.200 --> 06:09.520
even if the try clause fails and we go in the accept, whatever happens, the code will still go inside

06:09.520 --> 06:18.160
the finally clause and close both the connection and the character so we can write if they are not null

06:19.680 --> 06:21.320
those anchors.

06:23.000 --> 06:25.640
And that's it for the staging table function.

06:25.640 --> 06:28.600
Now we have to do something quite similar for the core layer.

06:28.760 --> 06:30.280
So let's get right into it.

06:30.280 --> 06:36.320
Just as we did for staging, we need to define the core table function, which in this case will have

06:36.320 --> 06:37.560
the name core table.

06:37.760 --> 06:39.640
We will set the schema to core.

06:39.640 --> 06:42.920
And we will also initialize the conn and cursor variable to null.

06:44.920 --> 06:51.680
We can copy the first few lines of code that we have in the staging table function, also into the core

06:51.680 --> 06:52.560
table function.

06:52.880 --> 07:00.800
From here we need to initialize a variable, which we will call current video IDs that will be used

07:00.800 --> 07:04.400
to collect all video IDs from the current execution.

07:04.640 --> 07:09.560
This will be used for deletion logic and we can set this to an empty set.

07:10.320 --> 07:16.600
Next, we execute a command to get all the data from the staging table and fetch all the rows.

07:17.320 --> 07:22.960
As an important note here, as you can see, we are selecting all the data from the staging table.

07:22.960 --> 07:28.400
It could very well be the case that you are dealing with tables that have hundreds of thousands of rows,

07:28.400 --> 07:30.480
if not millions or even billions.

07:30.480 --> 07:34.080
With these table sizes, this would not be a memory efficient approach.

07:34.080 --> 07:40.480
So we would need to process the data in batches of a specific batch size, which you could set dependent

07:40.480 --> 07:41.600
on your data size.

07:41.600 --> 07:47.080
However, in our case, we are dealing with only a few hundreds of rows, so we will not work in batches

07:47.120 --> 07:49.280
as it would be overengineering our problem.

07:49.280 --> 07:56.200
From here it is again very similar to the staging table, as we define a for loop to go through each

07:56.200 --> 08:01.680
row and transform or insert or update depending on certain conditions.

08:06.080 --> 08:11.880
And as we loop through each row of data, we keep track of each video ID by appending it to the current

08:11.880 --> 08:13.920
video IDs we defined earlier.

08:18.880 --> 08:25.960
For the first insert in the core table, we check the table is empty and if yes, we transform the row

08:25.960 --> 08:28.080
and insert it into the table.

08:37.040 --> 08:43.400
As we saw in the staging main function, if the table is not empty, we populate the else clause as

08:43.400 --> 08:43.920
follows.

08:44.640 --> 08:51.480
We again use the transform data function to transform data that is already present on staging, and

08:51.480 --> 08:55.400
then if the data is already present, we use the update function.

09:05.440 --> 09:09.680
If not, then we have new data coming in, so we use the insert function.

09:12.360 --> 09:14.080
Same as for the staging table.

09:14.080 --> 09:16.680
We also take into account deletions.

09:16.680 --> 09:23.840
So we define IDs to delete as the difference between what is already in the table, but not in the current

09:23.840 --> 09:24.600
execution.

09:32.520 --> 09:37.000
And finally, we delete the rows, if any are present using the delete rows function.

09:37.400 --> 09:43.360
We then log the table update and define the accept clause for any errors we might have.

09:44.160 --> 09:49.880
So let's copy and paste this from the staging, making sure we have the right tabs.

09:51.440 --> 09:54.280
This has to be aligned with the try as a last step.

09:54.320 --> 10:01.400
Same as we have done the staging function, we define the finally block so that regardless of what happens,

10:01.400 --> 10:04.920
we close the connection and the cursor if they are not, none.

10:06.520 --> 10:12.840
Having created these two main functions, what is left now is to simply define the airflow Dag related

10:12.840 --> 10:15.560
to populating and updating data warehouse.

10:16.240 --> 10:18.280
We will cover this in the next lecture.
