Skip to content

Commit

Permalink
chore(utils): add normalizing start
Browse files Browse the repository at this point in the history
  • Loading branch information
j-mendez committed Jan 26, 2025
1 parent 0474dc0 commit 26f995a
Show file tree
Hide file tree
Showing 5 changed files with 348 additions and 54 deletions.
8 changes: 8 additions & 0 deletions spider/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ pub struct Configuration {
pub only_html: bool,
/// The concurrency limits to apply.
pub concurrency_limit: Option<usize>,
/// Normalize the html de-deplucating the content.
pub normalize: bool,
}

#[derive(Default, Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -986,6 +988,12 @@ impl Configuration {
self
}

/// Normalize the content de-duplicating trailing slash pages and other pages that can be duplicated.
pub fn with_normalize(&mut self, normalize: bool) -> &mut Self {
self.normalize = normalize;
self
}

#[cfg(not(feature = "chrome"))]
/// Overrides default host system timezone with the specified one. This does nothing without the `chrome` flag enabled.
pub fn with_timezone_id(&mut self, _timezone_id: Option<String>) -> &mut Self {
Expand Down
85 changes: 70 additions & 15 deletions spider/src/features/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,35 @@ impl DatabaseHandler {
let pool =
SqlitePool::connect_lazy(&db_url).expect("Failed to connect to the database");

if let Err(e) = sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS resources (
id INTEGER PRIMARY KEY,
url TEXT NOT NULL COLLATE NOCASE
);
CREATE INDEX IF NOT EXISTS idx_url ON resources (url COLLATE NOCASE);
"#,
let create_resources_table = sqlx::query(
r#"CREATE TABLE IF NOT EXISTS resources (
id INTEGER PRIMARY KEY,
url TEXT NOT NULL COLLATE NOCASE
);
CREATE INDEX IF NOT EXISTS idx_url ON resources (url COLLATE NOCASE);"#,
)
.execute(&pool)
.await
{
log::warn!("SQLite error: {:?}", e)
.execute(&pool);

let create_signatures_table = sqlx::query(
r#"CREATE TABLE IF NOT EXISTS signatures (
id INTEGER PRIMARY KEY,
url TEXT NOT NULL COLLATE NOCASE
);
CREATE INDEX IF NOT EXISTS idx_url ON signatures (url COLLATE NOCASE);"#,
)
.execute(&pool);

// Run the queries concurrently
let (resources_result, signatures_result) =
tokio::join!(create_resources_table, create_signatures_table);

// Handle possible errors
if let Err(e) = resources_result {
log::warn!("SQLite error creating resources table: {:?}", e);
}

if let Err(e) = signatures_result {
log::warn!("SQLite error creating signatures table: {:?}", e);
}

pool
Expand All @@ -142,11 +158,47 @@ impl DatabaseHandler {
}
}

/// Check if a signature exists (ignore case)
pub async fn signature_exists(&self, pool: &SqlitePool, url_to_check: &str) -> bool {
match sqlx::query("SELECT 1 FROM signatures WHERE url = ? LIMIT 1")
.bind(url_to_check)
.fetch_optional(pool)
.await
{
Ok(result) => result.is_some(),
Err(e) => {
if let Some(db_err) = e.as_database_error() {
emit_log(db_err.message());
} else {
emit_log(&format!("A non-database error occurred: {:?}", e));
}
false
}
}
}

