Skip to content

Commit

Permalink
add crawler dag
Browse files Browse the repository at this point in the history
  • Loading branch information
Blunt10K committed Dec 17, 2023
1 parent 3c34810 commit 7d0549d
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 4,498 deletions.
11 changes: 11 additions & 0 deletions db_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from sqlalchemy import create_engine
from airflow.models import Variable

def make_engine():
host = Variable.get('HOSTNAME')
db = Variable.get('NBA_DB')
port = Variable.get('PORT')
user = Variable.get('USER')
pswd = Variable.get('PSWD')

return create_engine(f"postgresql+psycopg2://{user}:{pswd}@{host}:{port}/{db}")
3,214 changes: 0 additions & 3,214 deletions scraper/.ipynb_checkpoints/Untitled-checkpoint.ipynb

This file was deleted.

33 changes: 0 additions & 33 deletions scraper/.ipynb_checkpoints/crawler-checkpoint.py

This file was deleted.

1,218 changes: 0 additions & 1,218 deletions scraper/.ipynb_checkpoints/transform-checkpoint.ipynb

This file was deleted.

14 changes: 2 additions & 12 deletions scraper/crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,9 @@
import json
from os.path import join
import pandas as pd
from sqlalchemy import create_engine
from airflow.models import Variable
import re


def make_engine():
host = Variable.get('HOSTNAME')
db = Variable.get('NBA_DB')
port = Variable.get('PORT')
user = Variable.get('USER')
pswd = Variable.get('PSWD')

return create_engine(f"postgresql+psycopg2://{user}:{pswd}@{host}:{port}/{db}")
from db_utils import make_engine


def game_dates():
Expand Down Expand Up @@ -63,7 +53,7 @@ def parse_game(self, response):
items = response.css('script[type="application/json"]::text')

for i in items:
extract_path = join(Variable.get('EXTDISK'),'spark_apps','games')
extract_path = join(Variable.get('EXTDISK'),'spark_apps','NBA','games')
to_write = json.loads(i.get())['props']['pageProps']
fname = join(extract_path, to_write['playByPlay']['gameId'] + '.json')

Expand Down
29 changes: 29 additions & 0 deletions scraper/dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# %%
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
import pendulum
import sys
from os.path import expanduser,join as osjoin

# %%
with DAG('add_dates_to_calendar',default_args={'retries': 4,'owner':'blunt10k'},description='Data injector to calendar table',
schedule_interval='0 12 * * *',catchup=False,tags=['nba_stats'],
start_date=pendulum.datetime(2023, 12, 10, tz="UTC")) as dag:

dag.doc_md = __doc__

code_directory = expanduser(osjoin('~/airflow','dags','NBA','scraper'))
sys.path.insert(0,code_directory)

from get_scrape_data import write_to_db

@task(task_id="write_scrape_logs_to_db")
def scrape_logs_to_db():
write_to_db()

scrape_games = BashOperator(task_id="run_after_loop",bash_command="bash scrape_games.sh ",cwd='.')
failed_scrapes = BashOperator(task_id="run_after_loop",bash_command="bash failed_scrapes.sh ",cwd='.')
load_logs = scrape_logs_to_db()

scrape_games >> failed_scrapes >> load_logs
2 changes: 2 additions & 0 deletions scraper/failed_games.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
source ~/python_envs/data_engineering/bin/activate
scrapy runspider failed_scrapes.py -s LOG_FILE=$EXTDISK/NBA/failed_games.log
53 changes: 53 additions & 0 deletions scraper/failed_scrapes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@

# %%
from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor
import pandas as pd
from sqlalchemy import text as sql_text
from airflow.models import Variable
from os.path import join
import json
from db_utils import make_engine


def list_failed():
success = '''(select game_title, game_id FROM scrape_logs
WHERE response = 200) successful_scrape'''
failed = '''(select game_title as failed_title, game_id as failed_game
FROM scrape_logs WHERE response <> 200) failed_scrape'''
condition = '''ON failed_scrape.failed_game = successful_scrape.game_id
WHERE game_id IS NULL'''
query = f'''select failed_title, failed_game from {success}
RIGHT OUTER JOIN {failed} {condition}'''

conn = make_engine()
df = pd.read_sql(sql_text(query),conn.connect())

for i in df.itertuples():
yield f'https://www.nba.com/game/{i.failed_title}-{i.failed_game}'

# %%
class GamesSpider(CrawlSpider):

name = 'pbp-games-failed'
allowed_domains = ['nba.com']
start_urls = set(list_failed())
REDIRECT_ENABLED = False
allow_pat = r'game/\w+-vs-\w+-\d+'
rules = [Rule(LinkExtractor(allow=[allow_pat]),
callback='parse_start_url')]


def parse_start_url(self, response):
items = response.css('script[type="application/json"]::text')

for i in items:
extract_path = join(Variable.get('EXTDISK'),'spark_apps','NBA',
'failed_scrapes')

to_write = json.loads(i.get())['props']['pageProps']
fname = join(extract_path,
f"{to_write['playByPlay']['gameId']}.json")

with open(fname, 'w') as fp:
json.dump(to_write, fp)
42 changes: 21 additions & 21 deletions scraper/get_scrape_data.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,42 @@
# %%
from datetime import datetime as dt, timedelta as td
from os.path import join
import pandas as pd
from sqlalchemy import create_engine
from airflow.models import Variable
import re
from db_utils import make_engine
from os.path import join
from airflow.models import Variable
from os import remove

crawl_logs = join(Variable.get('EXTDISK'),'spark_apps','NBA',
'crawled_dates.log')

def make_engine():
host = Variable.get('HOSTNAME')
db = Variable.get('NBA_DB')
port = Variable.get('PORT')
user = Variable.get('USER')
pswd = Variable.get('PSWD')

return create_engine(f"postgresql+psycopg2://{user}:{pswd}@{host}:{port}/{db}")
# %%
def extract_fields():

with open('scrapy.log','r') as fp:
with open(crawl_logs,'r') as fp:
logs = fp.read().split('\n')

log_flag = r'[\s\S]+DEBUG: Crawled \((\d+)\)'
game_pat = r'[\S\s]+https://www.nba.com/game/(\w+-vs-\w+)-(\d+)'
referer_pat = r'[\s\S]+\([\s\S]+https://www.nba.com/games\?date=(\d+-\d+-\d+)'
search_pat = log_flag + game_pat + referer_pat

data = [dict(zip(['response','game_title','game_id','game_date'],
re.search(search_pat,i).groups()))
columns = ['response','game_title','game_id','game_date']

data = [dict(zip(columns, re.search(search_pat,i).groups()))
for i in logs if re.search(search_pat,i)]

df = pd.DataFrame(data)
return df

# %%
df = extract_fields()
conn = make_engine()
def clear_logs():
remove(crawl_logs)

def write_to_db():

df = extract_fields()
conn = make_engine()

df.to_sql('scrape_logs', conn.connect(), if_exists='append', index=False)

df.to_sql('scrape_logs', conn, if_exists='append', index=False)
conn.dispose()

conn.dispose()
clear_logs()
2 changes: 2 additions & 0 deletions scraper/scrape_games.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
source ~/python_envs/data_engineering/bin/activate
scrapy runspider crawler.py -s LOG_FILE=$EXTDISK/NBA/crawled_dates.log

0 comments on commit 7d0549d

Please sign in to comment.