Posts Real-time calendar day aggregations with Azure Stream Analytics
Post
Cancel

Real-time calendar day aggregations with Azure Stream Analytics

In this post, we’ll see how to use Azure Stream Analytics (ASA) to perform windowed aggregations by calendar day that update continuously, rather than at the end of the window. Specifically, we will maintain a view of daily aggregations in Azure Table Storage that are continuously updated as new data arrives.

Logo for Microsoft Azure Stream Analytics

According to the official ASA documentation, partial aggregates are not supported currently in Azure Stream Analytics. However, we can achieve the same semantics by using a sliding window, some clever grouping, and the Azure Table Storage output sink type, which supports upserts.

Our goals

  1. Aggregations must be performed by calendar day.
  2. Aggregations must be updated incrementally and continuously in near real-time (we cannot wait for the end of the calendar day to update aggregated views, i.e. Table Storage).

Solution

To solve this problem in ASA, we use the aggregation windowing type SlidingWindow with 24-hour width, which emits an output every time a new event arrives on the input source. That alone is not enough, since it provides us with a rolling day aggregation, rather than one by calendar day. To achieve calendar-day semantics, we first group events within the 24-hour window by calendar day, which results in at most 2 groups: today, and yesterday. Then, we select only today’s group by filtering for groups whose calendar day matches that of the window’s end , i.e. the newest event in the window (from the example below, HAVING eventDate = AggregationEndDate).

Note that the filtering step is very important, since without it, we would overwrite the correct aggregation computed for the previous day with an ever-shrinking partial result, as yesterday’s data falls out of the 24-hour sliding window.

1
2
3
4
5
6
7
  10/04    10/05 @ 12AM     10/05 @ 3PM (newest event)
 /        /                /
|XXXXXXXX|OOOOOOOOOOOOOOOO|
^----- 24-hour window ----^

X = discarded
O = included

In the above diagram, a new event arrives (timestamp of 10/05 @ 3PM) which triggers the aggregation to update. The 24-hour sliding window includes events from the previous calendar day, as well as events from the current one. We must discard events from the previous day (denoted with X).

As new events arrive, ASA updates its internal state and then writes out the current aggregation results to Azure Table Storage.

We use Azure Table Storage’s support for upserts to update daily aggregations in place. The table’s partition key and row key uniquely identify the aggregation result, which is replaced each time ASA’s internal state of that aggregation gets updated. We do not read from Table Storage.

Example Query

In this example, we will maintain event counts for each calendar day, updating them in real-time.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
 * Step 1: Augment input events with the date.
 *
 * To do this, we create a new column `eventDate` by zero-ing out
 * the time component of `eventTimestamp`, the application timestamp
 * for our particular dataset.
 */
WITH   
EventsWithDate AS  
(
    SELECT
        *,
        DATETIMEFROMPARTS(YEAR(eventTimestamp), 
                          MONTH(eventTimestamp), 
                          DAY(eventTimestamp), 0, 0, 0, 0) AS eventDate
    FROM
        Events TIMESTAMP BY eventTimestamp
)

/**
 * Step 2: Sliding window aggregation.
 *
 * Annotated inline:
 *
 * a. Group by 24-hour sliding window.
 * b. Group by `eventDate`, yielding at most 2 groups within the window:
 *    yesterday and today.
 * c. Find the date of the newest event in the window, `windowEndDate`.
 * d. Filter-out yesterday group (HAVING clause) by considering only the
 *    group which has the same date as the window's end (today).
 *
 * System.Timestamp() is the end of the 24 hour window.
 */
SELECT
    eventDate,
    COUNT(*) AS [Count],
    DATETIMEFROMPARTS(YEAR(System.Timestamp()),
                      MONTH(System.Timestamp()),
                      DAY(System.Timestamp()), 0, 0, 0, 0) AS windowEndDate -- c.
INTO
    TableStorageOutput
FROM
    EventsWithDate
GROUP BY
    eventDate, -- b.
    SlidingWindow(day, 1) -- a.
HAVING
    eventDate = windowEndDate -- d.

Writing to Table Storage

You must configure a partition key and row key on the ASA Table Storage Output (e.g. TableStorageOutput from the above example). In our example, we would use eventDate as our row key. This way, we maintain a single row for each calendar day within Table Storage, which is replaced each time our aggregation advances.

For the Table Storage partition key, we might use eventDate as well for our example. If we were performing this aggregation for multiple groups (e.g. by car manufacture), you might use a more interesting partition key to maintain a single row for each calendar day for each group.

Notes

This method may produce incorrect results if you’re using the “Adjust” late arrival policy in the case that an event is adjusted such that it moves to the next day. This is because System.Timestamp used to calculate the window’s end date would be the adjusted timestamp, whereas the event would land in yesterday’s group (using its original timestamp), and hence be discarded.

From Understanding time handling in Azure Stream Analytics:

As a part of the adjustment, the event’s System.Timestamp is set to the new value, but the event time field itself is not changed. This adjustment is the only situation where an event’s System.Timestamp can be different from the value in the event time field and may cause unexpected results to be generated.

❤︎  ...?

Connect with me on Twitter:   @ctrpeach