From 7c101c210cb6d41a4091dbaedb0349a6f8d6087c Mon Sep 17 00:00:00 2001 From: "Josephine.Rutten" Date: Fri, 8 Nov 2024 10:32:46 +0100 Subject: [PATCH] small code improvements based on sonar cloud warnings --- src/cnaas_nms/db/device.py | 3 ++- src/cnaas_nms/db/git.py | 4 ++-- src/cnaas_nms/db/git_worktrees.py | 1 + src/cnaas_nms/devicehandler/init_device.py | 7 +++---- src/cnaas_nms/devicehandler/sync_devices.py | 5 ----- src/cnaas_nms/plugins/ni.py | 4 ++-- 6 files changed, 10 insertions(+), 14 deletions(-) diff --git a/src/cnaas_nms/db/device.py b/src/cnaas_nms/db/device.py index 5bc0557c..17310d63 100644 --- a/src/cnaas_nms/db/device.py +++ b/src/cnaas_nms/db/device.py @@ -9,6 +9,7 @@ from sqlalchemy import Boolean, DateTime, Enum, ForeignKey, Integer, String, Unicode, UniqueConstraint, event from sqlalchemy.orm import Mapped, mapped_column, relationship +from sqlalchemy.sql import func from sqlalchemy_utils import IPAddressType import cnaas_nms.db.base @@ -93,7 +94,7 @@ class Device(cnaas_nms.db.base.Base): state: Mapped[DeviceState] = mapped_column(Enum(DeviceState), nullable=False) # type: ignore device_type: Mapped[DeviceType] = mapped_column(Enum(DeviceType), nullable=False) confhash: Mapped[Optional[str]] = mapped_column(String(64)) # SHA256 = 64 characters - last_seen: Mapped[Optional[DateTime]] = mapped_column(DateTime, default=datetime.datetime.utcnow) + last_seen: Mapped[Optional[DateTime]] = mapped_column(DateTime, default=func.now(), onupdate=func.now()) port: Mapped[Optional[int]] = mapped_column(Integer) stack_members = relationship( "Stackmember", foreign_keys="[Stackmember.device_id]", lazy="subquery", back_populates="device" diff --git a/src/cnaas_nms/db/git.py b/src/cnaas_nms/db/git.py index 1ae4bfb9..8401d274 100644 --- a/src/cnaas_nms/db/git.py +++ b/src/cnaas_nms/db/git.py @@ -103,7 +103,7 @@ def refresh_repo(repo_type: RepoType = RepoType.TEMPLATES, scheduled_by: str = " result = _refresh_repo_task_settings(job_id=job_id) else: raise ValueError("Invalid repository") - job.finish_time = datetime.datetime.utcnow() # type: ignore + job.finish_time = datetime.datetime.now() job.status = JobStatus.FINISHED job.result = {"message": result, "repository": repo_type.name} try: @@ -120,7 +120,7 @@ def refresh_repo(repo_type: RepoType = RepoType.TEMPLATES, scheduled_by: str = " return result except Exception as e: logger.exception("Exception while scheduling job for refresh repo") - job.finish_time = datetime.datetime.utcnow() # type: ignore + job.finish_time = datetime.datetime.now() job.status = JobStatus.EXCEPTION job.result = {"error": str(e), "repository": repo_type.name} try: diff --git a/src/cnaas_nms/db/git_worktrees.py b/src/cnaas_nms/db/git_worktrees.py index 33f3952e..46fc8620 100644 --- a/src/cnaas_nms/db/git_worktrees.py +++ b/src/cnaas_nms/db/git_worktrees.py @@ -3,6 +3,7 @@ from typing import Optional import git.exc + from cnaas_nms.app_settings import app_settings from cnaas_nms.tools.log import get_logger from git import Repo diff --git a/src/cnaas_nms/devicehandler/init_device.py b/src/cnaas_nms/devicehandler/init_device.py index 7c24a17b..74a7350b 100644 --- a/src/cnaas_nms/devicehandler/init_device.py +++ b/src/cnaas_nms/devicehandler/init_device.py @@ -1,4 +1,3 @@ -import datetime import os from ipaddress import IPv4Address, IPv4Interface, ip_interface from typing import Any, List, Optional, Union @@ -12,6 +11,7 @@ from nornir_jinja2.plugins.tasks import template_file from nornir_napalm.plugins.tasks import napalm_configure, napalm_get from nornir_utils.plugins.functions import print_result +from sqlalchemy.sql import func import cnaas_nms.db.helper import cnaas_nms.devicehandler.get @@ -880,7 +880,7 @@ def init_device_step2( set_facts(dev, facts) management_ip = dev.management_ip dev.dhcp_ip = None - dev.last_seen = datetime.datetime.utcnow() # type: ignore + dev.last_seen = func.now() # type: ignore # Plugin hook: new managed device # Send: hostname , device type , serial , platform , vendor , model , os version @@ -925,7 +925,6 @@ def set_hostname_task(task, new_hostname: str): template="hostname.j2", jinja_env=get_jinja_env(f"{local_repo_path}/{task.host.platform}"), path=f"{local_repo_path}/{task.host.platform}", - # **template_vars, ) task.host["config"] = r.result task.run( @@ -971,7 +970,7 @@ def discover_device(ztp_mac: str, dhcp_ip: str, iteration: int, job_id: Optional dev.model = facts["model"][:64] dev.os_version = facts["os_version"][:64] dev.state = DeviceState.DISCOVERED - dev.last_seen = datetime.datetime.utcnow() # type: ignore + dev.last_seen = func.now() # type: ignore new_hostname = dev.hostname logger.info( f"Device with ztp_mac {ztp_mac} successfully scanned" diff --git a/src/cnaas_nms/devicehandler/sync_devices.py b/src/cnaas_nms/devicehandler/sync_devices.py index a3344543..5b098824 100644 --- a/src/cnaas_nms/devicehandler/sync_devices.py +++ b/src/cnaas_nms/devicehandler/sync_devices.py @@ -1,4 +1,3 @@ -import datetime import os import time from hashlib import sha256 @@ -817,7 +816,6 @@ def confirm_devices( dev = session.query(Device).filter(Device.hostname == host).one() dev.synchronized = True remove_sync_events(host) - dev.last_seen = datetime.datetime.utcnow() # type: ignore logger.info("Releasing lock for devices from syncto job: {} (in commit-job {})".format(prev_job_id, job_id)) Joblock.release_lock(session, job_id=prev_job_id) @@ -958,18 +956,15 @@ def sync_devices( if dev.synchronized: dev.synchronized = False add_sync_event(hostname, "syncto_dryrun", scheduled_by, job_id) - dev.last_seen = datetime.datetime.utcnow() # type: ignore # if next job will commit, that job will mark synchronized on success elif get_confirm_mode(confirm_mode_override) != 2: dev = session.query(Device).filter(Device.hostname == hostname).one() dev.synchronized = True remove_sync_events(hostname) - dev.last_seen = datetime.datetime.utcnow() # type: ignore for hostname in unchanged_hosts: dev = session.query(Device).filter(Device.hostname == hostname).one() dev.synchronized = True remove_sync_events(hostname) - dev.last_seen = datetime.datetime.utcnow() # type: ignore if not dry_run and get_confirm_mode(confirm_mode_override) != 2: if failed_hosts and get_confirm_mode(confirm_mode_override) == 1: logger.error( diff --git a/src/cnaas_nms/plugins/ni.py b/src/cnaas_nms/plugins/ni.py index 71b4108c..3b05dbbf 100644 --- a/src/cnaas_nms/plugins/ni.py +++ b/src/cnaas_nms/plugins/ni.py @@ -36,7 +36,7 @@ def new_managed_device(self, hostname, device_type, serial_number, vendor, model data: dict[str, dict[str, Any]] = {"node": {"operational_state": "In service"}} - res = requests.get(str(self.urlbase), headers=headers, verify=False) + res = requests.get(self.urlbase, headers=headers, verify=False) # type: ignore if not res.status_code == 200: logger.warn("Failed to fetch devices from NI: {}: {} ({})".format(res.status_code, res.text, data)) @@ -55,7 +55,7 @@ def new_managed_device(self, hostname, device_type, serial_number, vendor, model data["node"]["ip_addresses"] = [management_ip] handle_id = device["handle_id"] - res = requests.put(str(self.urlbase) + str(handle_id) + "/", headers=headers, json=data, verify=False) + res = requests.put(self.urlbase + str(handle_id) + "/", headers=headers, json=data, verify=False) # type: ignore if res.status_code != 204: logger.warn("Could not change device {} with ID {}.".format(hostname, handle_id))