forked from djhedges/exit_speed
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlabjack.py
executable file
·84 lines (76 loc) · 2.95 KB
/
labjack.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/python3
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Library for reading values from labjack."""
import sys
import traceback
import multiprocessing
from typing import Dict
from typing import List
from typing import Text
from typing import Tuple
from absl import logging
import u3
class Labjack(object):
"""Interface for the labjack DAQ."""
def __init__(self, config, start_process=True):
self.config = config
self.u3 = None
self.commands, self.command_proto_field = self.BuildCommands()
self.voltage_values = self.BuildValues()
if start_process:
self.process = multiprocessing.Process(target=self.Loop, daemon=True)
self.process.start()
def BuildCommands(self) -> Tuple[List[u3.FeedbackCommand],
Dict[u3.FeedbackCommand, Text]]:
"""Builds the list of feedback commands to send to the labjack device."""
commands = []
command_proto_field = {}
if self.config.get('labjack'):
for input_name, proto_field in self.config['labjack'].items():
input_type = input_name[0:3]
channel = int(input_name[-1])
feedback_command = getattr(u3, input_type.upper())
command = feedback_command(channel)
commands.append(command)
command_proto_field[command] = proto_field
return commands, command_proto_field
def BuildValues(self) -> Dict[Text, multiprocessing.Value]:
values = {}
if self.config.get('labjack'):
for proto_field in self.config['labjack'].values():
values[proto_field] = multiprocessing.Value('d', 0.0)
return values
def ReadValues(self):
"""Reads the labjack voltages."""
try:
results = self.u3.getFeedback(*self.commands)
for command in self.commands:
result = results[self.commands.index(command)]
voltage = self.u3.binaryToCalibratedAnalogVoltage(
result,
isLowVoltage=False,
channelNumber=command.positiveChannel)
proto_field = self.command_proto_field[command]
self.voltage_values[proto_field].value = voltage
except u3.LabJackException:
stack_trace = ''.join(traceback.format_exception(*sys.exc_info()))
logging.log_every_n_seconds(logging.ERROR,
'Error reading labjack values\n%s',
10,
stack_trace)
def Loop(self):
self.u3 = u3.U3()
while True:
self.ReadValues()