Practical Event Sourcing with SQL (Postgres)

Event sourcing is an excellent software design pattern. Instead of persisting current state of an asset and mutating it in place. An asset is represented by the events through its life cycle. When the current state of an asset is needed its re-computed from the events.

Despite being a great way to solve many problems its somewhat scarcely applied. There are undoubtedly many reason for this, but personally I suspect part of the problem is lack of good examples of how to use it. I've seen plenty of software engineers understand the theory and utility of Event Sourcing but struggle with implementation there of. It doesn't need to be this way. Event Sourcing can be implemented in common relational databases such as postgres and mysql. The database schemas of Event Sourcing implementations don't vary all that much and as a result by mastering couple of building blocks, Event Sourcing becomes approachable.

Quick primer to event sourcing theory

A quick example of event sourcing. Consider a data schema to represent a book tracking system at a library:

Book Status Date Member
Humiliated and Insulted Checked Out 2021-02-12 Alice
Crime and Punishment Checked Out 2021-01-11 Bob
The Idiot Checked In 2002-12-02 Mallory

We can mutate assets in place. And so if Eve was to check out The Idiot the stored data in the database would now look like:

Book Status Date Member
Humiliated and Insulted Checked Out 2021-02-12 Alice
Crime and Punishment Checked Out 2021-01-11 Bob
The Idiot Checked Out 2021-02-22 Eve

By storing data in such a way, it possible to deduce answers to following questions:

  • What is the state of a particular book, is it checked out?
  • Who checked out the book last
  • When was the last time the book was checked out

To convert this library system example to an Event Source model. The schema barely needs to change, simply drop the Unique constraint on Book column. The difference lies in how the table used. Specifically how data is added and read out is with how add and read data. We'll deal with how read is different later, for now Storage. Storage is different in that records are not modified but a new one is appended instead.

Post Eves transaction the data will look as follows, note that the Idiot is now duplicated:

Book Status Date Member
The Idiot Checked Out 2021-02-22 Eve
Humiliated and Insulted Checked Out 2021-02-12 Alice
Crime and Punishment Checked Out 2021-01-11 Bob
The Idiot Checked In 2002-12-02 Mallory

By using event sourcing we can not only answer all the same questions as above but also now have enough data to determine:

  • What is the most/least common checked out book
  • How frequently is a book checked out
  • Who has checked out a book in the past
  • Who has checked out the most books in any time range
  • What is the average checkout time for a book
  • We can even replay history of whole book or library and use that to test our library system.

There are many other facts we can extract about books and library members. And we can do all of that without any ancillary tables.

Hopefully by now you can see some scenarios where event sourcing may be useful, and we'll go over some other things to consider when weighing event sourcing vs more classical data storage methodoligies. But for now lets return to the open question of how do you effectively query data stored in such a fashion.

Practical Event Sourcing with SQL

To explore some queries we'll start with a data set repesenting the travels of ships. This data is organized as follows:

Ship Action Port Time
Edh depart Los Angeles 2020-07-02 15:54:24.467018
Yough depart Singapore 2020-10-17 08:52:57.891636
Ash arrive Port Klang 2020-09-28 11:13:48.191754
Thorn depart Dubai 2020-05-12 16:23:40.381128
... ... ... ...

Follow along by through db-fiddle or download ship_ledger.sql, a sql script that creates a temporary table. This data can be loaded by excuting \i ship_ledger.sql from psql or pgcli clients.

Read current state of single asset

Say we want to find what port the ship Ash was last at. All we need to do is to ORDER the results and pick the first element with ship name ASH

1
2
3
4
SELECT * FROM ship_ledger
WHERE ship = 'Ash'
ORDER BY time DESC
LIMIT 1;

Read current state of all assets

What if we want to extend the previous question to get current all states for all ships. We want to get outcome similar to:

Ship Action Port Time
Ash arrive Rotterdam 2021-01-15 03:35:29.845197
Edh arrive Los Angeles 2021-01-09 09:37:30.387559
Ethel arrive Laem Chabang 2021-01-25 05:40:35.469808
Thorn arrive Antwerp 2021-01-05 10:50:07.723586
Wyn arrive Los Angeles 2021-01-16 11:56:50.433422
Yough arrive Hamburg 2021-01-03 10:57:43.320602

There are several ways in doing this lets explore applying DISTINCT ON as well as WINDOW functions as both of these approaches are a good foundation block to other queries.

Current State of all assets using DISTINCT ON

Utilizing DISTINCT ON we can instruct our database to retrieve only one record for each ship after having ordered it by time.

1
2
3
SELECT DISTINCT ON (ship) *
FROM ship_ledger
ORDER BY ship, time DESC;

Unlike the single ship example here we need order not just by time but also by ship. This nuance is side effect of internal implementation of the database through the database which picks the first result after having arranged the data by ship. It requires you to explicitly order the data on the DISTINCT key and if you forget to do so it'll remind with an error message SELECT DISTINCT ON expressions must match initial ORDER BY expressions. See documentation on distinct for additional information.

Current state of all assets using Window Functions

The same result of the previous query can be accomplished by using data Window Functions. This method is a bit more manual and requires to first arrange the data by ship and time (for similar reasons as in previous query):

1
2
3
SELECT *,
       RANK() OVER(PARTITION BY ship ORDER BY ship, TIME DESC) AS rank
FROM ship_ledger

This will return all events, they will be arranged by ship and each event associated to a specific ship will be sequentially numbered (ranked).

ship action port time rank
Ash arrive Rotterdam 2021-01-15 03:35:29.845197 1
Ash depart Shanghai 2020-12-27 07:12:25.163836 2
... ... ... ... 3
Edh arrive Los Angeles 2021-01-09 09:37:30.387559 1
Edh depart Dubai 2020-12-12 07:29:13.325785 2
... ... ... ... 3
Ethel arrive Laem Chabang 2021-01-25 05:40:35.469808 1
Ethel depart Los Angeles 2020-12-28 08:22:25.237478 2
... ... ... ... 3

To narrow down the results to one last event per ship, all we need to do is filter results to those where rank is 1:

1
2
3
4
5
6
SELECT *
FROM
  (SELECT *,
          RANK() OVER(PARTITION BY ship ORDER BY ship, TIME DESC) AS rank
   FROM ship_ledger) AS ranked_ledger
WHERE rank=1

and finally we get same result as before:

ship action port time rank
Ash arrive Rotterdam 2021-01-15 03:35:29.845197 1
Edh arrive Los Angeles 2021-01-09 09:37:30.387559 1
Ethel arrive Laem Chabang 2021-01-25 05:40:35.469808 1
Thorn arrive Antwerp 2021-01-05 10:50:07.723586 1
Wyn arrive Los Angeles 2021-01-16 11:56:50.433422 1
Yough arrive Hamburg 2021-01-03 10:57:43.320602 1

Last N states of an asset

It may already have become apparent that filtering by rank already gives an options to get last few port result. This was not an option available with DISTINCT ON. Lets say that want last 3 port arrivals for each ship:

1
2
3
4
5
6
7
SELECT *
FROM
  (SELECT *,
          RANK() OVER(PARTITION BY ship ORDER BY ship, TIME DESC) AS rank
   FROM ship_ledger
   WHERE action = 'arrive') AS ranked_ledger
WHERE rank<=3

Result of this is:

ship action port time rank
Ash arrive Rotterdam 2021-01-15 03:35:29.845197 1
Ash arrive Shanghai 2020-12-20 22:51:46.163836 2
Ash arrive Busan 2020-12-18 12:44:35.557756 3
Edh arrive Los Angeles 2021-01-09 09:37:30.387559 1
Edh arrive Dubai 2020-12-10 10:41:57.325785 2
Edh arrive Kaohsiung 2020-11-22 19:14:36.678225 3
Ethel arrive Laem Chabang 2021-01-25 05:40:35.469808 1
Ethel arrive Los Angeles 2020-12-21 12:25:15.237478 2
Ethel arrive Port Klang 2020-11-18 21:27:19.832519 3
Thorn arrive Antwerp 2021-01-05 10:50:07.723586 1
Thorn arrive Port Klang 2020-12-12 10:17:27.015774 2
Thorn arrive Kaohsiung 2020-12-01 22:04:29.384756 3
Wyn arrive Los Angeles 2021-01-16 11:56:50.433422 1
Wyn arrive Antwerp 2020-12-25 14:47:07.326144 2
Wyn arrive Rotterdam 2020-12-19 20:20:47.150076 3
Yough arrive Hamburg 2021-01-03 10:57:43.320602 1
Yough arrive Shanghai 2020-12-13 02:15:03.588928 2
Yough arrive Antwerp 2020-11-20 10:00:10.311773 3

Aggregating multiple events of an asset into single record

