In this post, we’ll see how to use Azure Stream Analytics (ASA) to perform windowed aggregations by calendar day that update continuously, rather than at the end of the window. Specifically, we will maintain a view of daily aggregations in Azure Table Storage that are continuously updated as new data arrives.
According to the official ASA documentation, partial aggregates are not supported currently in Azure Stream Analytics. However, we can achieve the same semantics by using a sliding window, some clever grouping, and the Azure Table Storage output sink type, which supports upserts.
Our goals
- Aggregations must be performed by calendar day.
- Aggregations must be updated incrementally and continuously in near real-time (we cannot wait for the end of the calendar day to update aggregated views, i.e. Table Storage).
Solution
To solve this problem in ASA, we use the aggregation windowing type SlidingWindow
with 24-hour width, which emits an output every time a new event arrives on the input source. That alone is not enough, since it provides us with a rolling day aggregation, rather than one by calendar day. To achieve calendar-day semantics, we first group events within the 24-hour window by calendar day, which results in at most 2 groups: today, and yesterday. Then, we select only today’s group by filtering for groups whose calendar day matches that of the window’s end †, i.e. the newest event in the window (from the example below, HAVING eventDate = AggregationEndDate
).
Note that the filtering step is very important, since without it, we would overwrite the correct aggregation computed for the previous day with an ever-shrinking partial result, as yesterday’s data falls out of the 24-hour sliding window.
1
2
3
4
5
6
7
10/04 10/05 @ 12AM 10/05 @ 3PM (newest event)
/ / /
|XXXXXXXX|OOOOOOOOOOOOOOOO|
^----- 24-hour window ----^
X = discarded
O = included
In the above diagram, a new event arrives (timestamp of 10/05 @ 3PM) which triggers the aggregation to update. The 24-hour sliding window includes events from the previous calendar day, as well as events from the current one. We must discard events from the previous day (denoted with
X
).
As new events arrive, ASA updates its internal state and then writes out the current aggregation results to Azure Table Storage.
We use Azure Table Storage’s support for upserts to update daily aggregations in place. The table’s partition key and row key uniquely identify the aggregation result, which is replaced each time ASA’s internal state of that aggregation gets updated. We do not read from Table Storage.
Example Query
In this example, we will maintain event counts for each calendar day, updating them in real-time.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* Step 1: Augment input events with the date.
*
* To do this, we create a new column `eventDate` by zero-ing out
* the time component of `eventTimestamp`, the application timestamp
* for our particular dataset.
*/
WITH
EventsWithDate AS
(
SELECT
*,
DATETIMEFROMPARTS(YEAR(eventTimestamp),
MONTH(eventTimestamp),
DAY(eventTimestamp), 0, 0, 0, 0) AS eventDate
FROM
Events TIMESTAMP BY eventTimestamp
)
/**
* Step 2: Sliding window aggregation.
*
* Annotated inline:
*
* a. Group by 24-hour sliding window.
* b. Group by `eventDate`, yielding at most 2 groups within the window:
* yesterday and today.
* c. Find the date of the newest event in the window, `windowEndDate`.
* d. Filter-out yesterday group (HAVING clause) by considering only the
* group which has the same date as the window's end (today).
*
* System.Timestamp() is the end of the 24 hour window.
*/
SELECT
eventDate,
COUNT(*) AS [Count],
DATETIMEFROMPARTS(YEAR(System.Timestamp()),
MONTH(System.Timestamp()),
DAY(System.Timestamp()), 0, 0, 0, 0) AS windowEndDate -- c.
INTO
TableStorageOutput
FROM
EventsWithDate
GROUP BY
eventDate, -- b.
SlidingWindow(day, 1) -- a.
HAVING
eventDate = windowEndDate -- d.
Writing to Table Storage
You must configure a partition key and row key on the ASA Table Storage Output (e.g. TableStorageOutput
from the above example). In our example, we would use eventDate
as our row key. This way, we maintain a single row for each calendar day within Table Storage, which is replaced each time our aggregation advances.
For the Table Storage partition key, we might use eventDate
as well for our example. If we were performing this aggregation for multiple groups (e.g. by car manufacture), you might use a more interesting partition key to maintain a single row for each calendar day for each group.
Notes
† This method may produce incorrect results if you’re using the “Adjust” late arrival policy in the case that an event is adjusted such that it moves to the next day. This is because System.Timestamp
used to calculate the window’s end date would be the adjusted timestamp, whereas the event would land in yesterday’s group (using its original timestamp), and hence be discarded.
From Understanding time handling in Azure Stream Analytics:
As a part of the adjustment, the event’s System.Timestamp is set to the new value, but the event time field itself is not changed. This adjustment is the only situation where an event’s System.Timestamp can be different from the value in the event time field and may cause unexpected results to be generated.