Skip to content

Commit

Permalink
util: fix various issues with VGM converter script
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-rodrigues committed Apr 1, 2021
1 parent 6c95486 commit 84a4081
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 61 deletions.
8 changes: 4 additions & 4 deletions scripts/usb_ctrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ def poll_status(stopping_event, status_ep, data_ep, processed_vgm):
status_data = status_ep.read(STATUS_TOTAL_LENGTH, 250)
print("Received status data: ", binascii.hexlify(status_data))

header = int.from_bytes(status_data[0 : 4], byteorder='little')
header = int.from_bytes(status_data[0 : 4], 'little')
if header != BUFFERING_REQUEST_HEADER:
print("Ignoring request with header: ", header)
continue

buffer_target_offset = int.from_bytes(status_data[4 : 8], byteorder='little')
vgm_start_offset = int.from_bytes(status_data[8 : 12], byteorder='little')
vgm_chunk_length = int.from_bytes(status_data[12 : 16], byteorder='little')
buffer_target_offset = int.from_bytes(status_data[4 : 8], 'little')
vgm_start_offset = int.from_bytes(status_data[8 : 12], 'little')
vgm_chunk_length = int.from_bytes(status_data[12 : 16], 'little')

print("Sending VGM chunk to buffer @ {:X}, VGM offset: {:X}, Length: {:X}"\
.format(buffer_target_offset, vgm_start_offset, vgm_chunk_length))
Expand Down
2 changes: 1 addition & 1 deletion scripts/vgm_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

vgm = VGMReader.read(input_path)
processor = VGMPreprocessor()
processed_vgm = processor.preprocess(vgm)
processed_vgm = processor.preprocess(vgm, rewrite_pcm=True, byteswap_pcm=False)

# Write converted output

Expand Down
196 changes: 140 additions & 56 deletions scripts/vgm_preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,41 @@ def remap_pcm_bank_byte(self, bank_byte):

return None

def preprocess_pcm(self, pcm_bank_address_indexes, total_size):
# PCM blocks need sorting according to their position and type..
assume_unified_pcm = (total_size >= 0x400000)
self.sort_pcm_blocks(assume_unified_pcm)
# ..then possibly merged into contiguous blocks..
self.merge_contiguous_pcm_blocks()
# ..then offsets need adjusting from a zero-base..
self.rebase_pcm_blocks()
# ..then the address high bytes need adjusting according to the newly sorted position
for bank_index in pcm_bank_address_indexes:
bank_byte = self.data[bank_index]
remapped_bank_byte = self.remap_pcm_bank_byte(bank_byte)
if remapped_bank_byte is None:
print("Couldn't find matching PCM bank byte: {:X}".format(bank_byte))
continue

self.data[bank_index] = remapped_bank_byte

def write_pcm_blocks(self, start_index, total_size):
# Inserting all PCM blocks in sequence, presumably at start
pcm_commands = bytearray()

for block in self.pcm_blocks:
# Block header (generic)
pcm_commands.extend([0x67, 0x66])
pcm_commands.append(0x82 if block.type == PCMType.A else 0x83)
pcm_commands.extend((len(block.data) + 8).to_bytes(4, 'little'))
# PCM data block (sample ROM)
pcm_commands.extend(total_size.to_bytes(4, 'little'))
pcm_commands.extend(block.remapped_offset.to_bytes(4, 'little'))
pcm_commands.extend(block.data)

self.data[start_index:start_index] = pcm_commands
return len(pcm_commands)

# This doesn't necessarily work for cases where (loop index < start index)
# (Any examples of this?)

Expand All @@ -162,19 +197,20 @@ def __init__(self, assumed_clock=8000000):

def included_chips(self, vgm):
chips = []
clock_mask = ~0x80000000

for t in Chip.ATTRIBUTES:
index = t[1]
clock = int.from_bytes(vgm[index : index + 4], byteorder='little')
if (clock == 0) or (clock > 8000000):
clock = int.from_bytes(vgm[index : index + 4], 'little')
if (clock == 0) or ((clock & clock_mask) > 8000000):
continue

presence_mask = t[2]
if (presence_mask != 0) and ((clock & presence_mask) == 0):
continue

chip_type = t[0]
chips.append(Chip(chip_type, index, clock))
chips.append(Chip(chip_type, index, clock & clock_mask))

return chips

Expand Down Expand Up @@ -217,15 +253,56 @@ def write_opnb(self, vgm, actions):
write_cmd = 0x59 if action.address >= 0x100 else 0x58
vgm.extend([write_cmd, action.address & 0xff, action.data])

