-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathhotstrings
executable file
·260 lines (207 loc) · 9.65 KB
/
hotstrings
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
#!/usr/bin/env python
import argparse
import collections
import json
import os
import signal
import subprocess
import sys
import Xlib
import Xlib.X
import Xlib.XK
import Xlib.display
import Xlib.ext.record
import Xlib.protocol
EXIT_FAILURE = 1
RECORD_CONTEXT_ARGUMENTS = (
0,
(Xlib.ext.record.AllClients,),
({
'core_requests': (0, 0),
'core_replies': (0, 0),
'ext_requests': (0, 0, 0, 0),
'ext_replies': (0, 0, 0, 0),
'delivered_events': (0, 0),
'device_events': (Xlib.X.KeyPress, Xlib.X.KeyRelease),
'errors': (0, 0),
'client_started': False,
'client_died': False
},)
)
# Load xkb to access XK_ISO_Level3_Shift
Xlib.XK.load_keysym_group('xkb')
event_field = Xlib.protocol.rq.EventField(None)
def parse_event_fields(data, display):
while data:
event, data = event_field.parse_binary_value(data, display, None, None)
yield event
def get_xdg_config_home():
xdg_config_home = os.getenv('XDG_CONFIG_HOME')
if xdg_config_home is not None and os.path.isabs(xdg_config_home):
return xdg_config_home
return os.path.expanduser('~/.config')
argument_parser = argparse.ArgumentParser()
argument_parser.add_argument('path', metavar='PATH', nargs='?',
help='Path to JSON file containing hotstring definitions. Default: %(default)s',
default=os.path.join(get_xdg_config_home(), 'hotstrings.json'))
argument_parser.add_argument('--verbose', '-v', action='store_true')
arguments = argument_parser.parse_args()
class RecordHandler:
MODIFIER_KEY_MASKS = {
'Shift': Xlib.X.ShiftMask,
'Lock': Xlib.X.LockMask,
'Control': Xlib.X.ControlMask,
'Alt': Xlib.X.Mod1Mask,
'Mod1': Xlib.X.Mod1Mask,
'Mod2': Xlib.X.Mod2Mask,
'Mod3': Xlib.X.Mod3Mask,
'Mod4': Xlib.X.Mod4Mask,
'Mod5': Xlib.X.Mod5Mask
}
def __init__(self, connection, record_connection, callback):
self.connection = connection
self.record_connection = record_connection
self.callback = callback
# Support for XK_ISO_Level3_Shift/AltGr:
self.alt_gr_pressed = False
self.alt_gr_keycodes = set(i[0] for i in self.connection.keysym_to_keycodes(Xlib.XK.XK_ISO_Level3_Shift))
def get_modifier_state_index(self, state):
# None = 0, Shift = 1, Alt = 2, Alt + Shift = 3, AltGr = 4, AltGr + Shift = 5
pressed = {n: (state & m) == m for n, m in self.MODIFIER_KEY_MASKS.items()}
index = 0
if pressed['Shift']:
index += 1
if pressed['Alt']:
index += 2
if self.alt_gr_pressed:
index += 4
return index
def key_pressed(self, event):
# Manually keep track of AltGr state because it is not encoded in the event.state byte
if event.detail in self.alt_gr_keycodes:
self.alt_gr_pressed = True
keysym = self.connection.keycode_to_keysym(event.detail, self.get_modifier_state_index(event.state))
character = self.connection.lookup_string(keysym)
if character:
self.callback(character)
def key_released(self, event):
if event.detail in self.alt_gr_keycodes:
self.alt_gr_pressed = False
def __call__(self, reply):
# Ignore all replies that can't be parsed by parse_event_fields
if not reply.category == Xlib.ext.record.FromServer:
return
for event in parse_event_fields(reply.data, self.record_connection.display):
if event.type == Xlib.X.KeyPress:
self.key_pressed(event)
else:
self.key_released(event)
def verbose(*args, **kwargs):
if arguments.verbose:
print(*args, **kwargs)
class HotstringProcessor:
BACKSPACE_CHARACTER = '\x08'
def __init__(self, hotstrings, connection, queue_size):
self.hotstrings = hotstrings
self.connection = connection
self.queue = collections.deque(maxlen=queue_size)
self.root_window = self.connection.screen().root
# These stay the same for all requests, so just keep a local copy
self._default_key_press_event_arguments = dict(time=Xlib.X.CurrentTime, root=self.root_window,
child=Xlib.X.NONE, root_x=0, root_y=0, event_x=0, event_y=0,
same_screen=1)
self._default_key_release_event_arguments = self._default_key_press_event_arguments
def make_key_press_event(self, detail, state, window, **kwargs):
arguments = self._default_key_press_event_arguments.copy()
arguments.update(kwargs)
return Xlib.protocol.event.KeyPress(detail=detail, state=state, window=window, **arguments)
def make_key_release_event(self, detail, state, window, **kwargs):
arguments = self._default_key_release_event_arguments.copy()
arguments.update(kwargs)
return Xlib.protocol.event.KeyRelease(detail=detail, state=state, window=window, **arguments)
# TODO: Figure out a way to find keycodes not assigned in the current keyboard mapping
def string_to_keycodes(self, string_):
for character in string_:
code_point = ord(character)
# TODO: Take a look at other projects using python-xlib to improve this
# See Xlib.XK.keysym_to_string
keycodes = tuple(self.connection.keysym_to_keycodes(code_point) or
self.connection.keysym_to_keycodes(0xFF00 | code_point))
keycode = keycodes[0] if keycodes else None
# TODO: Remap missing characters to available keycodes
if not keycode:
verbose('No keycode found for: %r.' % character, file=sys.stderr)
continue
yield keycode
def type_keycode(self, keycode, window):
detail, state = keycode
window.send_event(self.make_key_press_event(detail, state, window))
window.send_event(self.make_key_release_event(detail, state, window))
def type_keycodes(self, keycodes, window):
for keycode in keycodes:
self.type_keycode(keycode, window)
self.connection.flush()
def __call__(self, character):
if character == self.BACKSPACE_CHARACTER and self.queue:
self.queue.pop()
else:
self.queue.append(character)
queue_string = ''.join(self.queue)
backspace = tuple(self.string_to_keycodes(self.BACKSPACE_CHARACTER))
window = self.connection.get_input_focus().focus
for hotstring, (action, *arguments) in self.hotstrings.items():
if not queue_string.endswith(hotstring):
continue
# Remove typed hotstring before typing replacement
if action == 'replace':
replacement = arguments[0]
elif action == 'run-replace':
# The same as "run", but replaces the hotstring with the stdout of the executed process
with subprocess.Popen(arguments, stdout=subprocess.PIPE, universal_newlines=True) as process:
replacement = process.stdout.read().strip()
elif action == 'run-replace-raw':
# The same as "run-replace" but doesn't strip whitespace at the ends
with subprocess.Popen(arguments, stdout=subprocess.PIPE, universal_newlines=True) as process:
replacement = process.stdout.read()
elif action == 'run':
# To make use of various shell comforts simply run the command in a shell, for example:
# "hotkey": ["run", "sh", "-c", "touch ~/Desktop/hello_world.txt"]
subprocess.Popen(arguments)
continue
else:
verbose('Unrecognized action: %r.' % action)
continue
# Linefeeds don't seem to be sent by Xlib, so replace them with carriage returns: normalize \r\n to \r
# first, then replace all remaining \n with \r
replacement = replacement.replace('\r\n', '\r').replace('\n', '\r')
self.type_keycodes(backspace * len(hotstring), window)
self.type_keycodes(self.string_to_keycodes(replacement), window)
self.queue.clear()
def main():
path = os.path.expanduser(arguments.path)
if not os.path.exists(path):
argument_parser.exit(EXIT_FAILURE, path + ': No such file or directory.\n')
connection = Xlib.display.Display()
record_connection = Xlib.display.Display()
if not record_connection.has_extension('RECORD'):
argument_parser.exit(EXIT_FAILURE, 'X Record Extension Library not found.\n')
with open(path) as file:
hotstrings = json.load(file)
if not hotstrings:
argument_parser.exit(EXIT_FAILURE, 'No hotstrings defined.\n')
record_context = record_connection.record_create_context(*RECORD_CONTEXT_ARGUMENTS)
# Only keep at maximum the amount of characters of the longest hotstring in the HotstringProcessor queue
hotstring_processor = HotstringProcessor(hotstrings, connection, max(len(k) for k in hotstrings.keys()))
record_handler = RecordHandler(connection, record_connection, hotstring_processor)
def clean_up(*args):
record_connection.record_free_context(record_context)
record_connection.close()
connection.close()
argument_parser.exit()
# Make sure to free structs and close connections
for signal_ in signal.SIGINT, signal.SIGTERM:
signal.signal(signal_, clean_up)
verbose('Listening for hotstrings...')
record_connection.record_enable_context(record_context, record_handler)
if __name__ == '__main__':
main()