-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsample_extremes.py
37 lines (27 loc) · 1.47 KB
/
sample_extremes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import argparse
from pyspark.sql.functions import explode
from lib.data_processor import DataProcessor
class Sampler(DataProcessor):
"""Prints data samples of extrem values"""
def _run(self, dataFrames, config, spark):
dataFrame = dataFrames[config.entity]
# nested data
if "." in config.attribute:
levels = config.attribute.split(".")
dataFrame = dataFrame.withColumn("exploded", explode(levels[0]))
levels[0] = "exploded"
config.attribute = ".".join(levels)
# print
dataFrame.where(dataFrame[config.attribute].isNotNull() & (dataFrame[config.attribute] != "")) \
.select(config.attribute) \
.sort(dataFrame[config.attribute].desc() if (config.desc) else dataFrame[config.attribute].asc()) \
.show(config.sample_num, 100)
if "__main__" == __name__:
# init parameters
parser = argparse.ArgumentParser(prog='Sample Printer', description='Show samples of all data')
parser.add_argument('-n', '--sample_num', help='the number of samples to print', default=1000, type=int)
parser.add_argument('-e', '--entity', help='the entity to print', choices=['persons', 'works', 'orgUnits'], required=True)
parser.add_argument('-a', '--attribute', help='the attribute to print', required=True)
parser.add_argument('-d', '--desc', help='desc order', action='store_true')
parser.add_argument('-c', '--chain', help='the source data related to the transformation chain', default='initial')
Sampler().run(parser.parse_args())