In the previous example gave us last 3 ports for each ship. But data for any single ship was represented in 3 rows, this can be somewhat cumbersome It would a lot easier to deal with if all information for each ship was available on the same row. This can be realized using lead() window-function

Let's first do it with last two ports

1
2
3
4
5
6
7
8
SELECT *
FROM
  (SELECT *,
          RANK() OVER(PARTITION BY ship ORDER BY ship, TIME DESC) AS rank,
          lead(port, 1) OVER(PARTITION BY ship ORDER BY ship, TIME DESC) AS previous_port
   FROM ship_ledger
   WHERE action = 'arrive') AS ranked_ledger
WHERE rank=1;

Result:

ship action port time rank previous_port
Ash arrive Rotterdam 2021-01-15 03:35:29.845197 1 Shanghai
Edh arrive Los Angeles 2021-01-09 09:37:30.387559 1 Dubai
Ethel arrive Laem Chabang 2021-01-25 05:40:35.469808 1 Los Angeles
Thorn arrive Antwerp 2021-01-05 10:50:07.723586 1 Port Klang
Wyn arrive Los Angeles 2021-01-16 11:56:50.433422 1 Antwerp
Yough arrive Hamburg 2021-01-03 10:57:43.320602 1 Shanghai

The same pattern can be extended to 3 ports by supplying to lead(port, 2) with the same PARTITION statement. However the repetition is off putting, and to deal with that we can define the partition window independently and refer it to window function via an alias.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
SELECT *
FROM
  (SELECT *,
          RANK() OVER(ship_ledger_group) AS rank,
          lead(port, 1) OVER(ship_ledger_group) AS previous_port,
          lead(port, 2) OVER(ship_ledger_group) AS two_ports_ago
   FROM ship_ledger
   WHERE action = 'arrive'
   WINDOW ship_ledger_group AS (PARTITION BY ship ORDER BY ship, TIME DESC)
  ) AS ranked_ledger
WHERE rank=1;

Result:

ship action port time rank previous_port two_ports_ago
Ash arrive Rotterdam 2021-01-15 03:35:29.845197 1 Shanghai Busan
Edh arrive Los Angeles 2021-01-09 09:37:30.387559 1 Dubai Kaohsiung
Ethel arrive Laem Chabang 2021-01-25 05:40:35.469808 1 Los Angeles Port Klang
Thorn arrive Antwerp 2021-01-05 10:50:07.723586 1 Port Klang Kaohsiung
Wyn arrive Los Angeles 2021-01-16 11:56:50.433422 1 Antwerp Rotterdam
Yough arrive Hamburg 2021-01-03 10:57:43.320602 1 Shanghai Antwerp

We don't have the timestamp displayed for the previous ports but it can be added utilizing same approach. Based of lead(port, 1) OVER(ship_ledger_group) AS previous_port, a new select statement can be added rigth after as lead(time, 1) OVER(ship_ledger_group) AS previous_port_time,

Extract new data from past events of an asset with Window Functions

How about finding the average travel time of the most popular ports?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
SELECT *
FROM (SELECT port,
             last_port_event,
             avg(time-last_event_time),
             count(*)
      FROM
        (SELECT *,
               lag(port, 1) OVER(ship_ledger_group) AS last_port_event,
               lag(time, 1) OVER(ship_ledger_group) AS last_event_time
         FROM ship_ledger
         WINDOW ship_ledger_group AS (PARTITION BY ship ORDER BY ship, time ASC)
        ) ss
      GROUP BY (port, last_port_event)
     ) as sss
ORDER BY count DESC
LIMIT 3;

Result:

port last_port_event avg count
Busan Busan 3 days, 8:44:19.400000 15
Rotterdam Rotterdam 3 days, 14:06:46.538462 13
Singapore Singapore 4 days, 1:40:51.846154 13

Extract new data from past events of an asset with Pivot Table (crosstab)

The following section of tutorial requires the tablefunc extension to be enabled. To enabled as a privileged user execute:

1
CREATE EXTENSION IF NOT EXISTS tablefunc;

If crosstab is called without enabling tablefunc extension Postgres will error out with:

1
2
3
4
5
function crosstab(unknown, unknown) does not exist
LINE 3: FROM crosstab
             ^
             HINT:  No function matches the given name and argument types.
                    You might need to add explicit type casts.

A common pattern with Event Sourcing is storing a state with a corresponding event. Say we have a user session tracking system where record the state action that occured.

