Stratus3D

A blog on software engineering by Trevor Brown

Sliding Window Data Store in Erlang

In a recent Erlang project I had a need for in-memory storage of events. I didn’t need events to persist for more than an hour. All I needed was a collection of events that had occurred over the last hour that I could query. Collections like these are often referred to as sliding window data stores. Only events within a window of time are kept. As the events age they eventually pass out of the window of time and are discarded. Since old events are discarded when they fall outside of the window, queries against the collection only return events within the sliding window.

###First Attempt

My first attempt at creating a sliding window resulted in a very primitive solution. All it does is create an empty list, and as new events come in they are added to the beginning of the list. Anytime an action is performed, whether adding events, querying events, or retrieving all the events, old events are removed by folding over the list with lists:foldl. When events are queried the entire list is filtered to find the matching events. When we want all the events within the window we simply return the list. Events are stored internally as tuples of three values - The event ID, the time added, and the event itself.

Here is the module source code:

``` erlang sliding_window_list.erl -module(sliding_window_list).

-behaviour(sliding_window).

-include_lib(“include/sliding_window.hrl”).

-export([new/1, add/2, events/1, event_info/2, reset/1]).

% API new(Size) -> % Since we are using a plain old list to store events all we need to keep % track of is the size of the window (in seconds), the last event ID (starts % at 1), and the array of events. We store all these in a tuple and pass it % around to each API call. {Size, 1, []}.

add({Size, Id, Window}, Event) -> % Remove old events as some might still exist in the list NewWindow = remove_old_events(Size, Window), % Add a new event to the list and return the updated window state and event % id. {{Size, Id+1, [{Id, sliding_window:timestamp(), Event}|NewWindow]}, Id}.

events({Size, Id, Window}) -> % Remove old events as some might still exist in the list NewWindow = remove_old_events(Size, Window), % Return all events in the window {{Size, Id, NewWindow}, NewWindow}.

event_info({Size, Id, Window}, EventId) -> % Remove old events as some might still exist in the list NewWindow = remove_old_events(Size, Window), % Find the event in the list of events case lists:filter(fun({CurrentEventId, _Time, _ExistingEvent}) -> CurrentEventId =:= EventId end, NewWindow) of [{EventId, EventTimeAdded, Event}] -> % If event exists return it along with the time remaining in the % window and the time elapsed. Also return the new window state. TimeElapsed = timer:now_diff(sliding_window:timestamp(), EventTimeAdded) / ?MEGA, TimeRemaining = timer:now_diff(EventTimeAdded, sliding_window:window_end_timestamp(Size)) / ?MEGA, {{Size, Id, NewWindow}, {EventId, EventTimeAdded, TimeElapsed, TimeRemaining, Event}}; [] -> % Return none if the event is not found {{Size, Id, NewWindow}, none} end.

reset({Size, Id, _Window}) -> {{Size, Id, []}, ok}.

% Private Functions remove_old_events(Size, WindowEvents) -> % Remove events that are outside the window of time and return the updated % list of events. WindowEndTime = sliding_window:window_end_timestamp(Size), lists:takewhile(fun({_, TimeAdded, _}) -> TimeAdded > WindowEndTime end, WindowEvents).



Using this module to store events is straightforward:


``` erlang
% Create a new window to hold events for 40 seconds
1> Window = sliding_window_list:new(40).
{40,1,[]}
% Add an event to the window
2> {Window2, Id1} = sliding_window_list:add(Window, foobar_event).
{{40,2,[{1,{1436,42881,562920},foobar_event}]},1}
% Add another event to the window
3> {Window3, Id2} = sliding_window_list:add(Window2, {some, other, event}).
{{40,3,[{1,{1436,42893,241849},foobar_event}, {2,{1436,42991,830545},{some,other,event}}]},2}
% List all events
4> {Window4, Events} = sliding_window_list:events(Window3).
{{40,3,[{1,{1436,42893,241849},foobar_event}, {2,{1436,42991,830545},{some,other,event}}]}, [{1,{1436,42893,241849},foobar_event}, {2,{1436,42991,830545},{some,other,event}}]}
5> Events. % Events current stored in the window
[{1,{1436,42893,241849},foobar_event}, {2,{1436,42991,830545},{some,other,event}}]
% Get details on a specific event by ID
5> {Window5, Event} = sliding_window_list:event_info(Window4, Id1).
{{40,3,[{1,{1436,42893,241849},foobar_event}, {2,{1436,42991,830545},{some,other,event}}]}, {1,{1436,43127,793403},24.73498,15.262321,foobar_event}}
% Event details
7> Event.
{1,{1436,43127,793403},24.73498,15.262321,foobar_event}
% After waiting 40 seconds we can repeat the event_info and events calls.
% Now the events have expired and are no longer stored
14> {Window6, EventInfo} = sliding_window_list:event_info(Window5, Id1).
{{40,2,[]},none}
9> {Window7, Events} = sliding_window_list:events(Window6).
{{40,3,[]},[]}

