Source code for musisep.dictsep.dictlearn

#!python3

"""
Module for the training of the dictionary.  When invoked, a performance test
on artificial data is performed.
"""

from __future__ import absolute_import, division, print_function

import numpy as np
import scipy
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import pickle

from . import exptool
from . import pursuit
from . import adam_b
from ..audio import wav
from ..audio import performance

beta = 0.9

[docs]def make_closures(fsigma): """ Build the functions that give the bounds and initial values. Parameters ---------- fsigma : float Standard deviation of the Gaussian in the time domain. Returns ------- make_bounds : lambda (length) Lambda that gives the bounds for `length` peaks make_inits : lambda (length) Lambda that gives the initial values `length` peaks """ make_bounds = (lambda length : [(0, None)] * length + [(None, None)] * length + [(fsigma*0.8, fsigma*1.5)] * length + [(0, 2e-3)] * length) make_inits = (lambda length : (np.repeat(fsigma, length), np.zeros(length))) return make_bounds, make_inits
[docs]def stoch_grad(y, inst_dict, tone_num, adam, fsigma, harscale, baseshift, inst_spect, pexp, qexp): """ Perform a dictionary training step. Parameters ---------- y : array_like Log-frequency spectrum to represent inst_dict : ndarray Dictionary containing the relative amplitudes of the harmonics tone_num : int Maximum number of simultaneous tones for each instrument adam : Adam_B Container object for the ADAM optimizer fsigma : float Standard deviation (frequency) harscale : float Scaling factor baseshift : int Length to add to the spectrum in order to avoid circular convolution inst_spect : array_like Spectra of the instruments, in the columns pexp : float Exponent for the addition of sinusoids qexp : float Exponent to be applied on the spectrum Returns ------- inst_dict : ndarray Updated dictionary reconstruction : ndarray Synthesized spectrum inst_amps : ndarray Summed amplitudes for each instruments """ y = np.asarray(y) fixed_params = (inst_dict, harscale) make_bounds, make_inits = make_closures(fsigma) peaks, reconstruction = pursuit.peak_pursuit( y, tone_num, 1, tone_num*inst_dict.shape[1]*2, inst_dict.shape[1], pursuit.inst_shift, pursuit.inst_shift_obj, pursuit.inst_shift_grad, make_bounds, make_inits, fixed_params, pursuit.fft_selector, (baseshift, inst_spect, qexp), pexp, qexp, beta) inst_amps = np.zeros(inst_dict.shape[1]) for i in range(inst_dict.shape[1]): idcs, = np.where(peaks.insts == i) inst_amps[i] = np.sum(peaks.amps[idcs]) grad = pursuit.inst_shift_dict_grad(peaks.get_array(), peaks.insts, fixed_params, pexp, qexp, y.size, y) abserr_old = np.linalg.norm(reconstruction**qexp - y**qexp) print("abserr (before): %g" % abserr_old) inst_dict = adam.step(-grad) fixed_params = (inst_dict, harscale) reconstruction_new = pursuit.inst_shift(peaks, fixed_params, pexp, y.size) abserr_new = np.linalg.norm(reconstruction_new**qexp - y**qexp) print("abserr (after): %g" % abserr_new) if abserr_new >= abserr_old: print("residual increased") return inst_dict, reconstruction, inst_amps
[docs]def gen_random_inst(har): """ Generate random harmonic amplitudes according to a Par(1,2) distribution. Parameters ---------- har : int Number of harmonics Returns ------- inst : ndarray Harmonic amplitudes for one instrument, unified to an interval of [0,1] """ e = - np.random.pareto(0.5) coeffs = np.random.rand(har) inst = np.arange(1, har+1) ** e * coeffs return inst / np.amax(inst)
[docs]def gen_random_inst_dict(har, inst_num): """ Generate a random instrument dictionary according to a Par(1,2) distribution. Parameters ---------- har : int Number of harmonics inst_num : int Number of instruments Returns ------- inst_dict : ndarray Dictionary with instruments in columns, unified to an interval of [0,1] """ inst_dict = np.zeros((har, inst_num)) for j in range(inst_num): inst_dict[:, j] = gen_random_inst(har) return inst_dict
[docs]class Learner: """ Container object for the dictionary learning process. Parameters ---------- fsigma : float Standard deviation (frequency) tone_num : int Maximum number of simultaneous tones for each instrument inst_num : int Number of instruments in the dictionary har : int Number of harmonics m : int Height of the log-frequency spectrogram minfreq : float Minimum frequency to be represented (included) maxfreq : float Maximum frequency to be represented (excluded) lifetime : int Number of steps after which to renew the dictionary pexp : float Exponent for the addition of sinusoids qexp : float Exponent to be applied on the spectrum init : array_like Initial value for the dictionary """ def __init__(self, fsigma, tone_num, inst_num, har, m, minfreq, maxfreq, lifetime, pexp, qexp, init=None): self.fsigma = fsigma self.tone_num = tone_num self.inst_num = inst_num self.har = har self.m = m self.lifetime = lifetime self.baseshift = m self.harscale = pursuit.calc_harscale(minfreq, maxfreq, m) if init is None: self.inst_dict = gen_random_inst_dict(har, inst_num) else: self.inst_dict = np.asarray(init) self.adam = adam_b.Adam_B(self.inst_dict, alpha=1e-3) self.inst_amps = np.zeros(inst_num) self.inst_cnt = np.zeros(inst_num) self.pexp = pexp self.qexp = qexp self.cnt = 0
[docs] def learn(self, y): """ Learning step. Automatically renews the dictionary. Parameters ---------- y : array_like Log-frequency spectrum Returns ------- reconstruction : ndarray Synthesized spectrum """ y = np.asarray(y) fixed_params = (self.inst_dict, self.harscale) inst_spect = pursuit.gen_inst_spect( self.baseshift, self.fsigma, fixed_params, self.pexp, self.qexp, self.baseshift + self.m, self.inst_dict.shape[1]) self.inst_dict, reconstruction, inst_amps_new = \ stoch_grad(y, self.inst_dict, self.tone_num, self.adam, self.fsigma, self.harscale, self.baseshift, inst_spect, self.pexp, self.qexp) self.inst_amps += inst_amps_new self.inst_cnt += 1 self.cnt += 1 if self.cnt % self.lifetime == 0: self.renew_dict(self.lifetime//2, self.inst_num//2) return reconstruction
[docs] def renew_dict(self, headstart, newinsts): """ Renew the dictionary. Parameters ---------- headstart : int Headstart in the lifetime counter (to help new instruments) newinsts : int Number of instruments to be renewed """ idcs = np.argsort(self.inst_amps / (self.inst_cnt - headstart))[:newinsts] for idx in idcs: self.inst_amps[idx] = 0 self.inst_cnt[idx] = 0 self.inst_dict[:, idx] = gen_random_inst(self.har) self.adam.reset(idx)
[docs] def get_dict(self): """ Get the active part of the dictionary. Returns ------- inst_dict : ndarray Dictionary with `inst_num` columns """ idcs, = np.where(self.inst_cnt >= self.lifetime) return self.inst_dict[:, idcs]
[docs]def synth_spect(spect, tone_num, inst_dict, fsigma, spectheight, pexp, qexp, minfreq, maxfreq, stretch=1): """ Separate and synthesize the spectrograms from the original spectrogram. Parameters ---------- spect : array_like Original log-frequency spectrogram of the recording tone_num : int Maximum number of simultaneous tones for each instrument inst_dict : ndarray Dictionary containing the relative amplitudes of the harmonics fsigma : float Standard deviation (frequency) spectheight : int Height of the linear-frequency spectrograms pexp : float Exponent for the addition of sinusoids qexp : float Exponent to be applied on the spectrum minfreq : float Minimum frequency to be represented (included) (normalized to the sampling frequency) maxfreq : float Maximum frequency to be represented (excluded) (normalized to the sampling frequency) Returns ------- dict_spectrum : ndarray Synthesized log-frequency spectrogram with all instruments inst_spectrums : list of ndarray List of synthesized log-frequency spectrograms for the instruments dict_spectrum_lin : ndarray Synthesized linear-frequency spectrogram with all instruments inst_spectrums_lin : list of ndarray List of synthesized linear-frequency spectrograms for the instruments """ spect = np.asarray(spect) inst_dict = np.asarray(inst_dict) tone_num = np.asarray(tone_num) minfreq = minfreq * (2 * spectheight) maxfreq = maxfreq * (2 * spectheight) m = spect.shape[0] har = inst_dict.shape[0] baseshift = m harscale = pursuit.calc_harscale(minfreq, maxfreq, m) fixed_params = (inst_dict, harscale) inst_spect = pursuit.gen_inst_spect( baseshift, fsigma, fixed_params, pexp, qexp, baseshift + m, inst_dict.shape[1]) dict_spectrum = np.zeros(spect.shape) dict_spectrum_lin = np.zeros((spectheight, spect.shape[1])) inst_dict_size = inst_dict.shape[1] inst_spectrums = [] inst_spectrums_lin = [] for i in range(inst_dict_size): inst_spectrums.append(np.zeros(spect.shape)) inst_spectrums_lin.append(np.zeros((spectheight, spect.shape[1]))) numfreqs = spect.shape[0] make_bounds, make_inits = make_closures(fsigma) for j in range(spect.shape[1]): print("reconstruction {} out of {}".format(j, spect.shape[1])) y = spect[:, j] peaks, reconstruction = \ pursuit.peak_pursuit(y, tone_num, 1, tone_num*inst_dict.shape[1]*2, inst_dict.shape[1], pursuit.inst_shift, pursuit.inst_shift_obj, pursuit.inst_shift_grad, make_bounds, make_inits, fixed_params, pursuit.fft_selector, (baseshift, inst_spect, qexp), pexp, qexp, beta) dict_spectrum[:, j] = reconstruction lin_peaks = peaks.copy() print(peaks) lin_peaks.shifts = (np.exp(peaks.shifts * (np.log(maxfreq / minfreq) / numfreqs)) * minfreq) lin_peaks.params[0, :] = peaks.params[0, :] / stretch reconstruction = pursuit.inst_scale(lin_peaks, inst_dict, pexp, spectheight) dict_spectrum_lin[:, j] = reconstruction for i in range(inst_dict_size): idcs, = np.where(peaks.insts == i) reconstruction = pursuit.inst_shift(peaks[idcs], fixed_params, pexp, m) inst_spectrums[i][:, j] = reconstruction reconstruction = pursuit.inst_scale(lin_peaks[idcs], inst_dict, pexp, spectheight) inst_spectrums_lin[i][:, j] = reconstruction return dict_spectrum, inst_spectrums, dict_spectrum_lin, inst_spectrums_lin
[docs]def mask_spectrums(spects, orig_spect): """ Mask the synthesized spectrograms with the original spectrogram. Parameters ---------- spects : list of array_like List of synthesized spectrograms orig_spect : array_like Original spectrogram Returns ------- spectrums : list of ndarray Masked spectrograms mask_spect : ndarray Array mask """ spects = [np.asarray(s) for s in spects] total_spect = sum([s for s in spects]) mask_spect = orig_spect / (total_spect + 1e-40) for i in range(len(spects)): spects[i] = spects[i] * mask_spect return spects, mask_spect
[docs]def learn_spect_dict(spect, fsigma, tone_num, inst_num, pexp, qexp, har, minfreq, maxfreq, runs, lifetime): """ Train the dictionary containing the relative amplitudes of the harmonics. Parameters ---------- spect : array_like Original log-frequency spectrogram of the recording fsigma : float Standard deviation (frequency) tone_num : int Maximum number of simultaneous tones for each instrument inst_num : int Number of instruments in the dictionary pexp : float Exponent for the addition of sinusoids qexp : float Exponent to be applied on the spectrum har : int Number of harmonics minfreq : float Minimum frequency in Hz to be represented (included) maxfreq : float Maximum frequency in Hz to be represented (excluded) runs : int Number of training iterations to perform lifetime : int Number of steps after which to renew the dictionary Returns ------- inst_dict : ndarray Dictionary containing the relative amplitudes of the harmonics """ spect = np.asarray(spect) m = spect.shape[0] harscale = pursuit.calc_harscale(minfreq, maxfreq, m) dl = Learner(fsigma, tone_num, inst_num, har, m, minfreq, maxfreq, lifetime, pexp, qexp) errnormsq = [] signormsq = [] for k in range(runs): print("training iteration {}".format(k)) idx = np.random.randint(spect.shape[1]) y = spect[:, idx] reconstruction = dl.learn(y) errnormsq.append(np.linalg.norm(reconstruction)) signormsq.append(np.linalg.norm(y)) if dl.cnt % lifetime == 0: inst_dict = dl.get_dict() print(inst_dict) return dl.get_dict()
[docs]def test_learn(fsigma, tone_num, inst_num, pexp, qexp, har, m, runs, test_samples, lifetime, inst_dict): """ Evaluate the performance of the dictionary learning algorithm via artificial spectra. Parameters ---------- fsigma : float Width of the Gaussians in the log-frequency spectrogram tone_num : int Maximum number of simultaneous tones for each instrument inst_num : int Number of instruments in the dictionaries pexp : float Exponent for the addition of sinusoids qexp : float Exponent to be applied on the spectrum har : int Number of harmonics m : int Height of the log-frequency spectrogram runs : int Number of training iterations to perform test_samples : int Number of test spectra to generate lifetime : int Number of steps after which to renew the dictionary inst_dict : array_like Dictionary containing the relative amplitudes of the harmonics Returns ------- measures : ndarray Array containing, in that order, the SDR, SIR, SAR with the original dictionary and the SDR, SID, SAR with the trained dictionary """ inst_dict = np.asarray(inst_dict) harscale = pursuit.calc_harscale(20, 20480, m) fixed_params = (inst_dict, harscale) insts = np.repeat(np.arange(inst_num), tone_num) print(insts) print(inst_dict) dl = Learner(fsigma, tone_num, inst_num * 2, har, m, 20, 20480, lifetime, pexp, qexp) for k in range(runs): print("iteration {}".format(k)) params = np.vstack((np.ones(tone_num * inst_num) * fsigma, np.zeros(tone_num * inst_num))) peaks = pursuit.Peaks(np.random.rand(tone_num * inst_num), np.random.rand(tone_num * inst_num) * 500, params, insts) y = np.asarray(pursuit.inst_shift(peaks, fixed_params, pexp, m)) dl.learn(y) test_spect = np.zeros((m, test_samples)) test_inst_spects = [] for i in range(inst_num): test_inst_spects.append(np.zeros((m, test_samples))) for k in range(test_samples): params = np.vstack((np.ones(tone_num * inst_num) * fsigma, np.zeros(tone_num * inst_num))) peaks = pursuit.Peaks(np.random.rand(tone_num * inst_num), np.random.rand(tone_num * inst_num) * 500, params, insts) test_spect[:, k] = pursuit.inst_shift(peaks, fixed_params, pexp, m) for i in range(inst_num): idcs, = np.where(peaks.insts == i) test_inst_spects[i][:, k] = pursuit.inst_shift( peaks[idcs], fixed_params, pexp, m) inst_dict_learn = dl.get_dict() print(inst_dict_learn.shape) _, inst_spectrums_orig, _, _ = \ synth_spect(test_spect, tone_num, inst_dict, fsigma, m, pexp, qexp, 20, 20480) _, inst_spectrums_trained, _, _ = \ synth_spect(test_spect, tone_num, inst_dict_learn, fsigma, m, pexp, qexp, 20, 20480) test_spects = np.vstack([np.ravel(spect) for spect in test_inst_spects]) orig_spects = np.vstack([np.ravel(spect) for spect in inst_spectrums_orig]) trained_spects = np.vstack([np.ravel(spect) for spect in inst_spectrums_trained]) return np.hstack((performance.select_perm( *performance.measures(orig_spects, test_spects))[1].T, performance.select_perm( *performance.measures(trained_spects, test_spects))[1].T))
[docs]def test_learn_multi(fsigma, tone_num, inst_num, pexp, qexp, har, m, runs, test_samples, lifetime, num_dicts): """ Evaluate the performance of the dictionary learning algorithm via artificial spectra. Parameters ---------- fsigma : float Width of the Gaussians in the log-frequency spectrogram tone_num : int Maximum number of simultaneous tones for each instrument inst_num : int Number of instruments in the dictionaries pexp : float Exponent for the addition of sinusoids qexp : float Exponent to be applied on the spectrum har : int Number of harmonics m : int Height of the log-frequency spectrogram runs : int Number of training iterations to perform test_samples : int Number of test spectra to generate lifetime : int Number of steps after which to renew the dictionary num_dicts : int Number of different dictionaries to generate and train Returns ------- measures : ndarray Array containing, in the rows, the SDR, SIR, SAR with the original dictionary and the SDR, SID, SAR with the trained dictionary """ measures = [] for i in range(num_dicts): np.random.seed(i) inst_dict = gen_random_inst_dict(har, inst_num) measures.append(test_learn(fsigma, tone_num, inst_num, pexp, qexp, har, m, runs, test_samples, lifetime, inst_dict)) measures = np.vstack(measures) print(measures) print("Averages:") print(np.mean(measures, axis=0)) print("Standard deviations:") print(np.std(measures, axis=0, ddof=1)) return measures
if __name__ == '__main__': pexp = 1 qexp = 1/2 np.savetxt('artificial.txt', test_learn_multi(6/np.pi, 1, 2, pexp, qexp, 25, 1024, 10000, 10000, 500, 10))