Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
xfgryujk committed Apr 24, 2021
2 parents b689c9d + f64a087 commit 659fddc
Show file tree
Hide file tree
Showing 37 changed files with 2,831 additions and 1,217 deletions.
2 changes: 1 addition & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ README.md

# runtime data
data/*
!data/config.ini
!data/config.example.ini
log/*
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,6 @@ venv.bak/


.idea/
data/database.db
*.log*
data/*
!data/config.example.ini
log/*
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# 运行时
FROM python:3.6.8-slim-stretch
FROM python:3.7.10-slim-stretch
RUN mv /etc/apt/sources.list /etc/apt/sources.list.bak \
&& echo "deb http://mirrors.tuna.tsinghua.edu.cn/debian/ stretch main contrib non-free">>/etc/apt/sources.list \
&& echo "deb http://mirrors.tuna.tsinghua.edu.cn/debian/ stretch-updates main contrib non-free">>/etc/apt/sources.list \
Expand Down
78 changes: 52 additions & 26 deletions api/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Command(enum.IntEnum):
UPDATE_TRANSLATION = 7


_http_session = aiohttp.ClientSession()
_http_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))

room_manager: Optional['RoomManager'] = None

Expand All @@ -43,6 +43,8 @@ def init():


class Room(blivedm.BLiveClient):
HEARTBEAT_INTERVAL = 10

# 重新定义parse_XXX是为了减少对字段名的依赖,防止B站改字段名
def __parse_danmaku(self, command):
info = command['info']
Expand Down Expand Up @@ -97,7 +99,7 @@ def __parse_super_chat(self, command):
}

def __init__(self, room_id):
super().__init__(room_id, session=_http_session, heartbeat_interval=10)
super().__init__(room_id, session=_http_session, heartbeat_interval=self.HEARTBEAT_INTERVAL)
self.clients: List['ChatHandler'] = []
self.auto_translate_count = 0

Expand Down Expand Up @@ -365,34 +367,68 @@ def _del_room(self, room_id):

# noinspection PyAbstractClass
class ChatHandler(tornado.websocket.WebSocketHandler):
HEARTBEAT_INTERVAL = 10
RECEIVE_TIMEOUT = HEARTBEAT_INTERVAL + 5

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._close_on_timeout_future = None
self._heartbeat_timer_handle = None
self._receive_timeout_timer_handle = None

self.room_id = None
self.auto_translate = False

def open(self):
logger.info('Websocket connected %s', self.request.remote_ip)
self._close_on_timeout_future = asyncio.ensure_future(self._close_on_timeout())
self._heartbeat_timer_handle = asyncio.get_event_loop().call_later(
self.HEARTBEAT_INTERVAL, self._on_send_heartbeat
)
self._refresh_receive_timeout_timer()

async def _close_on_timeout(self):
try:
# 超过一定时间还没加入房间则断开
await asyncio.sleep(10)
logger.warning('Client %s joining room timed out', self.request.remote_ip)
self.close()
except (asyncio.CancelledError, tornado.websocket.WebSocketClosedError):
pass
def _on_send_heartbeat(self):
self.send_message(Command.HEARTBEAT, {})
self._heartbeat_timer_handle = asyncio.get_event_loop().call_later(
self.HEARTBEAT_INTERVAL, self._on_send_heartbeat
)

def _refresh_receive_timeout_timer(self):
if self._receive_timeout_timer_handle is not None:
self._receive_timeout_timer_handle.cancel()
self._receive_timeout_timer_handle = asyncio.get_event_loop().call_later(
self.RECEIVE_TIMEOUT, self._on_receive_timeout
)

def _on_receive_timeout(self):
logger.warning('Client %s timed out', self.request.remote_ip)
self._receive_timeout_timer_handle = None
self.close()

def on_close(self):
logger.info('Websocket disconnected %s room: %s', self.request.remote_ip, str(self.room_id))
if self.has_joined_room:
room_manager.del_client(self.room_id, self)
if self._heartbeat_timer_handle is not None:
self._heartbeat_timer_handle.cancel()
self._heartbeat_timer_handle = None
if self._receive_timeout_timer_handle is not None:
self._receive_timeout_timer_handle.cancel()
self._receive_timeout_timer_handle = None

def on_message(self, message):
try:
# 超时没有加入房间也断开
if self.has_joined_room:
self._refresh_receive_timeout_timer()

body = json.loads(message)
cmd = body['cmd']
if cmd == Command.HEARTBEAT:
return
pass
elif cmd == Command.JOIN_ROOM:
if self.has_joined_room:
return
self._refresh_receive_timeout_timer()

self.room_id = int(body['data']['roomId'])
logger.info('Client %s is joining room %d', self.request.remote_ip, self.room_id)
try:
Expand All @@ -402,21 +438,11 @@ def on_message(self, message):
pass

asyncio.ensure_future(room_manager.add_client(self.room_id, self))
self._close_on_timeout_future.cancel()
self._close_on_timeout_future = None
else:
logger.warning('Unknown cmd, client: %s, cmd: %d, body: %s', self.request.remote_ip, cmd, body)
except Exception:
logger.exception('on_message error, client: %s, message: %s', self.request.remote_ip, message)

def on_close(self):
logger.info('Websocket disconnected %s room: %s', self.request.remote_ip, str(self.room_id))
if self.has_joined_room:
room_manager.del_client(self.room_id, self)
if self._close_on_timeout_future is not None:
self._close_on_timeout_future.cancel()
self._close_on_timeout_future = None

# 跨域测试用
def check_origin(self, origin):
if self.application.settings['debug']:
Expand All @@ -432,7 +458,7 @@ def send_message(self, cmd, data):
try:
self.write_message(body)
except tornado.websocket.WebSocketClosedError:
self.on_close()
self.close()

async def on_join_room(self):
if self.application.settings['debug']:
Expand Down Expand Up @@ -550,7 +576,7 @@ async def _get_room_info(room_id):
res.status, res.reason)
return room_id, 0
data = await res.json()
except aiohttp.ClientConnectionError:
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
logger.exception('room %d _get_room_info failed', room_id)
return room_id, 0

Expand All @@ -574,7 +600,7 @@ async def _get_server_host_list(cls, _room_id):
# res.status, res.reason)
# return cls._host_server_list_cache
# data = await res.json()
# except aiohttp.ClientConnectionError:
# except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
# logger.exception('room %d _get_server_host_list failed', room_id)
# return cls._host_server_list_cache
#
Expand Down
2 changes: 1 addition & 1 deletion blivedm
Submodule blivedm updated 4 files
+6 −1 README.md
+29 −37 blivedm.py
+1 −1 requirements.txt
+2 −1 sample.py
102 changes: 84 additions & 18 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

logger = logging.getLogger(__name__)

CONFIG_PATH = os.path.join('data', 'config.ini')
CONFIG_PATH_LIST = [
os.path.join('data', 'config.ini'),
os.path.join('data', 'config.example.ini')
]

_config: Optional['AppConfig'] = None

Expand All @@ -21,8 +24,16 @@ def init():


def reload():
config_path = ''
for path in CONFIG_PATH_LIST:
if os.path.exists(path):
config_path = path
break
if config_path == '':
return False

config = AppConfig()
if not config.load(CONFIG_PATH):
if not config.load(config_path):
return False
global _config
_config = config
Expand All @@ -36,31 +47,86 @@ def get_config():
class AppConfig:
def __init__(self):
self.database_url = 'sqlite:///data/database.db'
self.enable_translate = True
self.allow_translate_rooms = {}
self.tornado_xheaders = False
self.loader_url = ''

self.fetch_avatar_interval = 3.5
self.fetch_avatar_max_queue_size = 2
self.avatar_cache_size = 50000

self.enable_translate = True
self.allow_translate_rooms = set()
self.translation_cache_size = 50000
self.translator_configs = []

def load(self, path):
try:
config = configparser.ConfigParser()
config.read(path, 'utf-8')

app_section = config['app']
self.database_url = app_section['database_url']
self.enable_translate = app_section.getboolean('enable_translate')
self._load_app_config(config)
self._load_translator_configs(config)
except Exception:
logger.exception('Failed to load config:')
return False
return True

allow_translate_rooms = app_section['allow_translate_rooms']
if allow_translate_rooms == '':
self.allow_translate_rooms = {}
def _load_app_config(self, config):
app_section = config['app']
self.database_url = app_section['database_url']
self.tornado_xheaders = app_section.getboolean('tornado_xheaders')
self.loader_url = app_section['loader_url']

self.fetch_avatar_interval = app_section.getfloat('fetch_avatar_interval')
self.fetch_avatar_max_queue_size = app_section.getint('fetch_avatar_max_queue_size')
self.avatar_cache_size = app_section.getint('avatar_cache_size')

self.enable_translate = app_section.getboolean('enable_translate')
self.allow_translate_rooms = _str_to_list(app_section['allow_translate_rooms'], int, set)
self.translation_cache_size = app_section.getint('translation_cache_size')

def _load_translator_configs(self, config):
app_section = config['app']
section_names = _str_to_list(app_section['translator_configs'])
translator_configs = []
for section_name in section_names:
section = config[section_name]
type_ = section['type']

translator_config = {
'type': type_,
'query_interval': section.getfloat('query_interval'),
'max_queue_size': section.getint('max_queue_size')
}
if type_ == 'TencentTranslateFree':
translator_config['source_language'] = section['source_language']
translator_config['target_language'] = section['target_language']
elif type_ == 'BilibiliTranslateFree':
pass
elif type_ == 'TencentTranslate':
translator_config['source_language'] = section['source_language']
translator_config['target_language'] = section['target_language']
translator_config['secret_id'] = section['secret_id']
translator_config['secret_key'] = section['secret_key']
translator_config['region'] = section['region']
elif type_ == 'BaiduTranslate':
translator_config['source_language'] = section['source_language']
translator_config['target_language'] = section['target_language']
translator_config['app_id'] = section['app_id']
translator_config['secret'] = section['secret']
else:
allow_translate_rooms = allow_translate_rooms.split(',')
self.allow_translate_rooms = set(map(lambda id_: int(id_.strip()), allow_translate_rooms))
raise ValueError(f'Invalid translator type: {type_}')

self.tornado_xheaders = app_section.getboolean('tornado_xheaders')
self.loader_url = app_section['loader_url']
translator_configs.append(translator_config)
self.translator_configs = translator_configs

except (KeyError, ValueError):
logger.exception('Failed to load config:')
return False
return True

def _str_to_list(value, item_type: Type=str, container_type: Type=list):
value = value.strip()
if value == '':
return container_type()
items = value.split(',')
items = map(lambda item: item.strip(), items)
if item_type is not str:
items = map(lambda item: item_type(item), items)
return container_type(items)
Loading

0 comments on commit 659fddc

Please sign in to comment.