Skip to content

Commit

Permalink
HiFi Decode Updates (oyvindln#119)
Browse files Browse the repository at this point in the history
* Adds stdin input using - as input filename.
Fixes denominator limits that may cause bad samplerate conversion.
Applies LogCompander.expand() only to sidechain signal.

* Reconfigures NR sidechain  gain to higher value.
Adds audio_rate param (not working yet)

* Fixes --audio_rate parameter.
Some speed optimizations.

* Updates HiFiDecode header text comment

* Overrides file close method for stdin to do nothing.

---------

Signed-off-by: Sebastian Wilwerth <[email protected]>
Signed-off-by: VideoMem <[email protected]>
  • Loading branch information
VideoMem authored and JuniorIsAJitterbug committed Feb 22, 2024
1 parent 3dba49e commit 07d9620
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 61 deletions.
77 changes: 52 additions & 25 deletions vhsdecode/hifi/HiFiDecode.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# This currently decodes raw vhs HiFi RF, but it could do beta, CED, LD and others stereo FM variants
# Also, it implements an interpretation of the noise reduction like described on IEC60774-2/1999
# This currently decodes raw VHS and Video8 HiFi RF.
# It could also do Beta HiFi, CED, LD and other stereo AFM variants,
# It has an interpretation of the noise reduction method described on IEC60774-2/1999

import sys
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from fractions import Fraction
from math import log, pi
from typing import Tuple

import numpy as np
from numba import njit
from pyhht.utils import inst_freq
from scipy.signal import iirpeak, iirnotch
from scipy.signal.signaltools import hilbert
Expand All @@ -15,7 +19,9 @@
from vhsdecode.addons.FMdeemph import FMDeEmphasisC
from vhsdecode.addons.chromasep import samplerate_resample
from vhsdecode.addons.gnuradioZMQ import ZMQSend
from vhsdecode.utils import firdes_lowpass, firdes_highpass, FiltersClass, gen_wave_at_frequency, StackableMA
from vhsdecode.utils import firdes_lowpass, firdes_highpass, FiltersClass, StackableMA

DEFAULT_NR_GAIN_ = 66


@dataclass
Expand Down Expand Up @@ -46,11 +52,15 @@ def __init__(self, filters_params, sample_rate):
self.filter_hi = FiltersClass(iir_hi[0], iir_hi[1], self.samp_rate)

def work(self, data):
return self.filter_lo.filtfilt(self.filter_hi.filtfilt(data))
try:
return self.filter_lo.lfilt(self.filter_hi.filtfilt(data))
except ValueError as e:
print('ERROR: Cannot decode because a read size mismatch. Maybe EOF reached')
sys.exit(1)


class LpFilter:
def __init__(self, sample_rate, cut=22e3, transition=10e3):
def __init__(self, sample_rate, cut=20e3, transition=10e3):
self.samp_rate = sample_rate
self.cut = cut

Expand Down Expand Up @@ -146,7 +156,7 @@ def __init__(self, filters_params, sample_rate, channel=0):
)
iir_notch_image = iirnotch(
self.filter_params.RCarrierRef - d,
QR,
QL,
fs=self.samp_rate
)

Expand All @@ -166,10 +176,8 @@ class FMdemod:
def __init__(self, sample_rate, carrier_freerun, type=0):
self.samp_rate = sample_rate
self.type = type
self.wave = gen_wave_at_frequency(carrier_freerun, sample_rate, num_samples=sample_rate)
self.carrier = carrier_freerun
self.offset = 0
# self.offset = np.mean(self.work(self.wave))

def hhtdeFM(self, data):
instf, t = inst_freq(data)
Expand All @@ -180,13 +188,28 @@ def htdeFM(data, samp_rate):
return unwrap_hilbert(hilbert(data), samp_rate)

@staticmethod
def inst_freq(signal, sample_rate):
analytic_signal = hilbert(signal.real)
instantaneous_phase = np.unwrap(np.angle(analytic_signal))
@njit(cache=True, fastmath=True, nogil=True)
def unwrap(p: np.array, discont: float = pi):
dd = np.diff(p)
ddmod = np.mod(dd + pi, 2 * pi) - pi
to_pi_locations = np.where(np.logical_and(ddmod == -pi, dd > 0))
ddmod[to_pi_locations] = pi
ph_correct = ddmod - dd
to_zero_locations = np.where(np.abs(dd) < discont)
ph_correct[to_zero_locations] = 0
return p[1] + np.cumsum(ph_correct)

@staticmethod
def unwrap_hilbert(analytic_signal: np.array, sample_rate: int):
instantaneous_phase = FMdemod.unwrap(np.angle(analytic_signal))
instantaneous_frequency = (np.diff(instantaneous_phase) /
(2.0 * np.pi) * sample_rate)
(2.0 * pi) * sample_rate)
return instantaneous_frequency

@staticmethod
def inst_freq(signal: np.ndarray, sample_rate: int):
return FMdemod.unwrap_hilbert(hilbert(signal.real), sample_rate)

def work(self, data):

if self.type == 2:
Expand Down Expand Up @@ -238,17 +261,20 @@ def tau_as_freq(tau):

class NoiseReduction:

def __init__(self, notch_freq, side_gain, discard_size=0):
self.audio_rate = 192000
def __init__(self, notch_freq: float,
side_gain: float,
discard_size: int = 0,
audio_rate: int = 192000):
self.audio_rate = audio_rate
self.discard_size = discard_size
self.hfreq = notch_freq