####List Implementation Benchmarks Since I knew this wasn’t going to be an efficient solution I decided to write a couple functions to benchmark the performance of several operations. There are three main operations that can be performed on a sliding window data store:

  1. Adding events
  2. Fetching information on events (Querying for specific events within the window)
  3. Retrieving all events currently in the window

I wrote three benchmark functions. The first adds 10,000 events to the sliding window. The second fetches information on 1,000 random individual events The third fetches all events from the window 100 times. Events are generated on the fly and are tuples composed of a unique ID, an atom, and a binary with a length of 45, 90, 135, or 180 bytes.

``` erlang An example event {load_testing_event, 123, «“The quick brown fox jumped over the lazy dog.The quick brown fox jumped over the lazy dog.”»}


I ran all these functions three times on my MacBook Pro and took the average of the three runs. For my naive list implementation the results were:

* Adding 10,000 events: 8.726298 seconds
* Fetching 1,000 events: 2.835538 seconds
* Fetching all events 100 times: 0.279025 seconds

Not too bad for just being a list, but still not good enough for a system expected to be able to handle thousands of events. When using a list to store events a new list must be constructed each time old events are removed. This is due to the fact that old events are at the end of the list, and since Erlang's lists are stored as linked-lists and all data is immutable in Erlang, an entirely new list must be constructed when the last item in the list is removed. Since old events are at the end of the list and are removed each time an event is added or retrieved it has a significant impact on performance.

After seeing the benchmark results I realized my benchmark functions were completely unrealistic. The data store would likely experience a mix of operations over time. Calls to add new events along with queries for existing events and calls to retrieve all events in the window would be randomly mixed together. I decided to write another benchmark function to interleave all three of these operations. The completed function adds 100 events, then fetches 10 random events, and then fetches all events once. In these benchmarks I ran the function 1000 times. I set the window to only store events for 10 seconds so events would expire during the test. At the end of the test run I counted the events still in the window. Since all events still in the window had been added in the last 10 seconds I simply divided the number of events by 10 to compute the number of events per second. While not perfect this served as my throughput benchmark. The throughput on the list implementation was 700.8 events per second.

I also created an Erlang behavior named `sliding_window` that defines all the functions the sliding window modules must export. All the modules shown in this post comply with the `sliding_window` behaviour:

``` erlang sliding_window.erl
-module(sliding_window).

-type event() :: list(tuple()).

-type id() :: any().

-type seconds() :: integer().

-type milliseconds() :: integer().

-type window_ref() :: any().

-callback new(Size :: seconds()) -> window_ref().

-callback add(Window :: window_ref(), Event :: event()) ->
    {window_ref(), Id :: id()}.

-callback events(Window :: window_ref()) ->
    {window_ref(), list({Id :: id(), Event :: event()})}.

-callback event_info(Window :: window_ref(), EventId :: id()) ->
    {window_ref(), {TimeElapsed :: milliseconds(), TimeRemaining :: milliseconds(), event()} | none}.

