Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev #14

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open

Dev #14

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
dataclasses==0.6
urllib3==1.25.3
requests==2.22.0
typing==3.7.4
dataclasses_json==0.2.14
selenium_wire==1.0.8
beautifulsoup4==4.8.0
dataclasses
urllib3
requests
typing
dataclasses_json
selenium_wire
beautifulsoup4
99 changes: 77 additions & 22 deletions yandex_images_download/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ def download_single_image(img_url: str,
}

try:
response = requests.get(img_url, timeout=10)
# response = requests.get(img_url, timeout=10)
response = requests.get(img_url, timeout=10, headers={
'User-Agent': 'CoolBot/0.0 (https://example.org/coolbot/; [email protected])'})

data = response.content
content_type = response.headers["Content-Type"]
Expand Down Expand Up @@ -200,7 +202,9 @@ def __init__(self,
itype=None,
commercial=None,
recent=None,
pool=None):
pool=None,
block_url_path='',
block_keyword_path=''):
self.driver = driver
self.output_directory = pathlib.Path(output_directory)
self.limit = limit
Expand All @@ -214,20 +218,36 @@ def __init__(self,
self.recent = recent

self.url_params = self.init_url_params()
self.requests_headers = {
'User-Agent':
("Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML,"
" like Gecko) Chrome/41.0.2228.0 Safari/537.36")
}
# self.requests_headers = {
# 'User-Agent':
# ("Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML,"
# " like Gecko) Chrome/41.0.2228.0 Safari/537.36")
# }
self.requests_headers = {'User-Agent': 'CoolBot/0.0 (https://example.org/coolbot/; [email protected])'}
self.cookies = {}
self.pool = pool

logging.info(f'Output directory is set to "{self.output_directory}/"')
logging.info(f"Limit of images is set to {self.limit}")

self.block_url_list = []
if os.path.isfile(block_url_path):
with open(block_url_path) as f:
self.block_url_list = [item.rstrip() for item in f.readlines()]

self.block_keyword_list = []
if os.path.isfile(block_keyword_path):
with open(block_keyword_path) as f:
self.block_keyword_list = [item.rstrip() for item in f.readlines()]


def get_response(self):
pathes = [request.path for request in self.driver.requests]
request = self.driver.requests[pathes.index(self.driver.current_url)]
try:
request = self.driver.requests[pathes.index(self.driver.current_url)]
except:
request = self.driver.requests[pathes.index(self.url_with_params)]
# request = self.driver.requests[pathes.index(self.driver.requests[pathes.index(self.url_with_params)].response.headers['Location'])]
return request.response

def init_url_params(self):
Expand All @@ -252,10 +272,36 @@ def init_url_params(self):

return params

def get_url_params(self, page, text):
params = {"p": page, "text": text}
params.update(self.url_params)
def is_url(self, keyword):
if ('http://' in keyword.lower()) or ('https://' in keyword.lower()):
return True
else:
return False

def get_url_params_by_keyword(self, keyword):
if self.is_url(keyword):
params = {"source": "collections",
"rpt": "imageview",
"cbir_page": "similar",
"url": keyword,
"nomisspell": 1
}
else:
params = {'text': keyword, "nomisspell": 1}
return params

def get_url_params_by_page(self, page, text):
if self.is_url(text):
params = {"source": "collections",
"rpt": "imageview",
"cbir_page": "similar",
"url": text,
"nomisspell": 1,
"p": page
}
else:
params = {"p": page, "text": text}
params.update(self.url_params)
return params

def download_images_by_page(self, keyword, page, imgs_count,
Expand All @@ -268,11 +314,11 @@ def download_images_by_page(self, keyword, page, imgs_count,
img_url_results=[])

self.check_captcha_and_get(YandexImagesDownloader.MAIN_URL,
params=self.get_url_params(page, keyword))
params=self.get_url_params_by_page(page, keyword))

response = self.get_response()

if not (response.reason == "OK"):
if not (response.reason.lower() == "ok"):
page_result.status = "fail"
page_result.message = (f"Page response is not ok."
f" page: {page},",
Expand All @@ -288,7 +334,7 @@ def download_images_by_page(self, keyword, page, imgs_count,
json.loads(item.attrs["data-bem"])["serp-item"]
for item in tag_sepr_item
]
img_hrefs = [key["img_href"] for key in serp_items]
img_hrefs = [key["img_href"] for key in serp_items if not (key["img_href"] in self.block_url_list)]

errors_count = 0
for img_url in img_hrefs:
Expand Down Expand Up @@ -328,14 +374,11 @@ def download_images_by_keyword(self, keyword,
errors_count=None,
page_results=[])

self.check_captcha_and_get(YandexImagesDownloader.MAIN_URL,
params={
'text': keyword,
"nomisspell": 1
})
self.check_captcha_and_get(YandexImagesDownloader.MAIN_URL, self.get_url_params_by_keyword(keyword))

response = self.get_response()

if not (response.reason == "OK"):
if not (response.reason.lower() == "ok"):
keyword_result = "fail"
keyword_result.message = (
"Failed to fetch a search page."
Expand Down Expand Up @@ -390,6 +433,13 @@ def download_images_by_keyword(self, keyword,

return keyword_result

def get_subdirectory(self, keyword):
subdirectory = keyword
if self.is_url(keyword):
import hashlib
subdirectory = hashlib.sha256(keyword.encode('utf-8')).hexdigest()
return subdirectory

def download_images(self, keywords: List[str]) -> DownloaderResult:
dowloader_result = DownloaderResult(status=None,
message=None,
Expand All @@ -398,10 +448,14 @@ def download_images(self, keywords: List[str]) -> DownloaderResult:
dowloader_result.status = "fail"

for keyword in keywords:
logging.info(f"Downloading images for {keyword}...")
if keyword in self.block_keyword_list:
logging.info(f"Skip images for {keyword}...")
continue
else:
logging.info(f"Downloading images for {keyword}...")

keyword_result = self.download_images_by_keyword(
keyword, sub_directory=keyword)
keyword, sub_directory=self.get_subdirectory(keyword))
dowloader_result.keyword_results.append(keyword_result)

logging.info(keyword_result.message)
Expand All @@ -419,6 +473,7 @@ def check_captcha_and_get(self, url, params=None):
If there is captcha, you have to type it in input() or quit."""

url_with_params = f"{url}?{urlencode(params)}"
self.url_with_params = url_with_params

del self.driver.requests
self.driver.get(url_with_params)
Expand Down
Loading