def preprocess(self, vgm_in):
def preprocess(self, vgm_in, rewrite_pcm=False, byteswap_pcm=True):
flag_writes_removed = 0

processed_vgm = ProcessedVGM()
vgm_out = bytearray()

def copy(length):
nonlocal index, vgm_in, vgm_out

vgm_out.extend(vgm_in[index : index + length])
index += length

# Header adjustments:

def write_header_offset(header_index, file_index):
nonlocal vgm_out

file_offset = file_index - header_index
vgm_out[header_index : header_index + 4] = file_offset.to_bytes(4, 'little')
return file_offset

def displace_header_offset(header_index, delta):
nonlocal vgm_out

file_offset_bytes = vgm_out[header_index : header_index + 4]
file_offset = int.from_bytes(file_offset_bytes, 'little') + header_index
file_offset += delta
write_header_offset(header_index, file_offset)

def read_header_offset(header_index):
nonlocal vgm_in

file_offset_bytes = vgm_in[header_index : header_index + 4]
file_offset = int.from_bytes(file_offset_bytes, 'little')
return file_offset + header_index if file_offset > 0 else 0

def write_header_word(header_index, word):
nonlocal vgm_out

vgm_out[header_index : header_index + 4] = word.to_bytes(4, 'little')

# Read source indexes (from relative offsets):
relative_offset_index = 0x34
relative_offset = int.from_bytes(vgm_in[relative_offset_index : relative_offset_index + 4], byteorder='little')
start_index = relative_offset_index + relative_offset if relative_offset else 0x40
start_index = read_header_offset(relative_offset_index)
print("VGM start index: {:X}".format(start_index))

index = start_index

loop_offset_index = 0x1c
loop_offset = int.from_bytes(vgm_in[loop_offset_index : loop_offset_index + 4], byteorder='little')
loop_index = loop_offset + loop_offset_index if loop_offset else 0
loop_index = read_header_offset(loop_offset_index)
print("VGM loop index: {:X}".format(loop_index))

# What chips are included in this VGM?
Expand All @@ -234,41 +311,41 @@ def preprocess(self, vgm_in):
for chip in chips:
print("Found {:s} @ {:d}Hz".format(chip.chip_type.name, chip.clock))

ym2610_chip = next(filter(lambda c: c.chip_type == ChipType.YM2610, chips), None)
ym2610_chip = next(filter(lambda c: c.chip_type in [ChipType.YM2610, ChipType.YM2610B], chips), None)
ym2612_chip = next(filter(lambda c: c.chip_type == ChipType.YM2612, chips), None)
psg_chip = next(filter(lambda c: c.chip_type == ChipType.SN76489, chips), None)

if (ym2610_chip is None) and (ym2612_chip is None):
print("Error: expected either YM2610 or YM2612")
sys.exit(1)

index = start_index
flag_writes_removed = 0

# VGM generation:

processed_vgm = ProcessedVGM()
vgm_out = bytearray()

# Copy existing header which will be updated later
vgm_out.extend(vgm_in[0x00 : 0x100])

# Some tracks have a very low start index that overlaps with the header
# Bump the index if needed
# Clear any leftover junk in the header incase
if start_index < 0x100:
start_index = 0x100
start_index_bytes = (start_index - relative_offset_index).to_bytes(4, 'little')
vgm_out[relative_offset_index : relative_offset_index + 4] = start_index_bytes
vgm_out[start_index : 0x100] = [0] * (0x100 - start_index)
# Preserve existing loop params if present
if start_index < 0x80:
loop_base_index = 0x7e
loop_modifier_index = 0x7f
vgm_out[loop_base_index] = 0
vgm_out[loop_modifier_index] = 0

# Bump version to 1.70 as some versions predate YM2610(B) support
version_index = 0x08
output_version = 0x00000170
write_header_word(version_index, output_version)

# Bump the index to 0x100 to make space for full-size header, if needed
# Some tracks may have a shorter header by setting a lower start-offset
minimum_start_index = 0x100
if start_index < minimum_start_index:
start_index = minimum_start_index
write_header_offset(relative_offset_index, start_index)

# Loop index needs adjusting based on data being added / removed
loop_index_adjusted = None

def copy(length):
nonlocal index, vgm_in, vgm_out

vgm_out.extend(vgm_in[index : index + length])
index += length

# PCM address remapping:

pcm_address_indexes = []
Expand Down Expand Up @@ -372,7 +449,7 @@ def record_reg_write():
# Data block (PCM)

