Skip to content

Commit

Permalink
Merge pull request #136 from spacetelescope/feature/delta_alliedvision
Browse files Browse the repository at this point in the history
Delta PR to #133
  • Loading branch information
ivalaginja authored Nov 10, 2023
2 parents 3578248 + aa297a6 commit 751b898
Showing 1 changed file with 90 additions and 143 deletions.
233 changes: 90 additions & 143 deletions catkit2/services/AlliedVision_camera/AlliedVision_camera.py
Original file line number Diff line number Diff line change
@@ -1,125 +1,102 @@
import threading
import time
import os
import numpy as np
import contextlib

from catkit2.testbed.service import Service

from vimba import *
import vimba

class AlliedVisionCamera(Service):

class AlliedVisionCamera(Service):
NUM_FRAMES = 20

def __init__(self, exit_stack):
super().__init__('allied_vision_camera')

def __init__(self):
super().__init__('AlliedVision_camera')
self.exit_stack = exit_stack

self.should_be_acquiring = threading.Event()
self.should_be_acquiring.set()

self.mutex = threading.Lock()


def open(self):
self.vimba = vimba.Vimba.get_instance()
self.exit_stack.enter(self.vimba)

with Vimba.get_instance() as vimba:

if len(vimba.get_all_cameras()) == 0:

raise RuntimeError("Not a single Allied Vision camera is connected.")

with vimba.get_all_cameras()[0] as cam:



# Set device values from config file (set width and height before offsets)
offset_x = self.config.get('offset_x', 0)
offset_y = self.config.get('offset_y', 0)


self.width = self.config.get('width', self.sensor_width - offset_x)
self.height = self.config.get('height', self.sensor_height - offset_y)
self.offset_x = offset_x
self.offset_y = offset_y

self.gain = self.config.get('gain', 0)
self.exposure_time_step_size = self.config.get('exposure_time_step_size', 1)
self.exposure_time_offset_correction = self.config.get('exposure_time_offset_correction', 0)
self.exposure_time_base_step = self.config.get('exposure_time_base_step', 1)
self.exposure_time = self.config.get('exposure_time', 1000)

# Create datastreams
# Use the full sensor size here to always allocate enough shared memory.
self.images = self.make_data_stream('images', 'float32', [self.height, self.width],
self.NUM_FRAMES)
print(self.sensor_height)
print(self.sensor_width )
print( self.height )
print( self.width )
self.temperature = self.make_data_stream('temperature', 'float64', [1], self.NUM_FRAMES)

self.is_acquiring = self.make_data_stream('is_acquiring', 'int8', [1], self.NUM_FRAMES)
self.is_acquiring.submit_data(np.array([0], dtype='int8'))

# Create properties
def make_property_helper(name, read_only=False, requires_stopped_acquisition=False):
if read_only:
self.make_property(name, lambda: getattr(self, name))
else:
if requires_stopped_acquisition:
def setter(val):
with StoppedAcquisition(self):
setattr(self, name, val)
else:
def setter(val):
setattr(self, name, val)

self.make_property(name, lambda: getattr(self, name), setter)

make_property_helper('exposure_time')
make_property_helper('gain')
make_property_helper('brightness')

make_property_helper('width', requires_stopped_acquisition=True)
make_property_helper('height', requires_stopped_acquisition=True)
make_property_helper('offset_x')
make_property_helper('offset_y')

make_property_helper('sensor_width', read_only=True)
make_property_helper('sensor_height', read_only=True)

make_property_helper('device_name', read_only=True)

self.make_command('start_acquisition', self.start_acquisition)
self.make_command('end_acquisition', self.end_acquisition)
camera_id = self.config.get('camera_id', 0)
try:
self.cam = self.vimba.get_camera_by_id(camera_id)
except vimba.VimbaCameraError:
raise RuntimeError(f'Could not find camera with ID {camera_id}')
self.exit_stack.enter(self.cam)

# Set device values from config file (set width and height before offsets)
offset_x = self.config.get('offset_x', 0)
offset_y = self.config.get('offset_y', 0)

self.width = self.config.get('width', self.sensor_width - offset_x)
self.height = self.config.get('height', self.sensor_height - offset_y)
self.offset_x = offset_x
self.offset_y = offset_y

self.gain = self.config.get('gain', 0)
self.exposure_time = self.config.get('exposure_time', 1000)

# Create datastreams
# Use the full sensor size here to always allocate enough shared memory.
self.images = self.make_data_stream('images', 'float32', [self.sensor_height, self.sensor_width], self.NUM_FRAMES)
print(self.sensor_height)
print(self.sensor_width)
print(self.height)
print(self.width)
self.temperature = self.make_data_stream('temperature', 'float64', [1], self.NUM_FRAMES)

self.is_acquiring = self.make_data_stream('is_acquiring', 'int8', [1], self.NUM_FRAMES)
self.is_acquiring.submit_data(np.array([0], dtype='int8'))

# Create properties
def make_property_helper(name, read_only=False):
if read_only:
self.make_property(name, lambda: getattr(self, name))
else:
self.make_property(name, lambda: getattr(self, name), lambda val: setattr(self, name, val))

make_property_helper('exposure_time')
make_property_helper('gain')
make_property_helper('brightness')

make_property_helper('width')
make_property_helper('height')
make_property_helper('offset_x')
make_property_helper('offset_y')