-callback reset(Window :: window_ref()) ->
    {window_ref(), ok}.

###Erlang’s Queue Module

I knew using an ETS table would probably be the best solution to this problem. But before I developed the ETS table implementation I came across some stuff on the erlang-questions mailing list about sliding window data structures from back in 2005. I then found Erlang’s queue module. The queue module implements a FIFO queue using two lists. Erlang’s queue implementation is based on double ended queues from Chris Okasaki’s Purely Functional Data Structures. Here is the implementation I came up with that uses the queue module:

``` erlang sliding_window_queue.erl -module(sliding_window_queue).

-behaviour(sliding_window).

-include_lib(“include/sliding_window.hrl”).

-export([new/1, add/2, events/1, event_info/2, reset/1]).

% API new(Size) -> % We can use a queue similar to a list. We create a queue and then we can % pass it along to each call. {Size, 1, queue:new()}.

add({Size, Id, Window}, Event) -> % Remove old events as some might still exist in the queue NewWindow = remove_old_events(Size, Window), % Add a new event to the queue and return the updated window state and event % id. {{Size, Id+1, queue:in({Id, sliding_window:timestamp(), Event}, NewWindow)}, Id}.

events({Size, Id, Window}) -> % Remove old events as some might still exist in the queue NewWindow = remove_old_events(Size, Window), % Return all events in the window {{Size, Id, NewWindow}, queue:to_list(NewWindow)}.

event_info({Size, Id, Window}, EventId) -> % Remove old events as some might still exist in the queue NewWindow = remove_old_events(Size, Window), % Find the event in the queue of events FilterResults = queue:filter(fun({CurrentEventId, _Time, _ExistingEvent}) -> CurrentEventId =:= EventId end, NewWindow), case queue:to_list(FilterResults) of [{EventId, EventTimeAdded, Event}] -> % If event exists return it along with the time remaining in the % window and the time elapsed. Also return the new window state. TimeElapsed = timer:now_diff(sliding_window:timestamp(), EventTimeAdded) / ?MEGA, TimeRemaining = timer:now_diff(EventTimeAdded, sliding_window:window_end_timestamp(Size)) / ?MEGA, {{Size, Id, NewWindow}, {EventId, EventTimeAdded, TimeElapsed, TimeRemaining, Event}}; [] -> % Return none if the event is not found {{Size, Id, NewWindow}, none} end.

reset({Size, Id, _Window}) -> {{Size, Id, queue:new()}, ok}.

% Private Functions remove_old_events(Size, WindowEvents) -> % Remove events that are outside the window of time and return the updated % queue of events. WindowEndTime = sliding_window:window_end_timestamp(Size), remove_old_events_from_queue(WindowEndTime, WindowEvents).

remove_old_events_from_queue(WindowEndTime, WindowEvents) -> % Get the oldest event in the queue try queue:get(WindowEvents) of {_, TimeAdded, _} -> case TimeAdded > WindowEndTime of true -> % If the oldest event is inside the window of time we can % just return the remaining WindowEvents queue. We know all % events from here on are newer. WindowEvents; _ -> % Remove the event from the queue and invoke % remove_old_events_from_queue again to check the next event remove_old_events_from_queue(WindowEndTime, queue:drop(WindowEvents)) end catch error:empty -> % We are at the end of the queue. Return the queue WindowEvents end.



Note that functions and macros that were reused by queue implementation have been pulled out into the `sliding_window` module and header file. The functions and macros are the same as they were in the `sliding_window_list` module.

I was surprised at how fast the queue implementation performed:

* Adding 10,000 events: 0.036307 seconds
* Fetching 1,000 events: 1.21013 seconds
* Fetching all events 100 times: 0.119682 seconds
* Throughput benchmark: 2783.333333 events per second

Overall it was about three times faster than the list implementation. The improved performance is likely due to queue being represented as two Erlang lists. Lists are essentially stacks with two efficient operations - push and pop. Items can be pushed onto the top of the list to create a new list. The first item in a list can be popped off the top of the list (removed) and returned.

The queue is represented underneath as two lists - one list for incoming (new) events and the other for outgoing (old) events. The newest event is the first item in the incoming list and the oldest event is the first item in the outgoing list. What makes the queue so efficient is that everything is done using only the push and pop operations. When an event is added to the queue it is pushed onto the front of the incoming event list. When an item is removed from the queue it is popped off of the front of the outgoing list. When the outgoing list is empty each of the items in the incoming list is popped off one by one and pushed onto the outgoing list. This results in the last and oldest item in the incoming list becoming the first item in the outgoing list. Each item that passes through the queue is only pushed twice and popped twice. It is pushed onto in the incoming list, then popped off the incoming list and pushed onto the outgoing list. Finally it is popped off the outgoing list. This results in very efficient FIFO queue.

###With An ETS Table

Next I tried implementing the sliding window using an ETS table. ETS stands for Built-in Term Storage. An ETS table is a key-value lookup table that is stored in memory. It is very different than the list and queue implementations, which are basically just stacks. While the event storage is different the code was similar to the other implementations:


``` erlang sliding_window_ets.erl
-module(sliding_window_ets).