block_type = vgm_in[index + 2]
block_size = int.from_bytes(vgm_in[index + 3 : index + 7], byteorder='little')
block_size = int.from_bytes(vgm_in[index + 3 : index + 7], 'little')
block_size -= 8
if block_size <= 0:
print("Expected PCM block size to be > 0")
Expand All @@ -383,27 +460,31 @@ def record_reg_write():
print("Unexpected block type: {:X}".format(block_type))
sys.exit(1)

block_total_size = int.from_bytes(vgm_in[index + 7 : index + 11], byteorder='little')
block_total_size = int.from_bytes(vgm_in[index + 7 : index + 11], 'little')
if block_total_size == 0:
print('Expected total_size to be > 0')
sys.exit(1)

total_size = max(block_total_size, total_size)

offset = int.from_bytes(vgm_in[index + 11 : index + 15], byteorder='little')
offset = int.from_bytes(vgm_in[index + 11 : index + 15], 'little')

is_adpcm_a = (block_type == 0x82)

print("Found block: type ", "A" if is_adpcm_a else "B")
print("Size: {:X}, offset: {:X}, total: {:X}".format(block_size, offset, block_total_size))


# Some PCM blocks are 0 size for whatever reason, just ignore them
if block_size > 0:
pcm_block = PCMBlock()
pcm_block.offset = offset
pcm_swapped = self.byte_swap(vgm_in[index + 15 : index + 15 + block_size])
pcm_block.data = pcm_swapped

pcm_data = vgm_in[index + 15 : index + 15 + block_size]
if byteswap_pcm:
pcm_block.data = self.byte_swap(pcm_data)
else:
pcm_block.data = pcm_data

pcm_block.type = PCMType.A if is_adpcm_a else PCMType.B
processed_vgm.pcm_blocks.append(pcm_block)

Expand All @@ -413,33 +494,36 @@ def record_reg_write():
print("Unrecognized command byte: {:X}".format(cmd))
sys.exit(1)

# PCM blocks need sorting according to their position and type..
assume_unified_pcm = (total_size >= 0x400000)
processed_vgm.sort_pcm_blocks(assume_unified_pcm)
# ..then possibly merged into contiguous blocks..
processed_vgm.merge_contiguous_pcm_blocks()
# ..then offsets need adjusting from a zero-base..
processed_vgm.rebase_pcm_blocks()
# ..then the address high bytes need adjusting according to the newly sorted position
for bank_index in pcm_address_indexes:
bank_byte = vgm_out[bank_index]
remapped_bank_byte = processed_vgm.remap_pcm_bank_byte(bank_byte)
if remapped_bank_byte is None:
print("Couldn't find matching PCM bank byte: {:X}".format(bank_byte))
continue

vgm_out[bank_index] = remapped_bank_byte

# Reassign loop index after possible displacement
if loop_index_adjusted is not None:
loop_index_adjusted -= loop_offset_index
vgm_out[loop_offset_index : loop_offset_index + 4] = loop_index_adjusted.to_bytes(4, 'little')
print("VGM adjusted loop offset: {:X}".format(loop_index_adjusted))
loop_offset_adjusted = write_header_offset(loop_offset_index, loop_index_adjusted)
print("VGM adjusted loop offset: {:X}".format(loop_offset_adjusted))

# Now that PCM blocks are extracted, they need preprocessing too
# FIXME: multiple references and updates to same bytearray isn't great, refactor as part of cleanup
processed_vgm.data = vgm_out
processed_vgm.preprocess_pcm(pcm_address_indexes, total_size)

if rewrite_pcm:
total_pcm_size = processed_vgm.write_pcm_blocks(start_index, total_size)
# Bump loop index which may have been assigned already
displace_header_offset(loop_offset_index, total_pcm_size)

# Remove GD3 data as it wasn't copied here (trying to save space, but could add it as an option)
gd3_offset_index = 0x14
gd3_input_index = read_header_offset(gd3_offset_index)
write_header_offset(gd3_offset_index, len(vgm_out))
# Copied GD3 region assumes it spans (index..<eof)
vgm_out.extend(vgm_in[gd3_input_index : gd3_input_index + len(vgm_in)])

# Reassign EOF offset (this particular hardware player doesn't check it but others do)
eof_offset_index = 0x04
write_header_offset(eof_offset_index, len(vgm_out))

# In all cases, the output is a YM2610(B) VGM regardless of original input
self.write_chip_header(vgm_out, ChipType.YM2610B, self.assumed_clock)
self.write_chip_header(vgm_out, ChipType.SN76489, 0)
self.write_chip_header(vgm_out, ChipType.YM2612, 0)

processed_vgm.data = bytes(vgm_out)
return processed_vgm
return processed_vgm

0 comments on commit 84a4081

Please sign in to comment.