Skip to content

Commit

Permalink
Merge pull request #37 from sendbird/release-1.2.0
Browse files Browse the repository at this point in the history
Release v1.2.0
  • Loading branch information
jjh-kim authored Jan 21, 2025
2 parents 5c37f79 + 4c89637 commit d076d00
Show file tree
Hide file tree
Showing 18 changed files with 244 additions and 167 deletions.
8 changes: 7 additions & 1 deletion deploy/compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ services:
command: ["python", "-m", "sbosc.controller.main"]
restart: always
depends_on:
- redis
redis:
condition: service_healthy

eventhandler:
<<: *component-base
Expand Down Expand Up @@ -49,6 +50,11 @@ services:
volumes:
- redis-data:/data
- ./redis.conf:/usr/local/etc/redis/redis.conf
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5

volumes:
redis-data:
8 changes: 8 additions & 0 deletions doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ If you set this parameter to `True`, SB-OSC will skip the bulk import stage and
### disable_apply_dml_events
If you set this parameter to `True`, SB-OSC will pause before `apply_dml_events` stage. This is useful when you have additional steps to perform manually before applying DML events.

### disable_eventhandler
If you set this parameter to `True`, SB-OSC will disable eventhandler, which means it will not process binlog events. Only bulk import will be performed.

After `bulk_import_validation` stage it will move directly to `done` stage. So, `add_index` stage will be skipped since `apply_dml_events` stage will not be executed.