-behaviour(sliding_window).

-include_lib("include/sliding_window.hrl").

-define(TABLE_NAME, ?MODULE).

-export([new/1, add/2, events/1, event_info/2, reset/1]).

% API
new(Size) ->
    % We need our ETS table to be an ordered set. Otherwise we will have to
    % inspect every event when checking for old events. With an ordered set we
    % can be sure the oldest event is at the end.
    EtsOptions = [ordered_set, protected],
    Tid = ets:new(?TABLE_NAME, EtsOptions),
    {Size, 1, Tid}.

add({Size, Id, Window}, Event) ->
    % Remove old events as some might still exist in the table
    remove_old_events(Size, Window),
    % Add a new event to the table and return the event id.
    ets:insert(Window, {Id, sliding_window:timestamp(), Event}),
    {{Size, Id+1, Window}, Id}.

events({Size, Id, Window}) ->
    % Remove old events as some might still exist in the table
    remove_old_events(Size, Window),

    % Match all events in the table and map them to remove the surrounding lists
    % for each event. Return all events.
    Events = ets:match(Window, '$1'),
    Result = lists:map(fun([Event]) ->
                               Event
                       end, Events),
    {{Size, Id, Window}, Result}.

event_info({Size, Id, Window}, EventId) ->
    % Remove old events as some might still exist in the table
    remove_old_events(Size, Window),

    % Find the event in the ETS table
    case ets:lookup(Window, EventId) of
        [{EventId, EventTimeAdded, Event}] ->
            % If event exists return it along with the time remaining in the
            % window and the time elapsed. Also return the new window state.
            TimeElapsed = timer:now_diff(sliding_window:timestamp(), EventTimeAdded) / ?MEGA,
            TimeRemaining = timer:now_diff(EventTimeAdded, sliding_window:window_end_timestamp(Size)) / ?MEGA,
            {{Size, Id, Window}, {EventId, EventTimeAdded, TimeElapsed, TimeRemaining, Event}};
        [] ->
            % Return none if the event is not found
            {{Size, Id, Window}, none}
    end.

reset({Size, Id, Window}) ->
    true = ets:delete(Window),
    {{Size, Id, Window}, ok}.

% Private Functions
remove_old_events(Size, Window) ->
    % Remove events that are outside the window of time
    WindowEndTime = sliding_window:window_end_timestamp(Size),

    % The last event is the oldest, so we start with it
    Key = ets:first(Window),
    ok = remove_old_events_from_table(WindowEndTime, Window, Key).

remove_old_events_from_table(_WindowEndTime, _Window, '$end_of_table') ->
    ok;
