forked from omniti-labs/pg_extractor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpg_extractor.py
executable file
·1528 lines (1377 loc) · 86.4 KB
/
pg_extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
import argparse
import errno
import fileinput
import os
import os.path
import random
import re
import shutil
import subprocess
import sys
import tempfile
import time
from multiprocessing import Process
from multiprocessing import ProcessError
class PGExtractor:
"""
A class object for the PG Extractor PostgreSQL dump filter script.
Some public methods are available for individual use outside this script's normal function,
but many of its advanced features are only available via the command line interface to the script.
"""
def __init__(self):
self.version = "2.4.0"
self.args = False
self.temp_filelist = []
self.error_list = []
######################################################################################
#
# PUBLIC METHODS
#
######################################################################################
def build_main_object_list(self, restore_file="#default#"):
"""
Build a list of all objects contained in the dump file
* restore_file: full path to a custom format (-Fc) pg_dump file
Returns a list containing a dictionary object for each line obtained when running pg_restore -l
"""
main_object_list = []
if restore_file == "#default#":
restore_file = self.tmp_dump_file.name
pg_restore_cmd = ["pg_restore", "--list", restore_file]
try:
restore_object_list = subprocess.check_output(pg_restore_cmd, universal_newlines=True).splitlines()
except subprocess.CalledProcessError as e:
print("Error in pg_restore when generating main object list: " + str(e.cmd))
sys.exit(2)
p_objid = '\d+;\s\d+\s\d+'
# Actual types extracted is controlled in create_extract_files(). This is list format mapping choices.
# Order of this list matters if the string starts with the same word (ex TABLE DATA before TABLE).
# Last object in this list cannot have a space in it.
# If an object type is missing, please let me know and I'll add it.
p_types = "ACL|AGGREGATE|COMMENT|CONSTRAINT|DATABASE|DEFAULT\sACL|DEFAULT|"
p_types += "DOMAIN|EXTENSION|FK\sCONSTRAINT|FOREIGN\sTABLE|FUNCTION|"
p_types += "INDEX|RULE|SCHEMA|SEQUENCE\sOWNED\sBY|SEQUENCE\sSET|SEQUENCE|"
p_types += "TABLE\sDATA|TABLE|TRIGGER|TYPE|VIEW|MATERIALIZED\sVIEW\sDATA|MATERIALIZED\sVIEW|"
p_types += "SERVER|USER\sMAPPING"
p_main_object_type = re.compile(p_objid + r'\s(?P<type>' + p_types + ')')
p_object_mapping = re.compile(r'(?P<objid>' + p_objid + ')\s'
r'(?P<objtype>' + p_types + ')\s'
r'(?P<objschema>\S+)\s'
r'(?P<objname>\S+)\s'
r'(?P<objowner>\S+)')
p_extension_mapping = re.compile(r'(?P<objid>' + p_objid + ')\s'
r'(?P<objtype>' + p_types + ')\s'
r'(?P<objschema>\S+)\s'
r'(?P<objname>\S+)\s')
p_function_mapping = re.compile(r'(?P<objid>' + p_objid + ')\s'
r'(?P<objtype>\S+)\s'
r'(?P<objschema>\S+)\s'
r'(?P<objname>.*\))\s'
r'(?P<objowner>\S+)')
p_comment_mapping = re.compile(r'(?P<objid>' + p_objid + ')\s'
r'(?P<objtype>\S+)\s'
r'(?P<objschema>\S+)\s'
r'(?P<objsubtype>\S+)\s'
r'(?P<objname>\S+)\s'
r'(?P<objowner>\S+)')
p_comment_extension_mapping = re.compile(r'(?P<objid>' + p_objid + ')\s'
r'(?P<objtype>COMMENT)\s'
r'(?P<objschema>\S+)\s'
r'(?P<objsubtype>\S+)\s'
r'(?P<objname>\S+)\s')
p_comment_function_mapping = re.compile(r'(?P<objid>' + p_objid + ')\s'
r'(?P<objtype>COMMENT)\s'
r'(?P<objschema>\S+)\s'
r'(?P<objsubtype>\S+)\s'
r'(?P<objname>.*\))\s'
r'(?P<objowner>\S+)')
p_comment_dash_mapping = re.compile(r'(?P<objid>' + p_objid + ')\s'
r'(?P<objtype>COMMENT)\s'
r'(?P<objschema>\-)\s'
r'(?P<objsubtype>\S+)\s'
r'(?P<objname>\S+)\s'
r'(?P<objowner>\S+)')
p_comment_for_db_dash_mapping = re.compile(r'(?P<objid>' + p_objid + ')\s'
r'(?P<objtype>COMMENT)\s'
r'(?P<objschema>\-)\s'
r'(?P<objname>\S+)\s'
r'(?P<objowner>\S+)')
p_comment_on_mapping = re.compile(r'(?P<objid>' + p_objid + ')\s'
r'(?P<objtype>COMMENT)\s'
r'(?P<objschema>\S+)\s'
r'(?P<objsubtype>\S+)\s'
r'(?P<objname>\S+)\s'
r'(?P<objsource>ON\s\S+)\s'
r'(?P<objowner>\S+)')
p_default_acl_mapping = re.compile(r'(?P<objid>' + p_objid + ')\s'
r'(?P<objtype>DEFAULT ACL)\s'
r'(?P<objschema>\S+)\s'
r'(?P<objstatement>DEFAULT PRIVILEGES FOR)\s'
r'(?P<objsubtype>\S+)\s'
r'(?P<objrole>\S+)')
p_96_rule_mapping = re.compile(r'(?P<objid>' + p_objid + ')\s'
r'(?P<objtype>RULE)\s'
r'(?P<objschema>\S+)\s'
r'(?P<objtable>\S+)\s'
r'(?P<objname>\S+)\s'
r'(?P<objowner>\S+)')
p_server_mapping = re.compile(r'(?P<objid>' + p_objid + ')\s'
r'(?P<objtype>SERVER)\s'
r'(?P<objschema>\S+)\s'
r'(?P<objname>\S+)\s'
r'(?P<objowner>\S+)')
p_user_mapping_mapping = re.compile(r'(?P<objid>' + p_objid + ')\s'
r'(?P<objtype>USER MAPPING)\s'
r'(?P<objschema>\S+)\s'
r'(?P<objstatement>USER MAPPING)\s'
r'(?P<objusermapping>\S+)\s'
r'(?P<objserverstatement>\S+)\s'
r'(?P<objservername>\S+)\s'
r'(?P<objowner>\S+)')
if self.args and self.args.debug:
self._debug_print("\nPG_RESTORE LIST:")
for o in restore_object_list:
self._debug_print(o)
for o in restore_object_list:
if re.match(r'^;', o):
continue
obj_type = p_main_object_type.match(o)
if obj_type != None:
if ( re.match(p_objid + r'\s(FUNCTION|AGGREGATE)', o)
# Matches function/agg or the ACL for them
or (obj_type.group('type').strip() == "ACL" and re.search(r'\(.*\)', o)) ):
obj_mapping = p_function_mapping.match(o)
objname = obj_mapping.group('objname')
basename = objname[:objname.find("(")]
object_dict = dict([('objid', obj_mapping.group('objid'))
, ('objtype', obj_mapping.group('objtype'))
, ('objschema', obj_mapping.group('objschema'))
, ('objname', obj_mapping.group('objname'))
, ('objbasename', basename)
, ('objowner', obj_mapping.group('objowner'))
])
main_object_list.append(object_dict)
continue
if obj_type.group('type').strip() == "EXTENSION":
obj_mapping = p_extension_mapping.match(o)
object_dict = dict([('objid', obj_mapping.group('objid'))
, ('objtype', obj_mapping.group('objtype'))
, ('objschema', obj_mapping.group('objschema'))
, ('objname', obj_mapping.group('objname'))
])
main_object_list.append(object_dict)
continue
if obj_type.group('type').strip() == "COMMENT":
if re.match(p_objid + r'\sCOMMENT\s\S+\s(FUNCTION|AGGREGATE)', o):
obj_mapping = p_comment_function_mapping.match(o)
objname = obj_mapping.group('objname')
basename = objname[:objname.find("(")]
object_dict = dict([('objid', obj_mapping.group('objid'))
, ('objtype', obj_mapping.group('objtype'))
, ('objschema', obj_mapping.group('objschema'))
, ('objsubtype', obj_mapping.group('objsubtype'))
, ('objname', obj_mapping.group('objname'))
, ('objbasename', basename)
, ('objowner', obj_mapping.group('objowner'))
])
main_object_list.append(object_dict)
continue
elif re.match(p_objid + r'\sCOMMENT\s\-\sEXTENSION', o):
obj_mapping = p_comment_extension_mapping.match(o)
object_dict = dict([('objid', obj_mapping.group('objid'))
, ('objtype', obj_mapping.group('objtype'))
, ('objschema', obj_mapping.group('objschema'))
, ('objsubtype', obj_mapping.group('objsubtype'))
, ('objname', obj_mapping.group('objname'))
])
main_object_list.append(object_dict)
continue
elif re.match(p_objid + r'\sCOMMENT\s\-\s', o):
obj_mapping = p_comment_dash_mapping.match(o)
if obj_mapping is None:
obj_mapping = p_comment_for_db_dash_mapping.match(o)
if obj_mapping is None:
raise ValueError('unexpected line in pg_restore list: {!r}'.format(o))
# we don't want saving a database's comment, so we're just skipping this line
continue
object_dict = dict([('objid', obj_mapping.group('objid'))
, ('objtype', obj_mapping.group('objtype'))
, ('objsubtype', obj_mapping.group('objsubtype'))
, ('objname', obj_mapping.group('objname'))
, ('objowner', obj_mapping.group('objowner'))
])
main_object_list.append(object_dict)
continue
elif re.match(p_objid + r'\sCOMMENT\s\S+\sRULE', o):
obj_mapping = p_comment_on_mapping.match(o)
object_dict = dict([('objid', obj_mapping.group('objid'))
, ('objtype', obj_mapping.group('objtype'))
, ('objschema', obj_mapping.group('objschema'))
, ('objsubtype', obj_mapping.group('objsubtype'))
, ('objname', obj_mapping.group('objname'))
, ('objsource', obj_mapping.group('objsource'))
, ('objowner', obj_mapping.group('objowner'))
])
main_object_list.append(object_dict)
continue
else:
obj_mapping = p_comment_mapping.match(o)
object_dict = dict([('objid', obj_mapping.group('objid'))
, ('objtype', obj_mapping.group('objtype'))
, ('objschema', obj_mapping.group('objschema'))
, ('objsubtype', obj_mapping.group('objsubtype'))
, ('objname', obj_mapping.group('objname'))
, ('objowner', obj_mapping.group('objowner'))
])
main_object_list.append(object_dict)
continue
if obj_type.group('type').strip() == "DEFAULT ACL":
obj_mapping = p_default_acl_mapping.match(o)
object_dict = dict([('objid', obj_mapping.group('objid'))
, ('objtype', obj_mapping.group('objtype'))
, ('objschema', obj_mapping.group('objschema'))
, ('objstatement', obj_mapping.group('objstatement'))
, ('objsubtype', obj_mapping.group('objsubtype'))
, ('objrole', obj_mapping.group('objrole'))
])
main_object_list.append(object_dict)
continue
if obj_type.group('type').strip() == "SERVER":
obj_mapping = p_server_mapping.match(o)
object_dict = dict([('objid', obj_mapping.group('objid'))
, ('objtype', obj_mapping.group('objtype'))
, ('objschema', obj_mapping.group('objschema'))
, ('objname', obj_mapping.group('objname'))
, ('objowner', obj_mapping.group('objowner'))
])
main_object_list.append(object_dict)
continue
if obj_type.group('type').strip() == "USER MAPPING":
obj_mapping = p_user_mapping_mapping.match(o)
object_dict = dict([('objid', obj_mapping.group('objid'))
, ('objtype', obj_mapping.group('objtype'))
, ('objschema', obj_mapping.group('objschema'))
, ('objstatement', obj_mapping.group('objstatement'))
, ('objusermapping', obj_mapping.group('objusermapping'))
, ('objserverstatement', obj_mapping.group('objserverstatement'))
, ('objservername', obj_mapping.group('objservername'))
, ('objowner', obj_mapping.group('objowner'))
])
main_object_list.append(object_dict)
continue
if self._check_bin_version("pg_restore", "9.6") == True and obj_type.group('type').strip() == "RULE":
if self.args.debug:
print("VERSION EXCEPTION: 9.6 rule build_main_object_list")
# The pg_restore -l line changed in 9.6 for RULES
obj_mapping = p_96_rule_mapping.match(o)
object_dict = dict([('objid', obj_mapping.group('objid'))
, ('objtype', obj_mapping.group('objtype'))
, ('objschema', obj_mapping.group('objschema'))
, ('objtable', obj_mapping.group('objtable'))
, ('objname', obj_mapping.group('objname'))
, ('objowner', obj_mapping.group('objowner'))
])
main_object_list.append(object_dict)
continue
# all the other common object formats
obj_mapping = p_object_mapping.match(o)
object_dict = dict([('objid', obj_mapping.group('objid'))
, ('objtype', obj_mapping.group('objtype'))
, ('objschema', obj_mapping.group('objschema'))
, ('objname', obj_mapping.group('objname'))
, ('objowner', obj_mapping.group('objowner'))
])
main_object_list.append(object_dict)
if self.args and self.args.debug:
self._debug_print("\nMAIN OBJECT LIST")
for o in main_object_list:
self._debug_print(o)
return main_object_list
# end build_main_object_list()
def build_type_object_list(self, object_list, list_types):
"""
Build a list of objects only of the given types.
* object_list - a list in the format created by build_main_object_list
* list_types - a list of desired object types (objtype field in object_list)
Returns a filtered list in the same format as object_list
"""
type_object_list = []
for o in object_list:
for t in list_types:
# Ensure it matches only the exact type given (ex. "SEQUENCE", not "SEQUENCE SET"
if re.match(r'^' + t + '$', o.get('objtype')):
type_object_list.append(o)
if self.args and self.args.debug:
self._debug_print("\nTYPE OBJECT LIST " + str(list_types))
for o in type_object_list:
self._debug_print(o)
return type_object_list
# end build_type_object_list()
def create_dir(self, dest_dir):
"""
Create the given directory if it does not exist.
Must be a full path and full directory tree will be created.
Returns dest_dir if directory creation was successful, or the directory already exists.
"""
try:
os.makedirs(dest_dir)
except OSError as e:
if e.errno == errno.EEXIST and os.path.isdir(dest_dir):
pass
else:
print("Unable to create directory: " + e.filename + ": " + e.strerror)
sys.exit(2)
return dest_dir
# end create_dir()
def create_extract_files(self, object_list, target_dir="#default#"):
"""
Create extracted DDL files in an organized folder structure.
Many of the additional folder & filter options are not available when this is called directly.
pg_dump command uses environment variables for several settings (add list to docstring).
* object_list - a list in the format created by build_main_object_list
* target_dir - full path to a directory to use as output for extracted files.
Will be created if it doesn't exist.
Used in same manner as --basedir option to command line version.
"""
extract_file_list = []
if target_dir == "#default#":
# Allows direct calls to this function to be able to have a working base directory
target_dir = self.args.basedir
acl_list = self.build_type_object_list(object_list, ["ACL"])
comment_list = self.build_type_object_list(object_list, ["COMMENT"])
process_list = []
process_count = 0
# Objects extracted with pg_dump
pgdump_list = self.build_type_object_list(object_list, ["TABLE", "MATERIALIZED VIEW", "VIEW", "FOREIGN TABLE"])
if len(pgdump_list) > 0 and self.args and not self.args.quiet:
print("Extracting tables...")
for o in pgdump_list:
output_file = target_dir
if self.args and self.args.schemadir:
if o.get('objschema') != "-":
output_file = self.create_dir(os.path.join(output_file, o.get('objschema')))
if o.get('objtype') == "TABLE" or o.get('objtype') == "FOREIGN TABLE":
output_file = self.create_dir(os.path.join(output_file, "tables"))
elif o.get('objtype') == "VIEW" or o.get('objtype') == "MATERIALIZED VIEW":
output_file = self.create_dir(os.path.join(output_file, "views"))
else:
print("Invalid dump type in create_extract_files() module")
sys.exit(2)
# replace any non-alphanumeric characters with ",hexcode,"
objschema_filename = re.sub(r'\W', self.replace_char_with_hex, o.get('objschema'))
objname_filename = re.sub(r'\W', self.replace_char_with_hex, o.get('objname'))
output_file = os.path.join(output_file, objschema_filename + "." + objname_filename + ".sql")
extract_file_list.append(output_file)
if self.args and self.args.jobs > 0:
p = Process(target=self._run_pg_dump, args=([o, output_file]))
if self.args and self.args.debug:
self._debug_print("PG_DUMP PROCESS CREATED: " + str(p.name))
process_list.append(p)
if (len(process_list) % self.args.jobs) == 0:
if self.args and self.args.debug:
self._debug_print("PG_DUMP PROCESS RUN JOB COUNT REACHED: " + str(len(process_list)))
self._start_jobs(process_list)
self._wait_jobs(process_list)
process_list = []
process_count += 1
else:
self._run_pg_dump(o, output_file)
# If --jobs value was not reached, finish off any that were left in the queue
if len(process_list) > 0:
if self.args and self.args.debug:
self._debug_print("PG_DUMP PROCESS RUN REMAINING JOBS: " + str(len(process_list)))
self._start_jobs(process_list)
self._wait_jobs(process_list)
# Objects that can be overloaded
process_list = []
process_count = 0
tmp_restore_list = None
func_agg_list = self.build_type_object_list(object_list, ["FUNCTION", "AGGREGATE"])
dupe_list = func_agg_list
if len(func_agg_list) > 0 and self.args and not self.args.quiet:
print("Extracting functions & aggregates...")
for o in func_agg_list:
output_file = target_dir
if self.args and self.args.schemadir:
if o.get('objschema') != "-":
output_file = self.create_dir(os.path.join(output_file, o.get('objschema')))
if o.get('objtype') == "FUNCTION":
output_file = self.create_dir(os.path.join(output_file, 'functions'))
elif o.get('objtype') == "AGGREGATE":
output_file = self.create_dir(os.path.join(output_file, 'aggregates'))
else:
print("Invalid object type found while creating function/aggregate extraction files: " + o.get('objtype'))
# replace any non-alphanumeric characters with ",hexcode,"
objschema_filename = re.sub(r'\W', self.replace_char_with_hex, o.get('objschema'))
objbasename_filename = re.sub(r'\W', self.replace_char_with_hex, o.get('objbasename'))
output_file = os.path.join(output_file, objschema_filename + "." + objbasename_filename + ".sql")
extract_file_list.append(output_file)
if self.args and self.args.temp != None:
tmp_restore_list = tempfile.NamedTemporaryFile(prefix='pg_extractor_restore_list', dir=self.args.temp, delete=False)
else:
tmp_restore_list = tempfile.NamedTemporaryFile(prefix='pg_extractor_restore_list', delete=False)
self.temp_filelist.append(tmp_restore_list.name)
fh = open(tmp_restore_list.name, 'w', encoding='utf-8', newline='\n')
# loop over same list to find overloaded functions
for d in dupe_list:
if ( o.get('objschema') == d.get('objschema') and
o.get('objbasename') == d.get('objbasename') ):
fh.write(d.get('objid') + '\n')
# Should grab all overloaded ACL & COMMENTS since it's matching on basename
for a in acl_list:
if "objbasename" in a:
if o.get('objschema') == a.get('objschema') and o.get('objbasename') == a.get('objbasename'):
fh.write(a.get('objid') + '\n')
for c in comment_list:
if re.match(r'(FUNCTION|AGGREGATE)', c.get('objsubtype')):
if o.get('objschema') == c.get('objschema') and o.get('objbasename') == c.get('objbasename'):
fh.write(c.get('objid') + '\n')
fh.close()
if self.args and self.args.jobs > 0:
p = Process(target=self._run_pg_restore, args=([tmp_restore_list.name, output_file]))
if self.args and self.args.debug:
self._debug_print("PG_RESTORE FUNCTIONS PROCESS CREATED: " + str(p.name))
process_list.append(p)
if (len(process_list) % self.args.jobs) == 0:
if self.args and self.args.debug:
self._debug_print("PG_RESTORE FUNCTIONS PROCESS RUN JOB COUNT REACHED: " + str(len(process_list)))
self._start_jobs(process_list)
self._wait_jobs(process_list)
process_list = []
process_count += 1
else:
self._run_pg_restore(tmp_restore_list.name, output_file)
# If --jobs value was not reached, finish off any that were left in the queue
if len(process_list) > 0:
if self.args and self.args.debug:
self._debug_print("PG_RESTORE FUNCTIONS PROCESS RUN REMAINING JOBS: " + str(len(process_list)))
self._start_jobs(process_list)
self._wait_jobs(process_list)
# Handle if --orreplace is set with --schemadir. This must be done after view & function files have been exported.
if self.args.orreplace:
schema_list = self.build_type_object_list(object_list, ["SCHEMA"])
for o in schema_list:
target_dir_funcs = os.path.join(target_dir, o.get('objname'), "functions")
target_dir_views = os.path.join(target_dir, o.get('objname'), "views")
self.or_replace(target_dir_funcs, target_dir_views)
# Sequences are special little snowflakes
process_list = []
process_count = 0
tmp_restore_list = None
if self.args and self.args.getsequences:
sequence_list = self.build_type_object_list(object_list, ["SEQUENCE"])
dupe_list = self.build_type_object_list(object_list, ["SEQUENCE SET", "SEQUENCE OWNED BY"])
if len(sequence_list) > 0 and self.args and not self.args.quiet:
print("Extracting sequences...")
for o in sequence_list:
output_file = target_dir
if self.args and self.args.schemadir:
if o.get('objschema') != "-":
output_file = self.create_dir(os.path.join(output_file, o.get('objschema')))
output_file = self.create_dir(os.path.join(output_file, 'sequences'))
# replace any non-alphanumeric characters with ",hexcode,"
objschema_filename = re.sub(r'\W', self.replace_char_with_hex, o.get('objschema'))
objname_filename = re.sub(r'\W', self.replace_char_with_hex, o.get('objname'))
output_file = os.path.join(output_file, objschema_filename + "." + objname_filename + ".sql")
extract_file_list.append(output_file)
if self.args and self.args.temp != None:
tmp_restore_list = tempfile.NamedTemporaryFile(prefix='pg_extractor_restore_list', dir=self.args.temp, delete=False)
else:
tmp_restore_list = tempfile.NamedTemporaryFile(prefix='pg_extractor_restore_list', delete=False)
self.temp_filelist.append(tmp_restore_list.name)
fh = open(tmp_restore_list.name, 'w', encoding='utf-8', newline='\n')
fh.write(o.get('objid') + '\n')
for d in dupe_list:
if o.get('objschema') == d.get('objschema') and o.get('objname') == d.get('objname'):
fh.write(d.get('objid') + '\n')
for a in acl_list:
if o.get('objschema') == a.get('objschema') and o.get('objname') == a.get('objname'):
fh.write(a.get('objid') + '\n')
for c in comment_list:
if re.search(r'SEQUENCE', c.get('objsubtype')):
if o.get('objschema') == c.get('objschema') and o.get('objname') == c.get('objname'):
fh.write(c.get('objid') + '\n')
fh.close()
if self.args and self.args.jobs > 0:
p = Process(target=self._run_pg_restore, args=([tmp_restore_list.name, output_file]))
if self.args and self.args.debug:
self._debug_print("PG_RESTORE SEQUENCE PROCESS CREATED: " + str(p.name))
process_list.append(p)
if (len(process_list) % self.args.jobs) == 0:
if self.args and self.args.debug:
self._debug_print("PG_RESTORE SEQUENCE PROCESS RUN JOB COUNT REACHED: " + str(process_count))
self._start_jobs(process_list)
self._wait_jobs(process_list)
process_list = []
process_count += 1
else:
self._run_pg_restore(tmp_restore_list.name, output_file)
# If --jobs value was not reached, finish off any that were left in the queue
if len(process_list) > 0:
if self.args and self.args.debug:
self._debug_print("PG_RESTORE SEQUENCE PROCESS RUN REMAINING JOBS: " + str(len(process_list)))
self._start_jobs(process_list)
self._wait_jobs(process_list)
process_list = []
process_count = 0
tmp_restore_list = None
# Default privileges for roles
if self.args and self.args.getdefaultprivs:
acl_default_list = self.build_type_object_list(object_list, ["DEFAULT ACL"])
dupe_list = acl_default_list
if len(acl_default_list) > 0 and self.args and not self.args.quiet:
print("Extracting default privileges...")
for o in acl_default_list:
output_file = self.create_dir(os.path.join(target_dir, "roles"))
output_file = os.path.join(output_file, o.get('objrole') + ".sql")
extract_file_list.append(output_file)
if self.args and self.args.temp != None:
tmp_restore_list = tempfile.NamedTemporaryFile(prefix='pg_extractor_restore_list', dir=self.args.temp, delete=False)
else:
tmp_restore_list = tempfile.NamedTemporaryFile(prefix='pg_extractor_restore_list', delete=False)
self.temp_filelist.append(tmp_restore_list.name)
fh = open(tmp_restore_list.name, 'w', encoding='utf-8', newline='\n')
for d in dupe_list:
if o.get('objrole') == d.get('objrole'):
fh.write(d.get('objid') + '\n')
fh.close()
if self.args and self.args.jobs > 0:
p = Process(target=self._run_pg_restore, args=([tmp_restore_list.name, output_file]))
if self.args and self.args.debug:
self._debug_print("PG_RESTORE DEFAULT PRIVS PROCESS CREATED: " + str(p.name))
process_list.append(p)
if (len(process_list) % self.args.jobs) == 0:
if self.args and self.args.debug:
self._debug_print("PG_RESTORE DEFAULT PRIVS PROCESS RUN JOB COUNT REACHED: " + str(len(process_list)))
self._start_jobs(process_list)
self._wait_jobs(process_list)
process_list = []
process_count += 1
else:
self._run_pg_restore(tmp_restore_list.name, output_file)
# If --jobs value was not reached, finish off any that were left in the queue
if len(process_list) > 0:
if self.args and self.args.debug:
self._debug_print("PG_RESTORE DEFAULT PRIVS PROCESS RUN REMAINING JOBS: " + str(len(process_list)))
self._start_jobs(process_list)
self._wait_jobs(process_list)
# All other objects extracted via _run_pg_restore()
process_list = []
process_count = 0
tmp_restore_list = None
other_object_list = self.build_type_object_list(object_list, ["RULE", "SCHEMA", "TRIGGER", "TYPE", "EXTENSION", "DOMAIN", "SERVER", "USER MAPPING"])
if len(other_object_list) > 0:
if self.args and not self.args.quiet:
print("Extracting remaining objects...")
for o in other_object_list:
output_file = target_dir
if self.args and self.args.schemadir:
if o.get('objschema') != "-":
output_file = self.create_dir(os.path.join(output_file, o.get('objschema')))
if o.get('objtype') == "RULE":
output_file = self.create_dir(os.path.join(output_file, 'rules'))
# replace any non-alphanumeric characters with ",hexcode,"
objschema_filename = re.sub(r'\W', self.replace_char_with_hex, o.get('objschema'))
objname_filename = re.sub(r'\W', self.replace_char_with_hex, o.get('objname'))
output_file = os.path.join(output_file, objschema_filename + "." + objname_filename + ".sql")
if o.get('objtype') == "SCHEMA":
if self.args and self.args.schemadir:
output_file = self.create_dir(os.path.join(output_file, o.get('objname')))
else:
output_file = self.create_dir(os.path.join(output_file, 'schemata'))
# replace any non-alphanumeric characters with ",hexcode,"
objname_filename = re.sub(r'\W', self.replace_char_with_hex, o.get('objname'))
output_file = os.path.join(output_file, objname_filename + ".sql")
if o.get('objtype') == "TRIGGER":
output_file = self.create_dir(os.path.join(output_file, 'triggers'))
# replace any non-alphanumeric characters with ",hexcode,"
objschema_filename = re.sub(r'\W', self.replace_char_with_hex, o.get('objschema'))
objname_filename = re.sub(r'\W', self.replace_char_with_hex, o.get('objname'))
output_file = os.path.join(output_file, objschema_filename + "." + objname_filename + ".sql")
if o.get('objtype') == "TYPE" or o.get('objtype') == "DOMAIN":
output_file = self.create_dir(os.path.join(output_file, 'types'))
# replace any non-alphanumeric characters with ",hexcode,"
objschema_filename = re.sub(r'\W', self.replace_char_with_hex, o.get('objschema'))
objname_filename = re.sub(r'\W', self.replace_char_with_hex, o.get('objname'))
output_file = os.path.join(output_file, objschema_filename + "." + objname_filename + ".sql")
if o.get('objtype') == "EXTENSION":
output_file = self.create_dir(os.path.join(output_file, 'extensions'))
# replace any non-alphanumeric characters with ",hexcode,"
objname_filename = re.sub(r'\W', self.replace_char_with_hex, o.get('objname'))
output_file = os.path.join(output_file, objname_filename + ".sql")
if o.get('objtype') == "SERVER":
output_file = self.create_dir(os.path.join(output_file, 'servers'))
# replace any non-alphanumeric characters with ",hexcode,"
objname_filename = re.sub(r'\W', self.replace_char_with_hex, o.get('objname'))
output_file = os.path.join(output_file, objname_filename + ".sql")
if o.get('objtype') == "USER MAPPING":
output_file = self.create_dir(os.path.join(output_file, 'user_mappings'))
# replace any non-alphanumeric characters with ",hexcode,"
objusermapping_filename = re.sub(r'\W', self.replace_char_with_hex, o.get('objusermapping'))
objservername_filename = re.sub(r'\W', self.replace_char_with_hex, o.get('objservername'))
output_file = os.path.join(output_file, objusermapping_filename + "_" + objservername_filename + ".sql")
extract_file_list.append(output_file)
if self.args and self.args.temp != None:
tmp_restore_list = tempfile.NamedTemporaryFile(prefix='pg_extractor_restore_list', dir=self.args.temp, delete=False)
else:
tmp_restore_list = tempfile.NamedTemporaryFile(prefix='pg_extractor_restore_list', delete=False)
self.temp_filelist.append(tmp_restore_list.name)
fh = open(tmp_restore_list.name, 'w', encoding='utf-8', newline='\n')
fh.write(o.get('objid') + '\n')
for a in acl_list:
if o.get('objschema') == a.get('objschema') and o.get('objname') == a.get('objname'):
fh.write(a.get('objid') + '\n')
for c in comment_list:
if re.search(r'(RULE|SCHEMA|TRIGGER|TYPE|EXTENSION|DOMAIN)', c.get('objsubtype')):
if o.get('objschema') == c.get('objschema') and o.get('objname') == c.get('objname'):
fh.write(c.get('objid') + '\n')
fh.close()
if self.args and self.args.jobs > 0:
p = Process(target=self._run_pg_restore, args=([tmp_restore_list.name, output_file]))
if self.args and self.args.debug:
self._debug_print("PG_RESTORE PROCESS CREATED: " + str(p.name))
process_list.append(p)
if (len(process_list) % self.args.jobs) == 0:
if self.args and self.args.debug:
self._debug_print("PG_RESTORE PROCESS RUN JOB COUNT REACHED: " + str(len(process_list)))
self._start_jobs(process_list)
self._wait_jobs(process_list)
process_list = []
process_count += 1
else:
self._run_pg_restore(tmp_restore_list.name, output_file)
# If --jobs value was not reached, finish off any that were left in the queue
if len(process_list) > 0:
if self.args and self.args.debug:
self._debug_print("PG_RESTORE PROCESS RUN REMAINING JOBS: " + str(len(process_list)))
self._start_jobs(process_list)
self._wait_jobs(process_list)
# end if block for other_object_list
if self.args and self.args.debug:
self._debug_print("\nEXTRACT FILE LIST")
for f in extract_file_list:
self._debug_print(f)
return extract_file_list
# end create_extract_files()
def delete_files(self, keep_file_list, target_dir="#default#"):
"""
Delete files with .sql extension that don't exist in a list of given files.
Delete folders in a given path if they are empty.
* keep_file_list: list object containing full paths to files that SHOULD REMAIN
* target_dir: full path to target directory of files to clean up.
"""
if target_dir == "#default#":
target_dir = self.args.basedir
if self.args and self.args.debug:
self._debug_print("\nDELETE LIST")
for root, dirs, files in os.walk(target_dir):
files = [f for f in files if not f[0] == '.'] # ignore hidden files
dirs[:] = [d for d in dirs if not d[0] == '.'] # ignore hidden dirs
for name in files:
full_file_name = os.path.join(root, name)
if ( full_file_name not in keep_file_list and
re.search(r'\.sql$', name) ):
if self.args and self.args.debug:
self._debug_print("DELETE FILE: " + full_file_name)
os.remove(full_file_name)
# Clean up empty folders excluding top root
for root, dirs, files in os.walk(target_dir):
files = [f for f in files if not f[0] == '.'] # ignore hidden files
dirs[:] = [d for d in dirs if not d[0] == '.'] # ignore hidden dirs
if root != target_dir and len(files) == 0 and len(dirs) == 0:
if self.args and self.args.debug:
self._debug_print("DELETE EMPTY DIR: " + root)
os.rmdir(root)
# end delete_files()
def extract_roles(self, output_dir="#default#"):
"""
Extract the roles from the database cluster (uses pg_dumpall -r)
* output_dir: full path to folder where file will be created.
Full directory tree will be created if it does not exist.
Returns the full path to the output_file that was created.
"""
pg_dumpall_cmd = ["pg_dumpall", "--roles-only"]
if (self._check_bin_version("pg_dumpall", "9.0") == True) and (self.args.dbname != None):
if self.args.debug:
print("VERSION EXCEPTION: 9.0 pg_dumpall rule")
pg_dumpall_cmd.append("--database=" + self.args.dbname)
if output_dir == "#default#":
output_file = self.create_dir(os.path.join(self.args.basedir, "roles"))
else:
output_file = self.create_dir(output_dir)
output_file = os.path.join(output_file, "roles.sql")
pg_dumpall_cmd.append("--file=" + output_file)
if self.args.debug:
self._debug_print("\nEXTRACT ROLE STATEMENT: " + str(pg_dumpall_cmd))
try:
subprocess.check_output(pg_dumpall_cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
print("Error in pg_dumpall command while extracting roles: " + str(e.output, encoding='utf-8').rstrip() + "\nSubprocess command called: " + str(e.cmd))
sys.exit(2)
return output_file
# end extract_roles()
def print_version(self):
""" Print out the current version of this script. """
print(self.version)
# end print_version()
def or_replace(self, target_dir_funcs="#default#", target_dir_views="#default#"):
"""
Replace CREATE with CREATE OR REPLACE in view & function files in a given target dir
* target_dir_funcs: target directory containing function sql files
* target_dir_views: target directory containint view sql files
"""
if target_dir_funcs == "#default#":
target_dir_funcs = os.path.join(self.args.basedir, "functions")
if target_dir_views == "#default#":
target_dir_views = os.path.join(self.args.basedir, "views")
if self.args and self.args.debug:
self._debug_print("\nOR REPLACE LIST")
if os.path.exists(target_dir_funcs):
for root, dirs, files in os.walk(target_dir_funcs):
files = [f for f in files if not f[0] == '.'] # ignore hidden files
dirs[:] = [d for d in dirs if not d[0] == '.'] # ignore hidden dirs
for name in files:
full_file_name = os.path.join(root, name)
if self.args and self.args.debug:
self._debug_print(full_file_name)
for line in fileinput.input(full_file_name, inplace=True, mode='rb'):
sys.stdout.buffer.write(
re.sub(r'^CREATE FUNCTION\b', "CREATE OR REPLACE FUNCTION", line.decode()).encode()
)
if os.path.exists(target_dir_views):
for root, dirs, files in os.walk(target_dir_views):
files = [f for f in files if not f[0] == '.'] # ignore hidden files
dirs[:] = [d for d in dirs if not d[0] == '.'] # ignore hidden dirs
for name in files:
full_file_name = os.path.join(root, name)
if self.args and self.args.debug:
self._debug_print(full_file_name)
for line in fileinput.input(full_file_name, inplace=True, mode='rb'):
# As of V9.4beta2 MATERIALIZED VIEWS cannot use the "CREATE OR REPLACE" syntax
sys.stdout.buffer.write(
re.sub(r'^CREATE VIEW\b', "CREATE OR REPLACE VIEW", line.decode()).encode()
)
# end or_replace()
def replace_char_with_hex(self, string):
"""
Replace any non-alphanumeric characters in a given string with their hex values.
Hex value will be surrounded by commas on either side to distiguish it.
Example:
str|ing -> str,7c,ng
"""
return ',{:02x},'.format(ord(string.group()))
# end replace_char_with_hex()
def remove_passwords(self, role_file):
"""
Remove the password hash from a role dump file created by pg_dumpall.
Leaves the file as valid SQL, but without the PASSWORD parameter to ALTER ROLE.
* role_file: full path to the dump file
"""
if os.path.isfile(role_file):
for line in fileinput.input(role_file, inplace=True, mode='rb'):
if re.match(r'ALTER ROLE', line.decode()):
sys.stdout.buffer.write(
re.sub(r'(.*)\sPASSWORD\s.*(;)$', r'\1\2', line.decode()).encode()
)
else:
sys.stdout.buffer.write(line)
else:
print("Given role file does not exist: " + role_file)
# end remove_passwords()
def show_examples(self):
print("""
Basic minimum usage.
This will extract all tables, functions/aggregates, views, types & roles.
It uses the directory that pg_extractor is run from as the base directory
(objects will be found in ./mydb/) and will also produce a permanent copy
of the pg_dump file that the objects were extracted from. It expects the
locations of the postgres binaries to be in the $PATH.
python3 pg_extractor.py -U postgres -d mydb --getall --keep_dump
Extract only functions from the "keith" schema
python3 pg_extractor.py -U postgres -d mydb --getfuncs -n keith
Extract only specifically named functions in the given filename (newline
separated list). Ensure the full function signature is given with only
the variable types for arguments. Since the functions desired are all
in one schema, setting the -n option speeds it up a little since it only
has to dump out a single schema to the temp dump file that is used.
python3 pg_extractor.py -U postgres --dbname=mydb --getfuncs
--include_functions_file=/home/postgres/func_incl -n dblink
func_incl file contains:
dblink.dblink_exec(text, text)
dblink.dblink_exec(text, text, boolean)
dblink.dblink_exec(text)
dblink.dblink_exec(text, boolean)
Extract only the tables listed in the given filename list) along
with the data in the pg_dump custom format.
python3 pg_extractor.py -U postgres --dbname=mydb --gettables -Fc
-tf /home/postgres/tbl_incl --getdata
Using an options file
python3 pg_extractor.py @options_file.txt
""")
######################################################################################
#
# PRIVATE METHODS
#
######################################################################################
def _build_filter_list(self, list_type, list_items, list_prefix="#none#"):
"""
Build a list object based on script filter arguments
* list_type: Format that the list_items paramter is in ("csv" or "file")
* list_items: either a csv list of items or a file with line separated items
* list_prefix: a string that is placed on the front of every item in the result list
Ex: put "-n " before every item for schema filtering the pg_dump command
Returns list_items as a list object
"""
split_list = []
if list_type == "csv":
split_list = list_items.split(',')
elif list_type == "file":
try:
fh = open(list_items, 'r', encoding='utf-8')
for line in fh:
if not line.strip().startswith('#'):
split_list.append(line.strip())
except IOError as e:
print("Cannot access include/exclude file " + list_items + ": " + e.strerror)
sys.exit(2)
else:
print("Bad include/exclude list formatting")
sys.exit(2)
if list_prefix == "#none#":
# returns as an unaltered list object (used by _filter_object_list)
return split_list
else:
# returns a list with the 3rd parameter prepended to each item (used by pg_dump/restore commands)
return [(list_prefix + x) for x in split_list]
# end _build_filter_list()
def _check_bin_version(self, bin_file, min_version):
"""
Returns true if the major version of the given postgres binary is greater than or equal to the one given
Note that prior to PG10, the major version was the first 2 pieces of the version number (x.x).
* bin_file: binary postgres file that supports a --version argument (pg_dump, pg_dumpall, pg_restore)
with the output format: bin_file (PostgreSQL) x.x.x
* min_version: minimum major version (x.x) that this function will return true for
Returns true or false
"""
min_version_list = min_version.split(".")
min_ver1 = int(min_version_list[0])
if min_ver1 < 10:
min_ver2 = int(min_version_list[1])
bin_version = subprocess.check_output([bin_file, '--version'], universal_newlines = True).rstrip()
version_position = bin_version.index(")") + 1 # add one to remove the space after the paren close
bin_version_list = bin_version[version_position:].split(".")
bin_ver1 = int(bin_version_list[0])
if bin_ver1 < 10:
bin_ver2 = int(bin_version_list[1])
## This is really spammy, but no better place to put it
## Uncomment if needed for version debugging issues
#if self.args.debug:
# print("VERSION CHECK:")
# print("min_ver1: " + str(min_ver1) + ", min_ver2: " + str(min_ver2) )
# print("bin_ver1: " + str(bin_ver1) + ", bin_ver2: " + str(bin_ver2) )
# print()
if bin_ver1 < min_ver1:
return False
elif bin_ver1 < 10:
if bin_ver2 < min_ver2:
return False
return True
def _cleanup_temp_files(self):
"""
Cleanup temporary files left behind by pg_restore.
They are not cleaned up automatically because they must be referenced after
the file is closed for writing.
Processes in the script add to the the global list variable temp_filelist
declared in constructor.
"""
if self.args.debug:
self._debug_print("\nCLEANUP TEMP FILES")
for f in self.temp_filelist:
if self.args.debug:
self._debug_print(f)
if os.path.exists(f):
os.remove(f)
def _create_temp_dump(self):
"""
Create the temp dump file used for rest of script runtime.
"""
if not self.args.quiet:
print("Creating temp dump file...")
pg_dump_cmd = ["pg_dump"]
pg_dump_cmd.append("--format=custom")
# tmp_dump_file is created during _set_config() so it can be used elsewhere easily
pg_dump_cmd.append("--file=" + self.tmp_dump_file.name)
if not self.args.getdata:
# Some object data is only placed in dump file when data is include (ex: sequence values).
# So include all data even in temp dump so that can be obtained.
pg_dump_cmd.append("--schema-only")
if self.args.no_acl:
pg_dump_cmd.append("--no-acl")
if self.args.no_owner: