-
Notifications
You must be signed in to change notification settings - Fork 44
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #33 from rmusser01/main
Ollama fix(?)
- Loading branch information
Showing
11 changed files
with
869 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
# html_to_markdown/ast_utils.py | ||
|
||
from typing import Callable, Optional, List, Union | ||
from s_types import SemanticMarkdownAST | ||
|
||
def find_in_ast(ast: Union[SemanticMarkdownAST, List[SemanticMarkdownAST]], predicate: Callable[[SemanticMarkdownAST], bool]) -> Optional[SemanticMarkdownAST]: | ||
if isinstance(ast, list): | ||
for node in ast: | ||
result = find_in_ast(node, predicate) | ||
if result: | ||
return result | ||
else: | ||
if predicate(ast): | ||
return ast | ||
# Recursively search based on node type | ||
if hasattr(ast, 'content'): | ||
content = ast.content | ||
if isinstance(content, list): | ||
result = find_in_ast(content, predicate) | ||
if result: | ||
return result | ||
elif isinstance(content, SemanticMarkdownAST): | ||
result = find_in_ast(content, predicate) | ||
if result: | ||
return result | ||
if hasattr(ast, 'items'): | ||
for item in ast.items: | ||
result = find_in_ast(item, predicate) | ||
if result: | ||
return result | ||
if hasattr(ast, 'rows'): | ||
for row in ast.rows: | ||
result = find_in_ast(row, predicate) | ||
if result: | ||
return result | ||
return None | ||
|
||
def find_all_in_ast(ast: Union[SemanticMarkdownAST, List[SemanticMarkdownAST]], predicate: Callable[[SemanticMarkdownAST], bool]) -> List[SemanticMarkdownAST]: | ||
results = [] | ||
if isinstance(ast, list): | ||
for node in ast: | ||
results.extend(find_all_in_ast(node, predicate)) | ||
else: | ||
if predicate(ast): | ||
results.append(ast) | ||
# Recursively search based on node type | ||
if hasattr(ast, 'content'): | ||
content = ast.content | ||
if isinstance(content, list): | ||
results.extend(find_all_in_ast(content, predicate)) | ||
elif isinstance(content, SemanticMarkdownAST): | ||
results.extend(find_all_in_ast(content, predicate)) | ||
if hasattr(ast, 'items'): | ||
for item in ast.items: | ||
results.extend(find_all_in_ast(item, predicate)) | ||
if hasattr(ast, 'rows'): | ||
for row in ast.rows: | ||
results.extend(find_all_in_ast(row, predicate)) | ||
return results |
21 changes: 21 additions & 0 deletions
21
App_Function_Libraries/html_to_markdown/conversion_options.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
# html_to_markdown/conversion_options.py | ||
|
||
from typing import Callable, Optional, Union, Dict, Any, List | ||
from dataclasses import dataclass, field | ||
|
||
from s_types import SemanticMarkdownAST, CustomNode | ||
|
||
@dataclass | ||
class ConversionOptions: | ||
website_domain: Optional[str] = None | ||
extract_main_content: bool = False | ||
refify_urls: bool = False | ||
url_map: Dict[str, str] = field(default_factory=dict) | ||
debug: bool = False | ||
override_dom_parser: Optional[Callable[[str], Any]] = None # Placeholder for DOMParser override | ||
enable_table_column_tracking: bool = False | ||
override_element_processing: Optional[Callable[[Any, 'ConversionOptions', int], Optional[List[SemanticMarkdownAST]]]] = None | ||
process_unhandled_element: Optional[Callable[[Any, 'ConversionOptions', int], Optional[List[SemanticMarkdownAST]]]] = None | ||
override_node_renderer: Optional[Callable[[SemanticMarkdownAST, 'ConversionOptions', int], Optional[str]]] = None | ||
render_custom_node: Optional[Callable[[CustomNode, 'ConversionOptions', int], Optional[str]]] = None | ||
include_meta_data: Union[str, bool] = False # 'basic', 'extended', or False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
# html_to_markdown/dom_utils.py | ||
|
||
from bs4 import BeautifulSoup, Tag | ||
from typing import Optional | ||
import logging | ||
|
||
from conversion_options import ConversionOptions | ||
|
||
logging.basicConfig(level=logging.INFO) | ||
logger = logging.getLogger(__name__) | ||
|
||
def find_main_content(soup: BeautifulSoup, options: ConversionOptions) -> Tag: | ||
logger.debug("Entering find_main_content function") | ||
|
||
main_element = soup.find('main') | ||
if main_element: | ||
logger.debug("Existing <main> element found") | ||
return main_element | ||
|
||
logger.debug("No <main> element found. Detecting main content.") | ||
if not soup.body: | ||
logger.debug("No body element found, returning the entire document") | ||
return soup | ||
|
||
return detect_main_content(soup.body, options) | ||
|
||
def wrap_main_content(main_content: Tag, soup: BeautifulSoup): | ||
if main_content.name.lower() != 'main': | ||
logger.debug("Wrapping main content in <main> element") | ||
main_element = soup.new_tag('main') | ||
main_content.wrap(main_element) | ||
main_element['id'] = 'detected-main-content' | ||
logger.debug("Main content wrapped successfully") | ||
else: | ||
logger.debug("Main content already wrapped") | ||
|
||
def detect_main_content(element: Tag, options: ConversionOptions) -> Tag: | ||
candidates = [] | ||
min_score = 20 | ||
logger.debug(f"Collecting candidates with minimum score: {min_score}") | ||
collect_candidates(element, candidates, min_score, options) | ||
|
||
logger.debug(f"Total candidates found: {len(candidates)}") | ||
|
||
if not candidates: | ||
logger.debug("No suitable candidates found, returning root element") | ||
return element | ||
|
||
# Sort candidates by score descending | ||
candidates.sort(key=lambda x: calculate_score(x, options), reverse=True) | ||
logger.debug("Candidates sorted by score") | ||
|
||
best_candidate = candidates[0] | ||
for candidate in candidates[1:]: | ||
if not any(other.contains(candidate) for other in candidates): | ||
if calculate_score(candidate, options) > calculate_score(best_candidate, options): | ||
best_candidate = candidate | ||
logger.debug(f"New best independent candidate found: {element_to_string(best_candidate)}") | ||
|
||
logger.debug(f"Final main content candidate: {element_to_string(best_candidate)}") | ||
return best_candidate | ||
|
||
def element_to_string(element: Optional[Tag]) -> str: | ||
if not element: | ||
return 'No element' | ||
classes = '.'.join(element.get('class', [])) | ||
return f"{element.name}#{element.get('id', 'no-id')}.{classes}" | ||
|
||
def collect_candidates(element: Tag, candidates: list, min_score: int, options: ConversionOptions): | ||
score = calculate_score(element, options) | ||
if score >= min_score: | ||
candidates.append(element) | ||
logger.debug(f"Candidate found: {element_to_string(element)}, score: {score}") | ||
|
||
for child in element.find_all(recursive=False): | ||
collect_candidates(child, candidates, min_score, options) | ||
|
||
def calculate_score(element: Tag, options: ConversionOptions) -> int: | ||
score = 0 | ||
score_log = [] | ||
|
||
# High impact attributes | ||
high_impact_attributes = ['article', 'content', 'main-container', 'main', 'main-content'] | ||
for attr in high_impact_attributes: | ||
if 'class' in element.attrs and attr in element['class']: | ||
score += 10 | ||
score_log.append(f"High impact attribute found: {attr}, score increased by 10") | ||
if 'id' in element.attrs and attr in element['id']: | ||
score += 10 | ||
score_log.append(f"High impact ID found: {attr}, score increased by 10") | ||
|
||
# High impact tags | ||
high_impact_tags = ['article', 'main', 'section'] | ||
if element.name.lower() in high_impact_tags: | ||
score += 5 | ||
score_log.append(f"High impact tag found: {element.name}, score increased by 5") | ||
|
||
# Paragraph count | ||
paragraph_count = len(element.find_all('p')) | ||
paragraph_score = min(paragraph_count, 5) | ||
if paragraph_score > 0: | ||
score += paragraph_score | ||
score_log.append(f"Paragraph count: {paragraph_count}, score increased by {paragraph_score}") | ||
|
||
# Text content length | ||
text_content_length = len(element.get_text(strip=True)) | ||
if text_content_length > 200: | ||
text_score = min(text_content_length // 200, 5) | ||
score += text_score | ||
score_log.append(f"Text content length: {text_content_length}, score increased by {text_score}") | ||
|
||
# Link density | ||
link_density = calculate_link_density(element) | ||
if link_density < 0.3: | ||
score += 5 | ||
score_log.append(f"Link density: {link_density:.2f}, score increased by 5") | ||
|
||
# Data attributes | ||
if element.has_attr('data-main') or element.has_attr('data-content'): | ||
score += 10 | ||
score_log.append("Data attribute for main content found, score increased by 10") | ||
|
||
# Role attribute | ||
if element.get('role') and 'main' in element.get('role'): | ||
score += 10 | ||
score_log.append("Role attribute indicating main content found, score increased by 10") | ||
|
||
if options.debug and score_log: | ||
logger.debug(f"Scoring for {element_to_string(element)}:") | ||
for log in score_log: | ||
logger.debug(f" {log}") | ||
logger.debug(f" Final score: {score}") | ||
|
||
return score | ||
|
||
def calculate_link_density(element: Tag) -> float: | ||
links = element.find_all('a') | ||
link_length = sum(len(link.get_text(strip=True)) for link in links) | ||
text_length = len(element.get_text(strip=True)) or 1 # Avoid division by zero | ||
return link_length / text_length |
46 changes: 46 additions & 0 deletions
46
App_Function_Libraries/html_to_markdown/html_to_markdown.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
# html_to_markdown/html_to_markdown.py | ||
|
||
from bs4 import BeautifulSoup | ||
from typing import Optional | ||
|
||
from conversion_options import ConversionOptions | ||
from dom_utils import find_main_content, wrap_main_content | ||
from html_to_markdown_ast import html_to_markdown_ast | ||
from markdown_ast_to_string import markdown_ast_to_string | ||
from url_utils import refify_urls | ||
|
||
import logging | ||
|
||
logging.basicConfig(level=logging.INFO) | ||
logger = logging.getLogger(__name__) | ||
|
||
def convert_html_to_markdown(html: str, options: Optional[ConversionOptions] = None) -> str: | ||
if options is None: | ||
options = ConversionOptions() | ||
|
||
if options.debug: | ||
logger.setLevel(logging.DEBUG) | ||
|
||
soup = BeautifulSoup(html, 'html.parser') | ||
|
||
if options.extract_main_content: | ||
main_content = find_main_content(soup, options) | ||
if options.include_meta_data and soup.head and not main_content.find('head'): | ||
# Reattach head for metadata extraction | ||
new_html = f"<html>{soup.head}{main_content}</html>" | ||
soup = BeautifulSoup(new_html, 'html.parser') | ||
main_content = soup.html | ||
else: | ||
if options.include_meta_data and soup.head: | ||
main_content = soup | ||
else: | ||
main_content = soup.body if soup.body else soup | ||
|
||
markdown_ast = html_to_markdown_ast(main_content, options) | ||
|
||
if options.refify_urls: | ||
options.url_map = refify_urls(markdown_ast, options.url_map) | ||
|
||
markdown_string = markdown_ast_to_string(markdown_ast, options) | ||
|
||
return markdown_string |
Oops, something went wrong.