/// Insert a new URL if it doesn't exist
pub async fn insert_url(&self, pool: &SqlitePool, new_url: &CaseInsensitiveString) {
pub async fn insert_url(&self, pool: &SqlitePool, new_url: &str) {
if !self.url_exists(pool, new_url).await {
if let Err(e) = sqlx::query("INSERT INTO resources (url) VALUES (?)")
.bind(new_url.to_string())
.bind(new_url)
.execute(pool)
.await
{
if let Some(db_err) = e.as_database_error() {
emit_log(db_err.message());
} else {
emit_log(&format!("A non-database error occurred: {:?}", e));
}
}
}
}

/// Insert a new signature if it doesn't exist
pub async fn insert_signature(&self, pool: &SqlitePool, new_url: &str) {
if !self.url_exists(pool, new_url).await {
if let Err(e) = sqlx::query("INSERT INTO signatures (url) VALUES (?)")
.bind(new_url)
.execute(pool)
.await
{
Expand Down Expand Up @@ -238,7 +290,10 @@ impl DatabaseHandler {

/// Clear the resources table.
pub async fn clear_table(pool: &SqlitePool) -> Result<(), sqlx::Error> {
sqlx::query("DELETE FROM resources").execute(pool).await?;
let _ = tokio::join!(
sqlx::query("DELETE FROM resources").execute(pool),
sqlx::query("DELETE FROM signatures").execute(pool)
);
Ok(())
}
}
Expand Down
41 changes: 28 additions & 13 deletions spider/src/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use crate::compact_str::CompactString;
#[cfg(all(feature = "chrome", not(feature = "decentralized")))]
use crate::configuration::{AutomationScripts, ExecutionScripts};
use crate::utils::abs::convert_abs_path;
use crate::utils::{get_domain_from_url, networking_capable, PageResponse, RequestError};
use crate::utils::{
get_domain_from_url, hash_html, networking_capable, PageResponse, RequestError,
};
use crate::CaseInsensitiveString;
use crate::Client;
use crate::RelativeSelectors;
Expand Down Expand Up @@ -268,6 +270,8 @@ pub struct Page {
pub bytes_transferred: Option<f64>,
/// The page was blocked from crawling usual from using website::on_should_crawl_callback.
pub blocked_crawl: bool,
/// The signature of the page to de-duplicate content.
pub signature: Option<u64>,
}

/// Represent a page visited.
Expand Down Expand Up @@ -709,6 +713,7 @@ pub fn build(url: &str, res: PageResponse) -> Page {
waf_check: res.waf_check,
bytes_transferred: res.bytes_transferred,
blocked_crawl: false,
signature: res.signature,
}
}

Expand Down Expand Up @@ -751,11 +756,13 @@ pub struct PageLinkBuildSettings {
pub tld: bool,
/// Subdomain handling resources.
pub subdomains: bool,
/// De-duplication signature.
pub normalize: bool,
}

impl PageLinkBuildSettings {
/// New build link settings.
pub fn new(ssg_build: bool, full_resources: bool) -> Self {
pub(crate) fn new(ssg_build: bool, full_resources: bool) -> Self {
Self {
ssg_build,
full_resources,
Expand All @@ -764,12 +771,19 @@ impl PageLinkBuildSettings {
}

/// New build full link settings.
pub fn new_full(ssg_build: bool, full_resources: bool, subdomains: bool, tld: bool) -> Self {
pub(crate) fn new_full(
ssg_build: bool,
full_resources: bool,
subdomains: bool,
tld: bool,
normalize: bool,
) -> Self {
Self {
ssg_build,
full_resources,
subdomains,
tld,
normalize,
}
}
}
Expand Down Expand Up @@ -1004,10 +1018,13 @@ impl Page {
let _ = rewriter.end();
}

response
.0
.content
.replace(Box::new(collected_bytes.freeze()));
let content = Box::new(collected_bytes.freeze());

if r_settings.normalize {
response.0.signature.replace(hash_html(&content).await);
}

response.0.content.replace(content);

if r_settings.ssg_build {
if let Some(ssg_map) = ssg_map {
Expand Down Expand Up @@ -1692,14 +1709,12 @@ impl Page {
let html_bytes = html.as_bytes();
let chunks = html_bytes.chunks(*STREAMING_CHUNK_SIZE);

let mut stream = tokio_stream::iter(chunks).map(Ok::<&[u8], A>);
let mut stream = tokio_stream::iter(chunks);

while let Some(chunk) = stream.next().await {
if let Ok(chunk) = chunk {
if rewriter.write(chunk).is_err() {
wrote_error = true;
break;
}
if rewriter.write(chunk).is_err() {
wrote_error = true;
break;
}
}

Expand Down
82 changes: 79 additions & 3 deletions spider/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub mod trie;
/// CPU and Memory detection to balance limitations.
pub mod detect_system;

use crate::RelativeSelectors;
use crate::{page::STREAMING_CHUNK_SIZE, RelativeSelectors};
use abs::parse_absolute_url;
use auto_encoder::is_binary_file;
use bytes::{BufMut, BytesMut};
Expand Down Expand Up @@ -228,6 +228,8 @@ pub struct PageResponse {
pub waf_check: bool,
/// The total bytes transferred for the page. Mainly used for chrome events. Inspect the content for bytes when using http instead.
pub bytes_transferred: Option<f64>,
/// The signature of the page to use for handling de-duplication.
pub signature: Option<u64>,
}

/// wait for event with timeout
Expand Down Expand Up @@ -2511,8 +2513,8 @@ pub async fn openai_request(
) -> crate::features::openai_common::OpenAIReturn {
match &gpt_configs.cache {
Some(cache) => {
use std::hash::{DefaultHasher, Hash, Hasher};
let mut s = DefaultHasher::new();
use std::hash::{Hash, Hasher};
let mut s = ahash::AHasher::default();

url.hash(&mut s);
prompt.hash(&mut s);
Expand Down Expand Up @@ -2991,6 +2993,80 @@ pub fn prepare_url(u: &str) -> String {
}
}

/// normalize the html markup to prevent Maliciousness.
pub(crate) async fn normalize_html(html: &[u8]) -> Vec<u8> {
use lol_html::{element, send::Settings};

let mut output = Vec::new();

let mut rewriter = HtmlRewriter::new(
Settings {
element_content_handlers: vec![
element!("a[href]", |el| {
el.remove_attribute("href");
Ok(())
}),
element!("script, style, iframe, base, noscript", |el| {
el.remove();
Ok(())
}),
element!("*", |el| {
let mut remove_attr = vec![];

for attr in el.attributes() {
let name = attr.name();
let remove =
!(name.starts_with("data-") || name == "id" || name == "class");
if remove {
remove_attr.push(name);
}
}

for name in remove_attr {
el.remove_attribute(&name);
}

Ok(())
}),
],
..Settings::new_send()
},
|c: &[u8]| output.extend_from_slice(c),
);

let chunks = html.chunks(*STREAMING_CHUNK_SIZE);
let mut stream = tokio_stream::iter(chunks);
let mut wrote_error = false;

while let Some(chunk) = stream.next().await {
if rewriter.write(chunk).is_err() {
wrote_error = true;
break;
}
}

if !wrote_error {
let _ = rewriter.end();
}

output
}

/// Hash html markup.
pub(crate) async fn hash_html(html: &[u8]) -> u64 {
let normalized_html = normalize_html(html).await;

if !normalized_html.is_empty() {
use std::hash::{Hash, Hasher};
let mut s = ahash::AHasher::default();
normalized_html.hash(&mut s);
let key = s.finish();
key
} else {
Default::default()
}
}

#[cfg(feature = "tracing")]
/// Spawns a new asynchronous task.
pub(crate) fn spawn_task<F>(task_name: &str, future: F) -> tokio::task::JoinHandle<F::Output>
Expand Down
Loading

0 comments on commit 26f995a

Please sign in to comment.