Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
YouXam committed Jul 17, 2024
2 parents 8233d77 + 16ef55c commit 35042e1
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 26 deletions.
22 changes: 14 additions & 8 deletions WorkerNodePython/WorkerNode/ApiMessageProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@


class ApiMessageProcessor:
def __init__(self,msg,apiURL,apiKey,model):
def __init__(self,msg,apiURL,apiKey,model,errorMode,errorData):
self.msg = msg
self.apiURL = apiURL
self.apiKey = apiKey
self.encoding = tiktoken.encoding_for_model(model)
self.destination = None
self.errorMode = errorMode
self.errorData = errorData
def process(self):
try:
jsonMsg = json.loads(self.msg.data().decode('utf-8'))
Expand All @@ -24,6 +26,9 @@ def process(self):
try:
if(stream == False):
self.destination = endPoint
if self.errorMode:
self.sendError(self.destination,requestID,self.errorData)
return None
response = self.sendHttpRequest(data)
print('received response: {}'.format(response))
self.sendHttpResponse(response,requestID,endPoint)
Expand All @@ -38,7 +43,12 @@ def process(self):
}
return result
else:
tokens = self.sendStreamRequest(data,requestID,endPoint)
ws = websocket.create_connection(endPoint.replace('http://','ws://'))
self.destination = ws
if self.errorMode:
self.sendError(self.destination,requestID,self.errorData)
return None
tokens = self.sendStreamRequest(data,requestID,ws)
usage = {
"prompt_tokens": tokens[0],
"completion_tokens": tokens[1],
Expand All @@ -51,8 +61,7 @@ def process(self):
return result

except requests.exceptions.HTTPError as e:
error_detail = e.response.json()
# TODO: 完成返回错误
error_detail = e.response.json()
self.sendError(self.destination,requestID,error_detail)
raise e
except Exception as e:
Expand All @@ -78,7 +87,7 @@ def sendHttpResponse(self,response,request_id,endpoint):
}
requests.post(endpoint,data=json.dumps(newResponse),headers=headers,verify=False)

def sendStreamRequest(self,data,request_id,endpoint):
def sendStreamRequest(self,data,request_id,ws):
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer {}'.format(self.apiKey)
Expand All @@ -89,9 +98,6 @@ def event_stream():
yield chunk
client = SSEClient(event_stream())

ws = websocket.create_connection(endpoint.replace('http://','ws://'))
self.destination = ws

response = requests.post(self.apiURL,data=json.dumps(data),headers=headers,verify=False)

if response.status_code >= 200:
Expand Down
86 changes: 68 additions & 18 deletions WorkerNodePython/WorkerNode/WorkerNode.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,23 @@
stopEvent = threading.Event()
startTime = 0
sockets = dict()
condition = threading.Condition()
activeThreads = 0
errorMode = False
errorData = dict()

def init():
global nodeType,pulsarURL,serviceTopicName,pulsarToken,topicName
global maxProcessNum,apiURL,apiKey,queue,map,model,debug,AIModelName,AIModelNamespace
nodeType = os.getenv('NODE_TYPE','local');
nodeType = os.getenv('NODE_TYPE','api');
pulsarURL = os.getenv('PULSAR_URL',"pulsar://localhost:6650");
maxProcessNum = int(os.getenv('MAX_PROCESS_NUM','128'));
apiURL = os.getenv('API_URL',"http://localhost:8080/v1/chat/completions");
apiKey = os.getenv('API_KEY',"sk-no-key-required");
if nodeType == 'local':
apiURL = 'http://localhost:8080/v1/chat/completions'
apiKey = 'sk-no-key-required'
else:
apiURL = os.getenv('API_URL',"https://api.openai-hk.com/v1/chat/completions");
apiKey = os.getenv('API_KEY',"");
model = os.getenv('MODEL_NAME','gpt-3.5-turbo')
serviceTopicName = os.getenv('RES_TOPIC_NAME','res-topic')
debug = bool(os.getenv('DEBUG','false'))
Expand Down Expand Up @@ -78,14 +86,27 @@ def createConsumer(url):
exit(1)

def run():
global queue,errorMode,errorData
if nodeType == 'local':
localInit()
if not model == 'LLaMA_CPP':
errorMode = True
errorData = {
"error": {
"message": "The model \'{}\' does not exist or you do not have access to it.".format(model),
"type": "invalid_request_error",
"param": None,
"code": "model_not_found"
}
}
Event.createEvent(apiInstance,AIModelNamespace,AIModelName,'ConfigurationError','unspported model ' + model)
else:
localInit()

