Anomaly detection using Druid and Imply pivot

One of the key requirements in machine learning is to monitor the inputs and predictions of the pipeline in real time. This need arises as sudden large disturbances in inputs and/or the outputs leads to disturbances in down stream application and activities that consuming predictions.

In this post we will look at a simple ML pipeline posting input and predictions to a kafka topic. Data from this topic will be continuously ingested into a druid cluster. We will use pycaret to extract data every 10s from this cluster and run an anomaly detection and push the results back to druid and then visualise the results in imply pivot.

For the purpose of this post we can use a machine learning pipeline that continuously reads and makes predictions using the iris dataset in python.

The full python code is https://github.com/vnarayaj/druid-anomaly/blob/main/ml_mon_predict.py

This uses the following code snippet to define a random forest classifier and run it continuously to generate predictions and post it to kafka (into a topic called ml1)

For the purpose of this post we can use a machine learning pipeline that continuously reads and makes predictions using the iris dataset in python.

The full python code is https://github.com/vnarayaj/druid-anomaly/blob/main/ml_mon_predict.py

This uses the following code snippet to define a random forest classifier and run it continuously to generate predictions and post it to kafka (into a topic called ml1)

X, y = load_iris(return_X_y=True)

rf = RandomForestClassifier()

def pipeline():

“””A dummy model that has a bunch of components that we can test.”””

r=random.randrange(1,5)

if r==4:

nest=100

else:

nest=10

model = Pipeline(

[

(“scaler”, StandardScaler()),

(“normal”, Normalizer()),

(

“union”,

FeatureUnion(

[

(“pca”, PCA(n_components=1)),

(“svd”, TruncatedSVD(n_components=2)),

],

n_jobs=1, # parallelized components won’t generate spans

),

),

(“class”, RandomForestClassifier(n_estimators=nest)),

]

)

X_train,y_train=load_iris(return_X_y=True)

model.fit(X_train, y_train)

return model

def random_input():

“””A random record from the feature set.”””

rows = X.shape[0]

random_row = np.random.choice(rows, size=1)

return X[random_row, :]

rf.fit(X, y)

rf.predict(X)

model = pipeline()

#instrumentor3 = SklearnInstrumentor(instrument=OpenTelemetrySpanner())

#instrumentor3.instrument_estimator(model)

j=1

while j:

r=random.randrange(0,100)

x_test = random_input()+1

#x_test[0,1]=x_test[0,1]+r

x1=x_test.tolist()

z=model.predict(x_test)

z1=z.tolist()

m={}

t=datetime.now()

y=t.strftime(“%Y-%m-%d %H:%M:%S.%f”)

m={“ts”:y,”col1":x1[0][0],”col2":x1[0][1],”col3":x1[0][2],”col4":x1[0][3],”predict”:z1[0]}

n=json.dumps(m)

producer.produce(n.encode(‘ascii’))

Ingest the inputs and outputs from the ml pipeline into druid. This is done by creating a streaming ingestion into druid from kafka. The sample supervisor spec is https://github.com/vnarayaj/druid-anomaly/blob/main/ml_mon_predict.py. This ingests col1,col2,col3,col4 and predict into druid.

In this step we setup an iforest anomaly detection model, train with sample data from a druid query and then run it continuously every 10 s to predict anomalies (the notebook is available at https://github.com/vnarayaj/druid-anomaly/blob/main/druid-anomaly.ipynb)

First create a connection to druid using sqlalchemy

engine2 = create_engine('druid+https://<user>:<password>@<host name>:<port>/druid/v2/sql/',
connect_args={"ssl_verify_cert": False})

Execute a query using the connection. This query extracts the last 10 minutes of data and trains an anomaly model

conn2=engine2.connect()
result2 = conn2.execute(text("select * from ml1 where __time > TIME_SHIFT(CURRENT_TIMESTAMP,'PT10S',-1)"))
df2=pd.DataFrame(result2.mappings())

setup apycaret anomaly detection

exp1=setup(df2,normalize = True,silent=True,session_id=123)

create and train iforest anomaly detection model

iforest = create_model('iforest')iforest_results = assign_model(iforest)

save and load the model

save_model(iforest, model_name=’iforest’

loaded_iforest = load_model(‘iforest’)

run the model continuously to get data from druid every 10 s, predict anomalies and post the data into kafka.

client = KafkaClient(hosts='localhost:9092')
topic = client.topics['anomaly']
producer = topic.get_sync_producer()
while true:
result3 = conn2.execute(text("select * from ml1 where __time > TIME_SHIFT(CURRENT_TIMESTAMP,'PT10S',-1)"))
df3=pd.DataFrame(result3.mappings())
pred_new = predict_model(loaded_iforest , data=df3)
print(pred_new.head())
for i in range(len(pred_new)):
p=pred_new.iloc[i].to_json()
print(p)
producer.produce(p.encode('ascii'))
#n=json.dumps(pred_new)
time.sleep(10)

Ingest the data from kafka into druid and create a data cube in pivot (https://docs.imply.io/2021.08/managing-data-cubes/)

The screen shot below shows a scenario where an anomaly is introduced into the model input ans is detected as the anomaly score starts increasing.

Currently working as lead solution engineer at Imply. Been in the data space for about 20 years at Cloudera, Informatica and other companies