Skip to content

Commit

Permalink
mvp logging demo
Browse files Browse the repository at this point in the history
  • Loading branch information
Niels Hoffmann committed Nov 27, 2024
1 parent f6f660f commit 682805e
Show file tree
Hide file tree
Showing 10 changed files with 1,032 additions and 0 deletions.
378 changes: 378 additions & 0 deletions mvp_pygeoapi_logging_demo/client/client.ipynb

Large diffs are not rendered by default.

32 changes: 32 additions & 0 deletions mvp_pygeoapi_logging_demo/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# MVP Pygeoapi voorbeeld met Opentelemetry logging

In deze folder staat een MVP demo met een pygeoapi server die via het opentelemetry protocol logt naar een OTLP Collector.

## server

Met het commando `docker compose up` worden 2 containers gestart.

Beide containers loggen naar de commandline (dus de containers niet 'detached' opstarten want dan zie je niks gebeuren)

Als de containers gestart zijn is pygeoapi beschikbaar op http://localhost:5000

De opentelemetry collector logt alleen naar de commandline.

### config

- otel-collector-config.yaml
minimale configuratie van een opentelemetry collector om traces te ontvangen van pygeoapi en naar de console te loggen

- pygeoapi.config.yaml
de benodigde configuratie van de pygeoapi server

- docker-compose.yml
de orchestratie van de docker containers.
- de pygeoapi container is gebaseerd op het base image van pygeo, maar aangevuld met python libraries uit de requirements.txt
- de pygeoapi container krijgt de /data en /plugin folders mee waar de data en de functies staan

## client

Via een Jupyter notebook kunnen de beschikbare OGC API Processes functies aangeroepen worden.

Hiervoor is een python omgeving nodig met jupyter notebook, requests, geopandas en eventueel folium.
12 changes: 12 additions & 0 deletions mvp_pygeoapi_logging_demo/server/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# docker file voor ogc wps (pygeopapi) plus opentelemetry
#

FROM geopython/pygeoapi:latest

WORKDIR /pygeoapi

LABEL author='Niels Hoffmann'
LABEL name='ogc api and opentelemetry'

ADD requirements.txt /pygeoapi/
RUN python3 -m pip install --no-cache-dir -r requirements.txt
83 changes: 83 additions & 0 deletions mvp_pygeoapi_logging_demo/server/data/knmi_meetstations.geojson

Large diffs are not rendered by default.

31 changes: 31 additions & 0 deletions mvp_pygeoapi_logging_demo/server/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@


services:
pygeoapi:
build: .
# image: pygeo-opentel:latest

container_name: pygeoapi_otel

ports:
- 5000:80

volumes:
- ./pygeoapi.config.yml:/pygeoapi/local.config.yml
- ./data:/data
- ./plugins/process/squared.py:/pygeoapi/pygeoapi/process/squared.py
- ./plugins/process/localoutlier.py:/pygeoapi/pygeoapi/process/localoutlier.py

collector:
image: otel/opentelemetry-collector:latest
command: ["--config=/etc/otel-collector-config.yaml"]
ports:
- 4318:4318

volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml

networks:
default:
name: pygeoapi_otel
driver: bridge
31 changes: 31 additions & 0 deletions mvp_pygeoapi_logging_demo/server/otel-collector-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
receivers:
otlp:
protocols:
# grpc:
# endpoint: 0.0.0.0:4317
http:
endpoint: "0.0.0.0:4318"
cors:
allowed_origins:
- "http://*"
- "https://*"
exporters:
# NOTE: Prior to v0.86.0 use `logging` instead of `debug`.
debug:
verbosity: detailed
processors:
batch:
service:
pipelines:
traces:
receivers: [otlp]
exporters: [debug]
processors: [batch]
metrics:
receivers: [otlp]
exporters: [debug]
processors: [batch]
logs:
receivers: [otlp]
exporters: [debug]
processors: [batch]
175 changes: 175 additions & 0 deletions mvp_pygeoapi_logging_demo/server/plugins/process/localoutlier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
from os import makedirs, path
from logging import getLogger


from pygeoapi.process.base import BaseProcessor, ProcessorExecuteError

#additional imports for opentelemetry
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter
)

import geopandas as gpd
from numpy import vstack
from sklearn.neighbors import LocalOutlierFactor

LOGGER = getLogger(__name__)

