Showing posts with label ETL Design. Show all posts
Showing posts with label ETL Design. Show all posts

Friday, April 29, 2011

Parallelization of ETL Workflows

One of the most often overlooked ETL architecture design techniques is to ensure that all available hardware resources, in particular CPUs, are utilized. Consider the sample ETL workflow in the figure below. In this workflow, all ETL tasks are scheduled to be executed in series, i.e. one after another. In general, execution tasks only utilize one CPU (with some exceptions). Thus, executing one task after another would only utilize a single CPU, no matter how many CPUs are available.
Thus, in order to utilize all available CPUs on a server, ETL tasks should be scheduled to execute in parallel. Scheduling a highly parallelized ETL workflow requires some planning as all ETL task dependencies need to be understood first. For example, if DimTable1 and DimTable2 are dimension tables for FactTable1, then DimTable1 and DimTable2 need to be loaded first before FactTable1 can be loaded. In addition, if there are no dependencies between DimTable1 and DimTable2, then we can execute their specific ETL processes in parallel.
The output of this dependency analysis is an ETL Task Execution Order Map, as outlined in the figure below. The ETL Task Execution Order Map will then be the template for the ETL workflow layout. As each task will utilize an available CPU, this approach will be able to utilize multiple (if not all) CPUs and provide better performance.
One word of caution, though: The number of parallel scheduled tasks should not significantly exceed the number of CPUs on your server. If there are many more concurrent tasks than CPUs, then the overhead of the OS scheduler switching jobs in and out of CPU time may actually decrease performance.
To avoid this problem, many ETL tools (such as Informatica Power Center) provide a global setting that limits the number of concurrently processed tasks regardless of how many tasks are scheduled to run in parallel. Let us call this setting N. As a general rule, N should be set to the number of CPU (cores) of the server. In this case, one should schedule as many tasks in parallel as outlined in the ETL Task Execution Order Map. This option is very useful in environments with different number of CPUs in the development, test, and production hardware as we do not have to create workflows based on the underlying hardware constraints. In this case, we just set N in each environment to the respective number of CPUs.

Friday, November 12, 2010

Identifying Source System Data Changes for Incremental ETL Processes

Problem: When designing incremental ETL processes, ETL Architects face the challenge of identifying algorithms to identify data changes in the source system between ETL runs. Some of the options that might be available are (from the best case scenario to the worst):

  1. Database Log Readers: This approach utilizes an ETL tool that is capable of reading the source database log files to identify inserted and updated records. For example, Informatica Power Center supports this through its Change Data Capture (CDC) functionality. However, source system owners may not be willing to grant read access to the database logs or an ETL tool that supports this functionality may not be available.
  2. Timestamp columns in the source database: If the source system maintains an insert and update timestamp column for each table of interest, then the ETL process can utilize these columns to identify source system changes since the last ETL execution timestamp. Chances are, however, that the source system does not provide that functionality.
  3. Triggers to populate log tables: This is by far the worst option since it adds a significant resource utilization burden to the source system. In this case, triggers are created for all tables of interest. The purpose of these triggers is to capture all insert/updates/deletes into log tables. The ETL process then reads the data changes from the log tables and removes all records that it has successfully processed. Again, source system owners will most likely be very hesitant to support this approach.
What to do if none of these options are available?



Solution: We propose the following checksum based approach. In this blog, we will utilize SQL Server’s CHECK_SUM algorithm; however, Oracle’s ORA_HASH can be used in a similar fashion.

This approach requires that the entire source table (only columns and rows of interest, of course) be loaded into a staging table. During the staging load, the ETL process will assign a checksum value to each record. For example, when loading data from SOURCE_TABLE_A into STAGING_TABLE_A, the SQL would look something like this:
insert into staging_table_a ( col1, col2, cold3, business_key, check_sum)
select col1, col2, cold3, business_key, check_sum(col1,col2,cold3,business_key)
from source_table_a

Let us further assume that business_key is the primary key of the source system record. In other words, business_key uniquely identifies a record in the source system.

Both business_key and check_sum must be stored in their corresponding dimension tables. In our example, the dimension table for source_table_a would include a surrogate dimension key (dim_key), business_key, and check_sum as shown below.
For performance optimization reasons, we recommend to create a composite index on business_key and check_sum.

In order to identify new records that were inserted into the source system since the last ETL run, we have to find all business_keys in the staging table that have no corresponding business_key in the dimension table. The SQL code would look something like this:


select s.* from staging_table_a s
Where not exists (select * from dimension_table_a d
where d.business_key = s.business_key)


To identify updated records since the last ETL run, we have to find all records in the staging table that have a matching business_key in the dimension table with a different check_sum value. Here is the SQL code for this:

select s.*
from staging_table_a s
inner join dimension_table_a d on
d.business_key = s.business_key and
d.check_sum <> s.check_sum
In all cases, the joins against the dimension table will be based on index lookups because we have a composite index on business_key and check_sum. Therefore, identifying new or updated records is quite efficient. The drawback of this solution is the necessity to perform a full data load into the staging area, which may not be feasible for large source systems.

One of the major benefits of this approach is its immunity against getting out-of-sync with the source system (due to aborted or failed ETL processes). No matter at what point the previous ETL process has failed, this approach will always correctly identify source system changes and re-sync without any additional human intervention.

In summary, the check_sum approach may be a feasible alternative for environments that have no other means for identifying data changes in the source system.

Please contact us if you have any questions.