Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX: Replaced deprecated OAuth1.a with OAuth 2.0 #142

Merged
merged 3 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions login_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import urllib
import json
import logging
from typing import Optional

from osm_access import osm_auth
from osc_api_gateway import OSCApi
from osc_api_gateway import OSCUser
Expand Down Expand Up @@ -31,7 +33,7 @@ class LoginController:
def __init__(self, sub_domain: OSCAPISubDomain):
self.osc_api = OSCApi(sub_domain)
self.handle_retry_count = 0
self.user: OSCUser = None
self.user: Optional[OSCUser] = None
self.osm_token = ""
self.osm_token_secret = ""

Expand Down Expand Up @@ -77,7 +79,7 @@ def login(self) -> OSCUser:

self.__persist_login(osc_user=osc_user)
self.user = osc_user

LOGGER.info("Greetings, %s!", self.user.name)
return osc_user

@classmethod
Expand Down Expand Up @@ -108,9 +110,9 @@ def __handle_osm_auth_error(self, error: Exception):
@classmethod
def __prompt_user_for_login(cls, osm_url: str):
LOGGER.warning("")
LOGGER.warning('For login go to this URL in your browser:')
LOGGER.warning(osm_url)
LOGGER.warning((input("Login and grant access then press ENTER")))
return input(f"1. Login by opening this URL in your browser:\n\t{osm_url}\n"
f"2. Copy, paste the 'Authorization code' from the browser in console\n"
f"3. Press ENTER\n")

@classmethod
def __handle_error_on_authorization(cls, exception: Exception):
Expand Down
33 changes: 21 additions & 12 deletions osm_access.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,37 @@
"""This script is used to get osm authentication token and secret token."""
from typing import Tuple, Optional

from requests_oauthlib import OAuth1Session
from requests_oauthlib import OAuth2Session
from oauthlib.oauth2 import OAuth2Token

OSM_AUTH_URL = 'https://www.openstreetmap.org/oauth2/authorize'
OSM_TOKEN_URL = 'https://www.openstreetmap.org/oauth2/token'

def __osm_auth_service() -> OAuth1Session:
# Credentials dedicated for GitHub community
CLIENT_ID = 'xCDlXoN-gVeXsXMHu8N5VArN4iWBDwuMgZAlf5PlC7c'
CLIENT_SECRET = "UICSTaxQkQsSl-osmcbqd5CXJIak5fvw9BF_F152BeE"


def __osm_auth_service() -> OAuth2Session:
"""Factory method that builds osm auth service"""
oauth = OAuth1Session(client_key='rBWV8Eaottv44tXfdLofdNvVemHOL62Lsutpb9tw',
client_secret='rpmeZIp49sEjjcz91X9dsY0vD1PpEduixuPy8T6S')
oauth = OAuth2Session(client_id=CLIENT_ID,
redirect_uri="urn:ietf:wg:oauth:2.0:oob",
scope=["openid", "read_prefs"])
return oauth


def osm_auth(request_user_action, error_handle) -> Tuple[Optional[str], Optional[str]]:
service = __osm_auth_service()
service.fetch_request_token('https://www.openstreetmap.org/oauth/request_token')
authorization_url = service.authorization_url('https://www.openstreetmap.org/oauth/authorize')
request_user_action(authorization_url)
authorization_url, _ = service.authorization_url(OSM_AUTH_URL)
response = request_user_action(authorization_url)
# pylint: disable=W0703
try:
access_token_url = 'https://www.openstreetmap.org/oauth/access_token'
access_token_response = service.fetch_access_token(access_token_url,
verifier=" ")
access_token_response: OAuth2Token = service.fetch_token(OSM_TOKEN_URL,
code=response,
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET)

return access_token_response.get("access_token"), CLIENT_SECRET
except Exception as ex:
error_handle(ex)
return None, None

