WEBVTT

00:04.200 --> 00:08.880
We have covered most of what we need to know to perform the data quality tests, and now it's just a

00:08.880 --> 00:11.080
matter of applying what we know inside airflow.

00:11.120 --> 00:17.240
For this, we will use the airflow batch operator, which allows us to run batch commands in the batch

00:17.240 --> 00:19.480
underscore command argument of the operator.

00:19.520 --> 00:23.880
So let's first create a data quality subfolder under the DAGs folder.

00:25.880 --> 00:27.360
And then here we define.

00:27.600 --> 00:35.320
So the scripts and the script we will write all the base data quality logic for airflow Dag.

00:36.000 --> 00:41.120
So we start off by first importing the logging module along with the batch operator.

00:49.880 --> 00:52.640
We also set the logger variable.

01:00.080 --> 01:06.720
And also the global variables for the parts where we will have the YAML files for the checks and configuration,

01:07.120 --> 01:10.690
and the data source name which we defined in the configuration.

01:15.010 --> 01:16.930
So I have the function ready.

01:16.930 --> 01:20.410
So I will simply copy and paste and explain what we are doing.

01:20.810 --> 01:24.410
So we define a function data quality.

01:24.410 --> 01:26.490
And the argument that it takes is schema.

01:27.050 --> 01:31.730
We define this as a parameter because it can either be the staging or the schemas.

01:32.410 --> 01:34.090
And the try clause.

01:34.290 --> 01:39.130
We are defining the batch operator by specifying the task id.

01:39.370 --> 01:45.330
And here this is a function of the schema and also the batch command which is simply the.

01:45.370 --> 01:48.250
So the scan command which we ran previously.

01:49.010 --> 01:53.170
And finally we finish by returning the task as we have seen before.

01:53.210 --> 01:58.130
If any exception occurs, we log it as an error and we raise it.

01:58.330 --> 02:01.810
Now that we have this function set up, let's go inside the Main.py.

02:02.530 --> 02:10.130
And what we can do is to simply copy the Dag that we have from the data warehouse section of this course,

02:10.330 --> 02:14.890
and change it to use the logic for the data quality tests.

02:15.650 --> 02:18.390
What we need to do is to again Change it again.

02:19.030 --> 02:21.590
So this will become something like data quality.

02:22.030 --> 02:23.790
The description will also change.

02:24.350 --> 02:26.230
So you can say something like that.

02:29.390 --> 02:37.190
So check the data quality on both layers in the database.

02:38.190 --> 02:38.950
The schedule.

02:39.190 --> 02:42.910
We need to set a time that runs after the previous tag.

02:43.070 --> 02:44.830
So let's run it at 4:00 pm.

02:45.310 --> 02:48.110
And the catch up we will keep it as false.

02:48.910 --> 02:53.230
Finally, we set the tasks for both staging and core.

02:54.590 --> 02:57.910
Here we also need to import the actual function.

02:58.350 --> 02:59.790
So let's go to the very top.

03:02.550 --> 03:04.510
And do the import.

03:06.870 --> 03:12.750
And also we need to set the schema names for the staging and schemas.

03:13.350 --> 03:16.190
We can also do this after the default arguments.

03:16.910 --> 03:18.470
So let's set them here.

03:18.710 --> 03:22.150
And now we have the variables for the staging and the schemas.

03:22.990 --> 03:25.030
The final thing is to set the dependencies.

03:25.950 --> 03:29.640
So the staging task will run before the core task.

03:34.280 --> 03:35.240
So let's save.

03:35.640 --> 03:40.520
And at this point we also have created the Dag for the data quality checks.

03:41.240 --> 03:44.320
Let's go on the airflow UI and run this Dag.

03:47.120 --> 03:50.920
Says the Dag manually we can simply unpause and let it run.

03:56.040 --> 03:59.440
So we're seeing two dark greens here.

03:59.440 --> 04:01.840
So it should signify that all was a success.

04:02.560 --> 04:03.640
Let's check the logs.

04:04.640 --> 04:09.440
As you can see for the staging schema we got the all is good.

04:09.480 --> 04:11.240
No failures, no warning errors.

04:13.200 --> 04:15.440
And for core, I'm expecting the same nodes.

04:15.560 --> 04:16.600
And in fact it is.

04:18.240 --> 04:21.120
So at this point we have now covered the data quality tests.

04:21.760 --> 04:26.520
Feel free to add more tests in the YAML and explore more of solar's functionality.

04:27.080 --> 04:29.920
In the next lecture we will go over the functional tests.

04:30.200 --> 04:31.160
I will see you then.
