Skip to content

Commit

Permalink
Fix markdown parsing
Browse files Browse the repository at this point in the history
Closes #107
Closes #108

Co-authored-by: Ryuk <[email protected]>
  • Loading branch information
KurimuzonAkuma and anonymousx97 committed Feb 8, 2025
1 parent 165e57d commit 106babb
Showing 1 changed file with 95 additions and 22 deletions.
117 changes: 95 additions & 22 deletions pyrogram/parser/markdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import pyrogram
from pyrogram.enums import MessageEntityType

from . import utils
from .html import HTML

Expand All @@ -33,6 +34,8 @@
CODE_DELIM = "`"
PRE_DELIM = "```"
BLOCKQUOTE_DELIM = ">"
BLOCKQUOTE_EXPANDABLE_DELIM = "**>"
BLOCKQUOTE_EXPANDABLE_END_DELIM = "||"

MARKDOWN_RE = re.compile(r"({d})|(!?)\[(.+?)\]\((.+?)\)".format(
d="|".join(
Expand All @@ -53,43 +56,113 @@
OPENING_TAG = "<{}>"
CLOSING_TAG = "</{}>"
URL_MARKUP = '<a href="{}">{}</a>'
EMOJI_MARKUP = '<emoji id={}>{}</emoji>'
EMOJI_MARKUP = "<emoji id={}>{}</emoji>"
FIXED_WIDTH_DELIMS = [CODE_DELIM, PRE_DELIM]


class Markdown:
def __init__(self, client: Optional["pyrogram.Client"]):
self.html = HTML(client)

def _parse_blockquotes(self, text: str):
text = html.unescape(text)
lines = text.split('\n')
result = []
in_blockquote = False
current_blockquote = []
@staticmethod
def escape_and_create_quotes(text: str, strict: bool):
text_lines: list[str | None] = text.splitlines()

# Indexes of Already escaped lines
html_escaped_list: list[int] = []

# Temporary Queue to hold lines to be quoted
to_quote_list: list[tuple[int, str]] = []

def create_blockquote(expandable: bool = False) -> None:
"""
Merges all lines in quote_queue into first line of queue
Encloses that line in html quote
Replaces rest of the lines with None placeholders to preserve indexes
"""
if len(to_quote_list) == 0:
return

joined_lines = "\n".join([i[1] for i in to_quote_list])

first_line_index, _ = to_quote_list[0]
text_lines[first_line_index] = (
f"<blockquote{' expandable' if expandable else ''}>{joined_lines}</blockquote>"
)

for line_to_remove in to_quote_list[1:]:
text_lines[line_to_remove[0]] = None

to_quote_list.clear()

# Handle Expandable Quote
inside_blockquote = False
for index, line in enumerate(text_lines):
if line.startswith(BLOCKQUOTE_EXPANDABLE_DELIM) and not inside_blockquote:
delim_stripped_line = line[len(BLOCKQUOTE_EXPANDABLE_DELIM) + (1 if line.startswith(f"{BLOCKQUOTE_EXPANDABLE_DELIM} ") else 0) :]
parsed_line = (
html.escape(delim_stripped_line) if strict else delim_stripped_line
)

to_quote_list.append((index, parsed_line))
html_escaped_list.append(index)

inside_blockquote = True
continue

elif line.endswith(BLOCKQUOTE_EXPANDABLE_END_DELIM) and inside_blockquote:
if line.startswith(BLOCKQUOTE_DELIM):
line = line[len(BLOCKQUOTE_DELIM) + (1 if line.startswith(f"{BLOCKQUOTE_DELIM} ") else 0) :]

delim_stripped_line = line[:-len(BLOCKQUOTE_EXPANDABLE_END_DELIM)]

parsed_line = (
html.escape(delim_stripped_line) if strict else delim_stripped_line
)

to_quote_list.append((index, parsed_line))
html_escaped_list.append(index)

inside_blockquote = False

create_blockquote(expandable=True)

if inside_blockquote:
parsed_line = line[len(BLOCKQUOTE_DELIM) + (1 if line.startswith(f"{BLOCKQUOTE_DELIM} ") else 0) :]
parsed_line = html.escape(parsed_line) if strict else parsed_line
to_quote_list.append((index, parsed_line))
html_escaped_list.append(index)

# Handle Single line/Continued Quote
for index, line in enumerate(text_lines):
if line is None:
continue

for line in lines:
if line.startswith(BLOCKQUOTE_DELIM):
in_blockquote = True
current_blockquote.append(line[1:].strip())
else:
if in_blockquote:
in_blockquote = False
result.append(OPENING_TAG.format("blockquote") + '\n'.join(current_blockquote) + CLOSING_TAG.format("blockquote"))
current_blockquote = []
result.append(line)
delim_stripped_line = line[len(BLOCKQUOTE_DELIM) + (1 if line.startswith(f"{BLOCKQUOTE_DELIM} ") else 0) :]
parsed_line = (
html.escape(delim_stripped_line) if strict else delim_stripped_line
)

if in_blockquote:
result.append(OPENING_TAG.format("blockquote") + '\n'.join(current_blockquote) + CLOSING_TAG.format("blockquote"))
to_quote_list.append((index, parsed_line))
html_escaped_list.append(index)

return '\n'.join(result)
elif len(to_quote_list) > 0:
create_blockquote()
else:
create_blockquote()

async def parse(self, text: str, strict: bool = False):
if strict:
text = html.escape(text)
for idx, line in enumerate(text_lines):
if idx not in html_escaped_list:
text_lines[idx] = html.escape(line)

text = self._parse_blockquotes(text)
return "\n".join(
[valid_line for valid_line in text_lines if valid_line is not None]
)

async def parse(self, text: str, strict: bool = False):
text = self.escape_and_create_quotes(text, strict=strict)
delims = set()
is_fixed_width = False

Expand Down

0 comments on commit 106babb

Please sign in to comment.