-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathrelay.py
100 lines (90 loc) · 2.33 KB
/
relay.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# Relay module
from threading import Condition
import config
import os
import glob
import time, serial
# Class for USB relay device handling
class serUSB:
dev = None
ser = None
def __init__(self):
u = os.popen("lsusb|egrep 'QinHeng Electronics HL-340' | awk '{print $6}'").read()
self.dev = None
if u == '':
return
id = u.rstrip().split(':')
a = glob.glob("/dev/ttyUSB*")
if len(a) == 0:
return
a.sort()
for f in a:
s1 = f.split('/')
s2 = os.popen("grep PRODUCT= /sys/bus/usb-serial/devices/"+s1[2]+"/../uevent").read()
if s2 == '' or s2 is None:
print("No usb")
return
s3 = s2.split('=')
ii = s3[1].split('/')
if ii[0] == id[0] and ii[1] == id[1]:
self.dev = f
if self.dev is None:
return
self.open(self.dev)
def open(self, dev):
self.ser = serial.Serial(dev, 9600, timeout=0)
self.ser.flushInput()
return self.ser
def off(self):
if not self.ser.isOpen():
print("Not Opened")
return
self.ser.write(b'\xA0\x01\x01\xA2')
return
def on(self):
if not self.ser.isOpen():
print("Not Opened")
return
self.ser.write(b'\xA0\x01\x00\xA1')
return
# Inter-thread communication, to signal the Relay Thread for operation
class sig_relay:
condv = None
source = None
def __init__(self):
self.condv = Condition()
self.source = None
def signal(self, data):
self.condv.acquire()
self.source = data
self.condv.notify()
self.condv.release()
def wait(self):
global done
self.condv.acquire()
while not config.done:
if self.source != None:
self.source = None
break
self.condv.wait()
self.condv.release()
relay_on()
# function to set relay ON
def relay_on():
global ser
#setOff(1)
config.ser.off()
time.sleep(2)
#setOn(1)
config.ser.on()
# Relay Thread
def relay_loop():
global relay, done
config.relay = sig_relay()
while not config.done:
try:
config.relay.wait()
except KeyboardInterrupt:
config.done = True
print("relay_loop interrupted")
break