Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/api import/update use stop word table #315

Merged
merged 7 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/deploy-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,13 @@ jobs:
SCW_ZONE: ${{ secrets.SCW_ZONE }}
with:
args: jobs definition update ${{ secrets.SCALEWAY_STOP_WORDS_ID }} image-uri=${{ secrets.CONTAINER_REGISTRY_ENDPOINT }}/stop_word:${{ env.PROJECT_VERSION }}

- name: update scaleway job update job
uses: jawher/[email protected]
env:
SCW_ACCESS_KEY: ${{ secrets.SCW_ACCESS_KEY }}
SCW_SECRET_KEY: ${{ secrets.SCW_SECRET_KEY }}
SCW_ORGANIZATION_ID: ${{ secrets.SCW_ORGANIZATION_ID }}
SCW_ZONE: ${{ secrets.SCW_ZONE }}
with:
args: jobs definition update ${{ secrets.SCALEWAY_UPDATE_JOB_ID }} image-uri=${{ secrets.CONTAINER_REGISTRY_ENDPOINT }}/mediatree_import:${{ env.PROJECT_VERSION }}
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ Use env variable `CHANNEL` like in docker compose (string: tf1) with `UPDATE` to

`UPDATE_PROGRAM_CHANNEL_EMPTY_ONLY` to true will only update program metadata with empty value : "".

### Batch update from an offset
With +1 millions rows, we can update from an offset to fix a custom logic by using `START_DATE_UPDATE` (YYYY-MM-DD), the default will use the end of the month otherwise you can specify`END_DATE` (optional) (YYYY-MM-DD) to batch update PG from a date range.
### Batch update from a date
With +1 millions rows, we can update from an offset to fix a custom logic by using `START_DATE_UPDATE` (YYYY-MM-DD - default first day of the current month), the default will use the end of the month otherwise you can specify`END_DATE` (optional) (YYYY-MM-DD) to batch update PG from a date range.

~55 minutes to update 50K rows on a mVCPU 2240 - 4Gb RAM on Scaleway.

Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ services:
POSTGRES_PASSWORD: password
POSTGRES_HOST: postgres_db
POSTGRES_PORT: 5432
PORT: 5000
PORT: 5002
HEALTHCHECK_SERVER: "0.0.0.0"
# NUMBER_OF_PREVIOUS_DAYS: 30
# MIN_REPETITION: 15
Expand Down Expand Up @@ -190,7 +190,7 @@ services:
HEALTHCHECK_SERVER: "0.0.0.0"
# SENTRY_DSN: prod_only
# COMPARE_DURATION: "true"
#UPDATE: "true" # to batch update PG
# UPDATE: "true" # to batch update PG
#UPDATE_PROGRAM_ONLY: "true" # to batch update PG but only channel with program
#START_DATE_UPDATE: "2024-02-01" # to batch update PG from a date
#END_DATE: "2024-02-29" # optional - otherwise end of the month
Expand Down
30 changes: 26 additions & 4 deletions quotaclimat/data_processing/mediatree/api_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
from quotaclimat.data_processing.mediatree.update_pg_keywords import *
from quotaclimat.data_processing.mediatree.detect_keywords import *
from quotaclimat.data_processing.mediatree.channel_program import *
from quotaclimat.data_processing.mediatree.stop_word.main import get_all_stop_word
from postgres.insert_data import save_to_pg
from postgres.schemas.models import create_tables, connect_to_db, get_db_session
from postgres.schemas.models import keywords_table

from quotaclimat.data_processing.mediatree.keyword.keyword import THEME_KEYWORDS
from typing import List, Optional
from tenacity import *
import sentry_sdk
Expand Down Expand Up @@ -49,8 +49,10 @@ def refresh_token(token, date):

# reapply word detector logic to all saved keywords
# use when word detection is changed
@monitor(monitor_slug='update')
async def update_pg_data(exit_event):
start_date = os.environ.get("START_DATE_UPDATE", "2023-04-01")
beginning_of_the_month = get_first_of_month(get_now())
start_date = os.environ.get("START_DATE_UPDATE", beginning_of_the_month)
tmp_end_date = get_end_of_month(start_date)
end_date = os.environ.get("END_DATE", tmp_end_date)
batch_size = int(os.environ.get("BATCH_SIZE", 50000))
Expand Down Expand Up @@ -86,6 +88,22 @@ def get_channels():

return channels

def get_stop_words(session, validated_only=True):
logging.info("Getting Stop words...")
try:
stop_words = get_all_stop_word(session, validated_only=validated_only)
result = list(map(lambda stop: stop.context, stop_words))
result_len = len(result)
if result_len > 0:
logging.info(f"Got {len(result)} stop words")
else:
logging.error("No stop words from sql tables")

return result
except Exception as err:
logging.error(f"Stop word error {err}")
raise Exception