global pulsarClient,consumer

pulsarClient,consumer = createConsumer(pulsarURL)

processors = list()
processors = []

for i in range(maxProcessNum):
processor = Processor('Thread-' + str(i),consumer)
Expand All @@ -94,9 +115,14 @@ def run():

try:
while True:
with condition:
while activeThreads >= maxProcessNum:
condition.wait()
msg = consumer.receive()
#consumer.acknowledge(msg)
print('received message: {}'.format(msg.data())) # debug
queue.put(msg,True)

except KeyboardInterrupt:
print('Stopping consumer...')
finally:
Expand All @@ -111,7 +137,7 @@ def __init__(self,name,consumer):
self.consumer = consumer
self.producer = pulsarClient.create_producer(serviceTopicName)
def run(self):
global startTime
global startTime,condition,activeThreads,queue,errorMode,errorData
while True:
msg = None
if stopEvent.is_set() and time.time() > startTime:
Expand All @@ -120,45 +146,69 @@ def run(self):
elif stopEvent.is_set():
time.sleep(5)
continue
msg = queue.get(True)
with condition:
activeThreads += 1

try:
msg = queue.get(True)
print('{} take message: {}'.format(self.name,msg.data())) #debug
if nodeType == 'api':
amp = ApiMessageProcessor(msg,apiURL,apiKey,model)
amp = ApiMessageProcessor(msg,apiURL,apiKey,model,errorMode,errorData)
else:
amp = ApiMessageProcessor(msg,apiURL,apiKey,'gpt-3.5-turbo')
amp = ApiMessageProcessor(msg,apiURL,apiKey,'gpt-3.5-turbo',errorMode,errorData)
result = amp.process()
self.sendResult(result)
if result != None:
self.sendResult(result)
self.consumer.acknowledge(msg)
except Empty:
continue
except json.JSONDecodeError as e:
Event.createEvent(apiInstance,AIModelName,AIModelNamespace,'GeneralError','Error decoding JSON')
if msg:
self.consumer.negative_acknowledge(msg)
self.consumer.acknowledge(msg)
except requests.exceptions.HTTPError as e:
# 处理 HTTP 错误
if e.response.status_code == 401:
error_detail = e.response.json()
Event.createEvent(apiInstance,AIModelName,AIModelNamespace,'AuthenticationError',json.dumps(error_detail))
errorMode = True
errorData = error_detail
Event.createEvent(apiInstance,AIModelName,AIModelNamespace,'AuthenticationError',error_detail['error']['message'])
elif e.response.status_code == 429:
error_detail = e.response.json()
errorMode = True
errorData = error_detail
if 'Rate limit reached for requests' in error_detail['error']['message']:
if not stopEvent.isSet():
stopEvent.set()
startTime = time.time() + 60
else:
Event.createEvent(apiInstance,AIModelName,AIModelNamespace,'APIQuotaExceededError',json.dumps(error_detail))
Event.createEvent(apiInstance,AIModelName,AIModelNamespace,'APIQuotaExceededError',error_detail['error']['message'])
elif e.response.status_code == 404:
error_detail = e.response.json()
errorMode = True
errorData = error_detail
if 'model' in error_detail['error']['message']:
Event.createEvent(apiInstance,AIModelName,AIModelNamespace,'ConfigurationError',error_detail['error']['message'])
else:
Event.createEvent(apiInstance,AIModelName,AIModelNamespace,'GeneralError',error_detail['error']['message'])
else:
error_detail = e.response.json()
Event.createEvent(apiInstance,AIModelName,AIModelNamespace,'GeneralError',json.dumps(error_detail))
errorMode = True
errorData = error_detail
print('error status code: {}'.format(e.response.status_code))
Event.createEvent(apiInstance,AIModelName,AIModelNamespace,'GeneralError',error_detail['error']['message'])
if msg:
self.consumer.negative_acknowledge(msg)
self.consumer.acknowledge(msg)

except Exception as e:
Event.createEvent(apiInstance,AIModelName,AIModelNamespace,'GeneralError',e.__str__())
print(e.__str__())
if msg:
self.consumer.negative_acknowledge(msg)
self.consumer.acknowledge(msg)

finally:
queue.task_done()
with condition:
activeThreads -= 1
condition.notify()

def sendResult(self,result):
result['model'] = model
Expand Down

0 comments on commit 35042e1

Please sign in to comment.