From 0282c6fcd1cde067c0eadfd1ec8e4f022bf01f96 Mon Sep 17 00:00:00 2001 From: Scott Fleming Date: Wed, 15 Jan 2025 23:12:25 -0500 Subject: [PATCH] lynting series.py --- astronify/series/series.py | 522 +++++++++++++++++++------------------ 1 file changed, 267 insertions(+), 255 deletions(-) diff --git a/astronify/series/series.py b/astronify/series/series.py index 01e8866..0bb07e6 100644 --- a/astronify/series/series.py +++ b/astronify/series/series.py @@ -49,7 +49,7 @@ def __init__(self, pitch_func=data_to_pitch, **pitch_args): "pitch_range": [100, 10000], "center_pitch": 440, "zero_point": "median", - "stretch": "linear" + "stretch": "linear", } self.pitch_map_func = pitch_func @@ -113,6 +113,7 @@ def pitch_map_args(self, new_args): self._pitch_map_args = new_args self._check_func_args() + class SoniSeries: def __init__(self, data, time_col="time", val_col="flux", preview_type="scan"): @@ -186,7 +187,7 @@ def time_col(self): @time_col.setter def time_col(self, value): - assert isinstance(value, str), 'Time column name must be a string.' + assert isinstance(value, str), "Time column name must be a string." self._time_col = value @property @@ -254,7 +255,9 @@ def sonify(self): data["asf_pitch"] = self.pitch_mapper(data[self.val_col]) data["asf_onsets"] = [ - x for x in (data[self.time_col] - data[self.time_col][0]) / exptime*self.note_spacing + x for x in (data[self.time_col] - data[self.time_col][0]) + / exptime + * self.note_spacing ] def play(self): @@ -282,13 +285,14 @@ def play(self): (0.01, 1), (duration - 0.1, 1), (duration - 0.05, 0.5), - (duration - 0.005, 0) + (duration - 0.005, 0), ], - mul=[self.gain for i in range(len(pitches))]).play( - delay=list(delays), dur=duration) + mul=[self.gain for i in range(len(pitches))], + ).play(delay=list(delays), dur=duration) self.streams = pyo.Sine(list(pitches), 0, env).out( - delay=list(delays), dur=duration) + delay=list(delays), dur=duration + ) def stop(self): """ @@ -326,266 +330,274 @@ def write(self, filepath): (0.1, 1), (duration - 0.1, 1), (duration - 0.05, 0.5), - (duration - 0.005, 0)], - mul=[self.gain for i in range(len(pitches))]).play( - delay=list(delays), dur=duration) - sine = pyo.Sine(list(pitches), 0, env).out(delay=list(delays), dur=duration) # noqa: F841 + (duration - 0.005, 0), + ], + mul=[self.gain for i in range(len(pitches))], + ).play(delay=list(delays), dur=duration) + sine = pyo.Sine(list(pitches), 0, env).out( + delay=list(delays), dur=duration + ) # noqa: F841 self.server.start() # Clean up self.server.shutdown() self.server.reinit(audio="portaudio") + class SeriesPreviews: + """ + Previews (or snapshots) of 1d spectra by binning the data into five equal pieces by assigning a sound to each piece. + """ + + def __init__(self, soniseries): + # Allows access to SoniSeries class methods and variables + self._soniseries = soniseries + # Define the frequencies to use for each section. + self.pitch_values = [500] * 5 + if self._soniseries.preview_type == "ensemble": + self.pitch_values = [300, 400, 500, 600, 700] + # TODO: Make robust + self.n_pitch_values = len(self.pitch_values) + # Amplitudes will be stored as a % between 0-1. + self.amplitudes = np.zeros(self.n_pitch_values) + # Tremolo values will be stored as a number, typically ranging from some small number + # (avoid 0.0, e.g., 0.1) through ~10. + self.tremolo_vals = np.zeros(self.n_pitch_values) + + def area_of_pieces(self, ydata_bins, xdata_bins): + """ + Given pieces of a series of 1D data, calculate the area-under-the-curve of each piece + such that the total area of all the pieces equals the total area of the entire curve. + """ + area_vals = [] + for idx, (ydata_bin, xdata_bin) in enumerate(zip(ydata_bins, xdata_bins)): + if idx < len(ydata_bins) - 1: + # Then you need to include the first (x,y) point from the NEXT bin as well + # when calculating the trapezoidal area so the pieces all add up to the total. + list(ydata_bin).append(ydata_bins[idx + 1][0]) + list(xdata_bin).append(xdata_bins[idx + 1][0]) + + # Taking the absolute value so that emission lines and absorption lines + # have the same amplitude + area_vals.append(np.abs(np.trapz(ydata_bin, xdata_bin))) + return area_vals + + def plot_preview(self, xdata_bin_ranges): + plt.plot( + self._soniseries.data[self._soniseries.time_col], + self._soniseries.data[self._soniseries.val_col], + color="k" + ) + + plt.axvspan( + xdata_bin_ranges[0][0], + xdata_bin_ranges[0][1], + color="royalblue", + alpha=0.5, + lw=0 + ) + + plt.axvspan( + xdata_bin_ranges[1][0], + xdata_bin_ranges[1][1], + color="green", + alpha=0.5, + lw=0 + ) + + plt.axvspan( + xdata_bin_ranges[2][0], + xdata_bin_ranges[2][1], + color="yellow", + alpha=0.5, + lw=0 + ) + + plt.axvspan( + xdata_bin_ranges[3][0], + xdata_bin_ranges[3][1], + color="orange", + alpha=0.5, + lw=0) + + plt.axvspan( + xdata_bin_ranges[4][0], + xdata_bin_ranges[4][1], + color="red", + alpha=0.5, + lw=0) + + plt.show() + + def sonify_preview(self, plotting=True, verbose=False): + """ + Make a "preview-style" sonification. The data is split into even pieces. Each piece + gets assigned a specific frequency. The amplitude is defined by the area under the curve + in this piece, normalized by the total area under the curve. The tremolo is defined + by the standard deviation of data in this piece, normalized by the maximum standard + deviation across all pieces. """ - Previews (or snapshots) of 1d spectra by binning the data into five equal pieces by assigning a sound to each piece. + # Get a copy of the 'y' and 'x' data. + ydata = np.asarray(self._soniseries.data[self._soniseries.val_col]) + xdata = np.asarray(self._soniseries.data[self._soniseries.time_col]) + + # Normalize the y-data by the maximum to constrain values from 0-1. + ydata_norm = ydata / max(ydata) + + # Split the data into `n_pitch_values` equal-sized pieces. + bin_size = int(np.round(len(xdata) // self.n_pitch_values, 1)) + # Split the y-values into pieces. + ydata_bins = [ydata_norm[i : i+bin_size] for i in range(0, len(ydata_norm), bin_size)] + # Split the x-values into pieces. + xdata_bins = [xdata[i : i + bin_size] for i in range(0, len(xdata), bin_size)] + + # Calculate the total area under the curve, used to normalize the areas in each piece. + total_area = np.trapz(ydata_norm, xdata) + + # Loop through each piece and calculate the standard deviation of the y-data + # and the area under the curve in each piece. + std_vals, xdata_bin_ranges = [], [] + for xdata_bin, ydata_bin in zip(xdata_bins, ydata_bins): + + xdata_bin_ranges.append((min(xdata_bin), max(xdata_bin))) + # Calculate standard deviation error and add to the list. + _, _, _, _, std_err = stats.linregress(xdata_bin, ydata_bin) + std_vals.append(std_err) + + # Plot the spectra and ranges if in troubleshooting mode + if plotting: + self.plot_preview(xdata_bin_ranges) + + # Calculate the area under the curve for each piece. + area_vals = self.area_of_pieces(ydata_bins, xdata_bins) + + # Normalize the standard deviations in each piece by this factor. + std_dev_norm = max(std_vals) + + # Set the amplitude of each pitch to the area under the curve normalized by the total + # area. + self.amplitudes = np.asarray(area_vals) / total_area + + if std_dev_norm == 0.0: std_dev_norm = 1.0 + + # Set the tremolo values based on the standard deviation of the piece normalized by the + # `std_dev_norm` factor. + + # TODO: Might be worth trying a different way of calculating the tremolo values other + # than the normalized standard dev. Maybe using RMS vals? + # To more accurately represent all forms of data. + + # The final calculated tremolo values are multiplied by a factor of 10 for auditory + # purposes + self.tremolo_vals = (np.asarray(std_vals) / std_dev_norm) * 10 + + # Constraint added to keep tremolo values at or below 15, otherwise oscillations are + # more difficult to hear + # self.tremolo_vals[self.tremolo_vals > 15] = 15 + + if verbose: + print("Total Expected area = {0:0f}".format(total_area)) + print(" ") + print("Area Values = ", np.asarray(area_vals)) + print(" ") + # print("Total Calculated area = {0:0f}".format(np.sum(str(area_vals).split(" ")))) + print(" ") + print("Amplitudes = ", self.amplitudes) + print(" ") + print("Standard Dev. Error Vals = ", np.asarray(std_vals)) + print(" ") + print("Standard Dev. Error MAX = ", std_dev_norm) + print(" ") + print("Tremolo Vals (x10) = ", self.tremolo_vals) + + def play_preview(self): + """ Play the sound of a "preview-style" sonification. + + The assigned pitch for each section of the spectra will begin + to play, with the calculated amplitude and frequency, one + at a time until all pitches are playing together for the full + audio preview of the spectra. """ - def __init__(self, soniseries): - # Allows access to SoniSeries class methods and variables - self._soniseries = soniseries - # Define the frequencies to use for each section. - self.pitch_values = [500] * 5 - if self._soniseries.preview_type == "ensemble": - self.pitch_values = [300, 400, 500, 600, 700] - # TODO: Make robust - self.n_pitch_values = len(self.pitch_values) - # Amplitudes will be stored as a % between 0-1. - self.amplitudes = np.zeros(self.n_pitch_values) - # Tremolo values will be stored as a number, typically ranging from some small number - # (avoid 0.0, e.g., 0.1) through ~10. - self.tremolo_vals = np.zeros(self.n_pitch_values) - - def area_of_pieces(self, ydata_bins, xdata_bins): - """ - Given pieces of a series of 1D data, calculate the area-under-the-curve of each piece - such that the total area of all the pieces equals the total area of the entire curve. - """ - area_vals = [] - for idx, (ydata_bin, xdata_bin) in enumerate(zip(ydata_bins, xdata_bins)): - if idx < len(ydata_bins) - 1: - # Then you need to include the first (x,y) point from the NEXT bin as well - # when calculating the trapezoidal area so the pieces all add up to the total. - list(ydata_bin).append(ydata_bins[idx + 1][0]) - list(xdata_bin).append(xdata_bins[idx + 1][0]) - - # Taking the absolute value so that emission lines and absorption lines - # have the same amplitude - area_vals.append(np.abs(np.trapz(ydata_bin, xdata_bin))) - return area_vals - - def plot_preview(self, xdata_bin_ranges): - plt.plot( - self._soniseries.data[self._soniseries.time_col], - self._soniseries.data[self._soniseries.val_col], - color="k" - ) - - plt.axvspan( - xdata_bin_ranges[0][0], - xdata_bin_ranges[0][1], - color="royalblue", - alpha=0.5, - lw=0 - ) - plt.axvspan( - xdata_bin_ranges[1][0], - xdata_bin_ranges[1][1], - color="green", - alpha=0.5, - lw=0 - ) - plt.axvspan( - xdata_bin_ranges[2][0], - xdata_bin_ranges[2][1], - color="yellow", - alpha=0.5, - lw=0 - ) - plt.axvspan( - xdata_bin_ranges[3][0], - xdata_bin_ranges[3][1], - color="orange", - alpha=0.5, - lw=0) - plt.axvspan( - xdata_bin_ranges[4][0], - xdata_bin_ranges[4][1], - color="red", - alpha=0.5, - lw=0) - - plt.show() - - def sonify_preview(self, plotting=True, verbose=False): - """ - Make a "preview-style" sonification. The data is split into even pieces. Each piece - gets assigned a specific frequency. The amplitude is defined by the area under the curve - in this piece, normalized by the total area under the curve. The tremolo is defined - by the standard deviation of data in this piece, normalized by the maximum standard - deviation across all pieces. - """ - # Get a copy of the 'y' and 'x' data. - ydata = np.asarray(self._soniseries.data[self._soniseries.val_col]) - xdata = np.asarray(self._soniseries.data[self._soniseries.time_col]) - - # Normalize the y-data by the maximum to constrain values from 0-1. - ydata_norm = ydata / max(ydata) - - # Split the data into `n_pitch_values` equal-sized pieces. - bin_size = int(np.round(len(xdata) // self.n_pitch_values, 1)) - # Split the y-values into pieces. - ydata_bins = [ydata_norm[i : i+bin_size] for i in range(0, len(ydata_norm), bin_size)] - # Split the x-values into pieces. - xdata_bins = [xdata[i : i + bin_size] for i in range(0, len(xdata), bin_size)] - - # Calculate the total area under the curve, used to normalize the areas in each piece. - total_area = np.trapz(ydata_norm, xdata) - - # Loop through each piece and calculate the standard deviation of the y-data - # and the area under the curve in each piece. - std_vals, xdata_bin_ranges = [], [] - for xdata_bin, ydata_bin in zip(xdata_bins, ydata_bins): - - xdata_bin_ranges.append((min(xdata_bin), max(xdata_bin))) - # Calculate standard deviation error and add to the list. - _, _, _, _, std_err = stats.linregress(xdata_bin, ydata_bin) - std_vals.append(std_err) - - # Plot the spectra and ranges if in troubleshooting mode - if plotting: - self.plot_preview(xdata_bin_ranges) - - # Calculate the area under the curve for each piece. - area_vals = self.area_of_pieces(ydata_bins, xdata_bins) - - # Normalize the standard deviations in each piece by this factor. - std_dev_norm = max(std_vals) - - # Set the amplitude of each pitch to the area under the curve normalized by the total - # area. - self.amplitudes = np.asarray(area_vals) / total_area - - if std_dev_norm == 0.0: std_dev_norm = 1.0 - - # Set the tremolo values based on the standard deviation of the piece normalized by the - # `std_dev_norm` factor. - - # TODO: Might be worth trying a different way of calculating the tremolo values other - # than the normalized standard dev. Maybe using RMS vals? - # To more accurately represent all forms of data. - - # The final calculated tremolo values are multiplied by a factor of 10 for auditory - # purposes - self.tremolo_vals = (np.asarray(std_vals) / std_dev_norm) * 10 - - # Constraint added to keep tremolo values at or below 15, otherwise oscillations are - # more difficult to hear - # self.tremolo_vals[self.tremolo_vals > 15] = 15 - - if verbose: - print("Total Expected area = {0:0f}".format(total_area)) - print(" ") - print("Area Values = ", np.asarray(area_vals)) - print(" ") - # print("Total Calculated area = {0:0f}".format(np.sum(str(area_vals).split(" ")))) - print(" ") - print("Amplitudes = ", self.amplitudes) - print(" ") - print("Standard Dev. Error Vals = ", np.asarray(std_vals)) - print(" ") - print("Standard Dev. Error MAX = ", std_dev_norm) - print(" ") - print("Tremolo Vals (x10) = ", self.tremolo_vals) - - def play_preview(self): - """ Play the sound of a "preview-style" sonification. - - The assigned pitch for each section of the spectra will begin - to play, with the calculated amplitude and frequency, one - at a time until all pitches are playing together for the full - audio preview of the spectra. - """ - - if self._soniseries.server.getIsBooted(): - self._soniseries.server.shutdown() - - self._soniseries.server.boot() - self._soniseries.server.start() - - # TODO: Generalize the self.delays list - # `step` must go into `stop` 5 times, since we have 5 pitches - self.delays = [0.0, 2.0, 4.0, 6.0, 8.0] - - # `total_duration` is in seconds - self.total_duration = 8.0 - - self.amplitudes = [amp / max(self.amplitudes) for amp in self.amplitudes] - - a = pyo.Phasor(self.pitch_values[0], mul=np.pi * 2) - b = pyo.Phasor(self.pitch_values[1], mul=np.pi * 2) - c = pyo.Phasor(self.pitch_values[2], mul=np.pi * 2) - d = pyo.Phasor(self.pitch_values[3], mul=np.pi * 2) - e = pyo.Phasor(self.pitch_values[4], mul=np.pi * 2) - - - # TODO: Make everything below iterable to it's cleaner and takes up less lines - lfo1 = ( - pyo.Sine(float(self.tremolo_vals[0]), 0, float(self.amplitudes[0]), 0) - if self.tremolo_vals[0] > 0 - else pyo.Cos(a, mul=float(self.amplitudes[0])) - ) - lfo2 = ( - pyo.Sine(float(self.tremolo_vals[1]), 0, float(self.amplitudes[1]), 0) - if self.tremolo_vals[1] > 0 - else pyo.Cos(b, mul=float(self.amplitudes[1])) - ) - lfo3 = ( - pyo.Sine(float(self.tremolo_vals[2]), 0, float(self.amplitudes[2]), 0) - if self.tremolo_vals[2] > 0 - else pyo.Cos(c, mul=float(self.amplitudes[2])) - ) - lfo4 = ( - pyo.Sine(float(self.tremolo_vals[3]), 0, float(self.amplitudes[3]), 0) - if self.tremolo_vals[3] > 0 - else pyo.Cos(d, mul=float(self.amplitudes[3])) - ) - lfo5 = ( - pyo.Sine(float(self.tremolo_vals[4]), 0, float(self.amplitudes[4]), 0) - if self.tremolo_vals[4] > 0 - else pyo.Cos(e, mul=float(self.amplitudes[4])) - ) - - self.stream1 = pyo.Sine( + if self._soniseries.server.getIsBooted(): + self._soniseries.server.shutdown() + + self._soniseries.server.boot() + self._soniseries.server.start() + + # TODO: Generalize the self.delays list + # `step` must go into `stop` 5 times, since we have 5 pitches + self.delays = [0.0, 2.0, 4.0, 6.0, 8.0] + + # `total_duration` is in seconds + self.total_duration = 8.0 + + self.amplitudes = [amp / max(self.amplitudes) for amp in self.amplitudes] + + a = pyo.Phasor(self.pitch_values[0], mul=np.pi * 2) + b = pyo.Phasor(self.pitch_values[1], mul=np.pi * 2) + c = pyo.Phasor(self.pitch_values[2], mul=np.pi * 2) + d = pyo.Phasor(self.pitch_values[3], mul=np.pi * 2) + e = pyo.Phasor(self.pitch_values[4], mul=np.pi * 2) + + + # TODO: Make everything below iterable to it's cleaner and takes up less lines + lfo1 = ( + pyo.Sine(float(self.tremolo_vals[0]), 0, float(self.amplitudes[0]), 0) + if self.tremolo_vals[0] > 0 + else pyo.Cos(a, mul=float(self.amplitudes[0])) + ) + lfo2 = ( + pyo.Sine(float(self.tremolo_vals[1]), 0, float(self.amplitudes[1]), 0) + if self.tremolo_vals[1] > 0 + else pyo.Cos(b, mul=float(self.amplitudes[1])) + ) + lfo3 = ( + pyo.Sine(float(self.tremolo_vals[2]), 0, float(self.amplitudes[2]), 0) + if self.tremolo_vals[2] > 0 + else pyo.Cos(c, mul=float(self.amplitudes[2])) + ) + lfo4 = ( + pyo.Sine(float(self.tremolo_vals[3]), 0, float(self.amplitudes[3]), 0) + if self.tremolo_vals[3] > 0 + else pyo.Cos(d, mul=float(self.amplitudes[3])) + ) + lfo5 = ( + pyo.Sine(float(self.tremolo_vals[4]), 0, float(self.amplitudes[4]), 0) + if self.tremolo_vals[4] > 0 + else pyo.Cos(e, mul=float(self.amplitudes[4])) + ) + + self.stream1 = pyo.Sine( + freq=[self.pitch_values[0], self.pitch_values[0]], mul=lfo1 + ).out(delay=self.delays[0], dur=2.0) + self.stream2 = pyo.Sine( + freq=[self.pitch_values[1], self.pitch_values[1]], mul=lfo2 + ).out(delay=self.delays[1], dur=2.0) + self.stream3 = pyo.Sine( + freq=[self.pitch_values[2], self.pitch_values[2]], mul=lfo3 + ).out(delay=self.delays[2], dur=2.0) + self.stream4 = pyo.Sine( + freq=[self.pitch_values[3], self.pitch_values[3]], mul=lfo4 + ).out(delay=self.delays[3], dur=2.0) + self.stream5 = pyo.Sine( + freq=[self.pitch_values[4], self.pitch_values[4]], mul=lfo5 + ).out(delay=self.delays[4], dur=2.0) + + # All together, if in ensemble mode. + if self._soniseries.preview_type == "ensemble": + self.stream6 = pyo.Sine( freq=[self.pitch_values[0], self.pitch_values[0]], mul=lfo1 - ).out(delay=self.delays[0], dur=2.0) - self.stream2 = pyo.Sine( + ).out(delay=10, dur=4) + self.stream7 = pyo.Sine( freq=[self.pitch_values[1], self.pitch_values[1]], mul=lfo2 - ).out(delay=self.delays[1], dur=2.0) - self.stream3 = pyo.Sine( + ).out(delay=10, dur=4) + self.stream8 = pyo.Sine( freq=[self.pitch_values[2], self.pitch_values[2]], mul=lfo3 - ).out(delay=self.delays[2], dur=2.0) - self.stream4 = pyo.Sine( + ).out(delay=10, dur=4) + self.stream9 = pyo.Sine( freq=[self.pitch_values[3], self.pitch_values[3]], mul=lfo4 - ).out(delay=self.delays[3], dur=2.0) - self.stream5 = pyo.Sine( + ).out(delay=10, dur=4) + self.stream10 = pyo.Sine( freq=[self.pitch_values[4], self.pitch_values[4]], mul=lfo5 - ).out(delay=self.delays[4], dur=2.0) - - # All together, if in ensemble mode. - if self._soniseries.preview_type == "ensemble": - self.stream6 = pyo.Sine( - freq=[self.pitch_values[0], self.pitch_values[0]], mul=lfo1 - ).out(delay=10, dur=4) - self.stream7 = pyo.Sine( - freq=[self.pitch_values[1], self.pitch_values[1]], mul=lfo2 - ).out(delay=10, dur=4) - self.stream8 = pyo.Sine( - freq=[self.pitch_values[2], self.pitch_values[2]], mul=lfo3 - ).out(delay=10, dur=4) - self.stream9 = pyo.Sine( - freq=[self.pitch_values[3], self.pitch_values[3]], mul=lfo4 - ).out(delay=10, dur=4) - self.stream10 = pyo.Sine( - freq=[self.pitch_values[4], self.pitch_values[4]], mul=lfo5 - ).out(delay=10, dur=4) + ).out(delay=10, dur=4)