PROCESS_METADATA = {
'version': '0.1',
'id': 'localoutlier',
'title': 'Local outlier factor (LOF)',
'description': 'The local outlier factor (LOF) algorithm computes a score indicating the degree of abnormality of each input (observation), in a set of such observations. It measures the local density deviation of a given data point with respect to its neighbors. It considers as outliers the samples that have a substantially lower density than their neighbors.',
'keywords': ['local outliter factor', 'LOF', 'outlier detection'],
"jobControlOptions": [
"sync-execute"
],
'links': [{
'type': 'text/html',
'rel': 'canonical',
'title': 'information',
'href': 'https://scikit-learn.org/stable/modules/outlier_detection.html#local-outlier-factor',
'hreflang': 'en-US'
}],
'inputs':{
'dataset':{
'title': 'Dataset',
'description': 'geojson dataset of points, in one CRS, for which LOF scores should be computed. ',
"schema": { "type": "string", "format": "url" },
'minOccurs': 1,
'maxOccurs': 1,
'keywords': ['geojson ogc api features', 'point data']
},
'n_neighbors':{
'title': 'Number of neighbors',
'description': 'Number of neighbors to use by default for `kneighbors` queries. If `n_neighbors` is larger than the number of samples provided, all samples will be used.',
'minOccurs': 0,
'maxOccurs': 1,
'schema': {
'oneOf': ['integer'],
'defaultValue': 20,
}
},
'leaf_size':{
'title': 'Leaf size',
'description': 'Leaf size passed to BallTree or KDTree. This can affect the speed of the construction and query, as well as the memory required to store the tree. The optimal value depends on the nature of the problem.',
'minOccurs': 0,
'maxOccurs': 1,
'schema': {
'oneOf': ['integer'],
'defaultValue': 30,
},
}
},
'output_column':{
'title': 'Output column name',
'description': 'Name of the column in which to store output metric. If this column exists, an error will be thrown',
'minOccurs': 0,
'maxOccurs': 1,
'schema': {
'oneOf': ['string'],
'defaultValue': 'abnormality',
},
},
'outputs': {
'output_dataset':{
'title': 'Output Dataset',
'description': 'output',
'schema': {
'type': 'object',
'contentMediaType': 'application/json'
}
},
},
'example': {}
}

# Service name is required for most backends
resource = Resource(attributes={
SERVICE_NAME: "pygeoapi.process.localoutlier.LOFProcessor"
})

provider = TracerProvider(resource=resource)
# processor = BatchSpanProcessor(ConsoleSpanExporter())
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://collector:4318/v1/traces"))
provider.add_span_processor(processor)

# Sets the global default tracer provider
trace.set_tracer_provider(provider)

# Creates a tracer from the global tracer provider
tracer = trace.get_tracer("my.tracer.name")

# Parameters that are NOT passed directly to sklearn.neighbors.LocalOutlierFactor
LOF_OMIT = ['training_dataset', 'dataset', 'output_column']

class LOFProcessor(BaseProcessor):
"""Local outlier factor (LOF) processor"""

def __init__(self, processor_def):
"""
Initialize object
:param processor_def: provider definition
:returns: pygeoapi.process.localoutlier.LOFProcessor
"""

super().__init__(processor_def, PROCESS_METADATA)

def execute(self, data):
with tracer.start_as_current_span("LocalOutlierFactor") as span: #parent
# create a parent log record
span.set_attribute("dpl.core.processing_activity_id", "http://localhost:5000/processes/localoutlier")
span.set_attribute("dpl.core.data_subject_id", 'not_set')
span.set_status("Ok") # does not work yet

data['p'] = int(data.get('p', 2))
data['leaf_size'] = int(data.get('leaf_size', 30))
data['n_neighbors'] = int(data.get('n_neighbors', 20))
colName = data.get('output_column', 'abnormality')
dataset = data.get("dataset")

if dataset is None:
raise ProcessorExecuteError('Cannot process without input dataset')

#setup the sklearn classifier
clf = LocalOutlierFactor(
novelty=False,
**{k:v for k,v in data.items()
if k not in LOF_OMIT}
)
predictMethod = clf.fit_predict

gdf = gpd.read_file(dataset)
X = vstack([gdf.geometry.x, gdf.geometry.y]).T

#perform the actual classification
y_pred = predictMethod(X)
if colName in gdf.columns:
raise Exception(f'{colName} exists in input and will not be overwritten')
gdf[colName] = y_pred

#loop through dataframe to create a logrecord for each object referring to the parent operation
for row in gdf.itertuples():
# Create a nested span to track nested work
with tracer.start_as_current_span("LocalOutlierFactor_items") as cs: #child
cs.set_attribute("dpl.core.data_subject_id", row.STN)

#timestamp does not serialize properly to json, so for now do a subset as workaround
gdf_out = gdf[['STN','TYPE','geometry','abnormality']]
mimetype = 'application/geo+json'
outputs = {
'id': 'output_dataset',
'value': gdf_out.to_json()
}

return mimetype, outputs

def __repr__(self):
return '<LOFProcessor> {}'.format(self.name)
Loading

0 comments on commit 682805e

Please sign in to comment.