## Chunk
### max_chunk_count & min_chunk_size
Expand All @@ -34,3 +39,6 @@ These parameters control insert throughput of SB-OSC. `batch_size` and `thread_c

`LIMIT batch_size` is applied to the next query to prevent from inserting too many rows at once.

**Note:** This option utilizes cursor.lastrowid to the `last_inserted_pk` which only returns non-zero value when table has **AUTO_INCREMENT** column.
([MySQL Document](https://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlcursor-lastrowid.html))

23 changes: 12 additions & 11 deletions doc/operation-class.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@ class MessageRetentionOperation(BaseOperation):
INSERT INTO {self.source_db}.{self.destination_table}({self.source_columns})
SELECT {self.source_columns}
FROM {self.source_db}.{self.source_table} AS source
WHERE source.id BETWEEN {start_pk} AND {end_pk}
WHERE source.{self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
"""
def _get_not_imported_pks_query(self, start_pk, end_pk):
return f'''
SELECT source.id FROM {self.source_db}.{self.source_table} AS source
LEFT JOIN {self.source_db}.{self.destination_table} AS dest ON source.id = dest.id
WHERE source.id BETWEEN {start_pk} AND {end_pk}
SELECT source.{self.pk_column} FROM {self.source_db}.{self.source_table} AS source
LEFT JOIN {self.source_db}.{self.destination_table} AS dest
ON source.{self.pk_column} = dest.{self.pk_column}
WHERE source.{self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
AND dest.id IS NULL
AND dest.{self.pk_column} IS NULL
'''
```
Expand All @@ -48,20 +49,20 @@ class CrossClusterMessageRetentionOperation(CrossClusterBaseOperation):
def _select_batch_query(self, start_pk, end_pk):
return f'''
SELECT {self.source_columns} FROM {self.source_db}.{self.source_table}
WHERE id BETWEEN {start_pk} AND {end_pk}
WHERE {self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
'''
def get_not_imported_pks(self, source_cursor, dest_cursor, start_pk, end_pk):
source_cursor.execute(f'''
SELECT id FROM {self.source_db}.{self.source_table}
WHERE id BETWEEN {start_pk} AND {end_pk}
SELECT {self.pk_column} FROM {self.source_db}.{self.source_table}
WHERE {self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
''')
source_pks = [row[0] for row in source_cursor.fetchall()]
dest_cursor.execute(f'''
SELECT id FROM {self.destination_db}.{self.destination_table}
WHERE id BETWEEN {start_pk} AND {end_pk}
SELECT {self.pk_column} FROM {self.destination_db}.{self.destination_table}
WHERE {self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL 30 DAY)
''')
dest_pks = [row[0] for row in dest_cursor.fetchall()]
Expand Down Expand Up @@ -89,7 +90,7 @@ class MessageRetentionOperation(BaseOperation):
INSERT INTO {self.source_db}.{self.destination_table}({self.source_columns})
SELECT {self.source_columns}
FROM {self.source_db}.{self.source_table} AS source
WHERE source.id BETWEEN {start_pk} AND {end_pk}
WHERE source.{self.pk_column} BETWEEN {start_pk} AND {end_pk}
AND source.ts > DATE_SUB(NOW(), INTERVAL {self.operation_config.retention_days} DAY)
"""
```
1 change: 1 addition & 0 deletions src/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class Config:
USE_BATCH_SIZE_MULTIPLIER = False

# EventHandler config
DISABLE_EVENTHANDLER = False
EVENTHANDLER_THREAD_COUNT = 4
EVENTHANDLER_THREAD_TIMEOUT_IN_SECONDS = 300
INIT_BINLOG_FILE: str = None
Expand Down
10 changes: 5 additions & 5 deletions src/modules/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ def cursor(self, cursorclass=None):
def ping(self):
if not self._conn:
self._conn = self.connect()
self._conn.ping()
try:
self._conn.ping()
except OperationalError:
self._conn = self.connect()

def close(self):
if self._conn:
Expand Down Expand Up @@ -104,10 +107,7 @@ def get_connection(self):

yield conn

try:
conn.ping()
except OperationalError:
conn = Connection(self.endpoint)
conn.ping()
if self.free_connections.full():
raise Exception("Connection pool full")
else:
Expand Down
3 changes: 2 additions & 1 deletion src/modules/redis/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class Metadata(Hash):
destination_db: str
destination_table: str
source_columns: str
max_id: int
pk_column: str
max_pk: int
start_datetime: datetime


Expand Down
75 changes: 39 additions & 36 deletions src/sbosc/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def start(self):
if action:
action()

# TODO: Add Redis data validation if needed
time.sleep(self.interval)

# Close db connection
Expand All @@ -63,14 +62,14 @@ def create_bulk_import_chunks(self):
self.redis_data.remove_all_chunks()

metadata = self.redis_data.metadata
max_id = metadata.max_id
max_pk = metadata.max_pk

# chunk_count is determined by min_chunk_size and max_chunk_count
# Each chunk will have min_chunk_size rows and the number of chunks should not exceed max_chunk_count
min_chunk_size = config.MIN_CHUNK_SIZE
max_chunk_count = config.MAX_CHUNK_COUNT # Number of chunks means max number of worker threads
chunk_count = min(max_id // min_chunk_size, max_chunk_count)
chunk_size = max_id // chunk_count
chunk_count = min(max_pk // min_chunk_size, max_chunk_count)
chunk_size = max_pk // chunk_count

# Create chunks
# Each chunk will have a range of primary key values [start_pk, end_pk]
Expand All @@ -79,7 +78,7 @@ def create_bulk_import_chunks(self):
start_pk = i * chunk_size + 1
end_pk = (i + 1) * chunk_size
if i == chunk_count - 1:
end_pk = max_id
end_pk = max_pk

chunk_id = f"{self.migration_id}-{i}"
chunk_info = self.redis_data.get_chunk_info(chunk_id)
Expand Down Expand Up @@ -112,7 +111,7 @@ def create_bulk_import_chunks(self):
self.redis_data.set_current_stage(Stage.BULK_IMPORT)
self.slack.send_message(
subtitle="Bulk import started",
message=f"Max id: {max_id}\n"
message=f"Max PK: {max_pk}\n"
f"Chunk count: {chunk_count}\n"
f"Chunk size: {chunk_size}\n"
f"Batch size: {config.MIN_BATCH_SIZE}\n"
Expand Down Expand Up @@ -166,7 +165,10 @@ def validate_bulk_import(self):
self.redis_data.set_current_stage(Stage.BULK_IMPORT_VALIDATION_FAILED)
self.slack.send_message(message="Bulk import validation failed", color="danger")
else:
self.redis_data.set_current_stage(Stage.APPLY_DML_EVENTS)
if not config.DISABLE_EVENTHANDLER:
self.redis_data.set_current_stage(Stage.APPLY_DML_EVENTS)
else:
self.redis_data.set_current_stage(Stage.DONE)
self.slack.send_message(message="Bulk import validation succeeded", color="good")
except StopFlagSet:
return
Expand Down Expand Up @@ -213,45 +215,46 @@ def add_index(self):
finished_all_creation = False
while not self.stop_flag:
finished_creation = False
with self.db.cursor() as cursor:
cursor: Cursor
with self.db.cursor(role='reader') as source_cursor:
source_cursor: Cursor

index_info = None
cursor.execute(f'''
source_cursor.execute(f'''
SELECT index_name FROM {config.SBOSC_DB}.index_creation_status
WHERE migration_id = %s AND ended_at IS NULL AND started_at IS NOT NULL
''', (self.migration_id,))

if cursor.rowcount > 0:
index_names = [row[0] for row in cursor.fetchall()]
if source_cursor.rowcount > 0:
index_names = [row[0] for row in source_cursor.fetchall()]
self.slack.send_message(
subtitle="Found unfinished index creation", message=f"Indexes: {index_names}", color="warning")

while True:
if self.stop_flag:
return
cursor.execute(f'''
SELECT DISTINCT database_name, table_name, index_name FROM mysql.innodb_index_stats
WHERE database_name = %s AND table_name = %s
AND index_name IN ({','.join(['%s'] * len(index_names))})
''', [metadata.destination_db, metadata.destination_table] + index_names)
if cursor.rowcount == len(index_names):
finished_creation = True
break
with self.db.cursor(host='dest', role='reader') as dest_cursor:
dest_cursor.execute(f'''
SELECT DISTINCT database_name, table_name, index_name FROM mysql.innodb_index_stats
WHERE database_name = %s AND table_name = %s
AND index_name IN ({','.join(['%s'] * len(index_names))})
''', [metadata.destination_db, metadata.destination_table] + index_names)
if dest_cursor.rowcount == len(index_names):
finished_creation = True
break
self.logger.info("Waiting for index creation to finish")
time.sleep(60)

else:
cursor.execute(f'''
source_cursor.execute(f'''
SELECT index_name, index_columns, is_unique FROM {config.SBOSC_DB}.index_creation_status
WHERE migration_id = %s AND ended_at IS NULL LIMIT {config.INDEX_CREATED_PER_QUERY}
''', (self.migration_id,))

if cursor.rowcount == 0:
if source_cursor.rowcount == 0:
finished_all_creation = True
break

index_info = cursor.fetchall()
index_info = source_cursor.fetchall()
index_names = [index_name for index_name, *_ in index_info]

if index_info and not finished_creation:
Expand All @@ -260,30 +263,30 @@ def add_index(self):

# update ended_at
started_at = datetime.now()
with self.db.cursor() as cursor:
cursor: Cursor
cursor.executemany(f'''
with self.db.cursor() as source_cursor:
source_cursor: Cursor
source_cursor.executemany(f'''
UPDATE {config.SBOSC_DB}.index_creation_status SET started_at = %s
WHERE migration_id = %s AND index_name = %s
''', [(started_at, self.migration_id, index_name) for index_name in index_names])

# add index
with self.db.cursor(host='dest') as cursor:
cursor: Cursor
with self.db.cursor(host='dest') as dest_cursor:
dest_cursor: Cursor

# set session variables
if config.INNODB_DDL_BUFFER_SIZE is not None:
cursor.execute(f"SET SESSION innodb_ddl_buffer_size = {config.INNODB_DDL_BUFFER_SIZE}")
dest_cursor.execute(f"SET SESSION innodb_ddl_buffer_size = {config.INNODB_DDL_BUFFER_SIZE}")
self.logger.info(f"Set innodb_ddl_buffer_size to {config.INNODB_DDL_BUFFER_SIZE}")
if config.INNODB_DDL_THREADS is not None:
cursor.execute(f"SET SESSION innodb_ddl_threads = {config.INNODB_DDL_THREADS}")
dest_cursor.execute(f"SET SESSION innodb_ddl_threads = {config.INNODB_DDL_THREADS}")
self.logger.info(f"Set innodb_ddl_threads to {config.INNODB_DDL_THREADS}")
if config.INNODB_PARALLEL_READ_THREADS is not None:
cursor.execute(
dest_cursor.execute(
f"SET SESSION innodb_parallel_read_threads = {config.INNODB_PARALLEL_READ_THREADS}")
self.logger.info(f"Set innodb_parallel_read_threads to {config.INNODB_PARALLEL_READ_THREADS}")

cursor.execute(f'''
dest_cursor.execute(f'''
ALTER TABLE {metadata.destination_db}.{metadata.destination_table}
{', '.join([
f"ADD{' UNIQUE' if is_unique else ''} INDEX {index_name} ({index_columns})"
Expand All @@ -296,9 +299,9 @@ def add_index(self):
if finished_creation:
# update ended_at
ended_at = datetime.now()
with self.db.cursor() as cursor:
cursor: Cursor
cursor.executemany(f'''
with self.db.cursor() as source_cursor:
source_cursor: Cursor
source_cursor.executemany(f'''
UPDATE {config.SBOSC_DB}.index_creation_status SET ended_at = %s
WHERE migration_id = %s AND index_name = %s
''', [(ended_at, self.migration_id, index_name) for index_name in index_names])
Expand Down Expand Up @@ -343,7 +346,7 @@ def swap_tables(self):
old_source_table = f"{metadata.source_db}.{self.redis_data.old_source_table}"
cursor.execute(f"RENAME TABLE {source_table} TO {old_source_table}")
after_rename_table_timestamp = time.time()
cursor.execute(f"SELECT MAX(id) FROM {old_source_table}")
cursor.execute(f"SELECT MAX({metadata.pk_column}) FROM {old_source_table}")
final_max_id = cursor.fetchone()[0]

with self.validator.migration_operation.override_source_table(self.redis_data.old_source_table):
Expand Down
28 changes: 23 additions & 5 deletions src/sbosc/controller/initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,32 @@ def fetch_metadata(self, redis_data):
metadata.source_columns = cursor.fetchone()[0]
self.logger.info("Saved source column schema to Redis")

# Get max id
cursor.execute("SELECT MAX(id) FROM %s.%s" % (metadata.source_db, metadata.source_table))
max_id = cursor.fetchone()[0]
metadata.max_id = max_id
# Get pk column
cursor.execute(f'''
SELECT COLUMN_NAME FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = '{metadata.source_db}' AND TABLE_NAME = '{metadata.source_table}'
AND COLUMN_KEY = 'PRI' AND DATA_TYPE IN ('int', 'bigint')
''')
if cursor.rowcount == 0:
raise Exception("Integer primary key column not found")
metadata.pk_column = f"`{cursor.fetchone()[0]}`"
self.logger.info("Saved primary key column to Redis")

# Get max PK
cursor.execute('''
SELECT MAX(%s) FROM %s.%s
''' % (metadata.pk_column, metadata.source_db, metadata.source_table))
max_pk = cursor.fetchone()[0]
if max_pk is None:
raise Exception("No data in source table")
metadata.max_pk = max_pk
self.logger.info("Saved total rows to Redis")

metadata.start_datetime = datetime.now()
redis_data.set_current_stage(Stage.START_EVENT_HANDLER)
if not config.DISABLE_EVENTHANDLER:
redis_data.set_current_stage(Stage.START_EVENT_HANDLER)
else:
redis_data.set_current_stage(Stage.BULK_IMPORT_CHUNK_CREATION)

def init_migration(self):
if not self.check_database_setup():
Expand Down
Loading

0 comments on commit d076d00

Please sign in to comment.