-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathacpic.py
executable file
Β·151 lines (118 loc) Β· 3.72 KB
/
acpic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python3
import argparse
import logging
import os
import re
import socket
import subprocess
import time
logger = logging.getLogger("acpic")
xdg_config_home = os.environ.get("XDG_CONFIG_HOME") or os.path.join(
os.environ["HOME"], ".config"
)
events_dir = os.path.join(xdg_config_home, "acpi", "events")
def rule_files():
if not os.path.isdir(events_dir):
logger.warning("Event handlers directory does not exist")
return
for filename in os.listdir(events_dir):
if filename.startswith("."):
continue
filename = os.path.relpath(os.path.join(events_dir, filename))
if os.path.isfile(filename):
yield filename
def parse_rule(filename):
logger.debug("Parsing file '%s'", filename)
parsed_content = {}
with open(filename) as datafile:
for line in datafile:
line = line.strip()
if not line or line.startswith("#"):
continue
key, value = tuple(line.split("=", 1))
parsed_content[key] = value
logger.debug("Parsed content: %r", parsed_content)
return parsed_content
def parsed_rules():
for filename in rule_files():
try:
yield parse_rule(filename)
except Exception as e:
logger.error("Parsing file '%s' failed: %r", filename, e)
def rule_applicable(rule, event):
match = re.search(rule["event"], event)
if match is not None:
logger.debug("Matched string: %s", match.group())
return True
else:
return False
def event_actions(event):
for rule in parsed_rules():
if rule_applicable(rule, event):
yield rule["action"]
def expand_action(action, event):
def repl(match_object):
expand_argument = match_object.group()[-1]
if expand_argument == "e":
return event
else:
return expand_argument
return re.sub("%.", repl, action)
def run_action(action):
logger.info("Calling: %s", action)
try:
output = subprocess.check_output(action, shell=True)
except subprocess.CalledProcessError as e:
logger.warning("Subprocess terminated with exit code %d", e.returncode)
output = e.output
if output:
logger.debug("Subprocess stdout: %s", output)
def run_event_actions(event):
for action in event_actions(event):
run_action(expand_action(action, event))
def acpid_events():
while True:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.connect("/var/run/acpid.socket")
except Exception as e:
logger.error("Could not connect to acpid socket: %r", e)
logger.info("Retrying in one second...")
time.sleep(1)
continue
logger.debug("Connected to acpid socket")
for event in sock.makefile():
logger.info("New event: %s", event.strip())
yield event
def event_loop():
try:
for event in acpid_events():
run_event_actions(event)
except KeyboardInterrupt:
pass
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"-v", "--verbose", action="store_true", help="Print what is being done"
)
parser.add_argument(
"-d",
"--debug",
action="store_true",
help="Print additional debug info (implies -v)",
)
return parser.parse_args()
def setup_logging(args):
if args.debug:
loglevel = logging.DEBUG
elif args.verbose:
loglevel = logging.INFO
else:
loglevel = logging.ERROR
logging.basicConfig(level=loglevel)
def main():
args = parse_args()
setup_logging(args)
event_loop()
if __name__ == "__main__":
main()