Blog
Blog /

The Top 5 Streaming ETL Patterns

John Allwright
Decodable

Many computing concepts from the 1970’s have fallen out of currency, but not so with ETL (Extract-Transform-Load), and its recent anagram shuffle ELT which manipulates data at destination vs in flight. ETL and ELT are traditionally scheduled batch operations, but as the need for always-on, always-current data services becomes the norm, realtime ELT operating on streams of data is the goal of many organizations - if not the reality, yet.

In real world usage, the ‘T’ in ETL represents a wide range of patterns assembled from primitive operations. In this blog we’ll explore these operations and see examples of how they’re implemented as SQL statements.

Transformations using SQL statements?

Yes! SQL combines the power & conciseness of a declarative language with ubiquity of skills among anyone that’s worked with code or data. Unlike almost any programming language you might use as an alternative, SQL’s has ubiquity thanks to the almost 50 years longevity - pretty much everyone in the computing industry has used it at some point. The power and pervasive nature of SQL means that it’s in use everywhere, even at companies building the latest developer technologies and services. SQL becomes even more powerful when enhanced by functions - which we’ll cover in a future blog post.

Pipeline patterns

Most ETL pipelines fit one or a combination of patterns. Decodable’s Connection - Stream - Pipeline abstraction means you can choose to build everything into a single pipeline, or decompose sophisticated transformations into a network of reusable pipelines connected by streams, spanning teams, regions and use cases as needed.

1: Filter

Filters remove unwanted records from a stream, dropping records that don’t match the ‘rules’ in the SQL where clause. A filter is often used to suppress sensitive records for compliance, or to reduce processing load or storage needs on a target system.

-- Filter only records pertaining to the application
insert into application_events
select * from http_events
where hostname = 'app.decodable.co'
-- Filter only records that modify the inventory
insert into inventory_updates
select * from http_events
where hostname = 'api.mycompany.com' and
  path like '/v1/inventory%' and
  method in ( 'POST', 'PUT', 'DELETE', 'PATCH' )

2: Route

The Route pattern creates multiple output streams from one or more input streams, directing records to their correct destination based on a set of rules. This pattern actually consists of multiple filters which all see every input record but each filter only transmits those records that match the rule for that particular destination.

-- Route security-related HTTP events
insert into security_events
select * from http_events
where path like '/login%' or
  path like '/billing/cc%'
-- Route app-related HTTP events
insert into application_events
select * from http_events
where hostname = 'app.decodable.co'
-- Route requests to Customer Success if it looks like the user needs help
insert into cs_alerts
select * from http_events
where response_code between 500 and 599 or -- any server failure
  ( path = '/signup' and response_code != 200 ) or -- failed to sign up for any reason

3: Transform

Transforming pipelines create an output record by modifying the input record. Typically this will result in a 1:1 transmission, but in some cases the output is derived from more than one input record, so there could be a 1:many relationship. Here we’ll call out three specialized transformations:

Transform: Extract

Parse the incoming record, extract data from the input record and use it as the basis for enriching the derived output record.

-- Parse timestamp and action
insert into user_events
select
  to_date(fields['ts'], 'YYYY-MM-DD''T''HH:MI:SS') as ts,
  fields['user_id']    as user_id,
  fields['path']       as path,
  case fields['method']
    when 'GET'         then 'read'
    when 'POST', 'PUT' then 'modify'
    when 'DELETE'      then 'delete'
  end as action
from (
  select
    grok(
      body,
      '\[${ISO8661_DATETIME:ts} ${DATA:method} "${PATH:path}" uid:${DATA:user_id}'
    ) as fields
  from http_event
)

Transform: Normalize

Incoming data records often need to be normalized against a schema for a target system to process them. Missing fields may need default values populated, optional fields may need to be stripped out, and data types enforced.

-- Cleanse incoming data for downstream processes
insert into sensor_readings
select
  cast(ifnull(sensor_id, '0') as bigint) as sensor_id,
  lower(trim(name))                      as name,
  cast(`value` as bigint)                as reading
from raw_sensor_readings

Transform: Anonymize

Anonymizing pipelines simply eliminate sensitive fields for compliance, regulatory, or privacy reasons where the target system doesn’t need the information to complete processing.

-- Anonymize SSNs and zip codes
insert into user_events_masked
select
  user_id,
  username,
  overlay(ssn placing '*' from 1 for 12) as ssn,
  substring(zip_code from 1 for 2)       as zip_code_1,
  action
from user_events

4: Aggregate

Aggregation pipelines typically use the SQL window function to group incoming records into buckets - typically based on time - on which to perform the aggregation operation. Count, Min, Max, Avg, Sum are the typical operators, but there are many more.

-- Count the number of events by path and status every 10 seconds.
insert into site_activity
select
  window_start,
  window_end,
  path,
  status,
  count(1) as `count`
from table(
  tumble(
    table http_events,
    descriptor(_time),
    interval '10' seconds
  )
)
group by window_start, window_end, path, status

5: Trigger

Our final pattern is the trigger. Unlike almost all of the other patterns, the trigger output record probably has little overlap with the input record’s schema as it indicates that a set of conditions has been detected over one or more input records, and outputs an alert as a result. The output schema could represent the detected condition(s), action to be taken or both.

-- Build hourly usage data for a Stripe integration on the output stream
insert into stripe_product_usage
select
  window_start as _time,
  customer_id,
  'abcd1234' as price_id
  sum(bytes_sent) / 1024 / 1024 as mb_sent
from table(
  tumble(
    table document_downloads,
    descriptor(_time),
    interval '1' hour
  )
)
group by window_start, customer_id
having mb_sent > 1024

Why batch when you can stream?

We’ve only skimmed the surface of what’s possible building streaming ETL solutions with SQL in Decodable. Justifying batch transformations in your next data project just got that much harder!

You can get started with Decodable for free - our developer account includes enough for you to build a useful pipeline and - unlike a trial - it never expires.

Learn more:

No items found.

Heading

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Suspendisse varius enim in eros elementum tristique. Duis cursus, mi quis viverra ornare, eros dolor interdum nulla, ut commodo diam libero vitae erat. Aenean faucibus nibh et justo cursus id rutrum lorem imperdiet. Nunc ut sem vitae risus tristique posuere.

Learn more
Tags
Pintrest icon in black
Product
SQL

Start using Decodable today.