Skip to content

Commit

Permalink
Add disable eventhandler option
Browse files Browse the repository at this point in the history
  • Loading branch information
jjh-kim committed Jan 9, 2025
1 parent 0ccb8b8 commit 7ecae91
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 3 deletions.
5 changes: 5 additions & 0 deletions doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ If you set this parameter to `True`, SB-OSC will skip the bulk import stage and
### disable_apply_dml_events
If you set this parameter to `True`, SB-OSC will pause before `apply_dml_events` stage. This is useful when you have additional steps to perform manually before applying DML events.

### disable_eventhandler
If you set this parameter to `True`, SB-OSC will disable eventhandler, which means it will not process binlog events. Only bulk import will be performed.

After `bulk_import_validation` stage it will move directly to `done` stage. So, `add_index` stage will be skipped since `apply_dml_events` stage will not be executed.


## Chunk
### max_chunk_count & min_chunk_size
Expand Down
1 change: 1 addition & 0 deletions src/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class Config:
USE_BATCH_SIZE_MULTIPLIER = False

# EventHandler config
DISABLE_EVENTHANDLER = False
EVENTHANDLER_THREAD_COUNT = 4
EVENTHANDLER_THREAD_TIMEOUT_IN_SECONDS = 300
INIT_BINLOG_FILE: str = None
Expand Down
5 changes: 4 additions & 1 deletion src/sbosc/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def start(self):
if action:
action()

# TODO: Add Redis data validation if needed
time.sleep(self.interval)

# Close db connection
Expand Down Expand Up @@ -166,6 +165,10 @@ def validate_bulk_import(self):
self.redis_data.set_current_stage(Stage.BULK_IMPORT_VALIDATION_FAILED)
self.slack.send_message(message="Bulk import validation failed", color="danger")
else:
if not config.DISABLE_EVENTHANDLER:
self.redis_data.set_current_stage(Stage.APPLY_DML_EVENTS)
else:
self.redis_data.set_current_stage(Stage.DONE)
self.redis_data.set_current_stage(Stage.APPLY_DML_EVENTS)
self.slack.send_message(message="Bulk import validation succeeded", color="good")
except StopFlagSet:
Expand Down
5 changes: 4 additions & 1 deletion src/sbosc/controller/initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,10 @@ def fetch_metadata(self, redis_data):
self.logger.info("Saved total rows to Redis")

metadata.start_datetime = datetime.now()
redis_data.set_current_stage(Stage.START_EVENT_HANDLER)
if not config.DISABLE_EVENTHANDLER:
redis_data.set_current_stage(Stage.START_EVENT_HANDLER)
else:
redis_data.set_current_stage(Stage.BULK_IMPORT_CHUNK_CREATION)

def init_migration(self):
if not self.check_database_setup():
Expand Down
2 changes: 1 addition & 1 deletion src/sbosc/eventhandler/eventhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def start(self):
self.logger.info('Starting event handler')
while not self.stop_flag:
current_stage = self.redis_data.current_stage
if Stage.DONE > current_stage >= Stage.START_EVENT_HANDLER:
if Stage.DONE > current_stage >= Stage.START_EVENT_HANDLER and not config.DISABLE_EVENTHANDLER:
if self.log_file is None or self.log_pos is None:
self.logger.info('Initializing event handler')
self.init_event_handler()
Expand Down
3 changes: 3 additions & 0 deletions src/sbosc/monitor/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ def check_migration_status(self):
bulk_import_progress = inserted_rows / self.redis_data.metadata.max_pk * 100
self.metric_sender.submit('sb_osc_bulk_import_progress', bulk_import_progress)

if config.DISABLE_EVENTHANDLER:
return

self.submit_event_handler_timestamps()

# remaining_binlog_size
Expand Down

0 comments on commit 7ecae91

Please sign in to comment.