From cfab53bf4a150fa9ede44cb67008b49aab482304 Mon Sep 17 00:00:00 2001 From: "bogdan.sala" Date: Fri, 28 Jun 2024 19:12:58 +0300 Subject: [PATCH 1/3] FIX: Replaced deprecated OAuth1.a with OAuth 2.0 authentication from OSM, created dedicated credentials for GitHub Community --- login_controller.py | 12 +++++++----- osm_access.py | 33 +++++++++++++++++++++------------ requirements.txt | 3 ++- 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/login_controller.py b/login_controller.py index c64cfdb..5e85e23 100644 --- a/login_controller.py +++ b/login_controller.py @@ -4,6 +4,8 @@ import urllib import json import logging +from typing import Optional + from osm_access import osm_auth from osc_api_gateway import OSCApi from osc_api_gateway import OSCUser @@ -31,7 +33,7 @@ class LoginController: def __init__(self, sub_domain: OSCAPISubDomain): self.osc_api = OSCApi(sub_domain) self.handle_retry_count = 0 - self.user: OSCUser = None + self.user: Optional[OSCUser] = None self.osm_token = "" self.osm_token_secret = "" @@ -77,7 +79,7 @@ def login(self) -> OSCUser: self.__persist_login(osc_user=osc_user) self.user = osc_user - + LOGGER.info(f"Greetings, {self.user.name}!") return osc_user @classmethod @@ -108,9 +110,9 @@ def __handle_osm_auth_error(self, error: Exception): @classmethod def __prompt_user_for_login(cls, osm_url: str): LOGGER.warning("") - LOGGER.warning('For login go to this URL in your browser:') - LOGGER.warning(osm_url) - LOGGER.warning((input("Login and grant access then press ENTER"))) + return input(f"1. Login by opening this URL in your browser:\n\t{osm_url}\n" + f"2. Copy, paste the 'Authorization code' from the browser in console\n" + f"3. Press ENTER\n") @classmethod def __handle_error_on_authorization(cls, exception: Exception): diff --git a/osm_access.py b/osm_access.py index 169e2d0..12b8c49 100644 --- a/osm_access.py +++ b/osm_access.py @@ -1,28 +1,37 @@ """This script is used to get osm authentication token and secret token.""" from typing import Tuple, Optional -from requests_oauthlib import OAuth1Session +from requests_oauthlib import OAuth2Session +from oauthlib.oauth2 import OAuth2Token +OSM_AUTH_URL = 'https://www.openstreetmap.org/oauth2/authorize' +OSM_TOKEN_URL = 'https://www.openstreetmap.org/oauth2/token' -def __osm_auth_service() -> OAuth1Session: +# Credentials dedicated for GitHub community +CLIENT_ID = 'xCDlXoN-gVeXsXMHu8N5VArN4iWBDwuMgZAlf5PlC7c' +CLIENT_SECRET = "UICSTaxQkQsSl-osmcbqd5CXJIak5fvw9BF_F152BeE" + + +def __osm_auth_service() -> OAuth2Session: """Factory method that builds osm auth service""" - oauth = OAuth1Session(client_key='rBWV8Eaottv44tXfdLofdNvVemHOL62Lsutpb9tw', - client_secret='rpmeZIp49sEjjcz91X9dsY0vD1PpEduixuPy8T6S') + oauth = OAuth2Session(client_id=CLIENT_ID, + redirect_uri="urn:ietf:wg:oauth:2.0:oob", + scope=["openid", "read_prefs"]) return oauth def osm_auth(request_user_action, error_handle) -> Tuple[Optional[str], Optional[str]]: service = __osm_auth_service() - service.fetch_request_token('https://www.openstreetmap.org/oauth/request_token') - authorization_url = service.authorization_url('https://www.openstreetmap.org/oauth/authorize') - request_user_action(authorization_url) + authorization_url, state = service.authorization_url(OSM_AUTH_URL) + response = request_user_action(authorization_url) # pylint: disable=W0703 try: - access_token_url = 'https://www.openstreetmap.org/oauth/access_token' - access_token_response = service.fetch_access_token(access_token_url, - verifier=" ") + access_token_response: OAuth2Token = service.fetch_token(OSM_TOKEN_URL, + code=response, + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET) + + return access_token_response.get("access_token"), CLIENT_SECRET except Exception as ex: error_handle(ex) return None, None - - return access_token_response['oauth_token'], access_token_response['oauth_token_secret'] diff --git a/requirements.txt b/requirements.txt index 45435f1..b8598e2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,5 @@ geojson~=2.5.0 requests-oauthlib -imagesize~=1.3.0 \ No newline at end of file +imagesize~=1.3.0 +oauthlib~=3.2.2 \ No newline at end of file From 24e4430949f86ac1cb5a377281dd6a23d831e1c7 Mon Sep 17 00:00:00 2001 From: "bogdan.sala" Date: Fri, 28 Jun 2024 20:02:18 +0300 Subject: [PATCH 2/3] CHORE: lint errors fixed --- login_controller.py | 2 +- osm_access.py | 2 +- parsers/custom_data_parsers/custom_mapillary.py | 16 ++++++++-------- parsers/osc_metadata/parser.py | 6 +++--- parsers/xmp.py | 7 ++++--- 5 files changed, 17 insertions(+), 16 deletions(-) diff --git a/login_controller.py b/login_controller.py index 5e85e23..dee40aa 100644 --- a/login_controller.py +++ b/login_controller.py @@ -79,7 +79,7 @@ def login(self) -> OSCUser: self.__persist_login(osc_user=osc_user) self.user = osc_user - LOGGER.info(f"Greetings, {self.user.name}!") + LOGGER.info("Greetings, %s!", self.user.name) return osc_user @classmethod diff --git a/osm_access.py b/osm_access.py index 12b8c49..d895a79 100644 --- a/osm_access.py +++ b/osm_access.py @@ -22,7 +22,7 @@ def __osm_auth_service() -> OAuth2Session: def osm_auth(request_user_action, error_handle) -> Tuple[Optional[str], Optional[str]]: service = __osm_auth_service() - authorization_url, state = service.authorization_url(OSM_AUTH_URL) + authorization_url, _ = service.authorization_url(OSM_AUTH_URL) response = request_user_action(authorization_url) # pylint: disable=W0703 try: diff --git a/parsers/custom_data_parsers/custom_mapillary.py b/parsers/custom_data_parsers/custom_mapillary.py index 0fe522b..174170e 100644 --- a/parsers/custom_data_parsers/custom_mapillary.py +++ b/parsers/custom_data_parsers/custom_mapillary.py @@ -20,7 +20,7 @@ def _gps_timestamp(cls, gps_data: Dict[str, IfdTag]) -> Optional[float]: description = str(gps_data[ExifTags.DESCRIPTION.value]) try: description = json.loads(description) - except json.JSONDecodeError as e: + except json.JSONDecodeError as _: return None mapillary_timestamp = description.get("MAPCaptureTime", None) # 2021_07_24_14_24_04_000 if mapillary_timestamp is not None: @@ -34,7 +34,7 @@ def _gps_latitude(cls, gps_data: Dict[str, IfdTag]) -> Optional[float]: description = str(gps_data[ExifTags.DESCRIPTION.value]) try: description = json.loads(description) - except json.JSONDecodeError as e: + except json.JSONDecodeError as _: return None # mapillary latitude exists return it or None latitude = description.get("MAPLatitude", None) @@ -47,7 +47,7 @@ def _gps_longitude(cls, gps_data: Dict[str, IfdTag]) -> Optional[float]: description = str(gps_data[ExifTags.DESCRIPTION.value]) try: description = json.loads(description) - except json.JSONDecodeError as e: + except json.JSONDecodeError as _: return None # mapillary longitude exists return it or None latitude = description.get("MAPLongitude", None) @@ -60,7 +60,7 @@ def _gps_compass(cls, gps_data: Dict[str, IfdTag]) -> Optional[float]: description = str(gps_data[ExifTags.DESCRIPTION.value]) try: description = json.loads(description) - except json.JSONDecodeError as e: + except json.JSONDecodeError as _: return None # mapillary compass exists return it or None compass_dict = description.get("MAPCompassHeading", {}) @@ -74,7 +74,7 @@ def _gps_altitude(cls, gps_data: Dict[str, IfdTag]) -> Optional[float]: description = str(gps_data[ExifTags.DESCRIPTION.value]) try: description = json.loads(description) - except json.JSONDecodeError as e: + except json.JSONDecodeError as _: return None # mapillary altitude exists return it or None altitude = description.get("MAPAltitude", None) @@ -87,7 +87,7 @@ def _gps_speed(cls, gps_data: Dict[str, IfdTag]) -> Optional[float]: description = str(gps_data[ExifTags.DESCRIPTION.value]) try: description = json.loads(description) - except json.JSONDecodeError as e: + except json.JSONDecodeError as _: return None # mapillary speed exists return it or None speed = description.get("MAPGPSSpeed", None) @@ -100,7 +100,7 @@ def _device_model(cls, gps_data: Dict[str, IfdTag]) -> Optional[float]: description = str(gps_data[ExifTags.DESCRIPTION.value]) try: description = json.loads(description) - except json.JSONDecodeError as e: + except json.JSONDecodeError as _: return None # mapillary device model exists return it or None speed = description.get("MAPDeviceModel", None) @@ -113,7 +113,7 @@ def _device_make(cls, gps_data: Dict[str, IfdTag]) -> Optional[float]: description = str(gps_data[ExifTags.DESCRIPTION.value]) try: description = json.loads(description) - except json.JSONDecodeError as e: + except json.JSONDecodeError as _: return None # mapillary device make exists return it or None make = description.get("MAPDeviceMake", None) diff --git a/parsers/osc_metadata/parser.py b/parsers/osc_metadata/parser.py index 7e3fd5b..1460fe8 100644 --- a/parsers/osc_metadata/parser.py +++ b/parsers/osc_metadata/parser.py @@ -123,16 +123,16 @@ def _configure_headers(self): self.header_line = metadata_file.readline() line = metadata_file.readline() if "HEADER" not in line: - return None + return # find the definition lines line = metadata_file.readline() while line and "BODY" not in line: if "ALIAS:" not in line: - return None + return alias_line_elements = line.split(":") if ";" not in alias_line_elements[1]: - return None + return definition = SensorItemDefinition.definition_from_row(line) self._alias_definitions[definition.alias] = definition diff --git a/parsers/xmp.py b/parsers/xmp.py index 84f30af..28faf8b 100644 --- a/parsers/xmp.py +++ b/parsers/xmp.py @@ -151,7 +151,8 @@ def serialize(self): width = item.width # print(str(hex_val) + xmp_header) # pylint: disable=C0301 - xmp_header = '''\n\n\nTrue\nequirectangular\n0.0\n0.0\n0.0\n0.0\n0.0\n0.0\n0.0\n0.0\n0\n0\n{imagewidth}\n{imageheight}\n{imagewidth}\n{imageheight}\n\n\n'''.format(imageheight=height, imagewidth=width) + xmp_header = '''\n\n\nTrue\nequirectangular\n0.0\n0.0\n0.0\n0.0\n0.0\n0.0\n0.0\n0.0\n0\n0\n{imagewidth}\n{imageheight}\n{imagewidth}\n{imageheight}\n\n\n'''.format( + imageheight=height, imagewidth=width) # pylint: enable=C0301 string_len = len(xmp_header.encode('utf-8')) + 2 xmp_header = b'\xff\xe1' + struct.pack('>h', string_len) + xmp_header.encode('utf-8') @@ -159,8 +160,8 @@ def serialize(self): with self._storage.open(self.file_path, "wb") as out_image: out_image.write(data[:2] + xmp_header + data[2:]) elif len(self.xmp_str) > 0: - NotImplementedError("Adding information to existing XMP header is currently " - "not supported") + raise NotImplementedError("Adding information to existing XMP header is currently " + "not supported") else: with self._storage.open(self.file_path, "wb") as out_image: out_image.write(data[:start] + xmp_header + data[start:]) From 63eb2b5f2f6206c5535a29b947d794c394b3eab2 Mon Sep 17 00:00:00 2001 From: "bogdan.sala" Date: Fri, 28 Jun 2024 20:05:14 +0300 Subject: [PATCH 3/3] CHORE: lint errors fixed --- parsers/xmp.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/parsers/xmp.py b/parsers/xmp.py index 28faf8b..8f5fd4b 100644 --- a/parsers/xmp.py +++ b/parsers/xmp.py @@ -145,6 +145,8 @@ def serialize(self): with self._storage.open(self.file_path, "rb") as image: data = image.read() start = data.find(b'\xff\xe1') + height = 0 + width = 0 for item in self._sensors: if isinstance(item, ExifParameters): height = item.height