Macros
Global usage notes
source_model syntax
dbt itself supports references to data via
the ref()
function for models, and the source()
function
for dbt sources .
AutomateDV provides the means for specifying sources for Data Vault structures with a source_model
argument.
This behaves differently for the stage macro, which supports either style, shown below:
ref style
stg_customer :
source_model : 'raw_customer'
source style
The mapping provided for the source style is in the form source_name: table_name
which mimics the syntax for
the source()
macro.
For all other structures (Hub, Link, Satellite, etc.) the source_model
argument must be a string to denote a single
staging source, or a list of strings to denote multiple staging sources, which must be names of models (minus
the .sql
).
Global variables
AutomateDV provides
user-overridable global variables
which allow you to configure different aspects of AutomateDV. These variables will be expanded in future versions of
AutomateDV.
Hashing configuration
vars :
hash : MD5
concat_string : '||'
null_placeholder_string : '^^'
hash_content_casing : 'UPPER'
hash
Configure the type of hashing.
This can be one of:
Read more
concat_string
Configure the string value to use for concatenating strings together when hashing. By default, this is two pipe
characters: '||
'
Read more
null_placeholder_string
Configure the string value to use for replacing NULL
values when hashing. By default, this is two caret
characters: '^^
'
hash_content_casing
This variable configures whether hashed columns are normalised with UPPER()
when calculating the hash value.
This can be one of:
Info
We've added this config to give you more options when hashing. If there is logical difference between uppercase
and lowercase values in your data, set this to DISABLED
otherwise, the standard approach is to use UPPER
Ghost Record configuration
vars :
enable_ghost_records : false
system_record_value : 'AUTOMATE_DV_SYSTEM'
How Ghost Records work
In the Data Vault standards, ghost records are intended to provide equi-join capabilities for PIT tables when queries on a satellite
at a point in time would otherwise return no records. Instead of having to handle NULLs and incur performance penalties for joins which
do not return records, the ghost record is a single record inserted into a Satellite upon its first creation which can be used instead.
In AutomateDV this is implemented as an optional CTE which only gets created in the above circumstances and when the enable_ghost_records
global variable is set to true
.
A Ghost Record does not inherently mean anything (it is for performance only), and so the value of each column is set to NULL
or a sensible meaningless value.
The below tables describe what a ghost record will look like:
enable_ghost_records
Enable the use of ghost records in your project. This can either be true or false, true
will enable the configuration and false
will disable it.
This will insert a ghost record to a satellite table whether it is a new table or pre-existing.
Before adding the ghost record, the satellite macro will check there is not already one loaded.
Note
If this is enabled on an existing project, the ghost-records will be inserted into the satellite on the first dbt run after enabling only
system_record_value
This will set the record source system for the ghost record. The default is AUTOMATE_DV_SYSTEM
and can be changed to any string.
Note
If this is changed on an existing project, the source system of already loaded ghost records will not be changed unless you --full-refresh
.
NULL Key configurations
vars :
null_key_required : '-1'
null_key_optional : '-2'
null_key_required
Configure the string value to use for replacing NULL
values found in keys where a value is required, e.g. prior to
hashing.
By default, this is '-1'.
null_key_optional
Configure the string value to use for replacing NULL
values found in optional keys. By default, this is '-2'.
Read more
Other global variables
vars :
escape_char_left : '"'
escape_char_right : '"'
max_datetime : '9999-12-31 23:59:59.999999'
max_datetime
Configure the value for the maximum datetime.
This value will be used for showing that a record's effectivity is 'open' or 'current' in certain circumstances.
The default is variations on 9999-12-31 23:59:59.999999
where there is more or less nanosecond precision (9's after the .) depending on platform.
escape_char_left/escape_char_right
Configure the characters to use to delimit SQL column names when escaping .
Column names are delimited when using the escaping feature of AutomateDV,
and by default both the delimiting characters are double quotes following the SQL :1999 standard.
Here are some examples for different platforms:
The table below indicates which macros and templates are officially available for each platform.
AutomateDV is primarily developed on Snowflake, and we release support for other platforms as and when possible.
Most of the time this will be at the same time as the Snowflake release unless it is snowflake-only functionality
with no equivalent in another platform.
Thanks for your patience and continued support!
Macro/Template
Snowflake
Google BigQuery
MS SQL Server
Databricks
Postgres
Redshift**
hash
stage
hub
link
sat
t_link
eff_sat
ma_sat
xts
pit
bridge
**
These platforms are either planned or actively being worked on by the community and/or internal AutomateDV team.
See the issues below for more information:
Table templates
(macros/tables)
These macros are the core of the package and can be called in your models to build the different types of tables needed
for your Data Vault 2.0 Data Warehouse.
hub
view source:
Generates SQL to build a Hub table using the provided parameters.
Usage
{{ automate_dv.hub ( src_pk = src_pk , src_nk = src_nk , src_ldts = src_ldts ,
src_extra_columns = src_extra_columns ,
src_source = src_source , source_model = source_model ) }}
Parameters
Parameter
Description
Type
Required?
src_pk
Source primary key column
List[String]/String
src_nk
Source natural key column
List[String]/String
src_extra_columns
Select arbitrary columns from the source
List[String]/String
src_ldts
Source load date timestamp column
String
src_source
Name of the column containing the source ID
List[String]/String
source_model
Staging model name
List[String]/String
Video Tutorial
See examples
Example Output
Snowflake Google BigQuery MS SQL Server Postgres Databricks
Single-Source (Base Load) Single-Source (Incremental Loads) Multi-Source (Base Load) Multi-Source (Incremental Loads)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . stg_customer AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
QUALIFY row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . stg_customer AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
QUALIFY row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
LEFT JOIN ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . hub_customer_incremental AS d
ON a . CUSTOMER_HK = d . CUSTOMER_HK
WHERE d . CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . stg_customer AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
QUALIFY row_number = 1
),
row_rank_2 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . stg_orders AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
QUALIFY row_number = 1
),
stage_union AS (
SELECT * FROM row_rank_1
UNION ALL
SELECT * FROM row_rank_2
),
row_rank_union AS (
SELECT ru . * ,
ROW_NUMBER () OVER (
PARTITION BY ru . CUSTOMER_HK
ORDER BY ru . LOAD_DATETIME , ru . RECORD_SOURCE ASC
) AS row_rank_number
FROM stage_union AS ru
WHERE ru . CUSTOMER_HK IS NOT NULL
QUALIFY row_rank_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_union AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . stg_customer AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
QUALIFY row_number = 1
),
row_rank_2 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . stg_orders AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
QUALIFY row_number = 1
),
stage_union AS (
SELECT * FROM row_rank_1
UNION ALL
SELECT * FROM row_rank_2
),
row_rank_union AS (
SELECT ru . * ,
ROW_NUMBER () OVER (
PARTITION BY ru . CUSTOMER_HK
ORDER BY ru . LOAD_DATETIME , ru . RECORD_SOURCE ASC
) AS row_rank_number
FROM stage_union AS ru
WHERE ru . CUSTOMER_HK IS NOT NULL
QUALIFY row_rank_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_union AS a
LEFT JOIN ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . hub_orders_multi_source_incremental AS d
ON a . CUSTOMER_HK = d . CUSTOMER_HK
WHERE d . CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert
Single-Source (Base Load) Single-Source (Incremental Loads) Multi-Source (Base Load) Multi-Source (Incremental Loads)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` stg_customer ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
QUALIFY row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` stg_customer ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
QUALIFY row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
LEFT JOIN ` dbtvault - 341416 ` . ` dbtvault ` . ` hub_customer_incremental ` AS d
ON a . CUSTOMER_HK = d . CUSTOMER_HK
WHERE d . CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` stg_customer ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
QUALIFY row_number = 1
),
row_rank_2 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` stg_orders ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
QUALIFY row_number = 1
),
stage_union AS (
SELECT * FROM row_rank_1
UNION ALL
SELECT * FROM row_rank_2
),
row_rank_union AS (
SELECT ru . * ,
ROW_NUMBER () OVER (
PARTITION BY ru . CUSTOMER_HK
ORDER BY ru . LOAD_DATETIME , ru . RECORD_SOURCE ASC
) AS row_rank_number
FROM stage_union AS ru
WHERE ru . CUSTOMER_HK IS NOT NULL
QUALIFY row_rank_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_union AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` stg_customer ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
QUALIFY row_number = 1
),
row_rank_2 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` stg_orders ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
QUALIFY row_number = 1
),
stage_union AS (
SELECT * FROM row_rank_1
UNION ALL
SELECT * FROM row_rank_2
),
row_rank_union AS (
SELECT ru . * ,
ROW_NUMBER () OVER (
PARTITION BY ru . CUSTOMER_HK
ORDER BY ru . LOAD_DATETIME , ru . RECORD_SOURCE ASC
) AS row_rank_number
FROM stage_union AS ru
WHERE ru . CUSTOMER_HK IS NOT NULL
QUALIFY row_rank_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_union AS a
LEFT JOIN ` dbtvault - 341416 ` . ` dbtvault ` . ` hub_orders_multi_source_incremental ` AS d
ON a . CUSTOMER_HK = d . CUSTOMER_HK
WHERE d . CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert
Single-Source (Base Load) Single-Source (Incremental Loads) Multi-Source (Base Load) Multi-Source (Incremental Loads)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 WITH row_rank_1 AS (
SELECT CUSTOMER_HK , CUSTOMER_ID , LOAD_DATETIME , RECORD_SOURCE
FROM (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM "AUTOMATE_DV_TEST" . "TEST" . "stg_customer" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
) h
WHERE h . row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 WITH row_rank_1 AS (
SELECT CUSTOMER_HK , CUSTOMER_ID , LOAD_DATETIME , RECORD_SOURCE
FROM (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM "AUTOMATE_DV_TEST" . "TEST" . "stg_customer" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
) h
WHERE h . row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
LEFT JOIN "AUTOMATE_DV_TEST" . "TEST" . "hub_customer_incremental" AS d
ON a . CUSTOMER_HK = d . CUSTOMER_HK
WHERE d . CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 WITH row_rank_1 AS (
SELECT CUSTOMER_HK , CUSTOMER_ID , LOAD_DATETIME , RECORD_SOURCE
FROM (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM "AUTOMATE_DV_TEST" . "TEST" . "stg_customer" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
) h
WHERE h . row_number = 1
),
row_rank_2 AS (
SELECT CUSTOMER_HK , CUSTOMER_ID , LOAD_DATETIME , RECORD_SOURCE
FROM (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM "AUTOMATE_DV_TEST" . "TEST" . "stg_orders" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
) h
WHERE h . row_number = 1
),
stage_union AS (
SELECT * FROM row_rank_1
UNION ALL
SELECT * FROM row_rank_2
),
row_rank_union AS (
SELECT *
FROM (
SELECT ru . * ,
ROW_NUMBER () OVER (
PARTITION BY ru . CUSTOMER_HK
ORDER BY ru . LOAD_DATETIME , ru . RECORD_SOURCE ASC
) AS row_rank_number
FROM stage_union AS ru
WHERE ru . CUSTOMER_HK IS NOT NULL
) h
WHERE h . row_rank_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_union AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 WITH row_rank_1 AS (
SELECT CUSTOMER_HK , CUSTOMER_ID , LOAD_DATETIME , RECORD_SOURCE
FROM (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM "AUTOMATE_DV_TEST" . "TEST" . "stg_customer" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
) h
WHERE h . row_number = 1
),
row_rank_2 AS (
SELECT CUSTOMER_HK , CUSTOMER_ID , LOAD_DATETIME , RECORD_SOURCE
FROM (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM "AUTOMATE_DV_TEST" . "TEST" . "stg_orders" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
) h
WHERE h . row_number = 1
),
stage_union AS (
SELECT * FROM row_rank_1
UNION ALL
SELECT * FROM row_rank_2
),
row_rank_union AS (
SELECT *
FROM (
SELECT ru . * ,
ROW_NUMBER () OVER (
PARTITION BY ru . CUSTOMER_HK
ORDER BY ru . LOAD_DATETIME , ru . RECORD_SOURCE ASC
) AS row_rank_number
FROM stage_union AS ru
WHERE ru . CUSTOMER_HK IS NOT NULL
) h
WHERE h . row_rank_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_union AS a
LEFT JOIN "AUTOMATE_DV_TEST" . "TEST" . "hub_orders_multi_source_incremental" AS d
ON a . CUSTOMER_HK = d . CUSTOMER_HK
WHERE d . CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert
Single-Source (Base Load) Single-Source (Incremental Loads) Multi-Source (Base Load) Multi-Source (Incremental Loads)
WITH row_rank_1 AS (
SELECT DISTINCT ON ( rr . CUSTOMER_HK ) rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE
FROM "dbtvault_db" . "development" . "stg_customer" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
ORDER BY rr . CUSTOMER_HK , rr . LOAD_DATETIME
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14 WITH row_rank_1 AS (
SELECT DISTINCT ON ( rr . CUSTOMER_HK ) rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE
FROM "dbtvault_db" . "development" . "stg_customer" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
ORDER BY rr . CUSTOMER_HK , rr . LOAD_DATETIME
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
LEFT JOIN "dbtvault_db" . "development" . "hub_customer_incremental" AS d
ON a . CUSTOMER_HK = d . CUSTOMER_HK
WHERE d . CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 WITH row_rank_1 AS (
SELECT DISTINCT ON ( rr . CUSTOMER_HK ) rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE
FROM "dbtvault_db" . "development" . "stg_customer" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
ORDER BY rr . CUSTOMER_HK , rr . LOAD_DATETIME
),
row_rank_2 AS (
SELECT DISTINCT ON ( rr . CUSTOMER_HK ) rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE
FROM "dbtvault_db" . "development" . "stg_orders" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
ORDER BY rr . CUSTOMER_HK , rr . LOAD_DATETIME
),
stage_union AS (
SELECT * FROM row_rank_1
UNION ALL
SELECT * FROM row_rank_2
),
row_rank_union AS ( SELECT DISTINCT ON ( ru . CUSTOMER_HK ) ru . *
FROM stage_union AS ru
WHERE ru . CUSTOMER_HK IS NOT NULL
ORDER BY ru . CUSTOMER_HK , ru . LOAD_DATETIME , ru . RECORD_SOURCE ASC
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_union AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 WITH row_rank_1 AS (
SELECT DISTINCT ON ( rr . CUSTOMER_HK ) rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE
FROM "dbtvault_db" . "development" . "stg_customer" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
ORDER BY rr . CUSTOMER_HK , rr . LOAD_DATETIME
),
row_rank_2 AS (
SELECT DISTINCT ON ( rr . CUSTOMER_HK ) rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE
FROM "dbtvault_db" . "development" . "stg_orders" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
ORDER BY rr . CUSTOMER_HK , rr . LOAD_DATETIME
),
stage_union AS (
SELECT * FROM row_rank_1
UNION ALL
SELECT * FROM row_rank_2
),
row_rank_union AS ( SELECT DISTINCT ON ( ru . CUSTOMER_HK ) ru . *
FROM stage_union AS ru
WHERE ru . CUSTOMER_HK IS NOT NULL
ORDER BY ru . CUSTOMER_HK , ru . LOAD_DATETIME , ru . RECORD_SOURCE ASC
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_union AS a
LEFT JOIN "dbtvault_db" . "development" . "hub_orders_multi_source_incremental" AS d
ON a . CUSTOMER_HK = d . CUSTOMER_HK
WHERE d . CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert
Single-Source (Base Load) Single-Source (Incremental Loads) Multi-Source (Base Load) Multi-Source (Incremental Loads)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` hive_metastore ` . ` dbtvault ` . ` stg_customer ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
QUALIFY row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` hive_metastore ` . ` dbtvault ` . ` stg_customer ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
QUALIFY row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
LEFT JOIN ` hive_metastore ` . ` dbtvault ` . ` hub_customer_incremental ` AS d
ON a . CUSTOMER_HK = d . CUSTOMER_HK
WHERE d . CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` hive_metastore ` . ` dbtvault ` . ` stg_customer ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
QUALIFY row_number = 1
),
row_rank_2 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` hive_metastore ` . ` dbtvault ` . ` stg_orders ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
QUALIFY row_number = 1
),
stage_union AS (
SELECT * FROM row_rank_1
UNION ALL
SELECT * FROM row_rank_2
),
row_rank_union AS (
SELECT ru . * ,
ROW_NUMBER () OVER (
PARTITION BY ru . CUSTOMER_HK
ORDER BY ru . LOAD_DATETIME , ru . RECORD_SOURCE ASC
) AS row_rank_number
FROM stage_union AS ru
WHERE ru . CUSTOMER_HK IS NOT NULL
QUALIFY row_rank_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_union AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` hive_metastore ` . ` dbtvault ` . ` stg_customer ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
QUALIFY row_number = 1
),
row_rank_2 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` hive_metastore ` . ` dbtvault ` . ` stg_orders ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
QUALIFY row_number = 1
),
stage_union AS (
SELECT * FROM row_rank_1
UNION ALL
SELECT * FROM row_rank_2
),
row_rank_union AS (
SELECT ru . * ,
ROW_NUMBER () OVER (
PARTITION BY ru . CUSTOMER_HK
ORDER BY ru . LOAD_DATETIME , ru . RECORD_SOURCE ASC
) AS row_rank_number
FROM stage_union AS ru
WHERE ru . CUSTOMER_HK IS NOT NULL
QUALIFY row_rank_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_union AS a
LEFT JOIN ` hive_metastore ` . ` dbtvault ` . ` hub_orders_multi_source_incremental ` AS d
ON a . CUSTOMER_HK = d . CUSTOMER_HK
WHERE d . CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert
link
view source:
Generates SQL to build a Link table using the provided parameters.
Usage
{{ automate_dv.link ( src_pk = src_pk , src_fk = src_fk , src_ldts = src_ldts ,
src_extra_columns = src_extra_columns ,
src_source = src_source , source_model = source_model ) }}
Parameters
Parameter
Description
Type
Required?
src_pk
Source primary key column
List[String]/String
src_fk
Source foreign key column(s)
List[String]
src_extra_columns
Select arbitrary columns from the source
List[String]/String
src_ldts
Source load date timestamp column
String
src_source
Name of the column containing the source ID
List[String]/String
source_model
Staging model name
List[String]/String
Video Tutorial
See examples
Example Output
Snowflake Google BigQuery MS SQL Server Postgres Databricks
Single-Source (Base Load) Single-Source (Incremental Loads) Multi-Source (Base Load) Multi-Source (Incremental Loads)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . stg_customer AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
QUALIFY row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . stg_customer AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
QUALIFY row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . stg_customer AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
QUALIFY row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . stg_customer AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
QUALIFY row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
Single-Source (Base Load) Single-Source (Incremental Loads) Multi-Source (Base Load) Multi-Source (Incremental Loads)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` stg_customer ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
QUALIFY row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` stg_customer ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
QUALIFY row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` stg_customer ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
QUALIFY row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` stg_customer ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
QUALIFY row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
Single-Source (Base Load) Single-Source (Incremental Loads) Multi-Source (Base Load) Multi-Source (Incremental Loads)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 WITH row_rank_1 AS (
SELECT *
FROM
(
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM "AUTOMATE_DV_TEST" . "TEST" . "stg_customer" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
) l
WHERE l . row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 WITH row_rank_1 AS (
SELECT *
FROM
(
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM "AUTOMATE_DV_TEST" . "TEST" . "stg_customer" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
) l
WHERE l . row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 WITH row_rank_1 AS (
SELECT *
FROM
(
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM "AUTOMATE_DV_TEST" . "TEST" . "stg_customer" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
) l
WHERE l . row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 WITH row_rank_1 AS (
SELECT *
FROM
(
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM "AUTOMATE_DV_TEST" . "TEST" . "stg_customer" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
) l
WHERE l . row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
Single-Source (Base Load) Single-Source (Incremental Loads) Multi-Source (Base Load) Multi-Source (Incremental Loads)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 WITH row_rank_1 AS (
SELECT * FROM (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM "dbtvault_db" . "development" . "stg_customer" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
) as l
WHERE row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 WITH row_rank_1 AS (
SELECT * FROM (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM "dbtvault_db" . "development" . "stg_customer" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
) as l
WHERE row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 WITH row_rank_1 AS (
SELECT * FROM (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM "dbtvault_db" . "development" . "stg_customer" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
) as l
WHERE row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 WITH row_rank_1 AS (
SELECT * FROM (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM "dbtvault_db" . "development" . "stg_customer" AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
) as l
WHERE row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
Single-Source (Base Load) Single-Source (Incremental Loads) Multi-Source (Base Load) Multi-Source (Incremental Loads)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` hive_metastore ` . ` dbtvault ` . ` stg_customer ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
QUALIFY row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` hive_metastore ` . ` dbtvault ` . ` stg_customer ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
QUALIFY row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` hive_metastore ` . ` dbtvault ` . ` stg_customer ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
QUALIFY row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 WITH row_rank_1 AS (
SELECT rr . CUSTOMER_HK , rr . CUSTOMER_ID , rr . LOAD_DATETIME , rr . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY rr . CUSTOMER_HK
ORDER BY rr . LOAD_DATETIME
) AS row_number
FROM ` hive_metastore ` . ` dbtvault ` . ` stg_customer ` AS rr
WHERE rr . CUSTOMER_HK IS NOT NULL
AND rr . CUSTOMER_ID IS NOT NULL
QUALIFY row_number = 1
),
records_to_insert AS (
SELECT a . CUSTOMER_HK , a . CUSTOMER_ID , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
t_link
Generates SQL to build a Transactional Link table using the provided parameters.
Usage
{{ automate_dv.t_link ( src_pk = src_pk , src_fk = src_fk , src_payload = src_payload ,
src_extra_columns = src_extra_columns ,
src_eff = src_eff , src_ldts = src_ldts ,
src_source = src_source , source_model = source_model ) }}
Parameters
Parameter
Description
Type
Required?
src_pk
Source primary key column
List[String]/String
src_fk
Source foreign key column(s)
List[String]
src_payload
Source payload column(s)
List[String]
src_extra_columns
Select arbitrary columns from the source
List[String]/String
src_eff
Source effective from column
String
src_ldts
Source load date timestamp column
String
src_source
Name of the column containing the source ID
String
source_model
Staging model name
String
See examples
Example Output
Snowflake Google BigQuery MS SQL Server Postgres Databricks
Base Load Incremental Loads
1
2
3
4
5
6
7
8
9
10
11
12 WITH stage AS (
SELECT TRANSACTION_HK , CUSTOMER_HK , ORDER_HK , o_orderdate , o_orderpriority , o_clerk , o_shippriority , o_comment , o_totalprice , o_orderstatus , EFFECTIVE_FROM , LOAD_DATETIME , RECORD_SOURCE
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . stg_transactions
WHERE TRANSACTION_HK IS NOT NULL
AND CUSTOMER_HK IS NOT NULL
AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
SELECT DISTINCT stg . TRANSACTION_HK , stg . CUSTOMER_HK , stg . ORDER_HK , stg . o_orderdate , stg . o_orderpriority , stg . o_clerk , stg . o_shippriority , stg . o_comment , stg . o_totalprice , stg . o_orderstatus , stg . EFFECTIVE_FROM , stg . LOAD_DATETIME , stg . RECORD_SOURCE
FROM stage AS stg
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 WITH stage AS (
SELECT TRANSACTION_HK , CUSTOMER_HK , ORDER_HK , o_orderdate , o_orderpriority , o_clerk , o_shippriority , o_comment , o_totalprice , o_orderstatus , EFFECTIVE_FROM , LOAD_DATETIME , RECORD_SOURCE
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . stg_transactions
WHERE TRANSACTION_HK IS NOT NULL
AND CUSTOMER_HK IS NOT NULL
AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
SELECT DISTINCT stg . TRANSACTION_HK , stg . CUSTOMER_HK , stg . ORDER_HK , stg . o_orderdate , stg . o_orderpriority , stg . o_clerk , stg . o_shippriority , stg . o_comment , stg . o_totalprice , stg . o_orderstatus , stg . EFFECTIVE_FROM , stg . LOAD_DATETIME , stg . RECORD_SOURCE
FROM stage AS stg
LEFT JOIN ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . t_link_transactions_incremental AS tgt
ON stg . TRANSACTION_HK = tgt . TRANSACTION_HK
WHERE tgt . TRANSACTION_HK IS NULL
)
SELECT * FROM records_to_insert
Base Load Incremental Loads
1
2
3
4
5
6
7
8
9
10
11
12 WITH stage AS (
SELECT TRANSACTION_HK , CUSTOMER_HK , ORDER_HK , o_orderdate , o_orderpriority , o_clerk , o_shippriority , o_comment , o_totalprice , o_orderstatus , EFFECTIVE_FROM , LOAD_DATETIME , RECORD_SOURCE
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` stg_transactions `
WHERE TRANSACTION_HK IS NOT NULL
AND CUSTOMER_HK IS NOT NULL
AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
SELECT DISTINCT stg . TRANSACTION_HK , stg . CUSTOMER_HK , stg . ORDER_HK , stg . o_orderdate , stg . o_orderpriority , stg . o_clerk , stg . o_shippriority , stg . o_comment , stg . o_totalprice , stg . o_orderstatus , stg . EFFECTIVE_FROM , stg . LOAD_DATETIME , stg . RECORD_SOURCE
FROM stage AS stg
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 WITH stage AS (
SELECT TRANSACTION_HK , CUSTOMER_HK , ORDER_HK , o_orderdate , o_orderpriority , o_clerk , o_shippriority , o_comment , o_totalprice , o_orderstatus , EFFECTIVE_FROM , LOAD_DATETIME , RECORD_SOURCE
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` stg_transactions `
WHERE TRANSACTION_HK IS NOT NULL
AND CUSTOMER_HK IS NOT NULL
AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
SELECT DISTINCT stg . TRANSACTION_HK , stg . CUSTOMER_HK , stg . ORDER_HK , stg . o_orderdate , stg . o_orderpriority , stg . o_clerk , stg . o_shippriority , stg . o_comment , stg . o_totalprice , stg . o_orderstatus , stg . EFFECTIVE_FROM , stg . LOAD_DATETIME , stg . RECORD_SOURCE
FROM stage AS stg
LEFT JOIN ` dbtvault - 341416 ` . ` dbtvault ` . ` t_link_transactions_incremental ` AS tgt
ON stg . TRANSACTION_HK = tgt . TRANSACTION_HK
WHERE tgt . TRANSACTION_HK IS NULL
)
SELECT * FROM records_to_insert
Base Load Incremental Loads
1
2
3
4
5
6
7
8
9
10
11
12 WITH stage AS (
SELECT TRANSACTION_HK , CUSTOMER_HK , ORDER_HK , o_orderdate , o_orderpriority , o_clerk , o_shippriority , o_comment , o_totalprice , o_orderstatus , EFFECTIVE_FROM , LOAD_DATETIME , RECORD_SOURCE
FROM "AUTOMATE_DV_TEST" . "TEST" . "stg_transactions"
WHERE TRANSACTION_HK IS NOT NULL
AND CUSTOMER_HK IS NOT NULL
AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
SELECT DISTINCT stg . TRANSACTION_HK , stg . CUSTOMER_HK , stg . ORDER_HK , stg . o_orderdate , stg . o_orderpriority , stg . o_clerk , stg . o_shippriority , stg . o_comment , stg . o_totalprice , stg . o_orderstatus , stg . EFFECTIVE_FROM , stg . LOAD_DATETIME , stg . RECORD_SOURCE
FROM stage AS stg
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 WITH stage AS (
SELECT TRANSACTION_HK , CUSTOMER_HK , ORDER_HK , o_orderdate , o_orderpriority , o_clerk , o_shippriority , o_comment , o_totalprice , o_orderstatus , EFFECTIVE_FROM , LOAD_DATETIME , RECORD_SOURCE
FROM "AUTOMATE_DV_TEST" . "TEST" . "stg_transactions"
WHERE TRANSACTION_HK IS NOT NULL
AND CUSTOMER_HK IS NOT NULL
AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
SELECT DISTINCT stg . TRANSACTION_HK , stg . CUSTOMER_HK , stg . ORDER_HK , stg . o_orderdate , stg . o_orderpriority , stg . o_clerk , stg . o_shippriority , stg . o_comment , stg . o_totalprice , stg . o_orderstatus , stg . EFFECTIVE_FROM , stg . LOAD_DATETIME , stg . RECORD_SOURCE
FROM stage AS stg
LEFT JOIN "AUTOMATE_DV_TEST" . "TEST" . "t_link_transactions_incremental" AS tgt
ON stg . TRANSACTION_HK = tgt . TRANSACTION_HK
WHERE tgt . TRANSACTION_HK IS NULL
)
SELECT * FROM records_to_insert
Base Load Incremental Loads
1
2
3
4
5
6
7
8
9
10
11
12 WITH stage AS (
SELECT TRANSACTION_HK , CUSTOMER_HK , ORDER_HK , o_orderdate , o_orderpriority , o_clerk , o_shippriority , o_comment , o_totalprice , o_orderstatus , EFFECTIVE_FROM , LOAD_DATETIME , RECORD_SOURCE
FROM "dbtvault_db" . "development" . "stg_transactions"
WHERE TRANSACTION_HK IS NOT NULL
AND CUSTOMER_HK IS NOT NULL
AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
SELECT DISTINCT stg . TRANSACTION_HK , stg . CUSTOMER_HK , stg . ORDER_HK , stg . o_orderdate , stg . o_orderpriority , stg . o_clerk , stg . o_shippriority , stg . o_comment , stg . o_totalprice , stg . o_orderstatus , stg . EFFECTIVE_FROM , stg . LOAD_DATETIME , stg . RECORD_SOURCE
FROM stage AS stg
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 WITH stage AS (
SELECT TRANSACTION_HK , CUSTOMER_HK , ORDER_HK , o_orderdate , o_orderpriority , o_clerk , o_shippriority , o_comment , o_totalprice , o_orderstatus , EFFECTIVE_FROM , LOAD_DATETIME , RECORD_SOURCE
FROM "dbtvault_db" . "development" . "stg_transactions"
WHERE TRANSACTION_HK IS NOT NULL
AND CUSTOMER_HK IS NOT NULL
AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
SELECT DISTINCT stg . TRANSACTION_HK , stg . CUSTOMER_HK , stg . ORDER_HK , stg . o_orderdate , stg . o_orderpriority , stg . o_clerk , stg . o_shippriority , stg . o_comment , stg . o_totalprice , stg . o_orderstatus , stg . EFFECTIVE_FROM , stg . LOAD_DATETIME , stg . RECORD_SOURCE
FROM stage AS stg
LEFT JOIN "dbtvault_db" . "development" . "t_link_transactions_incremental" AS tgt
ON stg . TRANSACTION_HK = tgt . TRANSACTION_HK
WHERE tgt . TRANSACTION_HK IS NULL
)
SELECT * FROM records_to_insert
Base Load Incremental Loads
1
2
3
4
5
6
7
8
9
10
11
12 WITH stage AS (
SELECT TRANSACTION_HK , CUSTOMER_HK , ORDER_HK , o_orderdate , o_orderpriority , o_clerk , o_shippriority , o_comment , o_totalprice , o_orderstatus , EFFECTIVE_FROM , LOAD_DATETIME , RECORD_SOURCE
FROM ` hive_metastore ` . ` dbtvault ` . ` stg_transactions `
WHERE TRANSACTION_HK IS NOT NULL
AND CUSTOMER_HK IS NOT NULL
AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
SELECT DISTINCT stg . TRANSACTION_HK , stg . CUSTOMER_HK , stg . ORDER_HK , stg . o_orderdate , stg . o_orderpriority , stg . o_clerk , stg . o_shippriority , stg . o_comment , stg . o_totalprice , stg . o_orderstatus , stg . EFFECTIVE_FROM , stg . LOAD_DATETIME , stg . RECORD_SOURCE
FROM stage AS stg
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 WITH stage AS (
SELECT TRANSACTION_HK , CUSTOMER_HK , ORDER_HK , o_orderdate , o_orderpriority , o_clerk , o_shippriority , o_comment , o_totalprice , o_orderstatus , EFFECTIVE_FROM , LOAD_DATETIME , RECORD_SOURCE
FROM ` hive_metastore ` . ` dbtvault ` . ` stg_transactions `
WHERE TRANSACTION_HK IS NOT NULL
AND CUSTOMER_HK IS NOT NULL
AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
SELECT DISTINCT stg . TRANSACTION_HK , stg . CUSTOMER_HK , stg . ORDER_HK , stg . o_orderdate , stg . o_orderpriority , stg . o_clerk , stg . o_shippriority , stg . o_comment , stg . o_totalprice , stg . o_orderstatus , stg . EFFECTIVE_FROM , stg . LOAD_DATETIME , stg . RECORD_SOURCE
FROM stage AS stg
LEFT JOIN ` hive_metastore ` . ` dbtvault ` . ` t_link_transactions_incremental ` AS tgt
ON stg . TRANSACTION_HK = tgt . TRANSACTION_HK
WHERE tgt . TRANSACTION_HK IS NULL
)
SELECT * FROM records_to_insert
sat
view source:
Generates SQL to build a Satellite table using the provided parameters.
Usage
{{ automate_dv.sat ( src_pk = src_pk , src_hashdiff = src_hashdiff , src_payload = src_payload ,
src_extra_columns = src_extra_columns ,
src_eff = src_eff , src_ldts = src_ldts ,
src_source = src_source , source_model = source_model ) }}
Parameters
Parameter
Description
Type
Required?
src_pk
Source primary key column
List[String]/String
src_hashdiff
Source hashdiff column
String
src_payload
Source payload column(s)
List[String]
src_extra_columns
Select arbitrary columns from the source
List[String]/String
src_eff
Source effective from column
String
src_ldts
Source load date timestamp column
String
src_source
Name of the column containing the source ID
String
source_model
Staging model name
String
Video Tutorial
Satellite Behaviour Flags
This section covers global variables (var ) and config options that affect the behaviour of satellites.
Parameter
Description
Type
Flag type
Required?
apply_source_filter
Adds additional logic to filter the source_model
data
Boolean
config
enable_ghost_records
Adds a single ghost record to the satellite
Boolean
var
apply_source_filter (config) enable_ghost_records (var)
This config option adds a WHERE clause (in incremental mode) using an additional CTE in the SQL code to filter the source_model
's data
This ensures that records in the source data are filtered so that only records with src_ldts
after the MAX ldts in the existing Satellite
are processed during the satellite load.
It is intended for this config option to be used if you cannot guarantee atomic/idempotent batches i.e. only data which has not been loaded yet in your stage data.
This global variable option enables additional logic to add a ghost record upon first creation OR once when running in incremental mode
if a ghost record has not already been added.
Read more about ghost records.
See examples
Example Output
Snowflake Google BigQuery MS SQL Server Postgres Databricks
Base Load Incremental Loads Base Load with Ghost Record Incremental Loads with Ghost Record
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . stg_customer AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd
QUALIFY asc_rank = 1
),
unique_source_records AS (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE
FROM source_data as sd
QUALIFY sd . HASHDIFF != LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC )
),
records_to_insert AS (
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
UNION
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . stg_customer AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
latest_records AS (
SELECT current_records . CUSTOMER_HK , current_records . HASHDIFF , current_records . CUSTOMER_NAME , current_records . CUSTOMER_ADDRESS , current_records . CUSTOMER_PHONE , current_records . ACCBAL , current_records . MKTSEGMENT , current_records . COMMENT , current_records . EFFECTIVE_FROM , current_records . LOAD_DATETIME , current_records . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY current_records . CUSTOMER_HK
ORDER BY current_records . LOAD_DATETIME DESC
) AS rank_num
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . satellite_incremental AS current_records
JOIN (
SELECT DISTINCT source_data . CUSTOMER_HK
FROM source_data
) AS source_records
ON source_records . CUSTOMER_HK = current_records . CUSTOMER_HK
QUALIFY rank_num = 1
),
first_record_in_set AS (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd
QUALIFY asc_rank = 1
),
unique_source_records AS (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE
FROM source_data as sd
QUALIFY sd . HASHDIFF != LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC )
),
records_to_insert AS (
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
LEFT JOIN LATEST_RECORDS lr
ON lr . CUSTOMER_HK = frin . CUSTOMER_HK
AND lr . HASHDIFF = frin . HASHDIFF
WHERE lr . HASHDIFF IS NULL
UNION
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . stg_customer AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd
QUALIFY asc_rank = 1
),
unique_source_records AS (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE
FROM source_data as sd
QUALIFY sd . HASHDIFF != LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC )
),
ghost AS (
SELECT
NULL AS CUSTOMER_NAME ,
NULL AS CUSTOMER_PHONE ,
NULL AS CUSTOMER_ADDRESS ,
NULL AS ACCBAL ,
NULL AS MKTSEGMENT ,
NULL AS COMMENT ,
TO_TIMESTAMP_NTZ ( '1900-01-01 00:00:00' ) AS LOAD_DATETIME ,
TO_TIMESTAMP_NTZ ( '1900-01-01 00:00:00' ) AS EFFECTIVE_FROM ,
CAST ( 'AUTOMATE_DV_SYSTEM' AS VARCHAR ) AS RECORD_SOURCE ,
CAST ( '00000000000000000000000000000000' AS BINARY ( 16 )) AS CUSTOMER_HK ,
CAST ( '00000000000000000000000000000000' AS BINARY ( 16 )) AS HASHDIFF
),
records_to_insert AS (
SELECT
g . CUSTOMER_HK , g . HASHDIFF , g . CUSTOMER_NAME , g . CUSTOMER_ADDRESS , g . CUSTOMER_PHONE , g . ACCBAL , g . MKTSEGMENT , g . COMMENT , g . EFFECTIVE_FROM , g . LOAD_DATETIME , g . RECORD_SOURCE
FROM ghost AS g
UNION
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
UNION
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . stg_customer AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
latest_records AS (
SELECT current_records . CUSTOMER_HK , current_records . HASHDIFF , current_records . CUSTOMER_NAME , current_records . CUSTOMER_ADDRESS , current_records . CUSTOMER_PHONE , current_records . ACCBAL , current_records . MKTSEGMENT , current_records . COMMENT , current_records . EFFECTIVE_FROM , current_records . LOAD_DATETIME , current_records . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY current_records . CUSTOMER_HK
ORDER BY current_records . LOAD_DATETIME DESC
) AS rank_num
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . satellite_ghost_incremental AS current_records
JOIN (
SELECT DISTINCT source_data . CUSTOMER_HK
FROM source_data
) AS source_records
ON source_records . CUSTOMER_HK = current_records . CUSTOMER_HK
QUALIFY rank_num = 1
),
first_record_in_set AS (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd
QUALIFY asc_rank = 1
),
unique_source_records AS (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE
FROM source_data as sd
QUALIFY sd . HASHDIFF != LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC )
),
ghost AS (
SELECT
NULL AS CUSTOMER_NAME ,
NULL AS CUSTOMER_PHONE ,
NULL AS CUSTOMER_ADDRESS ,
NULL AS ACCBAL ,
NULL AS MKTSEGMENT ,
NULL AS COMMENT ,
TO_TIMESTAMP_NTZ ( '1900-01-01 00:00:00' ) AS LOAD_DATETIME ,
TO_TIMESTAMP_NTZ ( '1900-01-01 00:00:00' ) AS EFFECTIVE_FROM ,
CAST ( 'AUTOMATE_DV_SYSTEM' AS VARCHAR ) AS RECORD_SOURCE ,
CAST ( '00000000000000000000000000000000' AS BINARY ( 16 )) AS CUSTOMER_HK ,
CAST ( '00000000000000000000000000000000' AS BINARY ( 16 )) AS HASHDIFF
),
records_to_insert AS (
SELECT
g . CUSTOMER_HK , g . HASHDIFF , g . CUSTOMER_NAME , g . CUSTOMER_ADDRESS , g . CUSTOMER_PHONE , g . ACCBAL , g . MKTSEGMENT , g . COMMENT , g . EFFECTIVE_FROM , g . LOAD_DATETIME , g . RECORD_SOURCE
FROM ghost AS g
WHERE NOT EXISTS ( SELECT 1 FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . satellite_ghost_incremental AS h WHERE h . HASHDIFF = g . HASHDIFF )
UNION
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
LEFT JOIN LATEST_RECORDS lr
ON lr . CUSTOMER_HK = frin . CUSTOMER_HK
AND lr . HASHDIFF = frin . HASHDIFF
WHERE lr . HASHDIFF IS NULL
UNION
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
Base Load Incremental Loads Base Load with Ghost Record Incremental Loads with Ghost Record
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` stg_customer ` AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd
QUALIFY asc_rank = 1
),
unique_source_records AS (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE
FROM source_data as sd
QUALIFY sd . HASHDIFF != LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC )
),
records_to_insert AS (
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
UNION DISTINCT
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` stg_customer ` AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
latest_records AS (
SELECT current_records . CUSTOMER_HK , current_records . HASHDIFF , current_records . CUSTOMER_NAME , current_records . CUSTOMER_ADDRESS , current_records . CUSTOMER_PHONE , current_records . ACCBAL , current_records . MKTSEGMENT , current_records . COMMENT , current_records . EFFECTIVE_FROM , current_records . LOAD_DATETIME , current_records . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY current_records . CUSTOMER_HK
ORDER BY current_records . LOAD_DATETIME DESC
) AS rank_num
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` satellite_incremental ` AS current_records
JOIN (
SELECT DISTINCT source_data . CUSTOMER_HK
FROM source_data
) AS source_records
ON source_records . CUSTOMER_HK = current_records . CUSTOMER_HK
QUALIFY rank_num = 1
),
first_record_in_set AS (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd
QUALIFY asc_rank = 1
),
unique_source_records AS (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE
FROM source_data as sd
QUALIFY sd . HASHDIFF != LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC )
),
records_to_insert AS (
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
LEFT JOIN LATEST_RECORDS lr
ON lr . CUSTOMER_HK = frin . CUSTOMER_HK
AND lr . HASHDIFF = frin . HASHDIFF
WHERE lr . HASHDIFF IS NULL
UNION DISTINCT
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` stg_customer ` AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd
QUALIFY asc_rank = 1
),
unique_source_records AS (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE
FROM source_data as sd
QUALIFY sd . HASHDIFF != LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC )
),
ghost AS (
SELECT
CAST ( NULL AS STRING ) AS CUSTOMER_NAME ,
CAST ( NULL AS STRING ) AS CUSTOMER_PHONE ,
CAST ( NULL AS STRING ) AS CUSTOMER_ADDRESS ,
CAST ( NULL AS FLOAT64 ) AS ACCBAL ,
CAST ( NULL AS STRING ) AS MKTSEGMENT ,
CAST ( NULL AS STRING ) AS COMMENT ,
PARSE_DATETIME ( '%F %H:%M:%E6S' , '1900-01-01 00:00:00' ) AS LOAD_DATETIME ,
PARSE_DATETIME ( '%F %H:%M:%E6S' , '1900-01-01 00:00:00' ) AS EFFECTIVE_FROM ,
CAST ( 'AUTOMATE_DV_SYSTEM' AS STRING ) AS RECORD_SOURCE ,
CAST ( '00000000000000000000000000000000' AS STRING ) AS CUSTOMER_HK ,
CAST ( '00000000000000000000000000000000' AS STRING ) AS HASHDIFF
),
records_to_insert AS (
SELECT
g . CUSTOMER_HK , g . HASHDIFF , g . CUSTOMER_NAME , g . CUSTOMER_ADDRESS , g . CUSTOMER_PHONE , g . ACCBAL , g . MKTSEGMENT , g . COMMENT , g . EFFECTIVE_FROM , g . LOAD_DATETIME , g . RECORD_SOURCE
FROM ghost AS g
UNION DISTINCT
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
UNION DISTINCT
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` stg_customer ` AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
latest_records AS (
SELECT current_records . CUSTOMER_HK , current_records . HASHDIFF , current_records . CUSTOMER_NAME , current_records . CUSTOMER_ADDRESS , current_records . CUSTOMER_PHONE , current_records . ACCBAL , current_records . MKTSEGMENT , current_records . COMMENT , current_records . EFFECTIVE_FROM , current_records . LOAD_DATETIME , current_records . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY current_records . CUSTOMER_HK
ORDER BY current_records . LOAD_DATETIME DESC
) AS rank_num
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` satellite_ghost_incremental ` AS current_records
JOIN (
SELECT DISTINCT source_data . CUSTOMER_HK
FROM source_data
) AS source_records
ON source_records . CUSTOMER_HK = current_records . CUSTOMER_HK
QUALIFY rank_num = 1
),
first_record_in_set AS (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd
QUALIFY asc_rank = 1
),
unique_source_records AS (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE
FROM source_data as sd
QUALIFY sd . HASHDIFF != LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC )
),
ghost AS (
SELECT
CAST ( NULL AS STRING ) AS CUSTOMER_NAME ,
CAST ( NULL AS STRING ) AS CUSTOMER_PHONE ,
CAST ( NULL AS STRING ) AS CUSTOMER_ADDRESS ,
CAST ( NULL AS FLOAT64 ) AS ACCBAL ,
CAST ( NULL AS STRING ) AS MKTSEGMENT ,
CAST ( NULL AS STRING ) AS COMMENT ,
PARSE_DATETIME ( '%F %H:%M:%E6S' , '1900-01-01 00:00:00' ) AS LOAD_DATETIME ,
PARSE_DATETIME ( '%F %H:%M:%E6S' , '1900-01-01 00:00:00' ) AS EFFECTIVE_FROM ,
CAST ( 'AUTOMATE_DV_SYSTEM' AS STRING ) AS RECORD_SOURCE ,
CAST ( '00000000000000000000000000000000' AS STRING ) AS CUSTOMER_HK ,
CAST ( '00000000000000000000000000000000' AS STRING ) AS HASHDIFF
),
records_to_insert AS (
SELECT
g . CUSTOMER_HK , g . HASHDIFF , g . CUSTOMER_NAME , g . CUSTOMER_ADDRESS , g . CUSTOMER_PHONE , g . ACCBAL , g . MKTSEGMENT , g . COMMENT , g . EFFECTIVE_FROM , g . LOAD_DATETIME , g . RECORD_SOURCE
FROM ghost AS g
WHERE NOT EXISTS ( SELECT 1 FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` satellite_ghost_incremental ` AS h WHERE h . HASHDIFF = g . HASHDIFF )
UNION DISTINCT
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
LEFT JOIN LATEST_RECORDS lr
ON lr . CUSTOMER_HK = frin . CUSTOMER_HK
AND lr . HASHDIFF = frin . HASHDIFF
WHERE lr . HASHDIFF IS NULL
UNION DISTINCT
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
Base Load Incremental Loads Base Load with Ghost Record Incremental Loads with Ghost Record
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM "AUTOMATE_DV_TEST" . "TEST" . "stg_customer" AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
SELECT * FROM (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd ) rin
WHERE rin . asc_rank = 1
),
unique_source_records AS (
SELECT
b . CUSTOMER_HK , b . HASHDIFF , b . CUSTOMER_NAME , b . CUSTOMER_ADDRESS , b . CUSTOMER_PHONE , b . ACCBAL , b . MKTSEGMENT , b . COMMENT , b . EFFECTIVE_FROM , b . LOAD_DATETIME , b . RECORD_SOURCE
FROM (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC ) as prev_hashdiff
FROM source_data as sd
) b
WHERE b . HASHDIFF != b . prev_hashdiff
),
records_to_insert AS (
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
UNION
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM "AUTOMATE_DV_TEST" . "TEST" . "stg_customer" AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
latest_records AS (
SELECT b . CUSTOMER_HK , b . HASHDIFF , b . CUSTOMER_NAME , b . CUSTOMER_ADDRESS , b . CUSTOMER_PHONE , b . ACCBAL , b . MKTSEGMENT , b . COMMENT , b . EFFECTIVE_FROM , b . LOAD_DATETIME , b . RECORD_SOURCE
FROM (
SELECT current_records . CUSTOMER_HK , current_records . HASHDIFF , current_records . CUSTOMER_NAME , current_records . CUSTOMER_ADDRESS , current_records . CUSTOMER_PHONE , current_records . ACCBAL , current_records . MKTSEGMENT , current_records . COMMENT , current_records . EFFECTIVE_FROM , current_records . LOAD_DATETIME , current_records . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY current_records . CUSTOMER_HK
ORDER BY current_records . LOAD_DATETIME DESC
) AS rank
FROM "AUTOMATE_DV_TEST" . "TEST" . "satellite_incremental" AS current_records
JOIN (
SELECT DISTINCT source_data . CUSTOMER_HK
FROM source_data
) AS source_records
ON source_records . CUSTOMER_HK = current_records . CUSTOMER_HK
) AS b
WHERE b . rank = 1
),
first_record_in_set AS (
SELECT * FROM (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd ) rin
WHERE rin . asc_rank = 1
),
unique_source_records AS (
SELECT
b . CUSTOMER_HK , b . HASHDIFF , b . CUSTOMER_NAME , b . CUSTOMER_ADDRESS , b . CUSTOMER_PHONE , b . ACCBAL , b . MKTSEGMENT , b . COMMENT , b . EFFECTIVE_FROM , b . LOAD_DATETIME , b . RECORD_SOURCE
FROM (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC ) as prev_hashdiff
FROM source_data as sd
) b
WHERE b . HASHDIFF != b . prev_hashdiff
),
records_to_insert AS (
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
LEFT JOIN LATEST_RECORDS lr
ON lr . CUSTOMER_HK = frin . CUSTOMER_HK
AND lr . HASHDIFF = frin . HASHDIFF
WHERE lr . HASHDIFF IS NULL
UNION
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM "AUTOMATE_DV_TEST" . "TEST" . "stg_customer" AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
SELECT * FROM (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd ) rin
WHERE rin . asc_rank = 1
),
unique_source_records AS (
SELECT
b . CUSTOMER_HK , b . HASHDIFF , b . CUSTOMER_NAME , b . CUSTOMER_ADDRESS , b . CUSTOMER_PHONE , b . ACCBAL , b . MKTSEGMENT , b . COMMENT , b . EFFECTIVE_FROM , b . LOAD_DATETIME , b . RECORD_SOURCE
FROM (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC ) as prev_hashdiff
FROM source_data as sd
) b
WHERE b . HASHDIFF != b . prev_hashdiff
),
ghost AS (
SELECT
CAST ( NULL AS varchar ) AS CUSTOMER_NAME ,
CAST ( NULL AS varchar ) AS CUSTOMER_PHONE ,
CAST ( NULL AS varchar ) AS CUSTOMER_ADDRESS ,
CAST ( NULL AS float ) AS ACCBAL ,
CAST ( NULL AS varchar ) AS MKTSEGMENT ,
CAST ( NULL AS varchar ) AS COMMENT ,
CONVERT ( DATE , '1900-01-01' ) AS LOAD_DATETIME ,
CONVERT ( DATE , '1900-01-01' ) AS EFFECTIVE_FROM ,
CAST ( 'AUTOMATE_DV_SYSTEM' AS varchar ) AS RECORD_SOURCE ,
CAST ( REPLICATE ( CAST ( CAST ( '0' AS tinyint ) AS BINARY ( 16 )), 16 ) AS BINARY ( 16 )) AS CUSTOMER_HK ,
CAST ( REPLICATE ( CAST ( CAST ( '0' AS tinyint ) AS BINARY ( 16 )), 16 ) AS BINARY ( 16 )) AS HASHDIFF
),
records_to_insert AS (
SELECT
g . CUSTOMER_HK , g . HASHDIFF , g . CUSTOMER_NAME , g . CUSTOMER_ADDRESS , g . CUSTOMER_PHONE , g . ACCBAL , g . MKTSEGMENT , g . COMMENT , g . EFFECTIVE_FROM , g . LOAD_DATETIME , g . RECORD_SOURCE
FROM ghost AS g
UNION
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
UNION
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM "AUTOMATE_DV_TEST" . "TEST" . "stg_customer" AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
latest_records AS (
SELECT b . CUSTOMER_HK , b . HASHDIFF , b . CUSTOMER_NAME , b . CUSTOMER_ADDRESS , b . CUSTOMER_PHONE , b . ACCBAL , b . MKTSEGMENT , b . COMMENT , b . EFFECTIVE_FROM , b . LOAD_DATETIME , b . RECORD_SOURCE
FROM (
SELECT current_records . CUSTOMER_HK , current_records . HASHDIFF , current_records . CUSTOMER_NAME , current_records . CUSTOMER_ADDRESS , current_records . CUSTOMER_PHONE , current_records . ACCBAL , current_records . MKTSEGMENT , current_records . COMMENT , current_records . EFFECTIVE_FROM , current_records . LOAD_DATETIME , current_records . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY current_records . CUSTOMER_HK
ORDER BY current_records . LOAD_DATETIME DESC
) AS rank
FROM "AUTOMATE_DV_TEST" . "TEST" . "satellite_ghost_incremental" AS current_records
JOIN (
SELECT DISTINCT source_data . CUSTOMER_HK
FROM source_data
) AS source_records
ON source_records . CUSTOMER_HK = current_records . CUSTOMER_HK
) AS b
WHERE b . rank = 1
),
first_record_in_set AS (
SELECT * FROM (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd ) rin
WHERE rin . asc_rank = 1
),
unique_source_records AS (
SELECT
b . CUSTOMER_HK , b . HASHDIFF , b . CUSTOMER_NAME , b . CUSTOMER_ADDRESS , b . CUSTOMER_PHONE , b . ACCBAL , b . MKTSEGMENT , b . COMMENT , b . EFFECTIVE_FROM , b . LOAD_DATETIME , b . RECORD_SOURCE
FROM (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC ) as prev_hashdiff
FROM source_data as sd
) b
WHERE b . HASHDIFF != b . prev_hashdiff
),
ghost AS (
SELECT
CAST ( NULL AS varchar ) AS CUSTOMER_NAME ,
CAST ( NULL AS varchar ) AS CUSTOMER_PHONE ,
CAST ( NULL AS varchar ) AS CUSTOMER_ADDRESS ,
CAST ( NULL AS float ) AS ACCBAL ,
CAST ( NULL AS varchar ) AS MKTSEGMENT ,
CAST ( NULL AS varchar ) AS COMMENT ,
CONVERT ( DATE , '1900-01-01' ) AS LOAD_DATETIME ,
CONVERT ( DATE , '1900-01-01' ) AS EFFECTIVE_FROM ,
CAST ( 'AUTOMATE_DV_SYSTEM' AS varchar ) AS RECORD_SOURCE ,
CAST ( REPLICATE ( CAST ( CAST ( '0' AS tinyint ) AS BINARY ( 16 )), 16 ) AS BINARY ( 16 )) AS CUSTOMER_HK ,
CAST ( REPLICATE ( CAST ( CAST ( '0' AS tinyint ) AS BINARY ( 16 )), 16 ) AS BINARY ( 16 )) AS HASHDIFF
),
records_to_insert AS (
SELECT
g . CUSTOMER_HK , g . HASHDIFF , g . CUSTOMER_NAME , g . CUSTOMER_ADDRESS , g . CUSTOMER_PHONE , g . ACCBAL , g . MKTSEGMENT , g . COMMENT , g . EFFECTIVE_FROM , g . LOAD_DATETIME , g . RECORD_SOURCE
FROM ghost AS g
WHERE NOT EXISTS ( SELECT 1 FROM "AUTOMATE_DV_TEST" . "TEST" . "satellite_ghost_incremental" AS h WHERE h . HASHDIFF = g . HASHDIFF )
UNION
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
LEFT JOIN LATEST_RECORDS lr
ON lr . CUSTOMER_HK = frin . CUSTOMER_HK
AND lr . HASHDIFF = frin . HASHDIFF
WHERE lr . HASHDIFF IS NULL
UNION
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
Base Load Incremental Loads Base Load with Ghost Record Incremental Loads with Ghost Record
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM "dbtvault_db" . "development" . "stg_customer" AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
SELECT * FROM (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd ) rin
WHERE rin . asc_rank = 1
),
unique_source_records AS (
SELECT
b . CUSTOMER_HK , b . HASHDIFF , b . CUSTOMER_NAME , b . CUSTOMER_ADDRESS , b . CUSTOMER_PHONE , b . ACCBAL , b . MKTSEGMENT , b . COMMENT , b . EFFECTIVE_FROM , b . LOAD_DATETIME , b . RECORD_SOURCE
FROM (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC ) as prev_hashdiff
FROM source_data as sd
) b
WHERE b . HASHDIFF != b . prev_hashdiff
),
records_to_insert AS (
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
UNION
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM "dbtvault_db" . "development" . "stg_customer" AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
latest_records AS (
SELECT b . CUSTOMER_HK , b . HASHDIFF , b . CUSTOMER_NAME , b . CUSTOMER_ADDRESS , b . CUSTOMER_PHONE , b . ACCBAL , b . MKTSEGMENT , b . COMMENT , b . EFFECTIVE_FROM , b . LOAD_DATETIME , b . RECORD_SOURCE
FROM (
SELECT current_records . CUSTOMER_HK , current_records . HASHDIFF , current_records . CUSTOMER_NAME , current_records . CUSTOMER_ADDRESS , current_records . CUSTOMER_PHONE , current_records . ACCBAL , current_records . MKTSEGMENT , current_records . COMMENT , current_records . EFFECTIVE_FROM , current_records . LOAD_DATETIME , current_records . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY current_records . CUSTOMER_HK
ORDER BY current_records . LOAD_DATETIME DESC
) AS rank
FROM "dbtvault_db" . "development" . "satellite_incremental" AS current_records
JOIN (
SELECT DISTINCT source_data . CUSTOMER_HK
FROM source_data
) AS source_records
ON source_records . CUSTOMER_HK = current_records . CUSTOMER_HK
) AS b
WHERE b . rank = 1
),
first_record_in_set AS (
SELECT * FROM (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd ) rin
WHERE rin . asc_rank = 1
),
unique_source_records AS (
SELECT
b . CUSTOMER_HK , b . HASHDIFF , b . CUSTOMER_NAME , b . CUSTOMER_ADDRESS , b . CUSTOMER_PHONE , b . ACCBAL , b . MKTSEGMENT , b . COMMENT , b . EFFECTIVE_FROM , b . LOAD_DATETIME , b . RECORD_SOURCE
FROM (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC ) as prev_hashdiff
FROM source_data as sd
) b
WHERE b . HASHDIFF != b . prev_hashdiff
),
records_to_insert AS (
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
LEFT JOIN LATEST_RECORDS lr
ON lr . CUSTOMER_HK = frin . CUSTOMER_HK
AND lr . HASHDIFF = frin . HASHDIFF
WHERE lr . HASHDIFF IS NULL
UNION
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM "dbtvault_db" . "development" . "stg_customer" AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
SELECT * FROM (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd ) rin
WHERE rin . asc_rank = 1
),
unique_source_records AS (
SELECT
b . CUSTOMER_HK , b . HASHDIFF , b . CUSTOMER_NAME , b . CUSTOMER_ADDRESS , b . CUSTOMER_PHONE , b . ACCBAL , b . MKTSEGMENT , b . COMMENT , b . EFFECTIVE_FROM , b . LOAD_DATETIME , b . RECORD_SOURCE
FROM (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC ) as prev_hashdiff
FROM source_data as sd
) b
WHERE b . HASHDIFF != b . prev_hashdiff
),
ghost AS (
SELECT
CAST ( NULL AS text ) AS customer_name ,
CAST ( NULL AS text ) AS customer_phone ,
CAST ( NULL AS text ) AS customer_address ,
CAST ( NULL AS double precision ) AS accbal ,
CAST ( NULL AS text ) AS mktsegment ,
CAST ( NULL AS text ) AS comment ,
TO_DATE ( '1900-01-01' , 'YYY-MM-DD' ) AS load_datetime ,
TO_DATE ( '1900-01-01' , 'YYY-MM-DD' ) AS effective_from ,
CAST ( 'AUTOMATE_DV_SYSTEM' AS text ) AS RECORD_SOURCE ,
CAST ( '00000000000000000000000000000000' AS BYTEA ) AS customer_hk ,
CAST ( '00000000000000000000000000000000' AS BYTEA ) AS hashdiff
),
records_to_insert AS (
SELECT
g . CUSTOMER_HK , g . HASHDIFF , g . CUSTOMER_NAME , g . CUSTOMER_ADDRESS , g . CUSTOMER_PHONE , g . ACCBAL , g . MKTSEGMENT , g . COMMENT , g . EFFECTIVE_FROM , g . LOAD_DATETIME , g . RECORD_SOURCE
FROM ghost AS g
UNION
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
UNION
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM "dbtvault_db" . "development" . "stg_customer" AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
latest_records AS (
SELECT b . CUSTOMER_HK , b . HASHDIFF , b . CUSTOMER_NAME , b . CUSTOMER_ADDRESS , b . CUSTOMER_PHONE , b . ACCBAL , b . MKTSEGMENT , b . COMMENT , b . EFFECTIVE_FROM , b . LOAD_DATETIME , b . RECORD_SOURCE
FROM (
SELECT current_records . CUSTOMER_HK , current_records . HASHDIFF , current_records . CUSTOMER_NAME , current_records . CUSTOMER_ADDRESS , current_records . CUSTOMER_PHONE , current_records . ACCBAL , current_records . MKTSEGMENT , current_records . COMMENT , current_records . EFFECTIVE_FROM , current_records . LOAD_DATETIME , current_records . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY current_records . CUSTOMER_HK
ORDER BY current_records . LOAD_DATETIME DESC
) AS rank
FROM "dbtvault_db" . "development" . "satellite_ghost_incremental" AS current_records
JOIN (
SELECT DISTINCT source_data . CUSTOMER_HK
FROM source_data
) AS source_records
ON source_records . CUSTOMER_HK = current_records . CUSTOMER_HK
) AS b
WHERE b . rank = 1
),
first_record_in_set AS (
SELECT * FROM (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd ) rin
WHERE rin . asc_rank = 1
),
unique_source_records AS (
SELECT
b . CUSTOMER_HK , b . HASHDIFF , b . CUSTOMER_NAME , b . CUSTOMER_ADDRESS , b . CUSTOMER_PHONE , b . ACCBAL , b . MKTSEGMENT , b . COMMENT , b . EFFECTIVE_FROM , b . LOAD_DATETIME , b . RECORD_SOURCE
FROM (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC ) as prev_hashdiff
FROM source_data as sd
) b
WHERE b . HASHDIFF != b . prev_hashdiff
),
ghost AS (
SELECT
CAST ( NULL AS text ) AS customer_name ,
CAST ( NULL AS text ) AS customer_phone ,
CAST ( NULL AS text ) AS customer_address ,
CAST ( NULL AS double precision ) AS accbal ,
CAST ( NULL AS text ) AS mktsegment ,
CAST ( NULL AS text ) AS comment ,
TO_DATE ( '1900-01-01' , 'YYY-MM-DD' ) AS load_datetime ,
TO_DATE ( '1900-01-01' , 'YYY-MM-DD' ) AS effective_from ,
CAST ( 'AUTOMATE_DV_SYSTEM' AS text ) AS RECORD_SOURCE ,
CAST ( '00000000000000000000000000000000' AS BYTEA ) AS customer_hk ,
CAST ( '00000000000000000000000000000000' AS BYTEA ) AS hashdiff
),
records_to_insert AS (
SELECT
g . CUSTOMER_HK , g . HASHDIFF , g . CUSTOMER_NAME , g . CUSTOMER_ADDRESS , g . CUSTOMER_PHONE , g . ACCBAL , g . MKTSEGMENT , g . COMMENT , g . EFFECTIVE_FROM , g . LOAD_DATETIME , g . RECORD_SOURCE
FROM ghost AS g
WHERE NOT EXISTS ( SELECT 1 FROM "dbtvault_db" . "development" . "satellite_ghost_incremental" AS h WHERE h . HASHDIFF = g . HASHDIFF )
UNION
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
LEFT JOIN LATEST_RECORDS lr
ON lr . CUSTOMER_HK = frin . CUSTOMER_HK
AND lr . HASHDIFF = frin . HASHDIFF
WHERE lr . HASHDIFF IS NULL
UNION
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
Base Load Incremental Loads Base Load with Ghost Record Incremental Loads with Ghost Record
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM ` hive_metastore ` . ` dbtvault ` . ` stg_customer ` AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd
QUALIFY asc_rank = 1
),
unique_source_records AS (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE
FROM source_data as sd
QUALIFY sd . HASHDIFF != LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC )
),
records_to_insert AS (
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
UNION
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM ` hive_metastore ` . ` dbtvault ` . ` stg_customer ` AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
latest_records AS (
SELECT current_records . CUSTOMER_HK , current_records . HASHDIFF , current_records . CUSTOMER_NAME , current_records . CUSTOMER_ADDRESS , current_records . CUSTOMER_PHONE , current_records . ACCBAL , current_records . MKTSEGMENT , current_records . COMMENT , current_records . EFFECTIVE_FROM , current_records . LOAD_DATETIME , current_records . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY current_records . CUSTOMER_HK
ORDER BY current_records . LOAD_DATETIME DESC
) AS rank_num
FROM ` hive_metastore ` . ` dbtvault ` . ` satellite_incremental ` AS current_records
JOIN (
SELECT DISTINCT source_data . CUSTOMER_HK
FROM source_data
) AS source_records
ON source_records . CUSTOMER_HK = current_records . CUSTOMER_HK
QUALIFY rank_num = 1
),
first_record_in_set AS (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd
QUALIFY asc_rank = 1
),
unique_source_records AS (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE
FROM source_data as sd
QUALIFY sd . HASHDIFF != LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC )
),
records_to_insert AS (
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
LEFT JOIN LATEST_RECORDS lr
ON lr . CUSTOMER_HK = frin . CUSTOMER_HK
AND lr . HASHDIFF = frin . HASHDIFF
WHERE lr . HASHDIFF IS NULL
UNION
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM ` hive_metastore ` . ` dbtvault ` . ` stg_customer ` AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd
QUALIFY asc_rank = 1
),
unique_source_records AS (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE
FROM source_data as sd
QUALIFY sd . HASHDIFF != LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC )
),
ghost AS (
SELECT
NULL AS CUSTOMER_NAME ,
NULL AS CUSTOMER_PHONE ,
NULL AS CUSTOMER_ADDRESS ,
NULL AS ACCBAL ,
NULL AS MKTSEGMENT ,
NULL AS COMMENT ,
TO_DATE ( '1900-01-01' ) AS LOAD_DATETIME ,
TO_DATE ( '1900-01-01' ) AS EFFECTIVE_FROM ,
CAST ( 'AUTOMATE_DV_SYSTEM' AS string ) AS RECORD_SOURCE ,
CAST ( '00000000000000000000000000000000' AS STRING ) AS CUSTOMER_HK ,
CAST ( '00000000000000000000000000000000' AS STRING ) AS HASHDIFF
),
records_to_insert AS (
SELECT
g . CUSTOMER_HK , g . HASHDIFF , g . CUSTOMER_NAME , g . CUSTOMER_ADDRESS , g . CUSTOMER_PHONE , g . ACCBAL , g . MKTSEGMENT , g . COMMENT , g . EFFECTIVE_FROM , g . LOAD_DATETIME , g . RECORD_SOURCE
FROM ghost AS g
UNION
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
UNION
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 WITH source_data AS (
SELECT a . CUSTOMER_HK , a . HASHDIFF , a . CUSTOMER_NAME , a . CUSTOMER_ADDRESS , a . CUSTOMER_PHONE , a . ACCBAL , a . MKTSEGMENT , a . COMMENT , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM ` hive_metastore ` . ` dbtvault ` . ` stg_customer ` AS a
WHERE a . CUSTOMER_HK IS NOT NULL
),
latest_records AS (
SELECT current_records . CUSTOMER_HK , current_records . HASHDIFF , current_records . CUSTOMER_NAME , current_records . CUSTOMER_ADDRESS , current_records . CUSTOMER_PHONE , current_records . ACCBAL , current_records . MKTSEGMENT , current_records . COMMENT , current_records . EFFECTIVE_FROM , current_records . LOAD_DATETIME , current_records . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY current_records . CUSTOMER_HK
ORDER BY current_records . LOAD_DATETIME DESC
) AS rank_num
FROM ` hive_metastore ` . ` dbtvault ` . ` satellite_ghost_incremental ` AS current_records
JOIN (
SELECT DISTINCT source_data . CUSTOMER_HK
FROM source_data
) AS source_records
ON source_records . CUSTOMER_HK = current_records . CUSTOMER_HK
QUALIFY rank_num = 1
),
first_record_in_set AS (
SELECT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE ,
RANK () OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC
) as asc_rank
FROM source_data as sd
QUALIFY asc_rank = 1
),
unique_source_records AS (
SELECT DISTINCT
sd . CUSTOMER_HK , sd . HASHDIFF , sd . CUSTOMER_NAME , sd . CUSTOMER_ADDRESS , sd . CUSTOMER_PHONE , sd . ACCBAL , sd . MKTSEGMENT , sd . COMMENT , sd . EFFECTIVE_FROM , sd . LOAD_DATETIME , sd . RECORD_SOURCE
FROM source_data as sd
QUALIFY sd . HASHDIFF != LAG ( sd . HASHDIFF ) OVER (
PARTITION BY sd . CUSTOMER_HK
ORDER BY sd . LOAD_DATETIME ASC )
),
ghost AS (
SELECT
NULL AS CUSTOMER_NAME ,
NULL AS CUSTOMER_PHONE ,
NULL AS CUSTOMER_ADDRESS ,
NULL AS ACCBAL ,
NULL AS MKTSEGMENT ,
NULL AS COMMENT ,
TO_DATE ( '1900-01-01' ) AS LOAD_DATETIME ,
TO_DATE ( '1900-01-01' ) AS EFFECTIVE_FROM ,
CAST ( 'AUTOMATE_DV_SYSTEM' AS string ) AS RECORD_SOURCE ,
CAST ( '00000000000000000000000000000000' AS STRING ) AS CUSTOMER_HK ,
CAST ( '00000000000000000000000000000000' AS STRING ) AS HASHDIFF
),
records_to_insert AS (
SELECT
g . CUSTOMER_HK , g . HASHDIFF , g . CUSTOMER_NAME , g . CUSTOMER_ADDRESS , g . CUSTOMER_PHONE , g . ACCBAL , g . MKTSEGMENT , g . COMMENT , g . EFFECTIVE_FROM , g . LOAD_DATETIME , g . RECORD_SOURCE
FROM ghost AS g
WHERE NOT EXISTS ( SELECT 1 FROM ` hive_metastore ` . ` dbtvault ` . ` satellite_ghost_incremental ` AS h WHERE h . HASHDIFF = g . HASHDIFF )
UNION
SELECT frin . CUSTOMER_HK , frin . HASHDIFF , frin . CUSTOMER_NAME , frin . CUSTOMER_ADDRESS , frin . CUSTOMER_PHONE , frin . ACCBAL , frin . MKTSEGMENT , frin . COMMENT , frin . EFFECTIVE_FROM , frin . LOAD_DATETIME , frin . RECORD_SOURCE
FROM first_record_in_set AS frin
LEFT JOIN LATEST_RECORDS lr
ON lr . CUSTOMER_HK = frin . CUSTOMER_HK
AND lr . HASHDIFF = frin . HASHDIFF
WHERE lr . HASHDIFF IS NULL
UNION
SELECT usr . CUSTOMER_HK , usr . HASHDIFF , usr . CUSTOMER_NAME , usr . CUSTOMER_ADDRESS , usr . CUSTOMER_PHONE , usr . ACCBAL , usr . MKTSEGMENT , usr . COMMENT , usr . EFFECTIVE_FROM , usr . LOAD_DATETIME , usr . RECORD_SOURCE
FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
Hashdiff Aliasing
If you have multiple Satellites using a single stage as its data source, then you will need to
use hashdiff aliasing
Excluding columns from the payload
An exclude_columns
flag can be provided for payload columns which will invert the selection of columns provided in the list of columns.
This is extremely useful when a payload is composed of many columns, and you do not wish to individually provide all the columns.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 { % - set yaml_metadata -% }
source_model : v_stg_orders
src_pk : CUSTOMER_HK
src_hashdiff : CUSTOMER_HASHDIFF
src_payload :
exclude_columns : true
columns :
- NAME
- PHONE
src_eff : EFFECTIVE_FROM
src_ldts : LOAD_DATETIME
src_source : RECORD_SOURCE
{ % - endset -% }
{ % set metadata_dict = fromyaml(yaml_metadata) % }
{{ automate_dv.sat(src_pk=metadata_dict [ "src_pk" ],
src_hashdiff=metadata_dict [ "src_hashdiff" ],
src_payload=metadata_dict [ "src_payload" ],
src_eff=metadata_dict [ "src_eff" ],
src_ldts=metadata_dict [ "src_ldts" ],
src_source=metadata_dict [ "src_source" ],
source_model=metadata_dict [ "source_model" ] ) }}
Using the configuration in the above snippet, if we had the following columns: NAME, PHONE, ADDRESS_LINE_1, EMAIL_ADDRESS, DOB,...
The satellite payload would be created with the following columns: ADDRESS_LINE_1, EMAIL_ADDRESS, DOB,...
eff_sat
view source:
Generates SQL to build an Effectivity Satellite table using the provided parameters.
Usage
{{ automate_dv.eff_sat ( src_pk = src_pk , src_dfk = src_dfk , src_sfk = src_sfk ,
src_start_date = src_start_date , src_end_date = src_end_date ,
src_extra_columns = src_extra_columns ,
src_eff = src_eff , src_ldts = src_ldts , src_source = src_source ,
source_model = source_model ) }}
Parameters
Parameter
Description
Type
Required?
src_pk
Source primary key column
String
src_dfk
Source driving foreign key column
List[String]/String
src_sfk
Source secondary foreign key column
List[String]/String
src_start_date
Source start date column
String
src_end_date
Source end date column
String
src_extra_columns
Select arbitrary columns from the source
List[String]/String
src_eff
Source effective from column
String
src_ldts
Source load date timestamp column
String
src_source
Name of the column containing the source ID
String
source_model
Staging model name
String
See examples
Example Output
Snowflake Google BigQuery MS SQL Server Postgres Databricks
Base Load With auto end-dating (Incremental) Without auto end-dating (Incremental)
WITH source_data AS (
SELECT a . CUSTOMER_ORDER_HK , a . CUSTOMER_HK , a . ORDER_HK , a . START_DATE , a . END_DATE , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . stg_customer AS a
WHERE a . CUSTOMER_HK IS NOT NULL
AND a . ORDER_HK IS NOT NULL
),
records_to_insert AS (
SELECT i . CUSTOMER_ORDER_HK , i . CUSTOMER_HK , i . ORDER_HK , i . START_DATE , i . END_DATE , i . EFFECTIVE_FROM , i . LOAD_DATETIME , i . RECORD_SOURCE
FROM source_data AS i
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 WITH source_data AS (
SELECT a . CUSTOMER_ORDER_HK , a . CUSTOMER_HK , a . ORDER_HK , a . START_DATE , a . END_DATE , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . stg_customer AS a
WHERE a . CUSTOMER_HK IS NOT NULL
AND a . ORDER_HK IS NOT NULL
),
latest_records AS (
SELECT * FROM (
SELECT b . CUSTOMER_ORDER_HK , b . CUSTOMER_HK , b . ORDER_HK , b . START_DATE , b . END_DATE , b . EFFECTIVE_FROM , b . LOAD_DATETIME , b . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY b . CUSTOMER_ORDER_HK
ORDER BY b . LOAD_DATETIME DESC
) AS row_num
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . eff_sat_customer_order_incremental AS b
) AS inner_rank
WHERE row_num = 1 ),
latest_open AS (
SELECT c . CUSTOMER_ORDER_HK , c . CUSTOMER_HK , c . ORDER_HK , c . START_DATE , c . END_DATE , c . EFFECTIVE_FROM , c . LOAD_DATETIME , c . RECORD_SOURCE
FROM latest_records AS c
WHERE TO_DATE ( c . END_DATE ) = TO_DATE ( TO_TIMESTAMP ( '9999-12-31 23:59:59.999999' ))
),
latest_closed AS (
SELECT d . CUSTOMER_ORDER_HK , d . CUSTOMER_HK , d . ORDER_HK , d . START_DATE , d . END_DATE , d . EFFECTIVE_FROM , d . LOAD_DATETIME , d . RECORD_SOURCE
FROM latest_records AS d
WHERE TO_DATE ( d . END_DATE ) != TO_DATE ( TO_TIMESTAMP ( '9999-12-31 23:59:59.999999' ))
),
new_open_records AS (
SELECT DISTINCT
f . CUSTOMER_ORDER_HK ,
f . CUSTOMER_HK , f . ORDER_HK ,
f . START_DATE AS START_DATE ,
f . END_DATE AS END_DATE ,
f . EFFECTIVE_FROM AS EFFECTIVE_FROM ,
f . LOAD_DATETIME ,
f . RECORD_SOURCE
FROM source_data AS f
LEFT JOIN latest_records AS lr
ON f . CUSTOMER_ORDER_HK = lr . CUSTOMER_ORDER_HK
WHERE lr . CUSTOMER_ORDER_HK IS NULL
),
new_reopened_records AS (
SELECT DISTINCT
lc . CUSTOMER_ORDER_HK ,
lc . CUSTOMER_HK , lc . ORDER_HK ,
g . START_DATE AS START_DATE ,
g . END_DATE AS END_DATE ,
g . EFFECTIVE_FROM AS EFFECTIVE_FROM ,
g . LOAD_DATETIME ,
g . RECORD_SOURCE
FROM source_data AS g
INNER JOIN latest_closed AS lc
ON g . CUSTOMER_ORDER_HK = lc . CUSTOMER_ORDER_HK
WHERE TO_DATE ( g . END_DATE ) = TO_DATE ( TO_TIMESTAMP ( '9999-12-31 23:59:59.999999' ))
),
new_closed_records AS (
SELECT DISTINCT
lo . CUSTOMER_ORDER_HK ,
lo . CUSTOMER_HK , lo . ORDER_HK ,
h . START_DATE AS START_DATE ,
h . END_DATE AS END_DATE ,
h . EFFECTIVE_FROM AS EFFECTIVE_FROM ,
h . LOAD_DATETIME ,
lo . RECORD_SOURCE
FROM source_data AS h
LEFT JOIN latest_open AS lo
ON lo . CUSTOMER_ORDER_HK = h . CUSTOMER_ORDER_HK
LEFT JOIN latest_closed AS lc
ON lc . CUSTOMER_ORDER_HK = h . CUSTOMER_ORDER_HK
WHERE TO_DATE ( h . END_DATE ) != TO_DATE ( TO_TIMESTAMP ( '9999-12-31 23:59:59.999999' ))
AND lo . CUSTOMER_ORDER_HK IS NOT NULL
AND lc . CUSTOMER_ORDER_HK IS NULL
),
records_to_insert AS (
SELECT * FROM new_open_records
UNION
SELECT * FROM new_reopened_records
UNION
SELECT * FROM new_closed_records
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 WITH source_data AS (
SELECT a . CUSTOMER_ORDER_HK , a . CUSTOMER_HK , a . ORDER_HK , a . START_DATE , a . END_DATE , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . stg_customer AS a
WHERE a . CUSTOMER_HK IS NOT NULL
AND a . ORDER_HK IS NOT NULL
),
latest_records AS (
SELECT * FROM (
SELECT b . CUSTOMER_ORDER_HK , b . CUSTOMER_HK , b . ORDER_HK , b . START_DATE , b . END_DATE , b . EFFECTIVE_FROM , b . LOAD_DATETIME , b . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY b . CUSTOMER_ORDER_HK
ORDER BY b . LOAD_DATETIME DESC
) AS row_num
FROM ALEX_HIGGS . AUTOMATE_DV_DOCS_SAMPLES . eff_sat_customer_order_incremental_nae AS b
) AS inner_rank
WHERE row_num = 1 ),
latest_open AS (
SELECT c . CUSTOMER_ORDER_HK , c . CUSTOMER_HK , c . ORDER_HK , c . START_DATE , c . END_DATE , c . EFFECTIVE_FROM , c . LOAD_DATETIME , c . RECORD_SOURCE
FROM latest_records AS c
WHERE TO_DATE ( c . END_DATE ) = TO_DATE ( TO_TIMESTAMP ( '9999-12-31 23:59:59.999999' ))
),
latest_closed AS (
SELECT d . CUSTOMER_ORDER_HK , d . CUSTOMER_HK , d . ORDER_HK , d . START_DATE , d . END_DATE , d . EFFECTIVE_FROM , d . LOAD_DATETIME , d . RECORD_SOURCE
FROM latest_records AS d
WHERE TO_DATE ( d . END_DATE ) != TO_DATE ( TO_TIMESTAMP ( '9999-12-31 23:59:59.999999' ))
),
new_open_records AS (
SELECT DISTINCT
f . CUSTOMER_ORDER_HK ,
f . CUSTOMER_HK , f . ORDER_HK ,
f . START_DATE AS START_DATE ,
f . END_DATE AS END_DATE ,
f . EFFECTIVE_FROM AS EFFECTIVE_FROM ,
f . LOAD_DATETIME ,
f . RECORD_SOURCE
FROM source_data AS f
LEFT JOIN latest_records AS lr
ON f . CUSTOMER_ORDER_HK = lr . CUSTOMER_ORDER_HK
WHERE lr . CUSTOMER_ORDER_HK IS NULL
),
new_reopened_records AS (
SELECT DISTINCT
lc . CUSTOMER_ORDER_HK ,
lc . CUSTOMER_HK , lc . ORDER_HK ,
g . START_DATE AS START_DATE ,
g . END_DATE AS END_DATE ,
g . EFFECTIVE_FROM AS EFFECTIVE_FROM ,
g . LOAD_DATETIME ,
g . RECORD_SOURCE
FROM source_data AS g
INNER JOIN latest_closed AS lc
ON g . CUSTOMER_ORDER_HK = lc . CUSTOMER_ORDER_HK
WHERE TO_DATE ( g . END_DATE ) = TO_DATE ( TO_TIMESTAMP ( '9999-12-31 23:59:59.999999' ))
),
new_closed_records AS (
SELECT DISTINCT
lo . CUSTOMER_ORDER_HK ,
lo . CUSTOMER_HK , lo . ORDER_HK ,
h . START_DATE AS START_DATE ,
h . END_DATE AS END_DATE ,
h . EFFECTIVE_FROM AS EFFECTIVE_FROM ,
h . LOAD_DATETIME ,
lo . RECORD_SOURCE
FROM source_data AS h
LEFT JOIN latest_open AS lo
ON lo . CUSTOMER_ORDER_HK = h . CUSTOMER_ORDER_HK
LEFT JOIN latest_closed AS lc
ON lc . CUSTOMER_ORDER_HK = h . CUSTOMER_ORDER_HK
WHERE TO_DATE ( h . END_DATE ) != TO_DATE ( TO_TIMESTAMP ( '9999-12-31 23:59:59.999999' ))
AND lo . CUSTOMER_ORDER_HK IS NOT NULL
AND lc . CUSTOMER_ORDER_HK IS NULL
),
records_to_insert AS (
SELECT * FROM new_open_records
UNION
SELECT * FROM new_reopened_records
UNION
SELECT * FROM new_closed_records
)
SELECT * FROM records_to_insert
Base Load With auto end-dating (Incremental) Without auto end-dating (Incremental)
WITH source_data AS (
SELECT a . CUSTOMER_ORDER_HK , a . CUSTOMER_HK , a . ORDER_HK , a . START_DATE , a . END_DATE , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` stg_customer ` AS a
WHERE a . CUSTOMER_HK IS NOT NULL
AND a . ORDER_HK IS NOT NULL
),
records_to_insert AS (
SELECT i . CUSTOMER_ORDER_HK , i . CUSTOMER_HK , i . ORDER_HK , i . START_DATE , i . END_DATE , i . EFFECTIVE_FROM , i . LOAD_DATETIME , i . RECORD_SOURCE
FROM source_data AS i
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 WITH source_data AS (
SELECT a . CUSTOMER_ORDER_HK , a . CUSTOMER_HK , a . ORDER_HK , a . START_DATE , a . END_DATE , a . EFFECTIVE_FROM , a . LOAD_DATETIME , a . RECORD_SOURCE
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` stg_customer ` AS a
WHERE a . CUSTOMER_HK IS NOT NULL
AND a . ORDER_HK IS NOT NULL
),
latest_records AS (
SELECT * FROM (
SELECT b . CUSTOMER_ORDER_HK , b . CUSTOMER_HK , b . ORDER_HK , b . START_DATE , b . END_DATE , b . EFFECTIVE_FROM , b . LOAD_DATETIME , b . RECORD_SOURCE ,
ROW_NUMBER () OVER (
PARTITION BY b . CUSTOMER_ORDER_HK
ORDER BY b . LOAD_DATETIME DESC
) AS row_num
FROM ` dbtvault - 341416 ` . ` dbtvault ` . ` eff_sat_customer_order_incremental ` AS b
) AS inner_rank
WHERE row_num = 1 ),
latest_open AS (
SELECT c . CUSTOMER_ORDER_HK , c . CUSTOMER_HK , c . ORDER_HK , c . START_DATE , c . END_DATE , c . EFFECTIVE_FROM , c . LOAD_DATETIME , c . RECORD_SOURCE
FROM latest_records AS c
WHERE DATE ( c . END_DATE ) = DATE ( PARSE_DATETIME ( '%F %H:%M:%E6S' , '9999-12-31 23:59:59.999999' ))
),
latest_closed AS (
SELECT d . CUSTOMER_ORDER_HK , d . CUSTOMER_HK , d . ORDER_HK , d . START_DATE , d . END_DATE , d . EFFECTIVE_FROM , d . LOAD_DATETIME , d . RECORD_SOURCE
FROM latest_records AS d
WHERE DATE ( d . END_DATE ) != DATE ( PARSE_DATETIME ( '%F %H:%M:%E6S' , '9999-12-31 23:59:59.999999' ))
),
new_open_records AS (
SELECT DISTINCT
f . CUSTOMER_ORDER_HK ,
f . CUSTOMER_HK , f . ORDER_HK ,
f . START_DATE AS START_DATE ,
f . END_DATE AS END_DATE ,
f . EFFECTIVE_FROM AS EFFECTIVE_FROM ,
f . LOAD_DATETIME ,
f . RECORD_SOURCE
FROM source_data AS f
LEFT JOIN latest_records AS lr
ON f . CUSTOMER_ORDER_HK = lr . CUSTOMER_ORDER_HK
WHERE lr . CUSTOMER_ORDER_HK IS NULL
),
new_reopened_records AS (
SELECT DISTINCT
lc . CUSTOMER_ORDER_HK ,
lc . CUSTOMER_HK , lc . ORDER_HK ,
g . START_DATE AS START_DATE ,
g . END_DATE AS END_DATE ,
g . EFFECTIVE_FROM AS EFFECTIVE_FROM ,
g . LOAD_DATETIME ,
g . RECORD_SOURCE
FROM source_data AS g
INNER JOIN latest_closed AS lc
ON g . CUSTOMER_ORDER_HK = lc . CUSTOMER_ORDER_HK
WHERE DATE ( g . END_DATE ) = DATE ( PARSE_DATETIME ( '%F %H:%M:%E6S' , '9999-12-31 23:59:59.999999' ))
),
new_closed_records AS (
SELECT DISTINCT
lo . CUSTOMER_ORDER_HK ,
lo . CUSTOMER_HK , lo . ORDER_HK ,
h . START_DATE AS START_DATE ,
h . END_DATE AS END_DATE ,
h . EFFECTIVE_FROM AS EFFECTIVE_FROM ,
h . LOAD_DATETIME ,
lo . RECORD_SOURCE
FROM source_data AS h
LEFT JOIN latest_open AS lo
ON lo . CUSTOMER_ORDER_HK = h . CUSTOMER_ORDER_HK
LEFT JOIN latest_closed AS lc
ON lc . CUSTOMER_ORDER_HK = h . CUSTOMER_ORDER_HK
WHERE DATE ( h . END_DATE ) != DATE ( PARSE_DATETIME ( '%F %H:%M:%E6S' , '9999-12-31 23:59:59.999999' ))
AND lo . CUSTOMER_ORDER_HK IS NOT NULL
AND lc . CUSTOMER_ORDER_HK IS NULL
),
records_to_insert AS (
SELECT * FROM new_open_records
UNION DISTINCT
SELECT * FROM new_reopened_records
UNION DISTINCT
SELECT * FROM new_closed_records
)
SELECT * FROM records_to_insert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20