-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmig_task.py
39 lines (30 loc) · 1.35 KB
/
mig_task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from mig_source_es import MigSourceEs
from mig_target_es import MigTargetEs
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
class MigTask:
def __init__(self, conf):
self.source_es = MigSourceEs(conf['source'])
self.target_es = MigTargetEs(conf['target'])
self.copy_mapping = conf.get('copy_mapping', True)
self.process_bar = None
def set_process_bar(self, total):
self.process_bar = tqdm(total=total)
def update_process_bar(self, value):
self.process_bar.update(value)
def migrate(self):
if self.copy_mapping:
source_mapping = self.source_es.get_mapping()
self.target_es.put_mapping(source_mapping)
resp = self.source_es.init_scroll()
self.set_process_bar(resp['hits']['total'])
with ThreadPoolExecutor(self.target_es.index_thread_num) as executor:
continue_scroll, scroll_id = self.bulk_index(executor, resp)
while continue_scroll:
resp = self.source_es.scroll(scroll_id)
continue_scroll, scroll_id = self.bulk_index(executor, resp)
self.process_bar.clear()
def bulk_index(self, executor, resp):
docs = resp['hits']['hits']
self.target_es.bulk_index(docs, executor, self.update_process_bar)
return len(docs) > 0, resp['_scroll_id']