return access_token_response['oauth_token'], access_token_response['oauth_token_secret']
16 changes: 8 additions & 8 deletions parsers/custom_data_parsers/custom_mapillary.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def _gps_timestamp(cls, gps_data: Dict[str, IfdTag]) -> Optional[float]:
description = str(gps_data[ExifTags.DESCRIPTION.value])
try:
description = json.loads(description)
except json.JSONDecodeError as e:
except json.JSONDecodeError as _:
return None
mapillary_timestamp = description.get("MAPCaptureTime", None) # 2021_07_24_14_24_04_000
if mapillary_timestamp is not None:
Expand All @@ -34,7 +34,7 @@ def _gps_latitude(cls, gps_data: Dict[str, IfdTag]) -> Optional[float]:
description = str(gps_data[ExifTags.DESCRIPTION.value])
try:
description = json.loads(description)
except json.JSONDecodeError as e:
except json.JSONDecodeError as _:
return None
# mapillary latitude exists return it or None
latitude = description.get("MAPLatitude", None)
Expand All @@ -47,7 +47,7 @@ def _gps_longitude(cls, gps_data: Dict[str, IfdTag]) -> Optional[float]:
description = str(gps_data[ExifTags.DESCRIPTION.value])
try:
description = json.loads(description)
except json.JSONDecodeError as e:
except json.JSONDecodeError as _:
return None
# mapillary longitude exists return it or None
latitude = description.get("MAPLongitude", None)
Expand All @@ -60,7 +60,7 @@ def _gps_compass(cls, gps_data: Dict[str, IfdTag]) -> Optional[float]:
description = str(gps_data[ExifTags.DESCRIPTION.value])
try:
description = json.loads(description)
except json.JSONDecodeError as e:
except json.JSONDecodeError as _:
return None
# mapillary compass exists return it or None
compass_dict = description.get("MAPCompassHeading", {})
Expand All @@ -74,7 +74,7 @@ def _gps_altitude(cls, gps_data: Dict[str, IfdTag]) -> Optional[float]:
description = str(gps_data[ExifTags.DESCRIPTION.value])
try:
description = json.loads(description)
except json.JSONDecodeError as e:
except json.JSONDecodeError as _:
return None
# mapillary altitude exists return it or None
altitude = description.get("MAPAltitude", None)
Expand All @@ -87,7 +87,7 @@ def _gps_speed(cls, gps_data: Dict[str, IfdTag]) -> Optional[float]:
description = str(gps_data[ExifTags.DESCRIPTION.value])
try:
description = json.loads(description)
except json.JSONDecodeError as e:
except json.JSONDecodeError as _:
return None
# mapillary speed exists return it or None
speed = description.get("MAPGPSSpeed", None)
Expand All @@ -100,7 +100,7 @@ def _device_model(cls, gps_data: Dict[str, IfdTag]) -> Optional[float]:
description = str(gps_data[ExifTags.DESCRIPTION.value])
try:
description = json.loads(description)
except json.JSONDecodeError as e:
except json.JSONDecodeError as _:
return None
# mapillary device model exists return it or None
speed = description.get("MAPDeviceModel", None)
Expand All @@ -113,7 +113,7 @@ def _device_make(cls, gps_data: Dict[str, IfdTag]) -> Optional[float]:
description = str(gps_data[ExifTags.DESCRIPTION.value])
try:
description = json.loads(description)
except json.JSONDecodeError as e:
except json.JSONDecodeError as _:
return None
# mapillary device make exists return it or None
make = description.get("MAPDeviceMake", None)
Expand Down
6 changes: 3 additions & 3 deletions parsers/osc_metadata/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,16 @@ def _configure_headers(self):
self.header_line = metadata_file.readline()
line = metadata_file.readline()
if "HEADER" not in line:
return None
return

# find the definition lines
line = metadata_file.readline()
while line and "BODY" not in line:
if "ALIAS:" not in line:
return None
return
alias_line_elements = line.split(":")
if ";" not in alias_line_elements[1]:
return None
return

