Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Geopackage compare tool #263

Open
leendertvanwolfswinkel opened this issue Sep 26, 2024 · 1 comment
Open

Geopackage compare tool #263

leendertvanwolfswinkel opened this issue Sep 26, 2024 · 1 comment

Comments

@leendertvanwolfswinkel
Copy link
Collaborator

  • Migrate both to the latest schema version
  • Compare outputs as styled layers added to QGIS
@leendertvanwolfswinkel
Copy link
Collaborator Author

I already experimented a bit with this and wrote this wrapper around geodiff that outputs the changes as vector layers:

import csv
import pathlib
import tempfile
from typing import List, Dict

import pygeodiff
import base64
import json

from osgeo import ogr, osr
from shapely import wkb

ogr.UseExceptions()

WKT_OGR_GEOMETRY_TYPES = {
    "GEOMETRY": ogr.wkbUnknown,
    "NONE": ogr.wkbNone,
    "POINT": ogr.wkbPoint,
    "LINESTRING": ogr.wkbLineString,
    "POLYGON": ogr.wkbPolygon,
    "MULTIPOINT": ogr.wkbMultiPoint,
    "MULTILINESTRING": ogr.wkbMultiLineString,
    "MULTIPOLYGON": ogr.wkbMultiPolygon,
}

OGR_FIELD_TYPES = {
    "text": ogr.OFTString,
    "integer": ogr.OFTInteger,
    "double": ogr.OFTReal,
}


class Schema:
    def __init__(self, geodiff:pygeodiff.GeoDiff, geopackage_path: str | pathlib.Path):
        with tempfile.NamedTemporaryFile(delete=False) as temp_file:
            temp_schema_json_path = temp_file.name
        geodiff.schema(driver="sqlite", driver_info="", src=str(geopackage_path), json=temp_schema_json_path)
        with open(temp_schema_json_path, 'r') as schema_json:
            self._dict = json.load(schema_json)

    def _schema_for_table(self, table: str) -> Dict:
        for table_schema in self._dict["geodiff_schema"]:
            if table_schema["table"] == table:
                return table_schema

    def columns(self, table):
        table_schema = self._schema_for_table(table)
        return table_schema["columns"]

    def column_names(self, table: str, exclude_geometry: bool = False) -> List[str]:
        table_schema = self._schema_for_table(table)
        if exclude_geometry:
            return [
                column_schema["name"] for column_schema in table_schema["columns"]
                if column_schema.get("geometry") is None
            ]
        else:
            return [column_schema["name"] for column_schema in table_schema["columns"]]

    def geometry_type(self, table: str) -> Dict | None:
        """
        Returns None if table has no geometry
        """
        table_schema = self._schema_for_table(table)
        geometry_schema = None  # in case table has no columns and no geometry (is this possible?)
        for column_schema in table_schema.get("columns"):
            geometry_schema = column_schema.get("geometry")
            if geometry_schema:
                break
        if geometry_schema is None:
            return None
        else:
            return geometry_schema.get("type")

    def crs(self, table: str) -> Dict:
        """
        Returns None if table has no geometry_type
        """
        table_schema = self._schema_for_table(table)
        return table_schema.get('crs')

    def create_layer(
            self,
            table: str,
            datasource: ogr.DataSource,
            layer_name: str,
            fields: List[ogr.FieldDefn] = None
    ) -> ogr.Layer:
        wkt_geometry_type = self.geometry_type(table) or "NONE"
        ogr_geometry_type = WKT_OGR_GEOMETRY_TYPES[wkt_geometry_type]
        if wkt_geometry_type != "NONE":
            crs_schema = self.crs(table)
            srs = osr.SpatialReference()
            srs.ImportFromWkt(crs_schema.get("wkt"))
        else:
            srs = None
        layer = datasource.CreateLayer(layer_name, srs, ogr_geometry_type)  # Geometry type set to unknown
        if layer is None:
            raise RuntimeError(f"Failed to create layer '{table}' in the GeoPackage.")
        if fields:
            for field_defn in fields:
                layer.CreateField(field_defn)
        else:
            for column_schema in self.columns(table):
                if column_schema["type"] != "geometry":
                    ogr_field_type = OGR_FIELD_TYPES[column_schema["type"]]
                    layer.CreateField(ogr.FieldDefn(column_schema["name"], ogr_field_type))
        return layer