remove_old_events_from_table(WindowEndTime, Window, Key) ->
    % Lookup the event in the ETS table and check the time it was added.
    case ets:lookup(Window, Key) of
        [{_, TimeAdded, _}] = [Event] ->
            case TimeAdded > WindowEndTime of
                true ->
                    % If the oldest event is inside the window of time we can
                    % return. We know all events from here on are newer.
                    ok;
                false ->
                    % Remove the event from the table and invoke
                    % remove_old_events_from_table again to check the next event
                    true = ets:delete_object(Window, Event),
                    NewKey = ets:next(Window, Key),
                    remove_old_events_from_table(WindowEndTime, Window, NewKey)
            end;
        [] ->
            % We are at the end of the set. Return
            ok
    end.

####ETS Table Benchmark Results

  • Adding 10,000 events: 0.058912 seconds
  • Fetching 1,000 events: 0.937192 seconds
  • Fetching all events 100 times: 0.108906 seconds
  • Throughput benchmark: 2196.666667 events per second

Surprisingly the ETS table is actually slower than using the queue module. It’s still significantly faster than using a plain old list however. Since we are using an ordered set table to store the events we know the oldest event is the first item in the table. So we start checking for old events at the beginning of the table and work our way forward. Once we get to an event that is still within the window of time we stop checking for old events, since we know that the remaining events are closer to the end of the table, and therefore, they are newer. This allows us to ignore most of the events that are inside of the window of time. The slight decrease in performance over the queue implementation is likely due to the key lookup required to traverse the items in the table (e.g. the ets:first/1 and ets:next/2 calls).

###Mnesia

Since we are doing all this in Erlang this comparison wouldn’t be complete without testing Erlang’s own database, Mnesia. Mnesia uses ETS and DETS tables underneath so I didn’t expect it to be any faster than the ETS implementation. After seeing the results from ETS table implementation I was curious how Mnesia would perform in situation like this. Since Mnesia is built on top of ETS and DETS I didn’t expect it to be any faster. There are other things that might justify using an Mnesia database to store events. Mnesia has the ability to have events persisted on disk as well as stored in memory. An Mnesia table can also be replicated across multiple Erlang nodes, making fault tolerance possible. Both of these things would be very difficult to do with plain ETS table or queue. For the Mnesia sliding window I opted for a simple in memory table on one node. This makes the implementation similar to the others:

``` erlang sliding_window_mnesia.erl -module(sliding_window_mnesia).

-behaviour(sliding_window).

-include_lib(“stdlib/include/qlc.hrl”). -include_lib(“include/sliding_window.hrl”).

-define(TABLE_NAME, event).

-record(event, {id, added_time, event}).

-export([new/1, add/2, events/1, event_info/2, reset/1]).

% API new(Size) -> % Create the schema mnesia:create_schema([node()]), % Start Mnesia mnesia:start(), Attrs = {attributes, record_info(fields, event)}, % Only store table in ram on one node mnesia:create_table(?TABLE_NAME, [Attrs, {ram_copies, [node()]}]), {Size, 1, ?TABLE_NAME}.

add({Size, Id, Window}, Event) -> % Remove old events as some might still exist in the table remove_old_events(Size, Window), % Add a new event to the table and return the event id. {atomic, _} = mnesia:transaction(fun() -> EventRecord = #event{id=Id, added_time=sliding_window:timestamp(), event=Event}, mnesia:write(EventRecord) end), {{Size, Id+1, Window}, Id}.

events({Size, Id, Window}) -> % Remove old events as some might still exist in the table remove_old_events(Size, Window),

% Match all events in the table and map them to tuples. Return all events.
Result = do(qlc:q([X || X <- mnesia:table(Window)])),
TupleResults = lists:map(fun({event, EventId, TimeAdded, Event}) ->
                  {EventId, TimeAdded, Event}
          end, Result),
{{Size, Id, Window}, TupleResults}.

event_info({Size, Id, Window}, EventId) -> % Remove old events as some might still exist in the table remove_old_events(Size, Window),

% Find the event in the ETS table
Result = do(qlc:q([{CurrentEventId, EventTimeAdded, Event} || #event{id=CurrentEventId, added_time=EventTimeAdded, event=Event} <- mnesia:table(Window),
                                              CurrentEventId =:= EventId
         ])),
case Result of
    [{EventId, EventTimeAdded, Event}] ->
        % If event exists return it along with the time remaining in the
        % window and the time elapsed. Also return the new window state.
        TimeElapsed = timer:now_diff(sliding_window:timestamp(), EventTimeAdded) / ?MEGA,
        TimeRemaining = timer:now_diff(EventTimeAdded, sliding_window:window_end_timestamp(Size)) / ?MEGA,
        {{Size, Id, Window}, {EventId, EventTimeAdded, TimeElapsed, TimeRemaining, Event}};
    [] ->
        % Return none if the event is not found
        {{Size, Id, Window}, none}
end.

reset({Size, Id, Window}) -> % Empty the database mnesia:clear_table(Window),

% Stop mnesia
mnesia:stop(),
{{Size, Id, Window}, ok}.

% Private Functions remove_old_events(Size, Window) -> % Remove events that are outside the window of time WindowEndTime = sliding_window:window_end_timestamp(Size),

OldEventsIds = do(qlc:q([Id || #event{id=Id, added_time=TimeAdded} <- mnesia:table(Window),
                                    TimeAdded < WindowEndTime
         ])),
mnesia:transaction(fun() ->
                           lists:map(fun(Id) ->
                                             mnesia:delete({event, Id})
                                     end, OldEventsIds)
                   end).

do(Q) -> F = fun() -> qlc:e(Q) end, {atomic, Val} = mnesia:transaction(F), Val.

```

####ETS Table Benchmark Results

  • Adding 10,000 events: 10.498559 seconds
  • Fetching 1,000 events: 4.26676 seconds
  • Fetching all events 100 times: 0.442445 seconds
  • Throughput benchmark: 563.933333 events per second

The Mnesia table performed far worse than the ETS table. The extreme differences in performance between the ETS table and the Mnesia table is due to the fact that I must query the entire table to find old events. This is detrimental to performance. There is a way to use an ordered_set table just like in the ETS table implementation but I wasn’t able to get it to work properly. Checking every event in the table on every call is very expensive. Querying for old events is the primary cause of the bad performance.

###Benchmark Comparison

Here are the benchmark results for all four sliding window modules:

Implementation Adding 10,000 Events Fetching 1,000 Events Fetch All Events 100 Times
Values in Seconds - lower is better
List8.7262982.8355380.279025
Queue0.0363071.210130.119682
ETS0.0589120.9371920.108906
Mnesia10.4985594.266760.442445


And the more realistic throughput benchmark:

Implementation Throughput Benchmark (events per second, higher is better)
List700.8
Queue2783.333333
ETS2196.666667
Mnesia563.933333


###Conclusion

Erlang provides all the tools needed to quickly build sliding window data stores. The development of all four implementations was relatively straightforward and they were all less than 100 lines of code. They all had very different performance characteristics but overall the one that used Erlang’s queue module performed the best in my tests. The ETS table implementation was a close second. If you need the sliding window to be accessible to other processes on the Erlang node an ETS table may be a good option as it allows for sharing between processes, unlike a list or queue, which is only accessible by one process. The sliding window that used Mnesia was the slowest implementation out of the four. However there are still many advantages to using Mnesia to store events. You can spread the Mnesia table across multiple Erlang nodes and unlike the other three implementations you can use a disk backed Mnesia table to persist events on disk. Overall the list implementation was probably the worst. Though even it might be fine in situations where you don’t plan on handling more than a couple hundred events at a time.

I hope this information was helpful to you. If you notice anything here is that incorrect or you see something I missed please let me know.

The benchmark tests I wrote are far from perfect. I only wrote them to get a feel for the performance of the various sliding window implementations I came up with. In all the benchmarks I ran the window size was less than 2 minutes and in the throughput benchmark the window size was only 10 seconds. In real scenarios events would likely be kept for at least an hour. If you need a sliding window data store it would be wise to write your own benchmarking functions that exercises these implementations in ways that match your specific use case.

###Resources