definition = SensorItemDefinition.definition_from_row(line)
self._alias_definitions[definition.alias] = definition
Expand Down
9 changes: 6 additions & 3 deletions parsers/xmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,22 +145,25 @@ def serialize(self):
with self._storage.open(self.file_path, "rb") as image:
data = image.read()
start = data.find(b'\xff\xe1')
height = 0
width = 0
for item in self._sensors:
if isinstance(item, ExifParameters):
height = item.height
width = item.width
# print(str(hex_val) + xmp_header)
# pylint: disable=C0301
xmp_header = '''<x:xmpmeta xmlns:x="adobe:ns:meta/" x:xmptk="Adobe XMP Core 5.1.0-jc003">\n<rdf:RDF xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#">\n<rdf:Description rdf:about="" xmlns:GPano="http://ns.google.com/photos/1.0/panorama/">\n<GPano:UsePanoramaViewer>True</GPano:UsePanoramaViewer>\n<GPano:ProjectionType>equirectangular</GPano:ProjectionType>\n<GPano:PoseHeadingDegrees>0.0</GPano:PoseHeadingDegrees>\n<GPano:PosePitchDegrees>0.0</GPano:PosePitchDegrees>\n<GPano:PoseRollDegrees>0.0</GPano:PoseRollDegrees>\n<GPano:InitialViewHeadingDegrees>0.0</GPano:InitialViewHeadingDegrees>\n<GPano:InitialViewPitchDegrees>0.0</GPano:InitialViewPitchDegrees>\n<GPano:InitialViewRollDegrees>0.0</GPano:InitialViewRollDegrees>\n<GPano:InitialHorizontalFOVDegrees>0.0</GPano:InitialHorizontalFOVDegrees>\n<GPano:InitialVerticalFOVDegrees>0.0</GPano:InitialVerticalFOVDegrees>\n<GPano:CroppedAreaLeftPixels>0</GPano:CroppedAreaLeftPixels>\n<GPano:CroppedAreaTopPixels>0</GPano:CroppedAreaTopPixels>\n<GPano:CroppedAreaImageWidthPixels>{imagewidth}</GPano:CroppedAreaImageWidthPixels>\n<GPano:CroppedAreaImageHeightPixels>{imageheight}</GPano:CroppedAreaImageHeightPixels>\n<GPano:FullPanoWidthPixels>{imagewidth}</GPano:FullPanoWidthPixels>\n<GPano:FullPanoHeightPixels>{imageheight}</GPano:FullPanoHeightPixels>\n</rdf:Description>\n</rdf:RDF>\n</x:xmpmeta>'''.format(imageheight=height, imagewidth=width)
xmp_header = '''<x:xmpmeta xmlns:x="adobe:ns:meta/" x:xmptk="Adobe XMP Core 5.1.0-jc003">\n<rdf:RDF xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#">\n<rdf:Description rdf:about="" xmlns:GPano="http://ns.google.com/photos/1.0/panorama/">\n<GPano:UsePanoramaViewer>True</GPano:UsePanoramaViewer>\n<GPano:ProjectionType>equirectangular</GPano:ProjectionType>\n<GPano:PoseHeadingDegrees>0.0</GPano:PoseHeadingDegrees>\n<GPano:PosePitchDegrees>0.0</GPano:PosePitchDegrees>\n<GPano:PoseRollDegrees>0.0</GPano:PoseRollDegrees>\n<GPano:InitialViewHeadingDegrees>0.0</GPano:InitialViewHeadingDegrees>\n<GPano:InitialViewPitchDegrees>0.0</GPano:InitialViewPitchDegrees>\n<GPano:InitialViewRollDegrees>0.0</GPano:InitialViewRollDegrees>\n<GPano:InitialHorizontalFOVDegrees>0.0</GPano:InitialHorizontalFOVDegrees>\n<GPano:InitialVerticalFOVDegrees>0.0</GPano:InitialVerticalFOVDegrees>\n<GPano:CroppedAreaLeftPixels>0</GPano:CroppedAreaLeftPixels>\n<GPano:CroppedAreaTopPixels>0</GPano:CroppedAreaTopPixels>\n<GPano:CroppedAreaImageWidthPixels>{imagewidth}</GPano:CroppedAreaImageWidthPixels>\n<GPano:CroppedAreaImageHeightPixels>{imageheight}</GPano:CroppedAreaImageHeightPixels>\n<GPano:FullPanoWidthPixels>{imagewidth}</GPano:FullPanoWidthPixels>\n<GPano:FullPanoHeightPixels>{imageheight}</GPano:FullPanoHeightPixels>\n</rdf:Description>\n</rdf:RDF>\n</x:xmpmeta>'''.format(
imageheight=height, imagewidth=width)
# pylint: enable=C0301
string_len = len(xmp_header.encode('utf-8')) + 2
xmp_header = b'\xff\xe1' + struct.pack('>h', string_len) + xmp_header.encode('utf-8')
if start == -1:
with self._storage.open(self.file_path, "wb") as out_image:
out_image.write(data[:2] + xmp_header + data[2:])
elif len(self.xmp_str) > 0:
NotImplementedError("Adding information to existing XMP header is currently "
"not supported")
raise NotImplementedError("Adding information to existing XMP header is currently "
"not supported")
else:
with self._storage.open(self.file_path, "wb") as out_image:
out_image.write(data[:start] + xmp_header + data[start:])
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ geojson~=2.5.0
requests-oauthlib


imagesize~=1.3.0
imagesize~=1.3.0
oauthlib~=3.2.2
Loading