diff --git a/agentMET4FOF/signal_processing.py b/agentMET4FOF/signal_processing.py new file mode 100644 index 00000000..0c6d557e --- /dev/null +++ b/agentMET4FOF/signal_processing.py @@ -0,0 +1,49 @@ +import matplotlib +import matplotlib.pyplot as plt +import numpy as np +from scipy import fftpack + +from agentMET4FOF.agents import AgentMET4FOF + +matplotlib.use("Agg") # for sending plots as images + +class FFT_Agent(AgentMET4FOF): + """ + Buffers the received signal and apply fft on the collected signal upon reaching the `buffer_size` limit. + The buffer then empties and the fft spectrum data and its plot are sent out on `default` and `plot` channel respectively. + + Parameters + ---------- + s_freq : int + Actual sampling frequency for plotting the x-axis + """ + + def init_parameters(self, s_freq=1): + self.s_freq = s_freq + + def on_received_message(self, message): + single_measurement = message["data"] + agent_from = message["from"] + self.buffer.store(data=single_measurement, agent_from=agent_from) + + if self.buffer.buffer_filled(agent_from): + # extract collected signal from buffer + buffered_sig = self.buffer[agent_from] + + # apply fft and convert to amplitudes + sig_fft = fftpack.fft(buffered_sig) + power_amp = np.abs(sig_fft) ** 2 + + # get the corresponding frequencies + sample_freq = fftpack.fftfreq(len(buffered_sig), d=1/self.s_freq) + self.send_output(power_amp) + self.send_plot(self.plot_fft(power_amp, sample_freq)) + self.buffer_clear(agent_from) + + def plot_fft(self, amplitudes, sample_freq): + N = len(amplitudes) + fig = plt.figure() + plt.plot(sample_freq[1:N//2], amplitudes[1:N//2]) + plt.xlabel("Frequency (Hz)") + plt.ylabel("Power Amplitude") + return fig diff --git a/agentMET4FOF_tutorials/signal_processing/fft_example.py b/agentMET4FOF_tutorials/signal_processing/fft_example.py new file mode 100644 index 00000000..62815233 --- /dev/null +++ b/agentMET4FOF_tutorials/signal_processing/fft_example.py @@ -0,0 +1,28 @@ +from agentMET4FOF.agents import AgentNetwork, SineGeneratorAgent, MonitorAgent +from agentMET4FOF.signal_processing import FFT_Agent + + +def main(): + # start agent network server + agentNetwork = AgentNetwork(backend="mesa") + # init agents + gen_agent = agentNetwork.add_agent(agentType=SineGeneratorAgent) + fft_agent = agentNetwork.add_agent(agentType=FFT_Agent, buffer_size=50, s_freq=100) + monitor_agent = agentNetwork.add_agent(agentType=MonitorAgent) + + # connect agents : We can connect multiple agents to any particular agent + agentNetwork.bind_agents(gen_agent, fft_agent) + + # connect + agentNetwork.bind_agents(gen_agent, monitor_agent) + agentNetwork.bind_agents(fft_agent, monitor_agent, channel=["plot"]) + + # set all agents states to "Running" + agentNetwork.set_running_state() + + # allow for shutting down the network after execution + return agentNetwork + + +if __name__ == "__main__": + main() \ No newline at end of file