-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
4032798
commit a741352
Showing
18 changed files
with
443 additions
and
5 deletions.
There are no files selected for viewing
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
name: Run lint & unit tests | ||
|
||
on: | ||
push: | ||
branches: [ "main" ] | ||
pull_request: | ||
branches: [ "main" ] | ||
|
||
jobs: | ||
build: | ||
name: "Run unit tests" | ||
runs-on: ubuntu-latest | ||
|
||
steps: | ||
- uses: actions/checkout@v3 | ||
|
||
- name: Install dependencies | ||
run: pip install -r requirements-dev.txt | ||
|
||
- name: Run lint | ||
run: | | ||
make lint-check | ||
- name: Run unit tests | ||
run: make test |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,8 @@ | ||
module.tar.gz: requirements.txt *.sh src/*.py | ||
tar czf module.tar.gz $^ | ||
build: | ||
./build.sh | ||
|
||
test: | ||
PYTHONPATH=./src pytest | ||
|
||
dist/archive.tar.gz: | ||
tar -czvf dist/archive.tar.gz dist/main |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
pillow == 10.0.1 | ||
viam-sdk | ||
viam-sdk | ||
pyinstaller | ||
opencv-python == 4.9.0.80 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,263 @@ | ||
from io import BytesIO | ||
from typing import ClassVar, List, Mapping, Sequence, Any, Dict, Optional, Union, cast | ||
from typing_extensions import Self | ||
|
||
from viam.components.camera import Camera | ||
from viam.media.video import ViamImage, CameraMimeType | ||
from viam.media.utils import pil | ||
from viam.proto.service.vision import Classification, Detection | ||
from viam.services.vision import Vision, CaptureAllResult | ||
from viam.module.types import Reconfigurable | ||
from viam.proto.app.robot import ServiceConfig | ||
from viam.proto.common import PointCloudObject, ResourceName | ||
from viam.resource.base import ResourceBase | ||
from viam.resource.types import Model, ModelFamily | ||
from viam.utils import ValueTypes | ||
from viam.logging import getLogger | ||
|
||
import cv2 | ||
import io | ||
import math | ||
import numpy as np | ||
from PIL import Image | ||
|
||
|
||
LOGGER = getLogger("MotionDetectorLogger") | ||
|
||
class MotionDetector(Vision, Reconfigurable): | ||
""" | ||
MotionDetector implements a vision service that only supports detections | ||
and classifications. | ||
It inherits from the built-in resource subtype Vision and conforms to the | ||
``Reconfigurable`` protocol, which signifies that this component can be | ||
reconfigured. Additionally, it specifies a constructor function | ||
``MotionDetector.new_service`` which confirms to the | ||
``resource.types.ResourceCreator`` type required for all models. | ||
""" | ||
|
||
# Here is where we define our new model's colon-delimited-triplet | ||
# (viam:vision:motion-detector) viam = namespace, vision = family, motion-detector = model name. | ||
MODEL: ClassVar[Model] = Model(ModelFamily("viam", "vision"), "motion-detector") | ||
|
||
def __init__(self, name: str): | ||
super().__init__(name=name) | ||
|
||
# Constructor | ||
@classmethod | ||
def new_service(cls, | ||
config: ServiceConfig, | ||
dependencies: Mapping[ResourceName, ResourceBase]) -> Self: | ||
service = cls(config.name) | ||
service.reconfigure(config, dependencies) | ||
return service | ||
|
||
# Validates JSON Configuration | ||
@classmethod | ||
def validate_config(cls, config: ServiceConfig) -> Sequence[str]: | ||
source_cam = config.attributes.fields["cam_name"].string_value | ||
min_boxsize = config.attributes.fields["min_box_size"].number_value | ||
if min_boxsize < 0: | ||
raise Exception( | ||
"Minimum bounding box size should be a positive integer") | ||
sensitivity = config.attributes.fields["sensitivity"].number_value | ||
if sensitivity < 0 or sensitivity > 1: | ||
raise Exception( | ||
"Sensitivity should be a number between 0 and 1") | ||
return [source_cam] | ||
|
||
|
||
# Handles attribute reconfiguration | ||
def reconfigure(self, | ||
config: ServiceConfig, | ||
dependencies: Mapping[ResourceName, ResourceBase]): | ||
|
||
self.cam_name = config.attributes.fields["cam_name"].string_value | ||
self.camera = dependencies[Camera.get_resource_name(self.cam_name)] | ||
self.sensitivity = config.attributes.fields["sensitivity"].number_value | ||
if self.sensitivity == 0: | ||
self.sensitivity = 0.9 | ||
self.min_box_size = config.attributes.fields["min_box_size"].number_value | ||
|
||
|
||
""" | ||
Implement the methods the Viam RDK defines for the vision service API | ||
(rdk:service:vision) | ||
""" | ||
|
||
# This will be the main method implemented in this module. | ||
# Given a camera. Perform frame differencing and return how much of the image is moving | ||
async def get_classifications(self, | ||
image: ViamImage, | ||
count: int, | ||
*, | ||
extra: Optional[Dict[str, Any]] = None, | ||
timeout: Optional[float] = None, | ||
**kwargs) -> List[Classification]: | ||
|
||
# Grab and grayscale 2 images | ||
input1 = await self.camera.get_image(mime_type=CameraMimeType.JPEG) | ||
if input1.mime_type not in [CameraMimeType.JPEG, CameraMimeType.PNG]: | ||
raise Exception("image mime type must be PNG or JPEG, not ", input1.mime_type) | ||
img1 = pil.viam_to_pil_image(input1) | ||
gray1 = cv2.cvtColor(np.array(img1), cv2.COLOR_BGR2GRAY) | ||
|
||
input2 = await self.camera.get_image() | ||
if input2.mime_type not in [CameraMimeType.JPEG, CameraMimeType.PNG]: | ||
raise Exception("image mime type must be PNG or JPEG, not ", input2.mime_type) | ||
img2 = pil.viam_to_pil_image(input2) | ||
gray2 = cv2.cvtColor(np.array(img2), cv2.COLOR_BGR2GRAY) | ||
|
||
return self.classification_from_gray_imgs(gray1=gray1, gray2=gray2) | ||
|
||
|
||
async def get_classifications_from_camera(self, | ||
camera_name: str, | ||
count: int, | ||
*, | ||
extra: Optional[Dict[str, Any]] = None, | ||
timeout: Optional[float] = None, | ||
**kwargs) -> List[Classification]: | ||
if camera_name != self.cam_name: | ||
raise Exception( | ||
"Camera name passed to method:",camera_name, "is not the configured 'cam_name'", self.cam_name) | ||
image = await self.camera.get_image() | ||
return await self.get_classifications(image=image, count=count) | ||
|
||
# Not implemented for now. Eventually want this to return the location of the movement | ||
async def get_detections(self, | ||
image: ViamImage, | ||
*, | ||
extra: Optional[Dict[str, Any]] = None, | ||
timeout: Optional[float] = None, | ||
**kwargs) -> List[Detection]: | ||
|
||
# Grab and grayscale 2 images | ||
input1 = await self.camera.get_image(mime_type=CameraMimeType.JPEG) | ||
if input1.mime_type not in [CameraMimeType.JPEG, CameraMimeType.PNG]: | ||
raise Exception("image mime type must be PNG or JPEG, not ", input1.mime_type) | ||
img1 = pil.viam_to_pil_image(input1) | ||
gray1 = cv2.cvtColor(np.array(img1), cv2.COLOR_BGR2GRAY) | ||
|
||
input2 = await self.camera.get_image() | ||
if input2.mime_type not in [CameraMimeType.JPEG, CameraMimeType.PNG]: | ||
raise Exception("image mime type must be PNG or JPEG, not ", input2.mime_type) | ||
img2 = pil.viam_to_pil_image(input2) | ||
gray2 = cv2.cvtColor(np.array(img2), cv2.COLOR_BGR2GRAY) | ||
|
||
return self.detections_from_gray_imgs(gray1, gray2) | ||
|
||
async def get_detections_from_camera(self, | ||
camera_name: str, | ||
*, | ||
extra: Optional[Dict[str, Any]] = None, | ||
timeout: Optional[float] = None, | ||
**kwargs) -> List[Detection]: | ||
|
||
if camera_name != self.cam_name: | ||
raise Exception( | ||
"Camera name passed to method:",camera_name, "is not the configured 'cam_name':", self.cam_name) | ||
return await self.get_detections(image=None) | ||
|
||
async def get_object_point_clouds(self, | ||
camera_name: str, | ||
*, | ||
extra: Optional[Dict[str, Any]] = None, | ||
timeout: Optional[float] = None, | ||
**kwargs) -> List[PointCloudObject]: | ||
raise NotImplementedError | ||
|
||
async def get_properties( | ||
self, | ||
*, | ||
extra: Optional[Mapping[str, Any]] = None, | ||
timeout: Optional[float] = None) -> Vision.Properties: | ||
return Vision.Properties( | ||
classifications_supported=True, | ||
detections_supported=True, | ||
object_point_clouds_supported=False, | ||
) | ||
|
||
async def capture_all_from_camera( | ||
self, | ||
camera_name: str, | ||
return_image: bool = False, | ||
return_classifications: bool = False, | ||
return_detections: bool = False, | ||
return_object_point_clouds: bool = False, | ||
*, | ||
extra: Optional[Mapping[str, Any]] = None, | ||
timeout: Optional[float] = None, | ||
) -> CaptureAllResult: | ||
result = CaptureAllResult() | ||
if (camera_name != self.cam_name) and (camera_name != ""): | ||
raise Exception( | ||
"Camera name passed to method:",camera_name, "is not the configured 'cam_name':", self.cam_name) | ||
img = await self.camera.get_image(mime_type=CameraMimeType.JPEG) | ||
if return_image: | ||
result.image = img | ||
if return_classifications: | ||
classifs = await self.get_classifications(img, 1) | ||
result.classifications = classifs | ||
if return_detections: | ||
dets = await self.get_detections(img) | ||
result.detections = dets | ||
# No object point clouds | ||
return result | ||
|
||
async def do_command(self, | ||
command: Mapping[str, ValueTypes], | ||
*, | ||
timeout: Optional[float] = None, | ||
**kwargs): | ||
raise NotImplementedError | ||
|
||
def classification_from_gray_imgs(self, gray1, gray2): | ||
# Frame difference | ||
diff = cv2.absdiff(gray2,gray1) | ||
|
||
# Simple noise filtering via threshold (~10% of 255) | ||
k = math.floor((1-self.sensitivity) * 255) | ||
diff[diff<k] = 0 | ||
diff[diff>k] = 1 | ||
|
||
# Confidence = percent of activated pixels (after thresholding) | ||
conf = np.sum(diff) / (gray1.shape[0] * gray1.shape[1]) | ||
|
||
classifications = [{"class_name": "motion", "confidence": conf}] | ||
return classifications | ||
|
||
|
||
def detections_from_gray_imgs(self, gray1, gray2): | ||
detections = [] | ||
# Frame difference | ||
diff = cv2.absdiff(gray2,gray1) | ||
|
||
# Simple noise filtering via threshold (~10% of 255) | ||
k = math.floor((1-self.sensitivity) * 255) | ||
diff[diff<k] = 0 | ||
diff[diff>k] = 255 | ||
|
||
# Morphological operations to remove noise and blob | ||
kernel = np.ones((3, 3), np.uint8) | ||
kernel2 = np.ones((15, 15), np.uint8) | ||
img = cv2.erode(diff, kernel) | ||
img2 = cv2.dilate(img, kernel) | ||
img3 = cv2.dilate(img2, kernel2) | ||
imgOut = cv2.erode(img3, kernel2) | ||
|
||
# List points around the remaining blobs | ||
contours, h = cv2.findContours(imgOut, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) | ||
|
||
# Make boxes from the contours | ||
for c in contours: | ||
# Each contour should be a box. | ||
xs = [pt[0][0] for pt in c] | ||
ys = [pt[0][1] for pt in c] | ||
xmin, xmax, ymin, ymax = min(xs), max(xs), min(ys), max(ys) | ||
# Add to list of detections if big enough | ||
if (ymax-ymin)*(xmax-xmin) > self.min_box_size: | ||
detections.append({ "confidence": 0.5, "class_name": "motion", | ||
"x_min": int(xmin), "y_min": int(ymin), "x_max": int(xmax), "y_max": int(ymax) }) | ||
|
||
return detections |
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
from typing import Any, Coroutine, Final, List, Optional, Tuple, Dict | ||
from viam.components.camera import Camera | ||
from viam.gen.component.camera.v1.camera_pb2 import GetPropertiesResponse | ||
from viam.media.video import NamedImage, ViamImage, CameraMimeType | ||
from viam.media.utils import pil | ||
from viam.proto.common import ResponseMetadata | ||
from PIL import Image | ||
|
||
|
||
|
||
class FakeCamera(Camera): | ||
|
||
def __init__(self, name: str): | ||
super().__init__(name=name) | ||
self.count = -1 | ||
img1 = Image.open("tests/img1.jpg") | ||
img2 = Image.open("tests/img2.jpg") | ||
self.images = [img1, img2] | ||
|
||
async def get_image(self, mime_type: str = "") -> Coroutine[Any, Any, ViamImage]: | ||
self.count +=1 | ||
return pil.pil_to_viam_image(self.images[self.count%2], CameraMimeType.JPEG) | ||
|
||
async def get_images(self) -> Coroutine[Any, Any, Tuple[List[NamedImage] | ResponseMetadata]]: | ||
raise NotImplementedError | ||
|
||
async def get_properties(self) -> Coroutine[Any, Any, GetPropertiesResponse]: | ||
raise NotImplementedError | ||
|
||
async def get_point_cloud(self) -> Coroutine[Any, Any, Tuple[bytes | str]]: | ||
raise NotImplementedError |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Oops, something went wrong.