session user event time
B Alice login 2020-07-02 12:00:12
A Bob logout 2020-07-02 12:01:11
B Eve publish 2020-07-02 12:02:22
B Alice publish 2020-07-02 12:11:00
C Mallory login 2020-07-02 12:12:20
A Bob publish 2020-07-02 12:12:21
B Alice publish 2020-07-02 12:20:12
B Alice logout 2020-07-02 12:22:20

Follow along by downloading the session_tracking.sql a sql script that creates a temporary table. This data can be loaded by excuting \i session_tracking.sql from psql or pgcli clients.

With this data we may be interested in finding duration of a session. The time between login and logout states for all sessions.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
SELECT *,
       logout-login as duration
FROM crosstab 
  ($$ 
      SELECT DISTINCT ON (session, event) "user", event, time 
      FROM user_sessions WHERE event in ('login', 'logout') ORDER BY session, event, time DESC
   $$,
   $$
      VALUES('login'), ('logout')
   $$) AS ct ("user" text, login timestamp, logout timestamp);

What happens in this query is a whats often called a pivot. The inner query prepares the result by extracting last event associate with login out logout.

user event time
Bob logout 2020-07-02 12:01:11
Alice login 2020-07-02 12:00:12
Alice logout 2020-07-02 12:22:20
Mallory login 2020-07-02 12:12:20

The crosstab then performs the pivot, turning events into columns and populating them with corresponding time values yielding the following data:

user login logout duration
Bob 2020-07-02 12:01:11
Alice 2020-07-02 12:00:12 2020-07-02 12:22:20 0:22:08
Mallory 2020-07-02 12:12:20

This query above is extra careful to:

  • retrieve only logout an login events (WHERE event IN ('login', 'logout'))
  • explicitly declare to crosstab what values to expect as events (VALUES('login'), ('logout'))
  • specify the types of resulting data AS (ct ("user" text, login timestamp, logout timestamp)).

This safety is not incidental, it is possible to create a more generic version of return all event types there are few gotchas associated with it that are beyond the scope of this exercise.

Lets try some other applying crosstab to some other examples. Lets say we want to know when each ship has been to the 3 most popular ports. First what are the most popular ports:

1
2
3
4
5
6
SELECT port,
       count(*)
FROM ship_ledger
GROUP BY port
ORDER BY count(*) DESC
LIMIT 3

Singapore, Rotterdam, Busan. Great! crosstab can now be applied to find the when each ship was there last:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
SELECT *
FROM crosstab
  ($$ SELECT DISTINCT ON (ship, port) ship, port, TIME
      FROM ship_ledger
      WHERE action='arrive'
        AND port IN ('Singapore', 'Rotterdam', 'Busan')
      ORDER BY ship, port, TIME DESC
   $$,
   $$
      VALUES('Singapore'), ('Rotterdam'), ('Busan') 
   $$) AS ct(ship text, singapore TIMESTAMP, rotterdam TIMESTAMP, Busan TIMESTAMP);

Summary and Applicability

Hopefully these examples give you bit of an insight into how you can utilize Event Sourcing in your project. Just like any pattern it is not panacea, here are use cases which for which event sourcing is great for and there is also the opposite.

Great use cases:

  • User session tracking. Just like in the example above if you want to record what users are doing in your application for future analysis event sourcing is great fit.
  • Transactional systems such as order systems or banking. Instead of storing an account balance all transaction against an account can be store ensuring that balance is always accurate even in concurrent executions.
  • By utilizing event sourcing its possible to build distributed worker systems without having to rely on a queue. How much simpler can your system be by using your relational database that you have anyway and not have to start add rabbitmq / sqs / zeromq to your architecture.
  • Any time where point in time injection of data may be necessary. What if you get yesterdays transactions today, after today's transactions have already been executed. Without event sourcing, if transactions are not commutative you may need to build complex logic to rollback state and reply while in event sourcing you just create a new back-dated event the state will be recomputed on use.
  • Similar to point in time injection, you want to know what the state of something was in the past. Analysing state of events up to that point and ignoring the rest give you that ability.

When is event sourcing counter productive? Well its particularly useless if you are dealing with data that does not have a time dimension. In terms of performance event sourcing is not necessarily much more expensive but details there depend on the nature of the problem, its unlikely to be a order of magnitude difference. Perhaps the biggest downside of event sourcing is slightly more complex queries but that difference may be immaterial esp as the benefits can be liberating.

Its also worth noting that while examples above are written with Postgres in mind, similar functionality exists in other database engines.

2021-03-14T16:14:01-04:00