async def get_and_save_api_data(exit_event):
with sentry_sdk.start_transaction(op="task", name="get_and_save_api_data"):
try:
Expand All @@ -99,6 +117,8 @@ async def get_and_save_api_data(exit_event):
(start_date_to_query, end_date) = get_start_end_date_env_variable_with_default(start_date, minus_days=number_of_previous_days)
df_programs = get_programs()
channels = get_channels()
session = get_db_session(conn)
stop_words = get_stop_words(session, validated_only=True)

day_range = get_date_range(start_date_to_query, end_date, number_of_previous_days)
logging.info(f"Number of days to query : {len(day_range)} - day_range : {day_range}")
Expand All @@ -116,7 +136,8 @@ async def get_and_save_api_data(exit_event):
channel_program = str(program.program_name)
channel_program_type = str(program.program_type)
logging.info(f"Querying API for {channel} - {channel_program} - {channel_program_type} - {start_epoch} - {end_epoch}")
df = extract_api_sub(token, channel, type_sub, start_epoch,end_epoch, channel_program,channel_program_type)
df = extract_api_sub(token, channel, type_sub, start_epoch,end_epoch, channel_program, \
channel_program_type, stop_words=stop_words)
if(df is not None):
logging.debug(f"Memory df {df.memory_usage()}")
save_to_pg(df, keywords_table, conn)
Expand Down Expand Up @@ -204,12 +225,13 @@ def extract_api_sub(
end_epoch = None
,channel_program: str = ""
,channel_program_type : str = ""
,stop_words: list[str] = []
) -> Optional[pd.DataFrame]:
try:
df = get_df_api(media_tree_token, type_sub, start_epoch, channel, end_epoch, channel_program, channel_program_type)

if(df is not None):
df = filter_and_tag_by_theme(df)
df = filter_and_tag_by_theme(df, stop_words=stop_words)
logging.info(f"Adding primary key to save to PG and have idempotent results")
df["id"] = df.apply(lambda x: add_primary_key(x), axis=1)
return df
Expand Down
17 changes: 10 additions & 7 deletions quotaclimat/data_processing/mediatree/detect_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from quotaclimat.data_processing.mediatree.config import *
from postgres.schemas.models import keywords_table
from quotaclimat.data_processing.mediatree.keyword.keyword import THEME_KEYWORDS
from quotaclimat.data_processing.mediatree.keyword.stop_words import STOP_WORDS
from typing import List, Optional
from quotaclimat.data_ingestion.scrap_sitemap import get_consistent_hash
import re
Expand Down Expand Up @@ -116,9 +115,13 @@ def replace_word_with_context(text: str) -> str:
result = re.sub(pattern, replacement, text)

return result
def remove_stopwords(plaintext: str) -> str:

def remove_stopwords(plaintext: str, stopwords: list[str]) -> str:
logging.debug(f"Removing stopwords {plaintext}")
stopwords = STOP_WORDS

if len(stopwords) == 0:
logging.warning("Stop words list empty")

for word in stopwords:
plaintext = plaintext.replace(word, '')

Expand All @@ -129,11 +132,11 @@ def remove_stopwords(plaintext: str) -> str:
return plaintext

@sentry_sdk.trace
def get_themes_keywords_duration(plaintext: str, subtitle_duration: List[str], start: datetime):
def get_themes_keywords_duration(plaintext: str, subtitle_duration: List[str], start: datetime, stop_words: List[str] = []):
keywords_with_timestamp = []
number_of_elements_in_array = 28
default_window_in_seconds = DEFAULT_WINDOW_DURATION
plaitext_without_stopwords = remove_stopwords(plaintext)
plaitext_without_stopwords = remove_stopwords(plaintext=plaintext, stopwords=stop_words)
logging.debug(f"display datetime start {start}")

for theme, keywords_dict in THEME_KEYWORDS.items():
Expand Down Expand Up @@ -284,7 +287,7 @@ def log_min_max_date(df):
logging.info(f"Date min : {min_date}, max : {max_date}")


