Anomaly detection using Druid and Imply pivot
One of the key requirements in machine learning is to monitor the inputs and predictions of the pipeline in real time. This need arises as sudden large disturbances in inputs and/or the outputs leads to disturbances in down stream application and activities that consuming predictions.
In this post we will look at a simple ML pipeline posting input and predictions to a kafka topic. Data from this topic will be continuously ingested into a druid cluster. We will use pycaret to extract data every 10s from this cluster and run an anomaly detection and push the results back to druid and then visualise the results in imply pivot.
Step -1 : machine learning pipeline
For the purpose of this post we can use a machine learning pipeline that continuously reads and makes predictions using the iris dataset in python.
The full python code is https://github.com/vnarayaj/druid-anomaly/blob/main/ml_mon_predict.py
This uses the following code snippet to define a random forest classifier and run it continuously to generate predictions and post it to kafka (into a topic called ml1)
Step -1 : machine learning pipeline
For the purpose of this post we can use a machine learning pipeline that continuously reads and makes predictions using the iris dataset in python.
The full python code is https://github.com/vnarayaj/druid-anomaly/blob/main/ml_mon_predict.py
This uses the following code snippet to define a random forest classifier and run it continuously to generate predictions and post it to kafka (into a topic called ml1)
X, y = load_iris(return_X_y=True)
rf = RandomForestClassifier()
def pipeline():
“””A dummy model that has a bunch of components that we can test.”””
r=random.randrange(1,5)
if r==4:
nest=100
else:
nest=10
model = Pipeline(
[
(“scaler”, StandardScaler()),
(“normal”, Normalizer()),
(
“union”,
FeatureUnion(
[
(“pca”, PCA(n_components=1)),
(“svd”, TruncatedSVD(n_components=2)),
],
n_jobs=1, # parallelized components won’t generate spans
),
),
(“class”, RandomForestClassifier(n_estimators=nest)),
]
)
X_train,y_train=load_iris(return_X_y=True)
model.fit(X_train, y_train)
return model
def random_input():
“””A random record from the feature set.”””
rows = X.shape[0]
random_row = np.random.choice(rows, size=1)
return X[random_row, :]
rf.fit(X, y)
rf.predict(X)
model = pipeline()
#instrumentor3 = SklearnInstrumentor(instrument=OpenTelemetrySpanner())
#instrumentor3.instrument_estimator(model)
j=1
while j:
r=random.randrange(0,100)
x_test = random_input()+1
#x_test[0,1]=x_test[0,1]+r
x1=x_test.tolist()
z=model.predict(x_test)
z1=z.tolist()
m={}
t=datetime.now()
y=t.strftime(“%Y-%m-%d %H:%M:%S.%f”)
m={“ts”:y,”col1":x1[0][0],”col2":x1[0][1],”col3":x1[0][2],”col4":x1[0][3],”predict”:z1[0]}
n=json.dumps(m)
producer.produce(n.encode(‘ascii’))
step 2 — ingest input and prediction into druid
Ingest the inputs and outputs from the ml pipeline into druid. This is done by creating a streaming ingestion into druid from kafka. The sample supervisor spec is https://github.com/vnarayaj/druid-anomaly/blob/main/ml_mon_predict.py. This ingests col1,col2,col3,col4 and predict into druid.
step 3- Anomaly detection in pycaret
In this step we setup an iforest anomaly detection model, train with sample data from a druid query and then run it continuously every 10 s to predict anomalies (the notebook is available at https://github.com/vnarayaj/druid-anomaly/blob/main/druid-anomaly.ipynb)
First create a connection to druid using sqlalchemy
engine2 = create_engine('druid+https://<user>:<password>@<host name>:<port>/druid/v2/sql/',
connect_args={"ssl_verify_cert": False})
Execute a query using the connection. This query extracts the last 10 minutes of data and trains an anomaly model
conn2=engine2.connect()
result2 = conn2.execute(text("select * from ml1 where __time > TIME_SHIFT(CURRENT_TIMESTAMP,'PT10S',-1)"))
df2=pd.DataFrame(result2.mappings())
setup apycaret anomaly detection
exp1=setup(df2,normalize = True,silent=True,session_id=123)
create and train iforest anomaly detection model
iforest = create_model('iforest')iforest_results = assign_model(iforest)
save and load the model
save_model(iforest, model_name=’iforest’
loaded_iforest = load_model(‘iforest’)
run the model continuously to get data from druid every 10 s, predict anomalies and post the data into kafka.
client = KafkaClient(hosts='localhost:9092')
topic = client.topics['anomaly']
producer = topic.get_sync_producer()
while true:
result3 = conn2.execute(text("select * from ml1 where __time > TIME_SHIFT(CURRENT_TIMESTAMP,'PT10S',-1)"))
df3=pd.DataFrame(result3.mappings())
pred_new = predict_model(loaded_iforest , data=df3)
print(pred_new.head())
for i in range(len(pred_new)): p=pred_new.iloc[i].to_json()
print(p)
producer.produce(p.encode('ascii'))
#n=json.dumps(pred_new)
time.sleep(10)
step 4- visualize the output imply pivot
Ingest the data from kafka into druid and create a data cube in pivot (https://docs.imply.io/2021.08/managing-data-cubes/)
The screen shot below shows a scenario where an anomaly is introduced into the model input ans is detected as the anomaly score starts increasing.