# noise reduction envelope tracking constants (this ones might need tweaking)
self.NR_envelope_gain = side_gain

# values in seconds
NRenv_attack = 3e-3
NRenv_release = 70e-3
NRenv_attack = 4e-3
NRenv_release = 80e-3

self.NR_weighting_attack_Lo_cut = tau_as_freq(NRenv_attack)
self.NR_weighting_attack_Lo_transition = 1e3
Expand All @@ -266,7 +292,7 @@ def __init__(self, notch_freq, side_gain, discard_size=0):
self.tau = 56e-6

# final audio bandwidth limiter
self.finalLo_cut = 22e3
self.finalLo_cut = 20e3
self.finalLo_transition = 10e3

env_hi_trans = tau_as_freq(self.NR_weighting_T2) - tau_as_freq(self.NR_weighting_T1)
Expand Down Expand Up @@ -312,15 +338,16 @@ def audio_notch(samp_rate: int, freq: float, audio):
def audio_notch_stereo(samp_rate: int, freq: float, audioL, audioR):
return NoiseReduction.audio_notch(samp_rate, freq, audioL), NoiseReduction.audio_notch(samp_rate, freq, audioR)

def rs_envelope(self, data, channel=0):
def rs_envelope(self, raw_data, channel=0):
data = np.array([LogCompander.expand(x) for x in raw_data], dtype='float64')
hi_part = self.envelopeHighpassL.lfilt(data) if channel == 0 else self.envelopeHighpassR.lfilt(data)
lo_part = data - hi_part
env_part = self.envelopeVoicepassL.lfilt(hi_part + lo_part / 2) if channel == 0 else self.envelopeVoicepassR.lfilt(hi_part + lo_part / 2)
return np.abs(env_part)

def noise_reduction(self, audio, comb, channel=0):
# takes the RMS envelope of each audio channel
audio_env = self.rs_envelope(audio, channel)
audio_env = self.rs_envelope(comb, channel)

rsaC = self.envelope_attack_LowpassL.lfilt(audio_env) if channel == 0 else self.envelope_attack_LowpassR.lfilt(audio_env)
rsrC = self.envelope_release_LowpassL.lfilt(audio_env) if channel == 0 else self.envelope_release_LowpassR.lfilt(audio_env)
Expand Down Expand Up @@ -359,7 +386,7 @@ def stereo(self, audioL, audioR):

def lopassCompand(self, audio, channel=0):
audioX = self.finalLoL.work(audio) if channel == 0 else self.finalLoR.work(audio)
return [LogCompander.expand(x) for x in audioX]
return audioX


class HiFiDecode:
Expand Down Expand Up @@ -429,13 +456,13 @@ def __init__(self, options=None):

def getResamplingRatios(self):
samplerate2ifrate = self.if_rate / self.sample_rate
self.ifresample_numerator = Fraction(samplerate2ifrate).limit_denominator(1000).numerator
self.ifresample_denominator = Fraction(samplerate2ifrate).limit_denominator(1000).denominator
self.ifresample_numerator = Fraction(samplerate2ifrate).numerator
self.ifresample_denominator = Fraction(samplerate2ifrate).denominator
assert self.ifresample_numerator > 0, f'IF resampling numerator got 0; sample_rate {self.sample_rate}'
assert self.ifresample_denominator > 0, f'IF resampling denominator got 0; sample_rate {self.sample_rate}'
audiorate2ifrate = self.audio_rate / self.if_rate
self.audioRes_numerator = Fraction(audiorate2ifrate).limit_denominator(1000).numerator
self.audioRes_denominator = Fraction(audiorate2ifrate).limit_denominator(1000).denominator
self.audioRes_numerator = Fraction(audiorate2ifrate).numerator
self.audioRes_denominator = Fraction(audiorate2ifrate).denominator
return self.ifresample_numerator, self.ifresample_denominator, self.audioRes_numerator, self.audioRes_denominator

def updateDemod(self):
Expand Down Expand Up @@ -552,7 +579,7 @@ def guessBiases(self, blocks):
def carrierOffsets(self, standard, cL, cR):
return standard.LCarrierRef - cL, standard.RCarrierRef - cR

def block_decode(self, raw_data, block_count=0):
def block_decode(self, raw_data: np.array, block_count: int = 0) -> Tuple[int, np.array, np.array]:
lo_data = self.lopassRF.work(raw_data)
data = samplerate_resample(lo_data, self.ifresample_numerator, self.ifresample_denominator)
dcL, dcR, audioL, audioR, preL, preR = self.demodblock(data)
Expand Down
13 changes: 7 additions & 6 deletions vhsdecode/hifi/TimeProgressBar.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ def __init__(self, max, time, w=40, label='Progress'):
self.time = time

def print(self, v):
sys.stdout.write(self.label + " [")
c = round(v * self.w / self.max)
d = self.w - c
sys.stdout.write("#" * c)
sys.stdout.write(" " * d)
sys.stdout.write("] %.02f%%\n" % (v * 100.0 / self.max))
if self.max > 0:
sys.stdout.write(self.label + " [")
c = round(v * self.w / self.max)
d = self.w - c
sys.stdout.write("#" * c)
sys.stdout.write(" " * d)
sys.stdout.write("] %.02f%%\n" % (v * 100.0 / self.max))

Loading

0 comments on commit 07d9620

Please sign in to comment.