-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathGWiz.py
executable file
·929 lines (813 loc) · 39.2 KB
/
GWiz.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
#!/usr/bin/env python3
# vim: number
from __future__ import annotations # NOTE: what does this actually do?
import os
import sys
import logging
import logging.config
logging.config.fileConfig(fname='logging.ini', disable_existing_loggers=False)
logger = logging.getLogger('stderrLogger')
try:
import serial
except ImportError:
raise Exception("module 'pyserial' is required")
import urwid
try:
import bytes_as_braille as bab
except RuntimeError as e:
logger.error("not using bytes_as_braille ({e})")
from collections import deque
from time import sleep
from proghelp import *
EXTRA_DEBUG = False
"""
TODOs:
# TODO allow inserting command at top of pile or somewhere in the middle
* implement replies to
- !! / Error: / fatal:
- rs / Resend
- busy:<reason>
in general: https://reprap.org/wiki/G-code#Replies_from_the_RepRap_machine_to_the_host_computer
https://reprap.org/wiki/G-code#Action_commands
* force sending M108, M112, M410, M876 even if buffer throttling active (EMERGENCY_PARSER)
* commands added to "User input pile" not on the bottom, but on top!
* allow read commands from pipe (or command ie. python)
* allow '\n' in user input to send more than one command at once
* use ':' to prefix command mode, sort-of like vim
* python-format config?
* automatic machine detection based on report UUID and machine name reported from firmware
* automatically extract and cache G-Code command usage from https://raw.githubusercontent.com/MarlinFirmware/MarlinDocumentation/master/_gcode/ and list of commands when compiling firmware (requires Marlin patch)
* don't hang waiting for the printer to reply something when a lot of commands are in the pile! (`force` ; must be fixed)
* multiple gcode files on cmd line
* parallel G-Code with z-based interpolation, partial cancel
* display commands number (for history) ; don't consider comments and status messages as commands
* better coloring in piles ; color command in progress (the one on "top" of WIP pile)
* don't redraw all internal widgets?
* mouse control with XY, XZ, YZ plane selection and position reporting (mouse or from serial device)
* full power is 127%... not cool XD ; honor MAX_BED_POWER (must be set in config, if possible automatically when compiling firmware)
* it turns out this entire thing is quite slow... on small segments the printer will stutter, we are no way near the 1MBPs on the serial connection.
* also see inline TODOs
* pause: do more than just stop to send instructions to the machine ; purge the WIP pile so we can continue sending instructions manually
Weird:
- Marlin ommits 'C:' prefix to coordinates?
"""
loop, wai_pile, wip_pile, ack_pile, edit, machine_pos, messages, tbars, info_dic, machine_status, gcode_piles, watch_pipe, div, cmd_pile, all_wai, editmap = [None for _ in range(16)]
PRINT_PAUSED = True
MAX_COMMANDS_IN_WIP = 5#12
# max lines to show in piles
DISP_ACK_LEN = 30
DISP_WAI_LEN = 10
# exceptions and error messages:
class GotTempReport(Exception): pass
class FormatError(Exception): pass
watch_pipe_error = (urwid.Text(('error','OSError on watch_pipe ; display refresh will suffer')), ('pack',None))
import pendulum
TIME_FMT = "%Y-%m-%d %H:%M:%S"
#TIME_FMT = "%H:%M:%S.%s"
TIME_LEN = len(pendulum.now().strftime(TIME_FMT))+1
# the list of commands that the machine supports ; populated later
valid_commands = {}
class WQueue:
"""
Widgeted queue
basically a list, and a viewport on that list
override `widget` and `subwidget` if your UI differs from urwid
"""
def __init__(self, name, content = [], **kwargs ):
self.name = name
self.content = deque(content)
self.display_size = kwargs.pop('display_size', 10)
self.paused = kwargs.pop('paused', True)
self.style = kwargs.pop('style', 'qTitle')
self.show_title = kwargs.pop('show_title', True)
self.max_content_len = kwargs.pop('max_content_len', -1)
self.color = kwargs.pop('color', 'wait')
self.viewport_start = kwargs.pop('viewport_start', -1 )
def __str__(self):
return f"<WQueue: {self.name} ({len(self.content)} lines)>"
def subwidget(self, text, *args, **kwargs):
return urwid.Text( self.linecolor(text), *args, **kwargs)
@property
def widget(self, preload = []):
if len(self.content) < self.display_size:
if self.show_title:
return urwid.Pile( [ urwid.Text( (self.style, self.name) ), *preload, *[ self.subwidget(tup) for tup in self.content ]] )
else:
return urwid.Pile( [*preload, *[ self.subwidget(tup) for tup in self.content ]] )
else:
if self.show_title:
if self.viewport_start == -1:
return urwid.Pile( [ urwid.Text( (self.style, self.name) ), *preload, *[ self.subwidget(self.content[-self.display_size+i]) for i in range(self.display_size) ]] )
else:
return urwid.Pile( [ urwid.Text( (self.style, self.name) ), *preload, *[ self.subwidget(self.content[self.viewport_start+i]) for i in range(self.display_size) ]] )
else:
if self.viewport_start == -1:
return urwid.Pile( [ *preload, [ self.subwidget(self.content[-self.display_size+i]) for i in range(self.display_size) ]] )
else:
return urwid.Pile( [ *preload, [ self.subwidget(self.content[self.viewport_start+i]) for i in range(self.display_size) ]] )
def append(self, item, pos = -1):
if pos == -1:
self.content.append(item)
elif pos == 0:
# NOTE only works if self.content was declared as collections.deque() !
self.content.appendleft(item)
else:
self.content = [ *self.content[:pos+1], item, *self.content[pos+1:] ]
def pop(self, pos):
# deque.rotate() is speedy
self.content.rotate(-pos)
#try:
item = self.content.popleft()
#except IndexError as e:
# logger.info(f"queue was empty! [0]")
# #raise
#else:
self.content.rotate(pos)
return item
def __len__(self):
return len(self.content)
@property
def is_saturated(self):
""" can we append more items to this queue (the content, NOT the displayed subset) """
return False if len(self) < self.max_content_len else True
def linecolor(self, line, color = None):
if type(line) is urwid.Text:
line = line.get_text()[0]
if line.startswith(b';'):
return ('comment',line)
line = line.split(b';', 1)
if len(line) == 2:
return [(self.color if color is None else color,line[0]),('comment',b';'+line[1])]
else:
return (self.color if color is None else color,line[0])
class ACKPile(WQueue):
def subwidget(self, *args, **kwargs):
if len((tup := args[0])) == 2:
if tup[0] is not None:
try:
if type(tup[0]) is pendulum.DateTime:
return urwid.Columns([urwid.Text(f"OOPS: {tup[0]}")])
return urwid.Columns([
(TIME_LEN, urwid.Text( ('timestamp', tup[0][0].strftime(TIME_FMT)) )),
urwid.Text( tup[0][1] ),
urwid.Text( tup[1][1] ),
(TIME_LEN, urwid.Text( ('timestamp', tup[1][0].strftime(TIME_FMT)) )),
])
except IndexError:
logger.info(f"IndexError ; full ACK message {tup[0]} --- {tup[1]}")
raise
except TypeError:
logger.info(f"TypError ; full ACK message {tup[0]} --- {tup[1]}")
raise
else:
#logger.debug(f"no-ACK message {tup[0]} --- {tup[1]}")
return urwid.Columns([
(TIME_LEN, urwid.Text( '' )),
#urwid.Text( self.linecolor( tup[1][1] )),
urwid.Text( tup[1][1] ),
(TIME_LEN, urwid.Text( ('timestamp', tup[1][0].strftime(TIME_FMT)) )),
])
else:
#logger.info(f"short ACK message {tup[0][0]} --- {tup[0][1]}")
return urwid.Columns([
(TIME_LEN, urwid.Text( ('timestamp', tup[0][0].strftime(TIME_FMT)) )),
urwid.Text( tup[0][1] ),
])
def append(self, item, where = None):
now = pendulum.now()
if where:
logger.debug(f"ACK: appending {(item[0], (now), item[1])} ({where})")
self.content.append( (item[0], (now, item[1])) )
# TODO add machines names?
#case 'greeter':
# result.critical(f"{now}: Gwiz started")
try:
if item[1][1].startswith(b'ok'):
result.error(item[0][1].decode())
elif item[0] is None:
if item[1][1].startswith(b';'):
# this maybe a comment we sent? don't need to over-commment
result.warning(item[1][1].decode())
elif item[1][0] in ('status_msg',):
# communication from printer ; can't be replayed as-is in gcode so we double-comment it
result.info(';; '+item[1][1].decode())
# 'echo:Cold extrudes are disabled (min temp 170C)' 'misc_status (//)'
else:
logger.info('??? '+str(item))
else:
logger.info('!!! '+str(item))
except TypeError:
if item[1][0] == 'error':
logger.error(f"{item[1][1]}:{item[0][1].decode()}")
result.debug(f";!! ERROR ({machine_name}): {item[1][1]}:{item[0][1].decode()}")
else:
logger.critical(f"TODO (FJ482HD7): >>>{item}<<<")
except Exception as e:
logger.critical(f"{e} (FK582H5H): {item}")
class WIPPile(WQueue):
def subwidget(self, *args, **kwargs):
return urwid.Columns([
( TIME_LEN, urwid.Text( ('timestamp', args[0][0].strftime(TIME_FMT)) )),
urwid.Text( self.linecolor( args[0][1], kwargs.pop('color', self.color) )),
])
#def widget(self):
# return super().widget()
def append(self, item, where = None):
if where:
logger.debug(f"WIP: appending {(pendulum.now(), item)} ({where})")
try:
if self.content[0][1].startswith(b';'):
ack_pile.append( (None,wip_pile.pop(0)) )
except IndexError:
pass
self.content.append( (pendulum.now(), item) )
"""
write to serial
"""
def pop_to_serial(s, pile):
#logger.debug('pop_to_serial()', pile)
try:
wip_pile.append( cmd := pile.pop(0), 'serial' )
#logger.debug('pop_to_serial()', cmd)
if not PRINT_PAUSED:
raise IndexError
#messages.contents = [ (urwid.Text(('',f"{cmd} {PRINT_PAUSED = }")), ('pack',None)), *messages.contents ]
except IndexError:
pass
#return
#if not PRINT_PAUSED:
# try:
# messages.contents = [ (urwid.Text(('',f'added print command: {cmd}')), ('pack',None)), *messages.contents ]
# except UnboundLocalError:
# messages.contents = [ (urwid.Text(('',f'no command left to add!')), ('pack',None)), *messages.contents ]
# return
# strip comments and invalid commands
if not cmd.strip().startswith(b';') and not cmd.isspace() and len(cmd) > 0:
s.write((cmd+b'\n'))
if EXTRA_DEBUG: logger.debug(f">>> {cmd}")
"""
reads output from machine and takes action
TODO add some formatting and timestamps
"""
def read_from_serial(s):
do_update_ack = None
cmd_errors = deque()
# NOTE this depends on firmware and
# it makes GWiz wait for a 'start' from the printer
MACHINE_READY=True
# STOP Restart with flush so we get an ersatz of a 'start' (apparently it depends on serial port type :-s) ; this is
#wip_pile.append(b'STOP Restart')
#wip_pile.append(b'M999 S0')
#s.write(b'M999 S0\n')
# trying the same with reboot.. FAIL
#wip_pile.append(b'REBOOT')
#wip_pile.append(b'M997')
#s.write(b'M997\n')
# NOTE the above commented code is a crap workaround to the issue that CDC serial re-enumerates on reset
# and also with disabled/0 NO_TIMEOUTS if connection is lost and the machine's buffer is empty by then.
# kept for reference.
try:
while True:
reply = s.readline().rstrip(b'\n')
if EXTRA_DEBUG: logger.debug(f"<<< {reply}")
try:
if reply.startswith(b'ok'):
skip = False
while True:
try:
if (last_wip_command_with_ts := wip_pile.pop(0))[1].startswith(b';'):
ack_pile.append( last_wip_command_with_ts, '0' )
else:
break
#except TypeError:
# logger.info(f"read_from_serial(): queue was empty! [1] {reply}")
# #raise
# break
except IndexError:
logger.info(f"read_from_serial(): received '{reply}' but queue was empty")
skip = True
break
if not skip:
try:
if last_wip_command_with_ts[1] == cmd_errors[0]:
ack_pile.append( (last_wip_command_with_ts, ('error','Unknown command') ), '1')
cmd_errors.popleft()
else:
raise IndexError
except IndexError:
# normal command here, nothing special
# TODO it would be nice to split and color trailing comments
#logger.info('whoops', last_wip_command_with_ts)
ack_pile.append( (last_wip_command_with_ts, ('ack_msg',reply)), '2' )
# TODO update position if last command is one of G0-G5 ?
#except TypeError:
# logger.info("read_from_serial(): queue was empty! [2]")
# #raise
if reply.startswith(b'ok T:'):
reply = reply[2:]
raise GotTempReport
# else: TODO throttling and "skip" in cas of missed ACK message
# see also https://reprap.org/wiki/GCODE_buffer_multiline_proposal
elif reply in [b'wait',b'echo:busy: processing']:
# ignore this shit, we don't need that as a "clock" XD
# see HOST_KEEPALIVE_FEATURE DEFAULT_KEEPALIVE_INTERVAL BUSY_WHILE_HEATING NO_TIMEOUTS
MACHINE_READY = True
pass
elif reply.startswith(b'X:'):
machine_pos.set_text(reply.split(b' Count ',1)[0])
elif reply.startswith(b' T:'):
raise GotTempReport
elif reply.startswith(b'echo:'):
if reply.startswith(b'echo:Unknown command:'):
cmd_errors.append( reply.lstrip(b'echo:Unknown command:').split(b'"',2)[1] )
logger.debug(f"{cmd_errors[-1] = }")
else:
ack_pile.append( (None, ('echo', reply)), '3' )
elif reply.startswith(b'//'):
#add_to_ack( (pendulum.now(), (reply.decode().rstrip('\n'), 'misc_status')) )
ack_pile.append( (None, ('misc_status', reply)), '4' )
else:
# TODO works with Marlin 2.1.x, not 1.x, other firmwres untested (put in config?)
if reply == b'pages_ready':
MACHINE_READY = True
logger.info("machine ready")
messages.contents = [ (urwid.Text(('',b'Machine ready :-)')), ('pack',None)), *messages.contents ]
elif reply == b'start':
machine_status.set_text((b'status_OK',machine_status.get_text()[0]))
ack_pile.append( (None, ('status_msg', reply)), '5' )
except GotTempReport:
#messages.set_text(reply.decode().rstrip('\n'))
temps = reply.split(b' ')[1:]
for t in range(len(temps)//3):
try:
label, temp = temps[2*t].split(b':')
target = float(temps[2*t+1].lstrip(b'/'))
pwr = int(temps[2*len(temps)//3+t].split(b':')[1])
#messages.set_text(f"{label}: {temp}/{target}°C @{pwr}")
tbars[label][0].set_completion(pwr)
tbars[label][1].set_completion(target)
tbars[label][2].set_completion(float(temp))
except (ValueError, IndexError) as e:
logger.info(f"{e} (GJE72JDH): {bab.to_braille(reply)}")
# downshift commands if required (send to printer)
if MACHINE_READY:
while len(wai_pile) and not wip_pile.is_saturated:
pop_to_serial(s, wai_pile )
if not PRINT_PAUSED:
for gco_pile in gcode_piles.keys():
if EXTRA_DEBUG: logger.debug(f"flushing pile {gco_pile}")
while len(gcode_piles[gco_pile]) and not wip_pile.is_saturated:
#logger.info(f"will pop {gcode_piles[gco_pile].content[0]}")
pop_to_serial(s, gcode_piles[gco_pile] )
#logger.info(f"flushing pile {gcode_piles[gco_pile]}")
#sleep(1)
try:
os.write( watch_pipe, b'nop\n' )
except TypeError:
pass
except OSError:
machine_status.set_text((b'status_UNK',machine_status.get_text()[0]))
try:
if messages.contents[0] is not watch_pipe_error:
messages.contents = [ watch_pipe_error, *messages.contents ]
except KeyError:
messages.contents = [ watch_pipe_error, *messages.contents ]
finally:
serial_comm_still_ok(b'OSError\n')
except serial.serialutil.SerialException:
machine_status.set_text(('status_ERR',machine_status.get_text()[0]))
messages.contents = [ (urwid.Text(('error',b'connection to machine was lost')), ('pack',None)), *messages.contents ]
def serial_comm_still_ok(data):
global cmd_pile, all_wai
# NOTE we could do something with 'wait' and 'echo:busy: processing'...
try:
data = [ d for d in data.rstrip(b'\n').split(b'\n') if d != b'nop' ]
except Exception as e:
raise Exception( e, data)
if len(data):
for d in data:
if d == 'OSError':
try: os.close( watch_pipe )
except OSError: pass
finally:
messages.contents = [ (urwid.Text(('error','watch_pipe was lost ; display refresh issues ahead')), ('pack',None)), *messages.contents ]
return False
else:
messages.contents = [ (urwid.Text(('',f'watch_pipe: {data.decode()}')), ('pack',None)), *messages.contents ]
else:
# this must be forced / redefined, because the internal widgets change and we're not recycling widgets (TODO: FIX!)
all_wai = urwid.Columns([wai_pile.widget, *[gcode_piles[filename].widget for filename in gcode_piles.keys()]])
i = 0
while True:
try:
cmd_pile.contents = [ (w,('pack',None)) for w in [ ack_pile.widget, wip_pile.widget, all_wai, editmap ]]
break
except IndexError as e:
# deque index out of range
logger.warning(f"{e} (id:EH47SAJ3)")
break
except RuntimeError as e:
if i == 10:
logger.info(f"{e} (message repeated 10 times) (id:KCD4JD72G)")
else:
logger.debug(f"{e} (id:KCD4JD72F)")
i = 0
i += 1
sleep(.1)
return True
"""This is ACK pile', 'all instructions here have been processed"""
# add_to_ack
commands_ack = [
((pendulum.now(), (b'greeter', " Welcome to G-Code Wizard! ")),),
((pendulum.now(), b"You may cast your spells now."), (pendulum.now(), b'pooof!')),
#('G28', 'ok'),
#('G0 Z300', 'ok',),
]
"""
this is WAIT pile, instructions that are scheduled to be sent to the machine
it is possible to change order and/or insert commands and/or delete commands from this pile
commands added here will be auto-sent to printer as soon as it is ready
limitation: try not to exceed DISP_WAI_LEN or weird things may happen
TODO put this in printer config
"""
commands_wai = [
b'M155 S1', # temperatures auto-report
#b'G28', # Home all axii
#b'G29 H270', # get levelling mesh
#b'M420 S1', # use bed leveling mesh ; needs EEPROM to work properly :-/
# retract/recover (will be set in firmware)
# ABS
#b'M207 S1.45 F2500 Z1', # firmware retraction
#b'M208 S-.5 F6000',
b'M900 T0 L0.02', # linear advance ; incompatible with step-daemon
b'M900 T0 S0',
#b'M900 T0 S1',
# motion settings
#b'M201 X5000 Y3500 ; set max accel values',
#b'M203 X200 Y180 Z40 E6; set max feedrates',
]
if len(commands_wai) > DISP_WAI_LEN:
raise NotImplementedError
palette = [
('banner', 'black,bold', 'light gray'),
('greeter', 'black,bold', 'light gray'),
('streak', 'black', 'dark blue'),
('acked', 'dark green', ''),
('ack_msg', 'black,bold', ''),
('status_msg','light green', ''),
('wip', 'light green', ''),
('wip0', 'white', 'dark blue'), # topmost line of wip_pile (currently executing)
('wait', 'dark cyan', ''),
('acked', 'dark green', ''),
('echo', 'light magenta', ''),
('error', 'light red', 'black'),
('comment', 'white', ''),
('misc_status', 'dark magenta', ''),
('prompt','dark green', 'black'),
('input','',''),
('timestamp', 'black', ''),
('HL0', 'light green', ''),
('HL1', 'light cyan', ''),
('status_OK', 'black,bold', 'dark green'),
('status_UNK', 'black,bold', 'brown'),
('status_ERR', 'black,bold', 'dark red'),
('div', '', '', '', 'g7', '#d06'),
# progress bars (text_color, bar_color)
('pb_pwr', 'black', 'light gray'),
('pb_tgt', 'light magenta', 'dark blue'),
('pb_temp', 'black,bold', 'dark red'),
('qTitle', 'light gray', 'black'),
]
title = urwid.Text(('banner',' '+PROGNAME+' '), align='center')
titlemap = urwid.AttrMap(title, 'streak')
# one of 'normal', 'search', 'history', 'command'
EDIT_MODE = 'normal'
def commands_by_index(i):
raise NotImplementedError
try:
return wai_pile[-i].get_text()[0]
except IndexError:
#messages.contents = [ (urwid.Text(('','lookback mode searching wip pile')), ('pack',None)), *messages.contents ]
try:
return wip_pile[-(i-len(wai_pile))].get_text()[0]
except IndexError:
#messages.contents = [ (urwid.Text(('','lookback mode searching ack pile')), ('pack',None)), *messages.contents ]
try:
return ack_pile[-(i-len(wai_pile)-len(wip_pile))].contents[1][0].get_text()[0]
except IndexError:
#messages.contents = [ (urwid.Text(('','lookback mode returns nothing')), ('pack',None)), *messages.contents ]
return ''
class UserInput(urwid.Padding):
global wai_pile, loop
def keypress(self, size, key):
global EDIT_MODE, PRINT_PAUSED
#if key == 'enter':
# raise Exception(f"{key = }, {size = }")
def widget_to_append( text ):
#raise Exception('widget_to_append', text)
return urwid.Columns([
(6, urwid.Text(text[0])),
urwid.Text(text[1]),
])
def append_to_pile( widget ):
#raise Exception(widget)
info_dic.contents.append( (widget,('pack', None)) )
#if key.startswith('ctrl'):
# edit.edit_text = key
# TODO also match on partial strings if there is no ambiguity or obvious precedence
match key:
case 'esc':
EDIT_MODE = 'normal'
edit.set_caption('>>> ')
edit.edit_text = ''
# NOTE both that follow should be a single clause.... too bad we can't just not add "break;" like in C
case 'meta s':
EDIT_MODE = 'search'
edit.set_caption('??? ')
case '?':
EDIT_MODE = 'search'
edit.set_caption('??? ')
case ':':
EDIT_MODE = 'command'
edit.set_caption('### ')
edit.edit_text = '' # TODO why this doesn't work?
case '!':
if edit.edit_text == '':
EDIT_MODE = 'history'
edit.set_caption('>>> ')
case 'ctrl p':
PRINT_PAUSED = not PRINT_PAUSED
if key == 'enter' and edit.edit_text != '':
match EDIT_MODE:
case 'normal':
try:
edit.edit_text = commands_by_index( int(edit.edit_text) )
edit.edit_pos = len(edit.edit_text)
#messages.contents = [ (urwid.Text(('','entering lookback mode')), ('pack',None)), *messages.contents ]
# integer index of a previously typed command ; require confirmation with another 'enter'
return
except ValueError:
# normal command or comment
wai_pile.append(bytes(edit.edit_text,'utf-8'), 0)
edit.edit_text = ''
info_dic.contents = []
case 'search':
edit.edit_text = ''
EDIT_MODE = 'normal'
pass
case 'history':
raise NotImplementedError
wai_pile.append(bytes(edit.edit_text,'utf-8'))
edit.edit_text = ''
case 'command':
match edit.edit_text:
case 'run':
PRINT_PAUSED = False
wai_pile.append(b'M75 ; Print Job Timer start', 0) # see also PRINTJOB_TIMER_AUTOSTART
case 'pause':
PRINT_PAUSED = True
case 'force':
wip_pile.append(b'NOP')
logger.debug("appended 'NOP' to wip_pile") # TODO this is not the correct way to do it!
case 'debug':
logger.debug(ack_pile)
logger.debug(wip_pile)
logger.debug(wai_pile)
case 'quit':
logger.info("quit on user request")
raise SystemExit
case _:
messages.contents = [ (urwid.Text(('error',f"uh? `{edit.edit_text}`")), ('pack',None)), *messages.contents ]
edit.edit_text = ''
else:
match EDIT_MODE:
case 'normal':
if key == ' ':
try:
# TODO don't show this, show a more verbose info from online/cached .md
info_dic.contents = [ (urwid.Text( valid_commands[edit.edit_text.split(' ')[0]] ), ('pack',None))]
except KeyError:
info_dic.contents = []
case 'search':
info_dic.contents = []
#subcommands = []
for command in valid_commands:
search_and_highlight( (edit.edit_text+key).split(' '), f"{command}\t{valid_commands[command]}", widget_to_append, append_to_pile )
#if (cmd := search_and_highlight( edit.edit_text.split(' '), f"{command}\t{valid_commands[command]}", widget_to_append, append_to_pile ))
# subcommands.append( cmd )
if len(info_dic.contents) == 1:
#raise Exception(info_dic.contents[0][0].contents[0][0].get_text())
edit.edit_text = info_dic.contents[0][0].contents[0][0].get_text()[0]
edit.edit_pos = len(edit.edit_text)
EDIT_MODE = 'normal'
edit.set_caption('>>> ')
return
#return super().keypress(119, 'enter')
case 'command':
info_dic.contents = [
(urwid.Text('Available commands:'),('pack',None)),
(urwid.Text('run (start flushing G-code piles)'),('pack',None)),
(urwid.Text('pause'),('pack',None)),
(urwid.Text('load <filename.gcode> TODO'),('pack',None)),
(urwid.Text('reload <filename.gcode> TODO'),('pack',None)),
(urwid.Text('offset X Y <filename.gcode> TODO'),('pack',None)),
(urwid.Text('save <filename.gcode> TODO'),('pack',None)),
(urwid.Text("flush (abort print & clear 'wait' pile) TODO"),('pack',None)),
(urwid.Text("force (push one more command into WIP queue... sometimes bad reporting! TODO)"),('pack',None)),
(urwid.Text('connect <port> TODO'),('pack',None)),
(urwid.Text('buffsize <int> TODO'),('pack',None)),
(urwid.Text('debug'),('pack',None)),
(urwid.Text('quit'),('pack',None)),
]
return super().keypress(size, key)
try:
from hello_world import dummy_compiler
except ImportError:
def dummy_compiler( data, ttable = None, oddLine = True ):
return [data], [None for _ in range(len(data))]
def search_and_highlight( needle, stack, widget = tuple, target = print ):
#stack = [s[::-1] for s in stack[::-1].split('\t',1)[::-1]]
cmd, desc = stack.split('\t',1)
if all(n in desc for n in needle):
out, flags = dummy_compiler( desc, needle )
desc = []
j = 0
for i in range(len(out)):
if flags[i]:
#desc += fore_text(out[i],color)
desc.append( ('', out[i] ) )
else:
if j%2:
#desc += fore_text(out[i],HL0)
desc.append( ('HL0', out[i] ) )
else:
#desc += fore_text(out[i],HL1)
desc.append( ('HL1', out[i] ) )
j += 1
#raise Exception(widget, cmd, desc)
target( widget( (cmd, desc) ) )
def main(SER, machine_name, serial_port, maxtemp, gcodes):
global loop, edit, ack_pile, wip_pile, wai_pile, machine_pos, messages, tbars, info_dic, watch_pipe, machine_status, gcode_piles, div, cmd_pile, all_wai, editmap
from threading import Thread
t = Thread(target=read_from_serial, args=(SER,), daemon = True )
edit = urwid.Edit(('prompt',">>> "))
div = urwid.Divider('-')
#from time import sleep
#sleep(2)
ack_pile = ACKPile( 'ACK Pile', commands_ack, display_size=DISP_ACK_LEN, color='acked' )
wip_pile = WIPPile( 'Processing...', max_content_len=MAX_COMMANDS_IN_WIP, display_size=MAX_COMMANDS_IN_WIP, color='wip' ) # this is WIP pile, instructions have been sent to the machine but not acked yet
wai_pile = WQueue( 'User input pile', commands_wai, display_size=DISP_WAI_LEN, viewport_start=0 )
gcode_piles = {}
if gcodes:
for gcode in gcodes:
logger.info(f"Loading file: {gcode}")
with open(gcode,'rb') as g:
gcode_piles[gcode] = WQueue( gcode, [line.rstrip(b'\n').replace(b'\t', b' ') for line in g.readlines() if line != b'\n'], display_size=DISP_WAI_LEN, viewport_start=0 )
#logger.info(gcode_piles[gcode])
#logger.info(gcode_piles[gcode].widget.contents)
#logger.info('>>>', wai_pile.widget)
#logger.info('>>>', [gcode_piles[filename].widget for filename in gcode_piles.keys()])
all_wai = urwid.Columns([wai_pile.widget, *[gcode_piles[filename].widget for filename in gcode_piles.keys()]])
#logger.info(all_wai)
#all_wai = urwid.SolidFill('?')
editmap = urwid.AttrMap( UserInput(edit), 'prompt' )
cmd_pile = urwid.Pile([ ack_pile.widget, wip_pile.widget, all_wai, editmap ])
machine_pos = urwid.Text("")
class AbsoluteBar(urwid.ProgressBar):
def __init__(self, *args, prefix = '', suffix = '°C', **kwargs):
super().__init__(*args, **kwargs)
self.prefix = prefix
self.suffix = suffix
def get_text(self):
return self.prefix + str(self.current) + self.suffix
# TODO make this more dynamic, from config settings
tbars = {
b'T': [
AbsoluteBar('', 'pb_pwr', done = maxtemp[1], prefix = 'pwr: ', suffix ='%' ), # power setting
AbsoluteBar('', 'pb_tgt', done = maxtemp[0], prefix = 'set: ' ), # target temperature
AbsoluteBar('', 'pb_temp', done = maxtemp[0], prefix = 'read: ' ), # current temperature
],
b'B': [
AbsoluteBar('', 'pb_pwr', done = maxtemp[1], prefix = 'pwr: ', suffix ='%' ),
AbsoluteBar('', 'pb_tgt', done = maxtemp[0], prefix = 'set: ' ),
AbsoluteBar('', 'pb_temp', done = maxtemp[0], prefix = 'read: ' ),
],
}
# temperature bars
def temps_pile(dic):
lst = []
for k in dic.keys():
lst.append( urwid.Text(k))
lst.extend( dic[k] )
return urwid.Pile( lst )
info_dic = urwid.Pile([urwid.Text(PROGHELP)])
machine_status = urwid.Text(('status_UNK', ' '+machine_name+' '), 'center')
messages = urwid.Pile(deque([]))
context_pile = urwid.Pile([
machine_status, div,
machine_pos, div,
temps_pile(tbars), div,
info_dic, div,
messages,
])
cols = urwid.Columns([cmd_pile,context_pile])
filler = urwid.Filler(cols, "top")
frame = urwid.Frame(filler, header=titlemap)
loop = urwid.MainLoop(frame, palette)
watch_pipe = loop.watch_pipe(serial_comm_still_ok)
t.start()
loop.run()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(
prog=PROGNAME,
description=PROGDESC,
)
parser.add_argument("-c", "--config", help="machine configuration", default = None, metavar="file")
parser.add_argument("-g", "--gcode", help="gcode to preload", default = None, metavar="file", nargs='*')
parser.add_argument("-p", "--port", default = None, help="serial port override", metavar="device")
parser.add_argument("-b", "--baudrate", default = None, type=int, help="baud rate override", metavar="int")
parser.add_argument("--log-level", default = None, help="log level", metavar="str")
# TODO doesn't seem to work with config file
#parser.add_argument("-l", "--log", default = '/var/log/GWiz/GWiz.log', help="write log to file", metavar="file")
#parser.add_argument("--log-mode", default = 'a', help="open log in mode [w|a]", metavar="str")
parser.add_argument("-o", "--out", default = None, help="write machine I/O to file", metavar="file")
parser.add_argument("--out-level", default = 'DEBUG', help="machine output level", metavar="str")
#TODO parser.add_argument("--out-mode", default = 'w', help="open machine output file in mode [w|a]", metavar="str")
args = parser.parse_args()
if args.log_level:
logger.setLevel(args.log_level)
logger.debug(f"Logging initialized: {__name__}")
if args.config is None:
#print('need to specify machine configuration with "--config"', file=stderr)
logger.critical('need to specify machine configuration with "--config"')
sys.exit()
#out_formatter = logging.Formatter('%(levelname)s\t%(message)s') # keep for debugging..
out_formatter = logging.Formatter('%(message)s')
"""
read machine config
"""
with open(args.config) as machineconf:
while True:
line = machineconf.readline().split('=')
match line[0]:
case 'machine_name':
machine_name = line[1].rstrip('\n')
case 'serial_port':
serial_port = line[1].rstrip('\n') if args.port is None else args.port
case 'baudrate':
baudrate = int(line[1].rstrip('\n')) if args.baudrate is None else args.baudrate
case 'maxtemp':
maxtemp = [int(i) for i in line[1].rstrip('\n').split(',')]
case '# G-Code starts here\n':
break
case other:
if not line[0].startswith('#') and line[0] != '\n':
logger.info(f"unrecognized config option: {line}")
for line in machineconf.readlines():
if not line.startswith('#'):
command, desc = line.rstrip('\n').split('=')
valid_commands[command] = desc
#logger.info("GOT COMMAND:",command,desc)
result = logging.getLogger(machine_name) # TODO looks like this inherits from root logger because it seems to use handler_streamHandler that prints CRITICAL messages on stderr and always
f_handler = logging.FileHandler( machine_name+'.out' if args.out is None else args.out )
f_handler.setFormatter( out_formatter )
result.addHandler(f_handler)
result.setLevel(args.out_level)
result.info(f";{pendulum.now()}:Logging initialized for {machine_name}")
# Validate GCODE input
if args.gcode:
for gcode in args.gcode:
if gcode is not None and os.path.exists(gcode):
if os.path.isfile(gcode):
if gcode.strip().lower().endswith(".gcode"):
continue
else:
logger.critical(f"{gcode} does not have .gcode extension.")
sys.exit()
else:
logger.critical(f"{gcode} is not a file.")
sys.exit()
elif gcode is not None:
logger.critical(f"{gcode} does not exist.")
sys.exit()
for line in BANNER.split('\n'):
logger.info(line)
#try:
# termwidth = os.get_terminal_size().columns-1
#except OSError:
termwidth = 80
for line in f"""{termwidth*'='}
Machine name: {machine_name}
Port name: {serial_port}
Baud rate: {baudrate}
G-Code: {args.gcode}
{termwidth*'='}""".split('\n'):
logger.info(line)
SER = serial.Serial(serial_port, baudrate)
main(
SER,
machine_name, serial_port,
maxtemp,
args.gcode,
)