def get_geom(
        datasource_path: str | pathlib.Path,
        layer_name: str,
        feature_id: int,
        format: str,
) -> str:
    """
    Get the WKT (Well-Known Text) geometry from a specified feature in a layer of the given OGR datasource.

    Args:
        datasource (ogr.DataSource): The OGR data source (e.g., GeoPackage or Shapefile).
        layer_name (str): The name of the layer from which to get the feature.
        feature_id (int): The ID of the feature whose geometry should be retrieved.

    Returns:
        str: The WKT representation of the geometry.
    """
    datasource = ogr.Open(datasource_path)

    # Get the layer by name from the datasource
    layer = datasource.GetLayerByName(layer_name)
    if layer is None:
        raise ValueError(f"Layer '{layer_name}' not found in the datasource.")

    # Get the feature from the layer by its feature ID
    feature = layer.GetFeature(feature_id)
    if feature is None:
        raise ValueError(f"Feature with ID '{feature_id}' not found in layer '{layer_name}'.")

    # Get the geometry reference from the feature
    geometry = feature.GetGeometryRef()
    if geometry is None:
        raise ValueError(f"Feature with ID '{feature_id}' does not have a valid geometry.")

    if format == "wkt":
        return geometry.ExportToWkt()

    elif format == "wkb":
        return geometry.ExportToWkb()

    else:
        raise ValueError("format must be one of 'wkt', 'wkb'")

