-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy path6_inference_pipeline_ui.py
executable file
·127 lines (102 loc) · 4.61 KB
/
6_inference_pipeline_ui.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import streamlit as st
import hopsworks
from mimesis import Generic
from mimesis.locales import Locale
import pandas as pd
import random
# Function to print a styled header
def print_header(text, font_size=22):
res = f'<span style=" font-size: {font_size}px;">{text}</span>'
st.markdown(res, unsafe_allow_html=True)
# Function to retrieve and start model deployments
@st.cache_resource()
def get_deployments():
# Displaying a message indicating the process has started
st.write("🚀 Retrieving and Starting Deployments...")
# Logging into the Hopsworks project
project = hopsworks.login()
fs = project.get_feature_store()
interactions_fg = fs.get_feature_group(
name="interactions",
version=1,
)
# Getting the model serving instance from the project
ms = project.get_model_serving()
# Retrieving deployments for the query model and ranking model
query_model_deployment = ms.get_deployment("querydeployment")
ranking_deployment = ms.get_deployment("rankingdeployment")
# Starting the ranking deployment with a maximum waiting time of 180 seconds
ranking_deployment.start(await_running=180)
# Starting the query model deployment with a maximum waiting time of 180 seconds
query_model_deployment.start(await_running=180)
# Displaying a message indicating that deployments are ready
st.write('✅ Deployments are ready!')
# Returning deployment instances
return interactions_fg, ranking_deployment, query_model_deployment
def insert_interaction(user_id, video_id, interactions_fg):
generic = Generic(locale=Locale.EN)
interaction_id = generic.person.identifier(mask='####-##-####')
interaction_type = random.choices(
['like', 'dislike', 'view', 'comment', 'share', 'skip'],
weights=[1.5, 0.2, 3, 0.5, 0.8, 10], k=1
)[0]
watch_time = random.randint(1, 50)
interaction_df = pd.DataFrame({
'interaction_id': [interaction_id],
'interaction_type': [interaction_type],
'user_id': [user_id],
'video_id': [video_id],
'watch_time': [watch_time]
})
interactions_fg.insert(interaction_df)
# Define function to fetch recommendations
def fetch_recommendations(user_id, query_model_deployment):
st.write('🔮 Getting recommendations...')
deployment_input = {"instances": {"user_id": user_id}}
prediction = query_model_deployment.predict(deployment_input)['predictions']['ranking']
return prediction
# Function to insert interaction and fetch new recommendations
def handle_interaction(user_id, video_id, interactions_fg, query_model_deployment):
insert_interaction(user_id, video_id, interactions_fg)
return fetch_recommendations(user_id, query_model_deployment)
# Main Streamlit application logic
def main():
st.title('🎬 Video Recommender')
# Initialize or re-use existing deployments
if 'deployments_initialized' not in st.session_state:
st.session_state.interactions_fg, st.session_state.ranking_deployment, st.session_state.query_model_deployment = get_deployments()
st.session_state['deployments_initialized'] = True
# User selection box
user_id_option = st.selectbox(
'For which user?',
('CJ730Y', 'AL707G', 'WS920J', 'YP322C', 'HE156H',),
key='user_select'
)
# Initialize or refresh recommendations
if 'recommendations' not in st.session_state or 'refresh' in st.session_state:
recommendations = fetch_recommendations(user_id_option, st.session_state.query_model_deployment)
random.shuffle(recommendations)
st.session_state.recommendations = recommendations
st.session_state.pop('refresh', None)
print_header('📝 Top 3 Recommendations:')
displayed_recommendations = st.session_state.recommendations[:3]
for recommendation in displayed_recommendations:
video_id = recommendation[1]
if st.button(video_id, key=video_id):
new_recommendations = handle_interaction(
user_id_option,
video_id,
st.session_state.interactions_fg,
st.session_state.query_model_deployment,
)
random.shuffle(new_recommendations)
st.session_state.recommendations = new_recommendations
st.experimental_rerun()
if st.button("Stop Streamlit"):
st.write('⚙️ Stopping Deployments...')
st.session_state.ranking_deployment.stop()
st.session_state.query_model_deployment.stop()
st.success('✅ App finished successfully!')
st.stop()
if __name__ == '__main__':
main()