WEBVTT

00:04.200 --> 00:09.280
In this lecture, we will explore how to connect to the Postgres database that will store the data extracted

00:09.280 --> 00:11.640
from the API when we are using airflow.

00:11.680 --> 00:17.120
In order to set up a connection to a service, which in our case is the Postgres database, we use what

00:17.120 --> 00:18.400
are known as hooks.

00:18.920 --> 00:23.640
And since we are dealing with Postgres connection, we will use the Postgres hook.

00:23.800 --> 00:29.040
I will link the documentation that you see here for Postgres hook in the appendix of the section.

00:29.080 --> 00:35.200
Now before we start coding, I will explain two concepts that we will go over when we introduce Postgres

00:35.200 --> 00:38.480
hook, these being the connection and the cursor.

00:38.520 --> 00:44.800
The connection is used to establish the connection to the Postgres instance, while the cursor is an

00:44.800 --> 00:47.920
object that allows you to execute SQL commands.

00:48.160 --> 00:49.720
For the connection in question.

00:49.760 --> 00:54.720
Now that we have a basic understanding of what connections and cursors are, let's create the first

00:54.720 --> 00:58.640
Python script relating to how we will interact with the data warehouse.

00:58.680 --> 01:04.000
We will need to build a number of functions to get the end result, and many of the functions will be

01:04.040 --> 01:10.230
helper functions when we talk about helper functions, these are smaller functions where their purpose

01:10.270 --> 01:12.830
is to assist the main functions in a program.

01:12.870 --> 01:18.350
So let's first define a sub folder called data warehouse in our DAGs directory.

01:21.110 --> 01:27.110
And inside the sub folder let's first create the helper function script that will be used repeatedly

01:27.110 --> 01:28.510
in the section of the course.

01:28.510 --> 01:31.750
For these type of functions we usually put them in a utils script.

01:31.950 --> 01:36.230
So let's define a data underscore utils of the script.

01:40.030 --> 01:46.190
Here we will have most functions relating to database connections and operations like creating tables

01:46.190 --> 01:47.230
and schemas.

01:47.270 --> 01:48.550
Now we can start coding.

01:48.550 --> 01:54.430
And the first step is to import the Postgres hook which can be done with the following import statement.

01:55.230 --> 01:59.790
So let's go back to here and copy all of this.

02:01.830 --> 02:05.110
From here we import the Postgres hook.

02:08.470 --> 02:11.020
To interact with our Postgres database using Python.

02:11.060 --> 02:14.980
We will use a very common adapter called Psycopg2.

02:15.500 --> 02:22.700
And the first import we will need is a package from the Psycopg2 that is called the real data set.

02:28.660 --> 02:33.220
This will allow retrieval of reports using Python dictionaries and not the default tuples.

02:33.860 --> 02:38.580
This will be needed in our case as it gives more flexibility with how we structure our code.

02:38.820 --> 02:46.140
Now that we have our imports ready, let's define a function called get on set.

02:47.660 --> 02:51.100
And here we set up the Postgres hook by defining a variable hook.

02:52.780 --> 02:55.660
So hook equals to the Postgres hook.

02:56.820 --> 02:59.420
And here we define two arguments.

03:00.060 --> 03:03.300
The first one is the Postgres one id.

03:03.620 --> 03:09.500
And if you recall in Docker compose we had defined this connection already in the environment part where

03:09.500 --> 03:16.290
we said that the connection URI can be written as airflow underscore underscore one connection ID.

03:16.850 --> 03:19.290
Let's go back to here and I'll showcase this to you.

03:19.530 --> 03:21.930
I am referring to this environment variable.

03:22.730 --> 03:27.210
So the actual Postgres connection ID would be this part here.

03:27.370 --> 03:30.530
The Postgres DB YouTube thing.

03:32.210 --> 03:34.850
Let's go back to the utils and write this down.

03:39.210 --> 03:42.130
The second argument which we need to define is the database.

03:43.330 --> 03:47.490
And we had defined this as elt underscore db.

03:47.930 --> 03:51.210
Also just to remind you I defined this in dot env.

03:52.010 --> 03:59.770
So let's go to the DMV and we will find that the database name is in fact the underscore db.

04:00.170 --> 04:06.490
Now that we have the hook variable set up we can define the connection using the get underscore method.

04:07.570 --> 04:14.130
So we define a connection variable using the hook dot get underscore method.

04:14.290 --> 04:21.160
And we also define the cursor which we can define by specifying the cursor methods with the argument

04:21.200 --> 04:23.760
cursor factory set to cursor.

04:24.600 --> 04:29.480
This is it right now, so the cursor variable takes the connection.

04:31.000 --> 04:40.640
We use the cursor method and we set an argument for cursor factory equal to the input that we have here.

04:40.680 --> 04:44.920
We are using the cursor because it changes how the data is returned.

04:44.960 --> 04:51.280
When we execute a query using the cursor, and the cursor in our case will return the data from the

04:51.280 --> 04:55.280
SQL query as a Python dictionary instead of the default tuple.

04:55.320 --> 05:01.320
Once we have defined both the connection and cursor objects, we can then use the cursor object to execute

05:01.320 --> 05:06.040
some SQL, such as a simple select star from a particular table.

05:06.200 --> 05:07.920
So this would look something like this.

05:08.920 --> 05:09.600
Execute.

05:09.640 --> 05:11.680
And then we have the actual query.

05:11.840 --> 05:16.360
So select star from and the particular table.

05:16.720 --> 05:18.120
So this is how it would look.

05:18.120 --> 05:19.400
But we don't need this right now.

05:19.400 --> 05:21.120
So we remove this part of the code.

05:23.520 --> 05:29.470
Finally we should always close connection objects when done to release resources.

05:29.830 --> 05:31.190
And this would look like this.

05:31.190 --> 05:35.390
So characters close and connection dot close.

05:35.990 --> 05:40.390
So this is the main function that we will use to connect to the Postgres database.

05:40.910 --> 05:45.390
As a final step, we will make setting up and closing connections more modular.

05:45.710 --> 05:50.510
So I will split this function into two by defining another function.

05:50.510 --> 05:53.190
And we can call this function close.

05:55.430 --> 05:55.910
One.

05:58.230 --> 06:01.870
And they will take the conn and care arguments.

06:01.910 --> 06:05.630
Also in the first function I will return the canonical variables.

06:10.870 --> 06:13.950
So what we have effectively done here is the first function.

06:13.950 --> 06:16.550
We set up the connection and cursor object.

06:16.550 --> 06:19.750
And the second function will close the cursor and connection.

06:19.910 --> 06:22.750
So that's it for the connection and cursor functions.

06:22.990 --> 06:26.390
Next we will create the schemas and tables.
