Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

𖭦 Ability to override stage on enqueue #89

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "harambe-core"
version = "0.50.2"
version = "0.51.0"
description = "Core types for harambe SDK 🐒🍌"
authors = [
{ name = "Adam Watkins", email = "[email protected]" }
Expand Down
2 changes: 1 addition & 1 deletion core/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 33 additions & 10 deletions sdk/harambe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ async def enqueue(
*urls: URL | Awaitable[URL],
context: Optional[Context] = None,
options: Optional[Options] = None,
stage: Optional[Stage] = None,
) -> None:
"""
Enqueue url to be scraped. This will be passed to the on_enqueue callback.
Expand All @@ -147,9 +148,11 @@ async def enqueue(
:param urls: urls to enqueue
:param context: additional context to pass to the next run of the next stage/url
:param options: job level options to pass to the next stage/url
:param stage: the override stage to use for the next job. Will use the next stage in the sequence if not provided.
"""
context = context or {}
options = options or {}
stage = stage or get_next_stage(self._stage)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would recommend leaving the getnextstage outside of harambe (separation of concerns)

context["__url"] = self.page.url

for url in urls:
Expand All @@ -160,7 +163,7 @@ async def enqueue(
normalize_url(url, self.page.url) if hasattr(self.page, "url") else url
)
await self._notify_observers(
"on_queue_url", normalized_url, context, options
"on_queue_url", normalized_url, context, options, stage
)

async def paginate(
Expand Down Expand Up @@ -498,7 +501,7 @@ async def run_from_file(
:return: None: the scraper should save data to the database or file
"""
domain: str = getattr(scraper, "domain", "")
stage: str = getattr(scraper, "stage", "")
stage: Stage = getattr(scraper, "stage", "")
headers: dict[str, str] = getattr(scraper, "extra_headers", {})
observer: Optional[OutputObserver] = getattr(scraper, "observer", None)

Expand All @@ -507,19 +510,23 @@ async def run_from_file(

tracker = FileDataTracker(domain, stage)

prev = "listing" if stage == "detail" else "category"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should remove this run from file / stage stuff here too

file_path = tracker.get_storage_filepath(prev)
previous_stage = get_previous_stage(stage)
file_path = tracker.get_storage_filepath(previous_stage)

if not file_path.exists():
raise ValueError(
f"Could not find {file_path}."
f" No listing data found for this domain. Run the listing scraper first."
f" No {previous_stage} data found for this domain. Run the {previous_stage} scraper first."
)

listing_data = tracker.load_data(domain, prev)
previous_stage_data = tracker.load_data(domain, previous_stage)
async with playwright_harness(**harness_options) as page_factory:
page = await page_factory()
for listing in listing_data:
for enqueue_data in [
enqueue_data
for enqueue_data in previous_stage_data
if enqueue_data["stage"] == stage
]:
sdk = SDK(
page,
domain=domain,
Expand All @@ -533,11 +540,11 @@ async def run_from_file(

if headers:
await page.set_extra_http_headers(headers)
await page.goto(listing["url"])
await page.goto(enqueue_data["url"])
await scraper(
sdk,
listing["url"],
listing["context"],
enqueue_data["url"],
enqueue_data["context"],
)

return sdk
Expand Down Expand Up @@ -595,4 +602,20 @@ async def wrapper(sdk: "SDK", url: URL, context: Context) -> None:
return decorator


def get_next_stage(previous_stage: Stage | None) -> Stage:
if previous_stage == "parent_category":
return "category"
if previous_stage == "category":
return "listing"
return "detail"


def get_previous_stage(stage: Stage | None) -> Stage:
Comment on lines +605 to +613
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally api stuff

if stage == "detail" or stage is None:
return "listing"
if stage == "listing":
return "category"
return "parent_category"


PAGE_PDF_FILENAME = "reworkd_page_pdf.pdf"
30 changes: 21 additions & 9 deletions sdk/harambe/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ async def on_save_data(self, data: dict[str, Any]) -> None:
raise NotImplementedError()

@abstractmethod
async def on_queue_url(self, url: URL, context: Context, options: Options) -> None:
async def on_queue_url(
self, url: URL, context: Context, options: Options, stage: Stage
) -> None:
raise NotImplementedError()

@abstractmethod
Expand Down Expand Up @@ -73,8 +75,12 @@ class LoggingObserver(OutputObserver):
async def on_save_data(self, data: dict[str, Any]) -> None:
pprint(data, width=240)

async def on_queue_url(self, url: URL, context: Context, options: Options) -> None:
print(f"Enqueuing: {url} with context {context} and options {options}")
async def on_queue_url(
self, url: URL, context: Context, options: Options, stage: Stage
) -> None:
print(
f"Enqueuing: {url} with context {context} and options {options} in stage {stage}"
)

async def on_download(
self, download_url: str, filename: str, content: bytes
Expand Down Expand Up @@ -105,8 +111,12 @@ def __init__(self, domain: str, stage: Stage):
async def on_save_data(self, data: dict[str, Any]) -> None:
self._tracker.save_data(data)

async def on_queue_url(self, url: URL, context: Context, options: Options) -> None:
self._tracker.save_data({"url": url, "context": context, "options": options})
async def on_queue_url(
self, url: URL, context: Context, options: Options, stage: Stage
) -> None:
self._tracker.save_data(
{"url": url, "context": context, "options": options, "stage": stage}
)

async def on_download(
self, download_url: str, filename: str, content: bytes
Expand Down Expand Up @@ -134,16 +144,18 @@ async def on_check_and_solve_captchas(self, page: Page) -> None:
class InMemoryObserver(OutputObserver):
def __init__(self) -> None:
self._data: List[dict[str, Any]] = []
self._urls: List[Tuple[URL, Context, Options]] = []
self._urls: List[Tuple[URL, Context, Options, Stage]] = []
self._files: List[Tuple[str, bytes]] = []
self._cookies: List[Cookie] = []
self._local_storage: List[LocalStorage] = []

async def on_save_data(self, data: dict[str, Any]) -> None:
self._data.append(data)

async def on_queue_url(self, url: URL, context: Context, options: Options) -> None:
self._urls.append((url, context, options))
async def on_queue_url(
self, url: URL, context: Context, options: Options, stage: Stage
) -> None:
self._urls.append((url, context, options, stage))

async def on_download(
self, download_url: str, filename: str, content: bytes
Expand Down Expand Up @@ -171,7 +183,7 @@ def data(self) -> List[dict[str, Any]]:
return self._data

@property
def urls(self) -> List[Tuple[URL, Context, Options]]:
def urls(self) -> List[Tuple[URL, Context, Options, Stage]]:
return self._urls

@property
Expand Down
8 changes: 4 additions & 4 deletions sdk/harambe/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
import json
from typing import Any, Optional, Iterable, List

from harambe.types import URL, Context, Options, Cookie, LocalStorage, Stage
from pydantic import BaseModel

from harambe.types import URL, Context, Options, Cookie, LocalStorage


class PageInfo(BaseModel):
page: int
Expand Down Expand Up @@ -45,16 +44,17 @@ def on_save_local_storage(self, local_storage: List[LocalStorage]) -> bool:
return self._add_data(local_storage)

def on_queue_url(
self, url: URL, _: Optional[Context], __: Optional[Options]
self, url: URL, _: Optional[Context], __: Optional[Options], stage: Stage
) -> bool:
"""
Save url and check if it is duplicated
:param url: url to be saved
:param _: context unused
:param __: options unused
:param stage: stage of the enqueued url
:return: bool indicating if the url is duplicated, true if it is duplicated
"""
return self._add_data(url)
return self._add_data((url, stage))

# noinspection PyTypeChecker
def on_download(self, download_url: str, filename: str, content: bytes) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion sdk/harambe/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
ScrapeResult = dict[str, Any]
Context = dict[str, Any]
Options = dict[str, Any]
Stage = Literal["category", "listing", "detail"]
Stage = Literal["parent_category", "category", "listing", "detail"]
AsyncScraperType = Callable[["SDK", URL, Context], Awaitable[None]] # type: ignore # noqa: F821
SetupType = Callable[["SDK"], Awaitable[None]] # type: ignore # noqa: F821
Callback = Callable[..., Awaitable[None]]
Expand Down
4 changes: 2 additions & 2 deletions sdk/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
[project]
name = "harambe-sdk"
version = "0.50.2"
version = "0.51.0"
description = "Data extraction SDK for Playwright 🐒🍌"
authors = [
{ name = "Adam Watkins", email = "[email protected]" }
]
requires-python = ">=3.11,<4.0"
readme = "README.md"
dependencies = [
"harambe_core==0.50.2",
"harambe_core==0.51.0",
"pydantic==2.9.2",
"playwright==1.47.0",
"setuptools==73.0.0",
Expand Down
10 changes: 7 additions & 3 deletions sdk/test/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

import pytest
from aiohttp import web
from harambe.observer import InMemoryObserver
from harambe.types import BrowserType

from harambe import SDK
from harambe.contrib import playwright_harness, soup_harness
from harambe.observer import InMemoryObserver
from harambe.types import BrowserType


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -84,7 +84,7 @@ async def test_enqueue_data(server, observer):
@SDK.scraper("test", "detail", observer=observer)
async def scraper(sdk: SDK, *args, **kwargs):
await sdk.enqueue("?page=1")
await sdk.enqueue("/terms", "https://reworkd.ai")
await sdk.enqueue("/terms", "https://reworkd.ai", stage="category")

await SDK.run(scraper=scraper, url=server, schema={}, headless=True)

Expand All @@ -99,6 +99,10 @@ async def scraper(sdk: SDK, *args, **kwargs):
assert observer.urls[1][1] == {"__url": url}
assert observer.urls[2][1] == {"__url": url}

assert observer.urls[0][3] == "detail"
assert observer.urls[1][3] == "category"
assert observer.urls[2][3] == "category"


async def test_enqueue_data_with_context(server, observer):
@SDK.scraper("test", "detail", observer=observer)
Expand Down
34 changes: 34 additions & 0 deletions sdk/test/test_get_next_stage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import pytest
from harambe.core import get_next_stage
from harambe.core import get_previous_stage
from harambe.types import Stage


@pytest.mark.parametrize(
"previous_stage,expected_stage",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be introduced to harambe. The consumer of the observer should make this choice

[
("parent_category", "category"),
("category", "listing"),
("listing", "detail"),
("detail", "detail"),
(None, "detail"),
],
)
def test_get_next_stage(previous_stage: Stage, expected_stage: Stage):
result = get_next_stage(previous_stage)
assert result == expected_stage


@pytest.mark.parametrize(
"stage,expected_stage",
[
("parent_category", "parent_category"),
("category", "parent_category"),
("listing", "category"),
("detail", "listing"),
(None, "detail"),
],
)
def test_get_previous_stage(stage: Stage, expected_stage: Stage):
result = get_previous_stage(stage)
assert result == expected_stage
34 changes: 24 additions & 10 deletions sdk/test/test_pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ async def test_stop_pagination_observer_duplicate_data_error(duplicate_handler):


async def test_stop_pagination_observer_duplicate_url_error(duplicate_handler):
unduplicated = duplicate_handler.on_queue_url(
"https://example.com", {"foo": "bar"}, {}
first_pass_duplicated = duplicate_handler.on_queue_url(
"https://example.com", {"foo": "bar"}, {}, None
)
duplicate_handler.on_paginate("https://example.com/page2")
duplicated = duplicate_handler.on_queue_url(
"https://example.com", {"foo": "bar"}, {}
second_pass_duplicated = duplicate_handler.on_queue_url(
"https://example.com", {"foo": "bar"}, {}, None
)

assert not unduplicated and duplicated
assert not first_pass_duplicated and second_pass_duplicated
assert duplicate_handler.get_number_of_pages() == 2
assert duplicate_handler.get_current_page_info() == PageInfo(
page=2, total_rows=1, duplicated_rows=1
Expand All @@ -61,14 +61,28 @@ async def test_stop_pagination_observer_duplicate_url_error(duplicate_handler):
duplicate_handler.on_paginate("https://example.com/page3")


async def test_urls_across_stages_not_deduped(duplicate_handler):
first_duplicated = duplicate_handler.on_queue_url(
"https://example.com", {"foo": "bar"}, {}, "category"
)
assert not first_duplicated

second_duplicated = duplicate_handler.on_queue_url(
"https://example.com", {"foo": "bar"}, {}, "listing"
)
assert not second_duplicated


async def test_stop_pagination_observer_duplicate_download_error(duplicate_handler):
unduplicated = duplicate_handler.on_download(
first_pass_duplicated = duplicate_handler.on_download(
"https://example.com", "foo.txt", b"foo"
)
duplicate_handler.on_paginate("https://example.com/page2")
duplicated = duplicate_handler.on_download("https://example.com", "foo.txt", b"foo")
second_pass_duplicated = duplicate_handler.on_download(
"https://example.com", "foo.txt", b"foo"
)

assert not unduplicated and duplicated
assert not first_pass_duplicated and second_pass_duplicated
assert duplicate_handler.get_number_of_pages() == 2
assert duplicate_handler.get_current_page_info() == PageInfo(
page=2, total_rows=1, duplicated_rows=1
Expand Down Expand Up @@ -129,10 +143,10 @@ async def test_duplicate_data_without_pagination(duplicate_handler):
)

un_duplicated = duplicate_handler.on_queue_url(
"https://example.com", {"foo": "bar"}, {}
"https://example.com", {"foo": "bar"}, {}, None
)
duplicated = duplicate_handler.on_queue_url(
"https://example.com", {"foo": "bar"}, {}
"https://example.com", {"foo": "bar"}, {}, None
)
assert not un_duplicated and duplicated
assert duplicate_handler.get_current_page_info() == PageInfo(
Expand Down
Loading
Loading