class GeoDiffer:
    """Wrapper around pygeodiff.GeoDiff. Adds some utilities like exporting changesets to GIS formats"""

    def __init__(
            self,
            geodiff: pygeodiff.GeoDiff,
            base: str | pathlib.Path,
            modified: str | pathlib.Path
    ):
        """

        :param geodiff: geodiff for which to parse the changes
        :param base: Path to base geopackage
        :param modified: Path to modified geopackage
        """
        self.geodiff = geodiff
        self.updates = dict()
        self.deletes = dict()
        self.inserts = dict()
        self.base = base
        self.base_schema = Schema(geodiff=self.geodiff, geopackage_path=base)
        self.modified = modified
        self.modified_schema = Schema(geodiff=self.geodiff, geopackage_path=modified)
        self._changeset_path = None
        self.geodiff_dict = None

    @property
    def changeset_path(self):
        """
        Path to the changeset (.diff) file for the GeoDiff.
        Returns None if create_changeset() has not been run yet
        """
        return self._changeset_path

    def create_changeset(self):
        """Sets self._changeset_path"""
        with tempfile.NamedTemporaryFile(delete=False) as temp_file:
            self._changeset_path = temp_file.name
        self.geodiff.create_changeset(
            base=str(self.base),
            modified=str(self.modified),
            changeset=self._changeset_path
        )

    def list_changes(self):
        with tempfile.NamedTemporaryFile(delete=False) as temp_file:
            temp_json_path = temp_file.name
        self.geodiff.list_changes(changeset=str(self._changeset_path), json=temp_json_path)
        with open(temp_json_path, 'r') as json_file:
            self.geodiff_dict = json.load(json_file)

    def export_geopackage_geometry(self, geom: str, export_format: str):
        """
        Export a geopackage geometry to WKT or WKB

        :param geom: string representation of a base64-encoded wkb geometry with geopackage headers
        :param export_format: 'wkt' or 'wkb'
        """
        binary_geometry = base64.b64decode(geom)
        wkb_geom = self.geodiff.create_wkb_from_gpkg_header(binary_geometry)
        if export_format == 'wkb':
            return wkb_geom
        elif export_format == 'wkt':
            shapely_geom = wkb.loads(wkb_geom)
            return shapely_geom.wkt
        else:
            raise ValueError("export_format must be one of 'wkt', 'wkb'")

    def parse(self):
        for change_set in self.geodiff_dict["geodiff"]:
            if change_set["type"] == "update":
                self.parse_updated(change_set)
            elif change_set["type"] == "delete":
                self.parse_deleted(change_set)
            elif change_set["type"] == "insert":
                self.parse_inserted(change_set)

    def parse_updated(self, change_set: Dict):
        """
        Create a {table_name: [changes]} dict.
        Each change in [changes] is a dict with:
            - 'primary_key': <primary key>
            - 'geometry': <geometry (before update)>
            - 'changes': a newline-separated list of "<column>: <old value> -> <new value>" strings
        """
        table = change_set["table"]
        if self.updates.get(table) is None:
            self.updates[table] = []
        primary_key = change_set["changes"][0]["old"]
        geom = get_geom(
                datasource_path=self.base,
                layer_name=table,
                feature_id=primary_key,
                format="wkb"
            ) if self.base_schema.geometry_type(table) else None
        changes = []
        columns = self.base_schema.columns(table)
        for change in change_set["changes"][1:]:
            column_schema = columns[change['column']]
            changes.append(f"{column_schema['name']}: {change['old']} -> {change['new']}")
        changes_str = "\n".join(changes)
        self.updates[table].append(
            {
                "primary_key": primary_key,
                "geometry": geom,
                "changes": changes_str
            }
        )

    def parse_deleted(self, change_set: Dict):
        """
        Adds a {"geometry": <geometry>, "values": [<values>]} dict to the self.deletes[table] list
        """
        table = change_set["table"]
        if self.deletes.get(table) is None:
            self.deletes[table] = []
        values = []
        geometry = None
        columns = self.base_schema.columns(table)
        for change in change_set["changes"]:
            column_schema = columns[change['column']]
            if column_schema["type"] == "geometry":
                geometry = self.export_geopackage_geometry(change["old"], "wkb")
            else:
                values.append(change["old"])
        self.deletes[table].append({"values": values, "geometry": geometry})

    def parse_inserted(self, change_set: Dict):
        """
        Adds a {"geometry": <geometry>, "values": [<values>]} dict to the self.inserts[table] list
        """
        table = change_set["table"]
        if self.inserts.get(table) is None:
            self.inserts[table] = []
        values = []
        geometry = None
        columns = self.modified_schema.columns(table)
        for change in change_set["changes"]:
            column_schema = columns[change['column']]
            if column_schema["type"] == "geometry":
                geometry = self.export_geopackage_geometry(change["new"], "wkb")
            else:
                values.append(change["new"])
        self.inserts[table].append({"values": values, "geometry": geometry})

    def to_ogr(self, path: pathlib.Path, driver_name: str):
        """
        Write differences to a gis vector file or directory of files

        :param path: interpreted as a directory if driver does not support multiple layers
        :param driver_name: a driver short name (see https://gdal.org/en/latest/drivers/vector/index.html)
        """
        driver = ogr.GetDriverByName(driver_name)

        if driver is None:
            raise ValueError(f"Driver '{driver_name}' not found.")
        else:
            multi_layer_support = driver.GetMetadataItem("DCAP_MULTIPLE_VECTOR_LAYERS") == 'YES'
        if multi_layer_support:
            if path.exists():
                datasource = driver.Open(str(path), 1)  # Open in update mode
            else:
                datasource = driver.CreateDataSource(str(path))
            if datasource is None:
                raise RuntimeError(f"Could not create GeoPackage at {path}")
        else:
            path.mkdir(parents=True, exist_ok=True)

        # updated
        for table, rows in self.updates.items():
            print(f"{table} has {len(rows)} updated rows")
            layer_name = table + "_updated"
            if not multi_layer_support:
                file_extension = driver.GetMetadataItem("DMD_EXTENSIONS").split(" ")[0]
                datasource = driver.CreateDataSource((str(path / layer_name) + f".{file_extension}"))
            fields = [
                ogr.FieldDefn("primary_key", ogr.OFTInteger),
                ogr.FieldDefn("changes", ogr.OFTString)
            ]
            layer = self.base_schema.create_layer(
                table=table,
                datasource=datasource,
                layer_name=layer_name,
                fields=fields
            )
            layer_defn = layer.GetLayerDefn()
            for row in rows:
                feature = ogr.Feature(layer_defn)
                geom_wkb = row.get("geometry")
                if geom_wkb:
                    geom = ogr.CreateGeometryFromWkb(geom_wkb)
                    feature.SetGeometry(geom)

                feature.SetField("primary_key", row.get("primary_key"))
                feature.SetField("changes", row.get("changes"))
                layer.CreateFeature(feature)

                # Destroy the feature to free resources
                feature = None

        # deleted
        for table, rows in self.deletes.items():
            print(f"{table} has {len(rows)} deleted rows")
            layer_name = table + "_deleted"
            if not multi_layer_support:
                file_extension = driver.GetMetadataItem("DMD_EXTENSIONS").split(" ")[0]
                datasource = driver.CreateDataSource((str(path / layer_name) + f".{file_extension}"))
            layer = self.base_schema.create_layer(
                table=table,
                datasource=datasource,
                layer_name=layer_name
            )
            layer_defn = layer.GetLayerDefn()
            column_names = self.base_schema.column_names(table, exclude_geometry=True)
            for row in rows:
                feature = ogr.Feature(layer_defn)
                geom_wkb = row.get("geometry")
                if geom_wkb:
                    geom = ogr.CreateGeometryFromWkb(geom_wkb)
                    feature.SetGeometry(geom)

                for i, value in enumerate(row["values"]):
                    feature.SetField(column_names[i], value)

                layer.CreateFeature(feature)

                # Destroy the feature to free resources
                feature = None

        # inserted
        for table, rows in self.inserts.items():
            print(f"{table} has {len(rows)} inserted rows")
            layer_name = table + "_inserted"
            if not multi_layer_support:
                file_extension = driver.GetMetadataItem("DMD_EXTENSIONS").split(" ")[0]
                datasource = driver.CreateDataSource((str(path / layer_name) + f".{file_extension}"))
            layer = self.base_schema.create_layer(
                table=table,
                datasource=datasource,
                layer_name=layer_name
            )
            layer_defn = layer.GetLayerDefn()
            column_names = self.modified_schema.column_names(table, exclude_geometry=True)
            for row in rows:
                feature = ogr.Feature(layer_defn)
                geom_wkb = row.get("geometry")
                if geom_wkb:
                    geom = ogr.CreateGeometryFromWkb(geom_wkb)
                    feature.SetGeometry(geom)

                for i, value in enumerate(row["values"]):
                    feature.SetField(column_names[i], value)

                layer.CreateFeature(feature)

                # Destroy the feature to free resources
                feature = None


if __name__ == "__main__":
    DATA_DIR = pathlib.Path("diff_data")

    # Define scenario's
    original = DATA_DIR / "pipes_ori.gpkg"
    edit = DATA_DIR / "pipes_edit.gpkg"

    my_geodiffer = GeoDiffer(pygeodiff.GeoDiff(), base=original, modified=edit)

    my_geodiffer.create_changeset()
    my_geodiffer.list_changes()
    my_geodiffer.parse()
    # my_geodiffer.to_ogr(DATA_DIR / "test.gpkg", driver_name="GPKG")
    my_geodiffer.to_ogr(DATA_DIR / "geojson_output", driver_name="GeoJSON")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant