Consider the following transactional data set
Note that transactions pass through created->paid->cancelled in order.
The analyst usually needs to answer questions like
- How many transactions completed payment?
- How many transactions which were initiated on a mobile device were cancelled?
- How many transactions paid by card completed payment?
- How many transactions that were created in the last month completed within the month?
Using LATEST,EARLIEST and ANY_VALUE
The key issue with this kind of data is that druid does not support row level updates. In a typical data warehouse the data would be updated using transaction_id as a primary key. This is however not possible in druid. But druid does have the LATEST, EARLIEST and ANY_VALUE SQL commands which allow both numbers and strings to be treated as aggregates (https://druid.apache.org/docs/latest/querying/sql.html#aggregation-functions)
Consider the questions “How many transactions completed payment?”
The following query will retrieve the latest status and any amount for each transaction.
select transaction_id, LATEST(status,10), ANY_VALUE(amount) from datasource group by 1
The status changes for each record but the amount does not change. So the above query gets the latest status and any amount for each transaction_id (the value 10 for the latest(status,10) is to specify the size of the string for the aggregation buffer).
The following query will answer the question “How many transactions completed payment?”
Select count(1) from (select transaction_id, LATEST(status,10) status, ANY_VALUE(amount) amount from datasource group by 1) where status=’paid’
While this approach works, the primary drawback is that the inner query aggregates by transaction_id which tends to be a high cardinality column. This limits the performance that can be achieved with the above query.
Using a kafka lookup
In this approach we will be using a kafka lookup in druid (https://druid.apache.org/docs/latest/development/extensions-core/kafka-extraction-namespace.html
). To use the lookup we will need two kafka topics. A primary topic that receives the entire data (
__time,transaction_id,status,reason,method,channel,amount) and lookup topic that receives transaction_id,__time (the kafka lookup needs the topic data to be published using the transaction_id as the message key. This ensures the ordering of the messages.)
In the kafka lookup topic the message_id will be the transaction_id and __time is the value of the message. In the druid lookup transaction_id is the message key and __time is the value.
Using the above lookup topic in druid to create a lookup called lkp1, the following query can be used to get the count of transactions that are complete
select count(1) from datasource where __time=time_parse(LOOKUP(transaction_id,’lkp1’)) and status=’paid’
Funnel analysis using theta sketch
While lack of support for row level updates makes traditional OLAP challenging, retaining events for each primary key makes possible other types of analytics. Funnel analytics usually involves answering questions like “How many customers went from page A to B to C and D?” or “How many customers saw product A and B”. How many entities went through a sequence A->B->C etc
There are two approaches to funnel analytics
Consider the following data
The following query computes the count of unique users who have passed through status A,B and C
select THETA_SKETCH_ESTIMATE(THETA_SKETCH_INTERSECT(st_a,st_b,st_c)) from
(select DS_THETA(user) filter (where state=’A’) st_a,DS_THETA(user) filter (where state=’B’) st_b
,DS_THETA(user) filter (where status=’C’) st_c from datasource)
The key thing to note is that this query computes an intersection of three sets. However it does not maintain the sequence of the statuses. For instance if a user went through A->C->B then this query would still count those users in the intersection.
For most use cases the sequence A->B->C is fixed but for use cases where the order is flexible but the only the specific sequence must be counted in a query then the following query can be used
select count(1) from (select “user”,MAX(TIMESTAMP_TO_MILLIS(“__time”)) filter (where status=’A’) t1,
MAX(TIMESTAMP_TO_MILLIS(“__time”)) filter (where status=’B’) t2,
MAX(TIMESTAMP_TO_MILLIS(“__time”)) filter (where status=’C’) t3 from datasource group by 1 having t1!=-9223372036854775808 and t2!=-9223372036854775808 and t3!=-9223372036854775808 and t1>t2 and t2>t3)