-
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathbackdrop.py
3665 lines (2924 loc) · 169 KB
/
backdrop.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
""" This module handles the UI, and starting the main program.
BackDrop is intended to be used as a data backup solution, to assist in
logically copying files from point A to point B. This is complete with
verification, and many other organization and integrity features.
"""
__version__ = '4.0.1'
import platform
import wx
import wx.adv
import wx.lib.inspection
from sys import exit
import shutil
import os
import subprocess
import webbrowser
import ctypes
from signal import signal, SIGINT
from datetime import datetime
import re
import pickle
import clipboard
from pynput import keyboard
if platform.system() == 'Windows':
import pythoncom
import win32api
import win32file
import wmi
import logging
from bin.fileutils import FileUtils, get_drive_list, human_filesize, get_directory_size, get_file_hash, do_delete
from bin.threadmanager import ThreadManager
from bin.config import Config
from bin.backup import Backup
from bin.repeatedtimer import RepeatedTimer
from bin.update import UpdateHandler
from bin.uielements import Color, RootWindow, ModalWindow, StatusBar, WarningPanel, FancyProgressBar, SelectionListCtrl, CopyListPanel, InlineLabel, Counter, DetailBlock, BackupDetailBlock, resource_path
from bin.status import Status
def on_press(key):
"""Do things when keys are pressed.
Args:
key (keyboard.Key): The key that was pressed.
"""
global keypresses
if key == keyboard.Key.alt_l:
keypresses['AltL'] = True
if key == keyboard.Key.alt_r:
keypresses['AltR'] = True
if key == keyboard.Key.alt_gr:
keypresses['AltGr'] = True
if keypresses['AltL'] or keypresses['AltR'] or keypresses['AltGr']:
keypresses['Alt'] = True
def on_release(key):
"""Do things when keys are pressed.
Args:
key (keyboard.Key): The key that was released.
"""
global keypresses
if key == keyboard.Key.alt_l:
keypresses['AltL'] = False
if key == keyboard.Key.alt_r:
keypresses['AltR'] = False
if key == keyboard.Key.alt_gr:
keypresses['AltGr'] = False
if not keypresses['AltL'] and not keypresses['AltR'] and not keypresses['AltGr']:
keypresses['Alt'] = False
# FIXME: post_event() probably shouldn't hardcode the wx.Frame
def post_event(evt_type, data=None, frame: wx.Frame = None):
"""Post a wx.PyEvent of a given type with optional data.
Args:
evt_type (wx.EventType): The event flag to use.
data (tuple[]): Any data to append to the event (optional).
frame (wx.Frame): The wxPython frame to bind the event to.
"""
if frame is None:
frame = main_frame
event = wx.PyEvent()
event.SetEventType(evt_type)
event.data = data
wx.PostEvent(frame, event)
def update_file_detail_lists(list_name: str, files: set):
"""Update the file lists for the detail file view.
Args:
list_name (String): The list name to update.
filename (set): The file path to add to the list.
"""
# Ignore empty filename sets
if not files:
return
file_detail_list[list_name].extend([{
'displayName': filename.split(os.path.sep)[-1],
'filename': filename
} for filename in files])
if list_name == FileUtils.LIST_TOTAL_DELETE:
file_details_pending_delete_counter.value = len(file_detail_list[FileUtils.LIST_TOTAL_DELETE])
file_details_pending_delete_counter_total.value = len(file_detail_list[FileUtils.LIST_TOTAL_DELETE])
file_details_pending_sizer.Layout()
elif list_name == FileUtils.LIST_TOTAL_COPY:
file_details_pending_copy_counter.value = len(file_detail_list[FileUtils.LIST_TOTAL_COPY])
file_details_pending_copy_counter_total.value = len(file_detail_list[FileUtils.LIST_TOTAL_COPY])
file_details_pending_sizer.Layout()
elif list_name in [FileUtils.LIST_DELETE_SUCCESS, FileUtils.LIST_DELETE_FAIL, FileUtils.LIST_SUCCESS, FileUtils.LIST_FAIL]:
# Remove file from pending list
file_detail_list_name = FileUtils.LIST_TOTAL_COPY if list_name in [FileUtils.LIST_SUCCESS, FileUtils.LIST_FAIL] else FileUtils.LIST_TOTAL_DELETE
file_detail_list[file_detail_list_name] = [file for file in file_detail_list[file_detail_list_name] if file['filename'] not in files]
# Update file counter
if list_name in [FileUtils.LIST_SUCCESS, FileUtils.LIST_FAIL]:
file_details_pending_copy_counter.value = len(file_detail_list[file_detail_list_name])
else:
file_details_pending_delete_counter.value = len(file_detail_list[file_detail_list_name])
file_details_pending_sizer.Layout()
# Update copy list scrollable
filenames = [filename.split(os.path.sep)[-1] for filename in files]
if list_name in [FileUtils.LIST_SUCCESS, FileUtils.LIST_DELETE_SUCCESS]:
file_details_success_panel.AddItems(items=filenames, color=Color.FADED if list_name == FileUtils.LIST_DELETE_SUCCESS else Color.TEXT_DEFAULT)
# Remove all but the most recent 250 items for performance reasons
# FIXME: See if truncating the list like this is needed in wxPython
# file_details_copied.show_items(250)
else:
file_details_failed_panel.AddItems(items=filenames, color=Color.FADED if list_name == FileUtils.LIST_DELETE_FAIL else Color.TEXT_DEFAULT)
# Update counter in status bar
FAILED_FILE_COUNT = len(file_detail_list[FileUtils.LIST_FAIL]) + len(file_detail_list[FileUtils.LIST_DELETE_FAIL])
status_bar.SetErrorCount(FAILED_FILE_COUNT)
# HACK: The scroll yview won't see the label instantly after it's packed.
# Sleeping for a brief time fixes that. This is acceptable as long as it's
# not run in the main thread, else the UI will hang.
# FIXME: See if truncating the list like this is needed in wxPython
# file_details_failed.show_items()
backup_error_log = []
def display_backup_progress(copied: int, total: int, display_filename: str = None, operation: int = None, display_index: int = None):
"""Display the copy progress of a transfer
Args:
copied (int): the number of bytes copied.
total (int): The total file size.
display_filename (String): The filename to display inthe GUI (optional).
operation (int): The mode to display the progress in (optional).
display_index (int): The index to display the item in the GUI (optional).
"""
if copied > total:
copied = total
if total > 0:
percent_copied = copied / total * 100
else:
percent_copied = 100
# If display index has been specified, write progress to GUI
if display_index is not None:
if operation == Status.FILE_OPERATION_DELETE:
progress_bar_master.SetValue(backup.progress['current'])
progress_bar_file.SetValue(copied)
progress_bar_file.SetRange(total)
cmd_info_blocks[display_index].SetLabel('progress', label=f"Deleted {display_filename}")
cmd_info_blocks[display_index].SetForegroundColour('progress', Color.TEXT_DEFAULT)
elif operation == Status.FILE_OPERATION_COPY:
progress_bar_master.SetValue(backup.progress['current'])
progress_bar_file.SetValue(copied)
progress_bar_file.SetRange(total)
cmd_info_blocks[display_index].SetLabel('progress', label=f"{percent_copied:.2f}% \u27f6 {human_filesize(copied)} of {human_filesize(total)}")
cmd_info_blocks[display_index].SetForegroundColour('progress', Color.TEXT_DEFAULT)
elif operation == Status.FILE_OPERATION_VERIFY:
progress_bar_master.SetValue(backup.progress['current'])
progress_bar_file.SetValue(copied)
progress_bar_file.SetRange(total)
cmd_info_blocks[display_index].SetLabel('progress', label=f"Verifying \u27f6 {percent_copied:.2f}% \u27f6 {human_filesize(copied)} of {human_filesize(total)}")
cmd_info_blocks[display_index].SetForegroundColour('progress', Color.BLUE)
cmd_info_blocks[display_index].Layout()
summary_details_sizer.Layout()
summary_details_box.Layout()
def get_backup_killflag() -> bool:
"""Get backup thread kill flag status.
Returns:
bool: The kill flag of the backup thread.
"""
# Backup is not defined or is not running
if not backup or not backup.running:
return False
return thread_manager.threadlist['Backup']['killFlag']
def display_backup_summary_chunk(title: str, payload: list, reset: bool = None):
"""Display a chunk of a backup analysis summary to the user.
Args:
title (String): The heading title of the chunk.
payload (tuple[]): The chunks of data to display.
payload tuple[0]: The subject of the data line.
payload tuple[1]: The data to associate to the subject.
reset (bool): Whether to clear the summary frame first (default: False).
"""
if reset is None:
reset = False
if reset:
summary_summary_sizer.Clear(True)
summary_summary_sizer.Layout()
heading_label = wx.StaticText(summary_summary_panel, -1, label=title, name='Backup summary chunk header label')
heading_label.SetFont(FONT_HEADING)
chunk_sizer = wx.GridBagSizer()
for i, item in enumerate(payload):
col1_label = wx.StaticText(summary_summary_panel, -1, label=item[0], name='Backup summary chunk name label')
col2_label = wx.StaticText(summary_summary_panel, -1, label='\u27f6', name='Backup summary chunk arrow label')
col3_label = wx.StaticText(summary_summary_panel, -1, label=item[1], name='Backup summary chunk summary label')
if len(item) > 2 and not item[2]:
col1_label.SetForegroundColour(Color.FADED)
col2_label.SetForegroundColour(Color.FADED)
col3_label.SetForegroundColour(Color.FADED)
chunk_sizer.Add(col1_label, (i, 0))
chunk_sizer.Add(col2_label, (i, 1))
chunk_sizer.Add(col3_label, (i, 2))
summary_summary_sizer.Add(heading_label, 0)
summary_summary_sizer.Add(chunk_sizer, 0)
summary_summary_sizer.Layout()
# QUESTION: Instead of the copy function handling display, can it just set variables, and have the timer handle all the UI stuff?
def update_backup_eta_timer(progress_info: dict):
"""Update the backup timer to show ETA.
Args:
progress_info (dict): The progress of the current backup
"""
if backup.status == Status.BACKUP_ANALYSIS_RUNNING or backup.status == Status.BACKUP_ANALYSIS_FINISHED:
backup_eta_label.SetLabel('Analysis in progress. Please wait...')
backup_eta_label.SetForegroundColour(Color.TEXT_DEFAULT)
backup_eta_label.Layout()
summary_sizer.Layout()
elif backup.status == Status.BACKUP_IDLE or backup.status == Status.BACKUP_ANALYSIS_ABORTED:
backup_eta_label.SetLabel('Please start a backup to show ETA')
backup_eta_label.SetForegroundColour(Color.TEXT_DEFAULT)
backup_eta_label.Layout()
summary_sizer.Layout()
elif backup.status == Status.BACKUP_BACKUP_RUNNING:
# Total is copy source, verify dest, so total data is 2 * copy
total_to_copy = progress_info['total']['total'] - progress_info['total']['delete_total']
running_time = backup.timer.elapsed
if total_to_copy > 0:
percent_copied = progress_info['total']['current'] / total_to_copy
else:
percent_copied = 0
if percent_copied > 0:
remaining_time = running_time / percent_copied - running_time
else:
# Show infinity symbol if no calculated ETA
remaining_time = '\u221e'
backup_eta_label.SetLabel(f'{str(running_time).split(".")[0]} elapsed \u27f6 {str(remaining_time).split(".")[0]} remaining')
backup_eta_label.SetForegroundColour(Color.TEXT_DEFAULT)
backup_eta_label.Layout()
summary_sizer.Layout()
elif backup.status == Status.BACKUP_BACKUP_ABORTED:
backup_eta_label.SetLabel(f'Backup aborted in {str(backup.timer.elapsed).split(".")[0]}')
backup_eta_label.SetForegroundColour(Color.FAILED)
backup_eta_label.Layout()
summary_sizer.Layout()
elif backup.status == Status.BACKUP_BACKUP_FINISHED:
backup_eta_label.SetLabel(f'Backup completed successfully in {str(backup.timer.elapsed).split(".")[0]}')
backup_eta_label.SetForegroundColour(Color.FINISHED)
backup_eta_label.Layout()
summary_sizer.Layout()
def display_backup_command_info(display_command_list: list) -> list:
"""Enumerate the display widget with command info after a backup analysis.
Args:
display_command_list (list): The command list to pull data from.
"""
global cmd_info_blocks
summary_details_sizer.Clear(True)
summary_details_sizer.Layout()
cmd_info_blocks = []
for i, item in enumerate(display_command_list):
if item['type'] == Backup.COMMAND_TYPE_FILE_LIST:
if item['mode'] == Status.FILE_OPERATION_DELETE:
cmd_header_text = f"Delete {len(item['list'])} files from {item['dest']}"
elif item['mode'] == Status.FILE_OPERATION_UPDATE:
cmd_header_text = f"Update {len(item['list'])} files on {item['dest']}"
elif item['mode'] == Status.FILE_OPERATION_COPY:
cmd_header_text = f"Copy {len(item['list'])} new files to {item['dest']}"
else:
cmd_header_text = f"Work with {len(item['list'])} files on {item['dest']}"
backup_summary_block = BackupDetailBlock(
parent=summary_details_panel,
title=cmd_header_text,
text_font=FONT_DEFAULT,
bold_font=FONT_BOLD
)
if item['type'] == Backup.COMMAND_TYPE_FILE_LIST:
# Handle list trimming
dc = wx.ScreenDC()
dc.SetFont(FONT_BOLD)
FILE_LIST_HEADER_WIDTH = dc.GetTextExtent('File list: ').GetWidth()
dc.SetFont(FONT_DEFAULT)
TOOLTIP_HEADER_WIDTH = dc.GetTextExtent('(Click to copy)').GetWidth()
trimmed_file_list = ', '.join(item['list'])[:250]
MAX_WIDTH = summary_details_panel.GetSize().GetWidth() - FILE_LIST_HEADER_WIDTH - TOOLTIP_HEADER_WIDTH - 2 * ITEM_UI_PADDING - 50 # Used to be 80%
actual_file_width = dc.GetTextExtent(trimmed_file_list).GetWidth()
if actual_file_width > MAX_WIDTH:
while actual_file_width > MAX_WIDTH and len(trimmed_file_list) > 1:
trimmed_file_list = trimmed_file_list[:-1]
actual_file_width = dc.GetTextExtent(f'{trimmed_file_list}...').GetWidth()
trimmed_file_list = f'{trimmed_file_list}...'
backup_summary_block.add_line('file_size', 'Total size', human_filesize(item['size']))
backup_summary_block.add_line('file_list', 'File list', trimmed_file_list, '\n'.join(item['list']))
backup_summary_block.add_line('current_file', 'Current file', 'Pending' if item['enabled'] else 'Skipped', fg=Color.PENDING if item['enabled'] else Color.FADED)
backup_summary_block.add_line('progress', 'Progress', 'Pending' if item['enabled'] else 'Skipped', fg=Color.PENDING if item['enabled'] else Color.FADED)
summary_details_sizer.Add(backup_summary_block, 0, wx.EXPAND)
summary_details_sizer.Layout()
summary_details_box.Layout()
cmd_info_blocks.append(backup_summary_block)
def backup_reset_ui():
"""Reset the UI when we run a backup analysis."""
# Empty backup error log
backup_error_log.clear()
# Empty backup summary and detail panes
summary_summary_sizer.Clear(True)
summary_summary_sizer.Layout()
summary_details_sizer.Clear(True)
summary_details_sizer.Layout()
# Clear file lists for file details pane
for list_name in file_detail_list.keys():
file_detail_list[list_name].clear()
# Reset file details counters
file_details_pending_delete_counter.value = 0
file_details_pending_delete_counter_total.value - 0
file_details_pending_copy_counter.value = 0
file_details_pending_copy_counter_total.value = 0
file_details_pending_sizer.Layout()
file_details_success_panel.Clear()
file_details_failed_panel.Clear()
def request_kill_analysis():
"""Kill a running analysis."""
if backup:
update_status_bar_action(Status.BACKUP_ANALYSIS_HALT_REQUESTED)
backup.kill(Backup.KILL_ANALYSIS)
def start_backup_analysis():
"""Start the backup analysis in a separate thread."""
global backup
# FIXME: If backup @analysis @thread is already running, it needs to be killed before it's rerun
# CAVEAT: This requires some way to have the @analysis @thread itself check for the kill flag and break if it's set.
if (backup and backup.running) or verification_running or not source_avail_drive_list:
return
# TODO: Move status bar error log counter reset to reset UI function?
backup_reset_ui()
status_bar.SetErrorCount(0)
update_ui_component(Status.UPDATEUI_CURRENT_FILE_DETAILS, data='')
backup = Backup(
config=config,
backup_config_dir=BACKUP_CONFIG_DIR,
backup_config_file=BACKUP_CONFIG_FILE,
analysis_pre_callback_fn=request_update_ui_pre_analysis,
analysis_callback_fn=request_update_ui_post_analysis,
backup_callback_fn=lambda cmd=None: post_event(evt_type=EVT_BACKUP_FINISHED, data=cmd)
)
thread_manager.start(ThreadManager.KILLABLE, target=backup.analyze, name='Backup Analysis', daemon=True)
def load_source():
"""Load the source destination and source lists, and display sources in the tree."""
global PREV_SOURCE_DRIVE
global source_avail_drive_list
global source_drive_default
# Don't load if backup is running
if backup and backup.running:
return
post_event(evt_type=EVT_PROGRESS_MASTER_START_INDETERMINATE)
# Empty tree in case this is being refreshed
source_tree.DeleteAllItems()
flags = 0
if prefs.get('selection', 'source_local_drives', default=True, data_type=Config.BOOLEAN):
flags = flags | FileUtils.LOCAL_DRIVE
if prefs.get('selection', 'source_network_drives', default=False, data_type=Config.BOOLEAN):
flags = flags | FileUtils.NETWORK_DRIVE
source_avail_drive_list = get_drive_list(
system_drive=SYSTEM_DRIVE,
flags=flags
)
if settings_source_mode in [Config.SOURCE_MODE_SINGLE_PATH, Config.SOURCE_MODE_MULTI_PATH] or source_avail_drive_list:
# Display empty selection sizes
source_selected_space.SetLabel('None')
source_selected_space.SetForegroundColour(Color.FADED)
source_selected_space.Layout()
source_total_space.SetLabel('~None')
source_total_space.SetForegroundColour(Color.FADED)
source_total_space.Layout()
source_dest_selection_info_sizer.Layout()
if source_warning_panel.IsShown():
source_warning_panel.Hide()
source_tree.Show()
source_src_sizer.Layout()
summary_sizer.Layout()
root_sizer.Layout()
source_src_control_dropdown.Clear()
selected_source_mode = prefs.get('selection', 'source_mode', Config.SOURCE_MODE_SINGLE_DRIVE, verify_data=Config.SOURCE_MODE_OPTIONS)
if selected_source_mode == Config.SOURCE_MODE_SINGLE_DRIVE:
config['source_path'] = prefs.get('selection', 'source_drive', source_avail_drive_list[0], verify_data=source_avail_drive_list)
source_drive_default = config['source_path']
PREV_SOURCE_DRIVE = config['source_path']
source_src_control_dropdown.Append(source_avail_drive_list)
source_src_control_dropdown.SetSelection(source_src_control_dropdown.FindString(config['source_path']))
# Enumerate list of paths in source
if SYS_PLATFORM == PLATFORM_WINDOWS:
config['source_path'] = config['source_path'] + os.path.sep
for directory in next(os.walk(config['source_path']))[1]:
source_tree.Append((directory, '', 'Unknown', 0))
elif selected_source_mode == Config.SOURCE_MODE_MULTI_DRIVE:
# Enumerate list of paths in source
for drive in source_avail_drive_list:
drive_name = prefs.get('source_names', drive, default=drive.split(':')[0])
source_tree.Append((drive, drive_name, 'Unknown', 0))
elif selected_source_mode == Config.SOURCE_MODE_SINGLE_PATH:
if config['source_path'] and os.path.isdir(config['source_path']):
for directory in next(os.walk(config['source_path']))[1]:
# QUESTION: Should files be allowed in custom source?
source_tree.Append((directory, '', 'Unknown', 0))
source_tree.Layout()
source_src_sizer.Layout()
elif settings_source_mode in [Config.SOURCE_MODE_SINGLE_DRIVE, Config.SOURCE_MODE_MULTI_DRIVE]:
source_drive_default = 'No drives available'
if not source_warning_panel.IsShown():
source_tree.Hide()
source_warning_panel.Show()
source_src_sizer.Layout()
summary_sizer.Layout()
root_sizer.Layout()
post_event(evt_type=EVT_PROGRESS_MASTER_STOP_INDETERMINATE)
def load_source_in_background():
"""Start a source refresh in a new thread."""
thread_manager.start(ThreadManager.KILLABLE, is_progress_thread=True, target=post_event(evt_type=EVT_REQUEST_LOAD_SOURCE), name='Load source', daemon=True)
def change_source_drive(e):
"""Change the source drive to pull sources from to a new selection."""
global PREV_SOURCE_DRIVE
global config
selection = source_src_control_dropdown.GetValue()
# If backup is running, ignore request to change
if backup and backup.running:
prev_source_index = source_src_control_dropdown.FindString(PREV_SOURCE_DRIVE)
source_src_control_dropdown.SetSelection(prev_source_index)
return
# Invalidate analysis validation
reset_analysis_output()
config['source_path'] = selection
PREV_SOURCE_DRIVE = selection
prefs.set('selection', 'source_drive', selection)
load_source_in_background()
# If a drive type is selected for both source and destination, reload
# destination so that the source drive doesn't show in destination list
if ((settings_show_drives_source_local and settings_show_drives_destination_local) # Local selected
or (settings_show_drives_source_network and settings_show_drives_destination_network)): # Network selected
load_dest_in_background()
def reset_analysis_output():
"""Reset the summary panel for running an analysis."""
summary_summary_sizer.Clear(True)
summary_summary_sizer.Layout()
summary_details_sizer.Clear(True)
summary_details_sizer.Layout()
summary_summary_sizer.Add(wx.StaticText(summary_summary_panel, -1, label="This area will summarize the backup that's been configured.", name='Backup summary placeholder tooltip 1'), 0)
summary_summary_sizer.Add(wx.StaticText(summary_summary_panel, -1, label='Please start a backup analysis to generate a summary.', name='Backup summary placeholder tooltip 2'), 0, wx.TOP, 5)
summary_summary_sizer.Layout()
summary_summary_box.Layout()
summary_details_sizer.Add(wx.StaticText(summary_details_panel, -1, label="This area will show details of the backup that's been configured.", name='Backup details placeholder tooltip 1'), 0)
summary_details_sizer.Add(wx.StaticText(summary_details_panel, -1, label='Please start a backup analysis to generate details.', name='Backup details placeholder tooltip 2'), 0, wx.TOP, 5)
summary_details_sizer.Layout()
summary_details_box.Layout()
def add_source_to_tree(data):
"""Add a source to the source tree.
Args:
data (tuple): The data to add.
"""
source_tree.Append(data)
def update_source_meta_control_label(label: str):
"""Update the source control label string.
Args:
label (String): The label to set.
"""
source_src_control_label.SetLabel(label=label)
source_src_control_label.Layout()
source_src_control_sizer.Layout()
def update_source_meta_selected_label(label: str):
"""Update the source selected space label string.
Args:
label (String): The label to set.
"""
source_selected_space.SetLabel(human_filesize(label))
source_selected_space.SetForegroundColour(Color.TEXT_DEFAULT if label > 0 else Color.FADED)
source_src_selection_info_sizer.Layout()
source_src_sizer.Layout()
def update_source_meta_total_space(label: str, color: wx.Colour):
"""Update the source selected space label string.
Args:
label (String): The label to set.
color (wx.Colour): The color to set the text to.
"""
source_total_space.SetLabel(label)
source_total_space.SetForegroundColour(color)
source_src_selection_info_sizer.Layout()
source_src_sizer.Layout()
def update_source_size(item: int):
"""Update source info for a given source.
Args:
item (String): The identifier for a source in the source tree to be calculated.
"""
# FIXME: This crashes if you change the source, and the number of items in the tree changes while it's calculating things
source_name = source_tree.GetItem(item, SOURCE_COL_PATH).GetText()
if settings_source_mode in [Config.SOURCE_MODE_SINGLE_DRIVE, Config.SOURCE_MODE_SINGLE_PATH]:
source_path = os.path.join(config['source_path'], source_name)
elif settings_source_mode in [Config.SOURCE_MODE_MULTI_DRIVE, Config.SOURCE_MODE_MULTI_PATH]:
source_path = source_name
source_dir_size = get_directory_size(source_path)
source_tree.SetItem(item, SOURCE_COL_SIZE, label=human_filesize(source_dir_size))
source_tree.SetItem(item, SOURCE_COL_RAWSIZE, label=str(source_dir_size))
# After calculating source info, update the meta info
selected_total = 0
selected_source_list = []
selected_item = source_tree.GetFirstSelected()
while selected_item != -1:
# Write selected sources to config
source_info = {
'size': int(source_tree.GetItem(selected_item, SOURCE_COL_RAWSIZE).GetText())
}
if settings_source_mode in [Config.SOURCE_MODE_MULTI_DRIVE, Config.SOURCE_MODE_MULTI_PATH]:
source_info['path'] = source_tree.GetItem(selected_item, SOURCE_COL_PATH).GetText()
if SYS_PLATFORM == PLATFORM_WINDOWS:
# Windows uses drive letters, so default name is letter
default_name = source_info['path'][0]
else:
# Linux uses mount points, so get last dir name
default_name = source_info['path'].split(os.path.sep)[-1]
source_info['dest_name'] = source_tree.GetItem(selected_item, SOURCE_COL_NAME).GetText() if source_tree.GetItem(selected_item, SOURCE_COL_NAME).GetText() else default_name
else:
# If single drive mode, use source name as dest name
source_info['dest_name'] = source_tree.GetItem(selected_item, SOURCE_COL_PATH).GetText()
source_info['path'] = os.path.join(config['source_path'], source_info['dest_name'])
selected_source_list.append(source_info)
# Add total space of selection
if source_tree.GetItem(selected_item, SOURCE_COL_SIZE).GetText() != 'Unknown':
# Add total space of selection
selected_total += int(source_tree.GetItem(selected_item, SOURCE_COL_RAWSIZE).GetText())
selected_item = source_tree.GetNextSelected(selected_item)
post_event(evt_type=EVT_UPDATE_SOURCE_META_SELECTED_LABEL, data=selected_total)
config['sources'] = selected_source_list
source_total = sum([int(source_tree.GetItem(item, SOURCE_COL_RAWSIZE).GetText()) for item in range(source_tree.GetItemCount())])
human_size_list = [source_tree.GetItem(item, SOURCE_COL_SIZE).GetText() for item in range(source_tree.GetItemCount())]
# Recalculate and display the selected total
event = wx.PyEvent()
event.SetEventType(EVT_UPDATE_SOURCE_META_TOTAL)
event.label = f'{"~" if "Unknown" in human_size_list else ""}{human_filesize(source_total)}'
event.color = Color.TEXT_DEFAULT if source_total > 0 else Color.FADED
wx.PostEvent(main_frame, event)
# If everything's calculated, enable analysis button to be clicked
selected_source_list = []
selected_item = source_tree.GetFirstSelected()
while selected_item != -1:
selected_source_list.append(selected_item)
selected_item = source_tree.GetNextSelected(selected_item)
source_size_list = [source_tree.GetItem(item, SOURCE_COL_SIZE).GetText() for item in selected_source_list]
if 'Unknown' not in source_size_list:
start_analysis_btn.Enable()
update_status_bar_selection()
post_event(evt_type=EVT_PROGRESS_MASTER_STOP_INDETERMINATE)
# IDEA: @Calculate total space of all @sources in background
def select_source():
"""Calculate and display the filesize of a selected source, if it hasn't been calculated.
This gets the selection in the source tree, and then calculates the filesize for
all sources selected that haven't yet been calculated. The summary of total
selection, and total source space is also shown below the tree.
"""
global prev_source_selection
global source_selection_total
global backup
# If tree is locked, ignore changes
if source_tree.locked:
return
if not backup or not backup.running:
post_event(evt_type=EVT_PROGRESS_MASTER_START_INDETERMINATE)
# If analysis was run, invalidate it
reset_analysis_output()
# Get selection delta to figure out what to calculate
item = source_tree.GetFirstSelected()
selected = []
while item != -1:
selected.append(item)
item = source_tree.GetNextSelected(item)
new_sources = []
if selected:
for item in selected:
source_info = {
'size': int(source_tree.GetItem(item, SOURCE_COL_RAWSIZE).GetText())
}
if settings_source_mode in [Config.SOURCE_MODE_MULTI_DRIVE, Config.SOURCE_MODE_MULTI_PATH]:
source_info['path'] = source_tree.GetItem(item, SOURCE_COL_PATH).GetText()
if SYS_PLATFORM == PLATFORM_WINDOWS:
# Windows uses drive letters, so default name is letter
default_name = source_info['path'][0]
else:
# Linux uses mount points, so get last dir name
default_name = source_info['path'].split(os.path.sep)[-1]
source_info['dest_name'] = source_tree.GetItem(item, SOURCE_COL_NAME).GetText() if source_tree.GetItem(item, SOURCE_COL_NAME).GetText() else default_name
else:
# If single drive mode, use source name as dest name
source_info['dest_name'] = source_tree.GetItem(item, SOURCE_COL_PATH).GetText()
source_info['path'] = os.path.join(config['source_path'], source_info['dest_name'])
new_sources.append(source_info)
else:
source_selected_space.SetLabel('None')
source_selected_space.SetForegroundColour(Color.FADED)
source_src_selection_info_sizer.Layout()
source_src_sizer.Layout()
config['sources'] = new_sources
update_status_bar_selection()
new_selected = [item for item in selected if item not in prev_source_selection]
# Mark new selections as pending in UI
for item in new_selected:
source_tree.SetItem(item, SOURCE_COL_SIZE, label='Calculating')
# Update selected meta info to known selection before calculating new selections
selection_known_size_items = [item for item in range(source_tree.GetItemCount()) if source_tree.IsSelected(item) and item not in new_selected]
selection_known_size = sum([int(source_tree.GetItem(item, SOURCE_COL_RAWSIZE).GetText()) for item in selection_known_size_items])
source_selected_space.SetLabel(human_filesize(selection_known_size))
source_selected_space.SetForegroundColour(Color.TEXT_DEFAULT if selection_known_size > 0 else Color.FADED)
source_src_selection_info_sizer.Layout()
source_src_sizer.Layout()
# For each selected item, calculate size and add to total
for item in new_selected:
update_status_bar_selection(Status.BACKUPSELECT_CALCULATING_SOURCE)
start_analysis_btn.Disable()
post_event(evt_type=EVT_UPDATE_SOURCE_SIZE, data=item)
# Set current selection to previous selection var to be referenced next call
prev_source_selection = selected
post_event(evt_type=EVT_PROGRESS_MASTER_STOP_INDETERMINATE)
else:
# Temporarily unbind selection handlers so this function doesn't keep
# running with every change
# IDEA: Is this better to replace select tree bindings with mouse down bindings?
source_tree.Lock()
for item in range(source_tree.GetItemCount()):
source_tree.Select(item, on=item in prev_source_selection)
if prev_source_selection:
source_tree.Focus(prev_source_selection[-1])
# Re-enable the selection handlers that were temporarily disabled
source_tree.Unlock()
def add_dest_to_tree(data):
"""Add a destination to the destination tree.
Args:
data (tuple): The data to add.
"""
dest_tree.Append(data)
def update_dest_meta_total_space(total):
"""Refresh the destination metadata.
Args:
total (int): The total to update.
"""
dest_total_space.SetLabel(human_filesize(total))
dest_total_space.Layout()
source_dest_selection_info_sizer.Layout()
def load_dest():
"""Load the destination path info, and display it in the tree."""
global dest_drive_master_list
# Don't load if backup is running
if backup and backup.running:
return
post_event(evt_type=EVT_PROGRESS_MASTER_START_INDETERMINATE)
# Empty tree in case this is being refreshed
dest_tree.DeleteAllItems()
if prefs.get('selection', 'dest_mode', default=Config.DEST_MODE_DRIVES, verify_data=Config.DEST_MODE_OPTIONS) == Config.DEST_MODE_DRIVES:
if SYS_PLATFORM == PLATFORM_WINDOWS:
logical_drive_list = win32api.GetLogicalDriveStrings().split('\000')[:-1]
logical_drive_list = [drive[:2] for drive in logical_drive_list]
# Associate logical drives with physical drives, and map them to physical serial numbers
logical_to_physical_map = {}
pythoncom.CoInitialize()
try:
for physical_disk in wmi.WMI().Win32_DiskDrive():
for partition in physical_disk.associators("Win32_DiskDriveToDiskPartition"):
logical_to_physical_map.update({logical_disk.DeviceID[0]: physical_disk.SerialNumber.strip() for logical_disk in partition.associators("Win32_LogicalDiskToPartition")})
finally:
pythoncom.CoUninitialize()
# Enumerate drive list to find info about all non-source drives
total_drive_space_available = 0
dest_drive_master_list = []
for drive in logical_drive_list:
if drive != config['source_path'] and drive != SYSTEM_DRIVE:
drive_type = win32file.GetDriveType(drive)
drive_type_list = []
if prefs.get('selection', 'destination_network_drives', default=False, data_type=Config.BOOLEAN):
drive_type_list.append(DRIVE_TYPE_REMOTE)
if prefs.get('selection', 'destination_local_drives', default=True, data_type=Config.BOOLEAN):
drive_type_list.append(DRIVE_TYPE_FIXED)
drive_type_list.append(DRIVE_TYPE_REMOVABLE)
if drive_type in drive_type_list:
try:
drive_size = shutil.disk_usage(drive).total
vsn = os.stat(drive).st_dev
vsn = '{:04X}-{:04X}'.format(vsn >> 16, vsn & 0xffff)
try:
serial = logical_to_physical_map[drive[0]]
except KeyError:
serial = 'Not Found'
drive_has_config_file = os.path.exists(os.path.join(drive, BACKUP_CONFIG_DIR, BACKUP_CONFIG_FILE)) and os.path.isfile(os.path.join(drive, BACKUP_CONFIG_DIR, BACKUP_CONFIG_FILE))
total_drive_space_available = total_drive_space_available + drive_size
post_event(evt_type=EVT_ADD_DEST_TO_TREE, data=(
drive,
'',
human_filesize(drive_size),
'Yes' if drive_has_config_file else '',
vsn,
serial,
drive_size
))
dest_drive_master_list.append({
'name': drive,
'vid': vsn,
'serial': serial,
'capacity': drive_size,
'hasConfig': drive_has_config_file
})
except (FileNotFoundError, OSError):
pass
else:
local_selected = prefs.get('selection', 'destination_local_drives', default=True, data_type=Config.BOOLEAN)
network_selected = prefs.get('selection', 'destination_network_drives', default=False, data_type=Config.BOOLEAN)
if network_selected and not local_selected:
cmd = ['df', ' -tcifs', '-tnfs', '--output=target']
elif local_selected and not network_selected:
cmd = ['df', ' -xtmpfs', '-xsquashfs', '-xdevtmpfs', '-xcifs', '-xnfs', '--output=target']
elif local_selected and network_selected:
cmd = ['df', ' -xtmpfs', '-xsquashfs', '-xdevtmpfs', '--output=target']
out = subprocess.run(cmd,
stdout=subprocess.PIPE,
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
logical_drive_list = out.stdout.decode('utf-8').split('\n')[1:]
logical_drive_list = [mount for mount in logical_drive_list if mount and mount != config['source_path']]
total_drive_space_available = 0
dest_drive_master_list = []
for drive in logical_drive_list:
drive_name = f'"{drive}"'
out = subprocess.run("mount | grep " + drive_name + " | awk 'NR==1{print $1}' | sed 's/[0-9]*//g'",
stdout=subprocess.PIPE,
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
shell=True)
physical_disk = out.stdout.decode('utf-8').split('\n')[0].strip()
# Only process mount point if it's not on the system drive
if physical_disk != SYSTEM_DRIVE and drive != '/':
drive_size = shutil.disk_usage(drive).total
# Get volume ID, remove dashes, and format the last 8 characters
out = subprocess.run(f"df {drive_name} --output=source | awk 'NR==2' | xargs lsblk -o uuid | awk 'NR==2'",
stdout=subprocess.PIPE,
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
shell=True)
vsn = out.stdout.decode('utf-8').split('\n')[0].strip().replace('-', '').upper()
vsn = f'{vsn[-8:-4]}-{vsn[-4:]}'
# Get drive serial, if present
out = subprocess.run(f"lsblk -o serial '{physical_disk}' | awk 'NR==2'",
stdout=subprocess.PIPE,
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
shell=True)
serial = out.stdout.decode('utf-8').split('\n')[0].strip()
# Set default if serial not found
serial = serial if serial else 'Not Found'
drive_has_config_file = os.path.exists(os.path.join(drive, BACKUP_CONFIG_DIR, BACKUP_CONFIG_FILE)) and os.path.isfile(os.path.join(drive, BACKUP_CONFIG_DIR, BACKUP_CONFIG_FILE))
total_drive_space_available += drive_size
add_dest_to_tree(evt_type=EVT_ADD_DEST_TO_TREE, data=(
drive,
'',
human_filesize(drive_size),
'Yes' if drive_has_config_file else '',
vsn,
serial,
drive_size
))
dest_drive_master_list.append({
'name': drive,
'vid': vsn,
'serial': serial,
'capacity': drive_size,
'hasConfig': drive_has_config_file
})
elif settings_dest_mode == Config.DEST_MODE_PATHS:
total_drive_space_available = 0
post_event(evt_type=EVT_UPDATE_DEST_META_TOTAL, data=total_drive_space_available)
post_event(evt_type=EVT_PROGRESS_MASTER_STOP_INDETERMINATE)
def load_dest_in_background():
"""Start the loading of the destination path info in a new thread."""