make_property_helper('sensor_width', read_only=True)
make_property_helper('sensor_height', read_only=True)

make_property_helper('device_name', read_only=True)

self.make_command('start_acquisition', self.start_acquisition)
self.make_command('end_acquisition', self.end_acquisition)

def main(self):
with Vimba.get_instance() as vimba:
with vimba.get_all_cameras()[0] as cam:
cam.start_streaming(handler=self.acquisition_loop, buffer_count=self.NUM_FRAMES)
cam.stop_streaming()
self.cam.start_streaming(handler=self.acquisition_loop, buffer_count=self.NUM_FRAMES)
self.cam.stop_streaming()

def close(self):
##close cam AV
pass
self.cam = None

def acquisition_loop(self, cam, frame):
# Make sure the data stream has the right size and datatype.
has_correct_parameters = np.allclose(self.images.shape, [self.height, self.width])

if not has_correct_parameters:

self.images.update_parameters('float32', [self.height, self.width], self.NUM_FRAMES)
self.images.update_parameters('float32', [self.sensor_height, self.sensor_width], self.NUM_FRAMES)

try:
while self.should_be_acquiring.is_set() and not self.should_shut_down:

if frame.get_status()== FrameStatus.Complete:
frame.convert_pixel_format(PixelFormat.Mono8) # TODO change
print(frame.as_numpy_ndarray().astype('float32').shape )
self.images.submit_data( frame.as_numpy_ndarray().astype('float32') )

if frame.get_status() == self.vimba.FrameStatus.Complete:
frame.convert_pixel_format(self.vimba.PixelFormat.Mono8) # TODO change
print(frame.as_numpy_ndarray().astype('float32').shape)
self.images.submit_data(frame.as_numpy_ndarray().astype('float32'))

finally:
# Stop acquisition.
Expand All @@ -133,100 +110,70 @@ def end_acquisition(self):

@property
def exposure_time(self):
with Vimba.get_instance() as vimba:
with vimba.get_all_cameras()[0] as cam:
return cam.ExposureTime.get()
return self.cam.ExposureTime.get()

@exposure_time.setter
def exposure_time(self, exposure_time):
with Vimba.get_instance() as vimba:
with vimba.get_all_cameras()[0] as cam:
cam.ExposureTime.set( exposure_time )
self.cam.ExposureTime.set(exposure_time)

@property
def gain(self):
with Vimba.get_instance() as vimba:
with vimba.get_all_cameras()[0] as cam:
return cam.Gain.get()
return self.cam.Gain.get()

@gain.setter
def gain(self, gain):
with Vimba.get_instance() as vimba:
with vimba.get_all_cameras()[0] as cam:
cam.Gain.set( gain )
self.cam.Gain.set(gain)

@property
def brightness(self):
with Vimba.get_instance() as vimba:
with vimba.get_all_cameras()[0] as cam:
return cam.Brightness.get()
return self.cam.Brightness.get()

@brightness.setter
def brightness(self, brightness):
with Vimba.get_instance() as vimba:
with vimba.get_all_cameras()[0] as cam:
cam.Brightness.set( brightness )
self.cam.Brightness.set(brightness)

@property
def sensor_width(self):
with Vimba.get_instance() as vimba:
with vimba.get_all_cameras()[0] as cam:
return cam.SensorWidth.get()
return self.cam.SensorWidth.get()

@property
def sensor_height(self):
with Vimba.get_instance() as vimba:
with vimba.get_all_cameras()[0] as cam:
return cam.SensorHeight.get()
return self.cam.SensorHeight.get()

@property
def width(self):
with Vimba.get_instance() as vimba:
with vimba.get_all_cameras()[0] as cam:
return cam.Width.get()
return self.cam.Width.get()

@width.setter
def width(self, width):
with Vimba.get_instance() as vimba:
with vimba.get_all_cameras()[0] as cam:
cam.Width.set( width )
self.cam.Width.set(width)

@property
def height(self):
with Vimba.get_instance() as vimba:
with vimba.get_all_cameras()[0] as cam:
return cam.Height.get()
return self.cam.Height.get()

@height.setter
def height(self, height):
with Vimba.get_instance() as vimba:
with vimba.get_all_cameras()[0] as cam:
cam.Height.set( height )
self.cam.Height.set(height)

@property
def offset_x(self):
with Vimba.get_instance() as vimba:
with vimba.get_all_cameras()[0] as cam:
return cam.OffsetX.get()
return self.cam.OffsetX.get()

@offset_x.setter
def offset_x(self, offset_x):
with Vimba.get_instance() as vimba:
with vimba.get_all_cameras()[0] as cam:
cam.OffsetX.set( offset_x )
self.cam.OffsetX.set(offset_x)

@property
def offset_y(self):
with Vimba.get_instance() as vimba:
with vimba.get_all_cameras()[0] as cam:
return cam.OffsetY.get()
return self.cam.OffsetY.get()

@offset_y.setter
def offset_y(self, offset_y):
with Vimba.get_instance() as vimba:
with vimba.get_all_cameras()[0] as cam:
cam.OffsetY.set( offset_y )
self.cam.OffsetY.set(offset_y)


if __name__ == '__main__':
service = AlliedVisionCamera()
service.run()
with contextlib.ExitStack() as exit_stack:
service = AlliedVisionCamera(exit_stack)
service.run()

0 comments on commit 751b898

Please sign in to comment.