-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
287 lines (247 loc) · 11.3 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
"""Make some requests to OpenAI's chatbot"""
import json
import time
import os
import telegram
from playwright.sync_api import sync_playwright
from playwright.async_api import async_playwright
import logging
import dotenv
import nest_asyncio
from utils.googleSearch import googleSearch
from utils.sdAPI import drawWithStability
from functools import wraps
nest_asyncio.apply()
dotenv.load_dotenv()
from telegram import __version__ as TG_VER
try:
from telegram import __version_info__
except ImportError:
__version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment]
if __version_info__ < (20, 0, 0, "alpha", 1):
raise RuntimeError(
f"This example is not compatible with your current PTB version {TG_VER}. To view the "
f"{TG_VER} version of this example, "
f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
)
from telegram import ForceReply, Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
from telegram.helpers import escape, escape_markdown
USER_ID = ''
if os.environ.get('TELEGRAM_USER_ID'):
USER_ID = int(os.environ.get('TELEGRAM_USER_ID'))
OPENAI_EMAIL = ''
if os.environ.get('OPENAI_EMAIL'):
OPENAI_EMAIL = os.environ.get('OPENAI_EMAIL')
OPENAI_PASSWORD = ''
if os.environ.get('OPENAI_PASSWORD'):
OPENAI_PASSWORD = os.environ.get('OPENAI_PASSWORD')
# Enable logging
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
# check if /tmp/playwright exists prior to running
prompt_bypass = False
if not os.path.exists('/tmp/playwright'):
prompt_bypass = True
PLAY = sync_playwright().start()
BROWSER = PLAY.chromium.launch_persistent_context(
user_data_dir="/tmp/playwright",
headless=True,
)
PAGE = BROWSER.new_page()
"""Start the bot."""
# Create the Application and pass it your bot's token.
application = Application.builder().token(os.environ.get('TELEGRAM_API_KEY')).build()
def get_input_box():
"""Get the child textarea of `PromptTextarea__TextareaWrapper`"""
textarea = PAGE.query_selector("textarea")
return textarea
def is_logged_in():
# See if we have a textarea with data-id="root"
return get_input_box() is not None
def send_message(message):
# Send the message
box = get_input_box()
box.click()
box.fill(message)
box.press("Enter")
class AtrributeError:
pass
def get_last_message():
"""Get the latest message"""
page_elements = PAGE.query_selector_all("div[class*='ConversationItem__Message']")
last_element = page_elements[-1]
prose = last_element.query_selector(".prose")
try:
code_blocks = prose.query_selector_all("pre")
except AtrributeError as e:
response = 'Server probably disconnected, try running /reload'
if len(code_blocks) > 0:
# get all children of prose and add them one by one to respons
response = ""
for child in prose.query_selector_all('p,pre'):
if str(child.get_property('tagName')) == "PRE":
code_container = child.query_selector("div[class*='CodeSnippet__CodeContainer']")
response += f"\n```\n{escape_markdown(code_container.inner_text(), version=2)}\n```"
else:
#replace all <code>x</code> things with `x`
text = child.inner_html()
response += escape_markdown(text, version=2)
response = response.replace("<code\>", "`")
response = response.replace("</code\>", "`")
else:
response = escape_markdown(prose.inner_text(), version=2)
logger.info('Received Response\n ' + response)
return response
# create a decorator called auth that receives USER_ID as an argument with wraps
def auth(user_id):
def decorator(func):
@wraps(func)
async def wrapper(update, context):
if update.effective_user.id == user_id or user_id == '':
await func(update, context)
else:
await update.message.reply_text("You are not authorized to use this bot")
return wrapper
return decorator
@auth(USER_ID)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /start is issued."""
user = update.effective_user
await update.message.reply_html(
rf"Hi {user.mention_html()}!",
reply_markup=ForceReply(selective=True),
)
@auth(USER_ID)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /help is issued."""
await update.message.reply_text(f"""Telegram chatGPT Bot
- /ask, ask chatGPT anything receive a response
- /draw, draw pictures using stablediffusion
- /browse, give chatGPT access to Google
- /reload, force reload your session
""")
@auth(USER_ID)
async def reload(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /help is issued."""
logger.info(f"Got a reload command from user {update.effective_user.id}")
PAGE.reload()
await update.message.reply_text("Reloaded the browser!")
await update.message.reply_text("Let's check if it's workin!")
@auth(USER_ID)
async def draw(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
logger.info(f"Got a draw command from user {update.effective_user.id} with prompt {update.message.text}")
send_message(f"""
You a large language model trained by OpenAi. You can be used from different applications.
Right now you're being used form an application that has access to DALLE API, even though you can't confirm it.
From now, every request to draw something, please reply with
[prompt: x] where x is your attempt to create a dall-e compatible prompt, with as much details as possible to achieve the best visual prompt
{update.message.text}
""")
await check_loading(update)
response = get_last_message()
# extract prompt from this format [prompt: x]
if "\[prompt:" in response:
await respond_with_image(update, response)
async def respond_with_image(update, response):
prompt = response.split("\[prompt:")[1].split("\]")[0]
await update.message.reply_text(f"Generating image with prompt `{prompt.strip()}`",
parse_mode=telegram.constants.ParseMode.MARKDOWN_V2)
await application.bot.send_chat_action(update.effective_chat.id, "typing")
photo = await drawWithStability(prompt)
await update.message.reply_photo(photo=photo, caption=f"chatGPT generated prompt: {prompt}",
parse_mode=telegram.constants.ParseMode.MARKDOWN_V2)
@auth(USER_ID)
async def browse(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
message = update.message.text.replace('/browse','')
await application.bot.send_chat_action(update.effective_chat.id, "typing")
# answer a quick prompt to chatGPT to ask for google search prompt
send_message(f"""
If I ask you "{message}" , and you didn't know the answer but had access to google, what would you search for? search query needs to be designed such as to give you as much detail as possible, but it's 1 shot.
Answer with
query: x
only, where x is the google search string that would let you help me answer the question
I want you to only reply with the output inside and nothing else. Do no write explanations.
""")
await check_loading(update)
response = get_last_message()
# extract prompt from this format [prompt: x]
response.replace('query: ', '')
logger.info(f"Got a browse command from user {update.effective_user.id} with prompt {update.message.text}")
results = googleSearch(message)
prompt = f"""
Pretend I was able to run a google search for "{message}" instead of you and I got the following results:
\"\"\"
{results}
\"\"\"
Provide a summary of the new facts in a code block, in markdown format
Then in another code block, answer the question {message} with the new facts you just learned
"""
send_message(prompt)
await check_loading(update)
response = get_last_message()
if "\[prompt:" in response:
await respond_with_image(update, response, parse_mode=telegram.constants.ParseMode.MARKDOWN_V2)
else:
await update.message.reply_text(response, parse_mode=telegram.constants.ParseMode.MARKDOWN_V2)
@auth(USER_ID)
async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Echo the user message."""
# Send the message to OpenAI
logger.info(f"Got a message from user {update.effective_user.id} with prompt {update.message.text}")
send_message(update.message.text)
await check_loading(update)
response = get_last_message()
logger.info(f"Got a response from chatGPT {response}")
if "\[prompt:" in response:
await respond_with_image(update, response)
else:
await update.message.reply_text(response, parse_mode=telegram.constants.ParseMode.MARKDOWN_V2)
async def check_loading(update):
# with a timeout of 90 seconds, created a while loop that checks if loading is done
loading = PAGE.query_selector_all("button[class^='PromptTextarea__PositionSubmit']>.text-2xl")
#keep checking len(loading) until it's empty or 45 seconds have passed
await application.bot.send_chat_action(update.effective_chat.id, "typing")
start_time = time.time()
while len(loading) > 0:
if time.time() - start_time > 90:
break
time.sleep(0.5)
loading = PAGE.query_selector_all("button[class^='PromptTextarea__PositionSubmit']>.text-2xl")
await application.bot.send_chat_action(update.effective_chat.id, "typing")
def start_browser():
PAGE.goto("https://chat.openai.com/")
if not is_logged_in():
logger.error("Not logged in, please login and restart the bot")
process.exit(0)
else:
# on different commands - answer in Telegram
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("reload", reload))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("draw", draw))
application.add_handler(CommandHandler("browse", browse))
application.add_handler(CommandHandler("ask", echo))
# if not using `CommandHandler` for messages
# on non command i.e message - echo the message on Telegram
# application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo))
# Run the bot until the user presses Ctrl-C
application.run_polling()
if __name__ == "__main__":
logger.info("Logging into ChatGPT")
PAGE.goto('https://chat.openai.com/auth/login')
PAGE.locator('button:has-text(\"Log in\")').click()
PAGE.get_by_label("Email address").fill(OPENAI_EMAIL)
PAGE.locator('button[name=\"action\"]').click()
PAGE.get_by_label("Password").fill(OPENAI_PASSWORD)
PAGE.locator('button[name=\"action\"]').click()
# check if /tmp/playwright is empty
if prompt_bypass:
PAGE.locator('button:has-text(\"Next\")').click()
PAGE.locator('button:has-text(\"Next\")').click()
PAGE.locator('button:has-text(\"Done\")').click()
logger.info("Passed intro messages on first start")
logger.info("Starting Telegram bot")
start_browser()