def filter_and_tag_by_theme(df: pd.DataFrame) -> pd.DataFrame :
def filter_and_tag_by_theme(df: pd.DataFrame, stop_words: list[str] = []) -> pd.DataFrame :
with sentry_sdk.start_transaction(op="task", name="filter_and_tag_by_theme"):
count_before_filtering = len(df)
logging.info(f"{count_before_filtering} subtitles to filter by keywords and tag with themes")
Expand Down Expand Up @@ -324,7 +327,7 @@ def filter_and_tag_by_theme(df: pd.DataFrame) -> pd.DataFrame :
]
] = df[['plaintext','srt', 'start']]\
.swifter.apply(\
lambda row: get_themes_keywords_duration(*row),\
lambda row: get_themes_keywords_duration(*row, stop_words=stop_words),\
axis=1,
result_type='expand'
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# TODO : use SQL table "Stop_Word"
# Use only for test - otherwise SQL table "Stop_Word" is used
STOP_WORDS = [
"bonus écologique"
,"haute isolation thermique fabriqué en france"
Expand Down
11 changes: 1 addition & 10 deletions quotaclimat/data_processing/mediatree/s3/api_to_s3.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
### Library imports
import requests
import json

import logging
import asyncio
from time import sleep
import sys
import os
import gzip
from quotaclimat.utils.healthcheck_config import run_health_check_server
from quotaclimat.utils.logger import getLogger
from quotaclimat.data_processing.mediatree.utils import *
Expand All @@ -16,11 +11,8 @@
from quotaclimat.data_processing.mediatree.detect_keywords import *
from quotaclimat.data_processing.mediatree.channel_program import *
from quotaclimat.data_processing.mediatree.api_import import *
from postgres.insert_data import save_to_pg
from postgres.schemas.models import create_tables, connect_to_db, get_db_session
from postgres.schemas.models import keywords_table

import shutil
from quotaclimat.data_processing.mediatree.keyword.keyword import THEME_KEYWORDS
from typing import List, Optional
from tenacity import *
import sentry_sdk
Expand All @@ -30,7 +22,6 @@
import ray
import s3fs
import boto3
from io import BytesIO

from quotaclimat.utils.sentry import sentry_init
logging.getLogger('modin.logger.default').setLevel(logging.ERROR)
Expand Down
5 changes: 0 additions & 5 deletions quotaclimat/data_processing/mediatree/stop_word/main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import logging
import asyncio
from time import sleep
import sys
import os
import gc
from quotaclimat.utils.healthcheck_config import run_health_check_server
from quotaclimat.utils.logger import getLogger
from quotaclimat.data_processing.mediatree.utils import *
Expand All @@ -13,7 +11,6 @@
from sqlalchemy.orm import Session
from sqlalchemy import func, select
from quotaclimat.data_ingestion.scrap_sitemap import get_consistent_hash
from quotaclimat.data_processing.mediatree.keyword.stop_words import STOP_WORDS
from tenacity import *
from sentry_sdk.crons import monitor
import modin.pandas as pd
Expand Down Expand Up @@ -284,8 +281,6 @@ async def manage_stop_word(exit_event = None, conn = None, duration: int = 7, fr
sys.exit(1)


# TODO : increase context length (due to repeated ad words)
# TODO : test with several ad words said in the same plaintext
async def main():
with monitor(monitor_slug='stopword'): #https://docs.sentry.io/platforms/python/crons/
try:
Expand Down
4 changes: 3 additions & 1 deletion quotaclimat/data_processing/mediatree/update_pg_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from sqlalchemy.orm import Session
from postgres.schemas.models import Keywords
from quotaclimat.data_processing.mediatree.detect_keywords import *
from quotaclimat.data_processing.mediatree.api_import import get_stop_words
from quotaclimat.data_processing.mediatree.channel_program import get_programs, get_a_program_with_start_timestamp, get_channel_title_for_name
from sqlalchemy import func, select, and_, or_

Expand All @@ -23,6 +24,7 @@ def update_keywords(session: Session, batch_size: int = 50000, start_date : str

logging.info(f"Updating {total_updates} saved keywords from {start_date} date to {end_date} for channel {channel} - batch size {batch_size} - totals rows")
df_programs = get_programs()
stop_words = get_stop_words(session, validated_only=True)

for i in range(0, total_updates, batch_size):
current_batch_saved_keywords = get_keywords_columns(session, i, batch_size, start_date, end_date, channel, empty_program_only)
Expand Down Expand Up @@ -61,7 +63,7 @@ def update_keywords(session: Session, batch_size: int = 50000, start_date : str
,number_of_biodiversite_concepts_generaux_no_hrfp \
,number_of_biodiversite_causes_no_hrfp \
,number_of_biodiversite_consequences_no_hrfp \
,number_of_biodiversite_solutions_no_hrfp = get_themes_keywords_duration(plaintext, srt, start)
,number_of_biodiversite_solutions_no_hrfp = get_themes_keywords_duration(plaintext, srt, start, stop_words=stop_words)
except Exception as err:
logging.error(f"continuing loop but met error : {err}")
continue
Expand Down
7 changes: 7 additions & 0 deletions quotaclimat/data_processing/mediatree/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ def get_end_of_month(start_date: str) -> str:
date = pd.to_datetime(date, format='%Y%m%d')
return date.strftime('%Y-%m-%d')

def get_first_of_month(start_date: datetime) -> str:
# Get the first day of the current month
first_day = start_date.replace(day=1)

# Format as DD-MM-YYYY
return first_day.strftime("%Y-%m-%d")

def get_start_end_date_env_variable_with_default(start_date:int, minus_days:int=1):
if start_date != 0:
start_date_minus_days = int(int(start_date) - (minus_days * 24 * 60 * 60))
Expand Down
5 changes: 1 addition & 4 deletions test/sitemap/test_detect_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -1853,7 +1853,4 @@ def test_count_different_window_number_40():
'theme': 'attenuation_climatique_solutions_indirectes'
}
]
assert count_different_window_number(tag_wanted_duration_second_window_number(keywords_with_timestamp, start, 40),start) == 3

def test_remove_stopwords():
assert remove_stopwords("l' huile de coude était aussi une énergie renouvelable stéphane est à fond sur le tri sélectif") == "l' stéphane "
assert count_different_window_number(tag_wanted_duration_second_window_number(keywords_with_timestamp, start, 40),start) == 3
19 changes: 17 additions & 2 deletions test/sitemap/test_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,26 @@
from quotaclimat.data_processing.mediatree.detect_keywords import *

def test_get_remove_stopwords_recycler():
stop_words_list = [
"recycler"
]
ad = "nous les recycler pour en faire de nouvelles en fabriquant nous-mêmes du plastique recyclé pour cela nous avons créé trois usines exclusivement dédié au recyclage dès cette année cristallines est capable de recycler autant de bouteilles"

assert remove_stopwords(ad) == " de nouvelles en fabriquant pour cela nous avons créé dès cette année autant de bouteilles"
assert remove_stopwords(ad, stop_words_list) == "nous les pour en faire de nouvelles en fabriquant nous-mêmes du plastique recyclé pour cela nous avons créé trois usines exclusivement dédié au recyclage dès cette année cristallines est capable de autant de bouteilles"

def test_get_remove_stopwords_no_modification():
stop_words_list = [
"recycler"
]
ad = "no keywords"

assert remove_stopwords(ad) == ad
assert remove_stopwords(ad, stop_words_list) == ad

def test_remove_stopwords_huile():
stop_words_list = [
"recycler",
"huile de coude était aussi une énergie renouvelable",
"est à fond sur le tri sélectif"
]
assert remove_stopwords("l' huile de coude était aussi une énergie renouvelable stéphane est à fond sur le tri sélectif",stop_words_list) \
== "l' stéphane "
25 changes: 23 additions & 2 deletions test/sitemap/test_main_import_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from postgres.insert_data import save_to_pg
from quotaclimat.data_processing.mediatree.detect_keywords import *
from quotaclimat.data_processing.mediatree.api_import import *
from quotaclimat.data_processing.mediatree.keyword.stop_words import STOP_WORDS
from quotaclimat.data_processing.mediatree.stop_word.main import save_append_stop_word
from test_utils import get_localhost, debug_df, compare_unordered_lists_of_dicts

import time as t
Expand All @@ -31,11 +33,23 @@ def insert_mediatree_json(conn, json_file_path='test/sitemap/mediatree.json'):

return len(df)


def insert_stop_word(conn):
logging.info("test saving stop words")
to_save = []
for stop in STOP_WORDS:
stop_word = dict()
stop_word['id'] = stop
stop_word['context'] = stop
to_save.append(stop_word)

save_append_stop_word(conn, to_save)

def test_main_api_import():
conn = connect_to_db()
drop_tables()
create_tables()

insert_stop_word(conn)
len_df = insert_mediatree_json(conn)

session = get_db_session(conn)
Expand Down Expand Up @@ -90,4 +104,11 @@ def test_third_row_api_import():
])


assert specific_keyword.number_of_keywords == 1
assert specific_keyword.number_of_keywords == 1


def test_get_api_stop():
conn = connect_to_db()
session = get_db_session(conn)
stopwords = get_stop_words(session)
assert type(stopwords[0]) == str
4 changes: 4 additions & 0 deletions test/sitemap/test_mediatree_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ def test_get_end_of_month():
assert get_end_of_month("2024-02-01") == "2024-02-29"
assert get_end_of_month("2024-02-15") == "2024-02-29"

def test_get_first_of_month():
date = datetime(2024, 12, 12, 0, 0, 0)
assert get_first_of_month(date) == "2024-12-01"

def test_get_start_end_date_env_variable_with_default():
start_date = 0

Expand Down
1 change: 1 addition & 0 deletions test/stop_word/test_stop_word.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ def test_stop_word_main():


def test_stop_word_is_already_known_stop_word():

context1_avait= " avait promis de lancer un plan de replantation euh hélas pas pu tout s' est pa"
context2_avais= " avais promis de lancer un plan de replantation euh hélas pas pu tout s' est pa"
assert is_already_known_stop_word(context1_avait, context2_avais) == True
Expand Down
Loading