Skip to content

Commit

Permalink
Fix issue with updating all commodity codes in the database
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcinLinkl committed Jul 31, 2024
1 parent 114edd2 commit 25a9922
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 46 deletions.
61 changes: 34 additions & 27 deletions fetch_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,41 +73,47 @@ def fetch_all_reports():


def fetch_new_all(reports_list):
# Convert single tuple to list if necessary
if isinstance(reports_list, tuple):
reports_list = [reports_list]

for report in reports_list:
fetch_new_report(report)


def fetch_new_report(report_table_name_and_week):
report_table_name = report_table_name_and_week[0]
report_last_week = report_table_name_and_week[1]
report_root_name = report_table_name.split("_")[1]

# Create a comma-separated list of selected columns for the API query
selected_columns = ",".join(main_api_cols + report_api_cols[report_root_name])
selected_columns = ",".join(main_api_cols + report_api_cols.get(report_root_name, []))

data_records = get_socrata_api_data(
REPORTS_TABLE[report_table_name], selected_columns, report_last_week
)

if data_records is not None and not data_records.empty:
print("New data found...")
print(f"Number of records fetched: {data_records.shape[0]}")

with sqlite3.connect("data.db") as db_connection:
# Create the 'cftc_codes' table if it doesn't exist
cursor = db_connection.cursor()
cursor.execute(
f"""
select cftc_contract_market_code from cftc_codes where {report_table_name}=1
"""
f"SELECT cftc_contract_market_code FROM cftc_codes WHERE {report_table_name}=1"
)
codes_CFTC = cursor.fetchall()[0]

# filtering data_records based on the condition
codes_CFTC = cursor.fetchall()

if codes_CFTC:
codes_CFTC = [code[0] for code in codes_CFTC]
else:
print("No CFTC codes found for filtering.")
return

# Filtering data_records based on the condition
data_records = data_records[
data_records["cftc_contract_market_code"].isin(codes_CFTC)
]
# Print the number of records prepared to save in db
print(f"Number of records to be saved: {data_records.shape[0]}")

# Convert column names to lowercase, remove the "_all" suffix, and replace "__" with "_"
Expand All @@ -116,36 +122,38 @@ def fetch_new_report(report_table_name_and_week):
for col in data_records.columns
]

# Delete 'market_and_exchange_names' and 'commodity' columns (only 'cftc_codes' will be used)
data_records.drop(
[
"market_and_exchange_names",
"commodity",
"commodity_subgroup_name",
"contract_units",
],
# Remove columns if they exist
columns_to_remove = [
"market_and_exchange_names",
"commodity",
"commodity_subgroup_name",
"contract_units",
]
data_records = data_records.drop(
[col for col in columns_to_remove if col in data_records.columns],
axis=1,
inplace=True,
errors='ignore'
)

# change to numeric data values for saving properly types
# Convert columns to numeric data types
df_1 = data_records.iloc[:, :3]
df_2 = data_records.iloc[:, 3:].apply(pd.to_numeric, errors="coerce")
data_records = pd.concat([df_1, df_2], axis=1)

# calc net positions
for root in root_cols[report_root_name]:
# Calculate net positions
for root in root_cols.get(report_root_name, []):
print("Calculating net data for: " + root)
long_col = f"{root}_long"
short_col = f"{root}_short"
data_records[f"{root}_net"] = data_records[long_col].sub(
data_records[short_col]
)

if long_col in data_records.columns and short_col in data_records.columns:
data_records[f"{root}_net"] = data_records[long_col].sub(
data_records[short_col]
)

# Replace NaN values with None
data_records = data_records.where(pd.notna(data_records), None)

# Save data to the 'data.db' database else rise exception
# Save data to the database
with sqlite3.connect("data.db") as db_connection:
try:
data_records.to_sql(
Expand All @@ -157,7 +165,6 @@ def fetch_new_report(report_table_name_and_week):
print(
f"Data successfully saved to the table '{report_table_name}' in the database."
)

except Exception as e:
print(
f"Failed to save data to the table '{report_table_name}' in the database. Error: {str(e)}"
Expand Down
81 changes: 62 additions & 19 deletions utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,85 @@ def load_yahoo_tk_data():
return bidict(yahoo_tk_data)


import sqlite3
import datetime
import logging

# Configure logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')

def check_for_new_records():
# Get the current date
current_date = datetime.date.today()
check_latest_weeks_table = []
print("Current week:", current_date.strftime("%Y Week %W"))
logging.debug("Current date: %s", current_date.strftime("%Y-%m-%d"))
logging.debug("Current week: %s", current_date.strftime("%Y Week %W"))

try:
# Connect to the SQLite database
with sqlite3.connect("data.db") as conn:
cursor = conn.cursor()
query = "SELECT name FROM sqlite_schema WHERE type='table' AND name LIKE 'report_%'"

# Query to get all table names that start with 'report_'
query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'report_%'"
cursor.execute(query)
tables = cursor.fetchall()
logging.debug("Tables found: %s", tables)

# Iterate over each table
for table in tables:
table_name = table[0]
logging.debug("Processing table: %s", table_name)

# Query to get the maximum report week and date from the table
query_max_week = f"SELECT MAX(yyyy_report_week_ww) as week_report, MAX(report_date_as_yyyy_mm_dd) as report_date FROM {table_name}"
cursor.execute(query_max_week)
latest_week, latest_report_date = cursor.fetchone()
if latest_report_date:
latest_report_date = datetime.datetime.strptime(
latest_report_date, "%Y-%m-%dT%H:%M:%S.%f"
).date()
days_diff = (current_date - latest_report_date).days
if days_diff >= 10:
print(
f"{table_name.replace('_', ' ').title()}, latest Week: {latest_week}. Checked for new one."
)
check_latest_weeks_table.append((table_name, latest_week))
else:
print(
f"{table_name.replace('_', ' ').title()}, latest Week: {latest_week}."
)
result = cursor.fetchone()
logging.debug("Query result for table %s: %s", table_name, result)

if result:
latest_week, latest_report_date = result
if latest_report_date:
try:
# Convert the report date from string to date object
latest_report_date = datetime.datetime.strptime(
latest_report_date, "%Y-%m-%dT%H:%M:%S.%f"
).date()
logging.debug("Parsed date for table %s: %s", table_name, latest_report_date)
except ValueError:
# Handle the case where the date format is incorrect
logging.error("Date format error for table %s: %s", table_name, latest_report_date)
continue

# Calculate the difference in days between the current date and the latest report date
days_diff = (current_date - latest_report_date).days
logging.debug("Days difference for table %s: %d", table_name, days_diff)

if days_diff >= 10:
# If the difference is 10 days or more, add the table to the list for further checking
logging.info(
"Table %s, latest Week: %s. Data is older than 10 days. Check for new one.",
table_name.replace('_', ' ').title(), latest_week
)
check_latest_weeks_table.append((table_name, latest_week))
else:
# Print the table name and the latest week if the data is not yet outdated
logging.info(
"Table %s, latest Week: %s. Data is up to date.",
table_name.replace('_', ' ').title(), latest_week
)
except sqlite3.Error as e:
print("Error when checking for new reports:", e)
# Log any SQLite errors encountered
logging.error("Error when checking for new reports: %s", e)
return None

# If there are tables with outdated data, call fetch_new_all with the list of tables
if check_latest_weeks_table:
logging.info("Tables with outdated data: %s", check_latest_weeks_table)
fetch_new_all(check_latest_weeks_table)
else:
print("Data up to date.")
logging.info("Data up to date.")



def get_reports_opts():
Expand Down

0 comments on commit 25a9922

Please sign in to comment.