From f5a4e1001e0a5c06d39136e33465690d9417cb7c Mon Sep 17 00:00:00 2001 From: Matti Airas Date: Thu, 6 Jun 2024 10:57:31 +0300 Subject: [PATCH] Implement join and zip "transforms" --- examples/join_and_zip.cpp | 152 +++++++++++++++++++++++ src/sensesp/transforms/join.h | 199 ++++++++++++++++++++++++++++++ src/sensesp/transforms/zip.h | 221 ++++++++++++++++++++++++++++++++++ 3 files changed, 572 insertions(+) create mode 100644 examples/join_and_zip.cpp create mode 100644 src/sensesp/transforms/join.h create mode 100644 src/sensesp/transforms/zip.h diff --git a/examples/join_and_zip.cpp b/examples/join_and_zip.cpp new file mode 100644 index 000000000..db9a22db5 --- /dev/null +++ b/examples/join_and_zip.cpp @@ -0,0 +1,152 @@ +/** + * @file join_and_zip.cpp + * @brief Example of Join and Zip transforms. + * + * Join and Zip transforms combine multiple input values into a single output. + * + * Try running this code with the serial monitor open ("Upload and Monitor" + * in PlatformIO menu). The program will produce capital letters every second, + * lowercase letters every 3 seconds, and integers every 10 seconds. Comment + * out and enable Join and zip transforms to see how they affect the + * output. + * + */ + +#include "sensesp.h" + +#include + +#include "sensesp/sensors/sensor.h" +#include "sensesp/transforms/join.h" +#include "sensesp/transforms/lambda_transform.h" +#include "sensesp/transforms/zip.h" +#include "sensesp_minimal_app_builder.h" + +using namespace sensesp; + +ReactESP app; + +SensESPMinimalApp* sensesp_app; + +void setup() { + SetupLogging(); + + // Note: SensESPMinimalAppBuilder is used to build the app. This creates + // a minimal app with no networking or other bells and whistles which + // would be distracting in this example. In normal use, this is not what + // you would use. Unless, of course, you know that is what you want. + SensESPMinimalAppBuilder builder; + + sensesp_app = builder.get_app(); + + // Produce capital letters every second + auto sensor_A = new RepeatSensor(1000, []() { + static char value = 'Z'; + if (value == 'Z') { + value = 'A'; + } else { + value += 1; + } + return value; + }); + + sensor_A->connect_to(new LambdaConsumer( + [](char value) { ESP_LOGD("App", " %c", value); })); + + // Produce lowercase letters every 3 seconds + auto sensor_a = new RepeatSensor(3000, []() { + static char value = 'z'; + if (value == 'z') { + value = 'a'; + } else { + value += 1; + } + return value; + }); + + sensor_a->connect_to(new LambdaConsumer( + [](char value) { ESP_LOGD("App", " %c", value); })); + + // Produce integers every 10 seconds + auto sensor_int = new RepeatSensor(10000, []() { + static int value = 0; + value += 1; + return value; + }); + + sensor_int->connect_to(new LambdaConsumer( + [](int value) { ESP_LOGD("App", " %d", value); })); + + // Join the three producer outputs into one tuple. A tuple is a data + // structure that can hold multiple values of different types. The resulting + // tuple can be consumed by consumers to process the values together. + + auto* merged = new Join3(5000); + + // The Join transform will emit a tuple whenever any of the producers emit a + // new value, as long as all values are less than max_age milliseconds old. + + // Once an integer is produced, the Join transform produces tuples for all + // new letter input until the last integer value is over 5000 milliseconds + // old. + + // Next, try commenting out the Join transform and enabling the Zip transform + // below to see how it affects the output. + + // auto* merged = new Zip3(5000); + + // The Zip transform will emit a tuple only when all producers have emitted a + // new value within max_age milliseconds. This has the effect of synchronizing + // the producers' outputs, at the cost of potentially waiting for all + // producers to emit a new value. + + // Below, the sensors are connected to the consumers of the Join/Zip + // transform. The syntax here is a bit more complex and warrants some + // explanation. + + // `merged` is our variable holding a pointer to the Join or Zip transform. + // The `consumers` member of the transform is a tuple of LambdaConsumers + // that consume and process the values of the producers. Subscripts [] can + // only be used to access elements of a same type, but our LambdaConsumers + // are of potentially different types - hence the tuple. The `std::get<>()` + // function is used to access the elements of the tuple. The first argument + // is the index of the element in the tuple, starting from 0. + + // `connect_to()` expects a pointer to a `ValueConsumer`, but `std::get` + // returns a reference to the tuple element. The `&` operator is used to + // get the address of the tuple element, which is then passed to + // `connect_to()`. + + // TL;DR: We connect each sensor to the corresponding consumer Join or + // Zip transform. + + sensor_A->connect_to(&(std::get<0>(merged->consumers))); + sensor_a->connect_to(&(std::get<1>(merged->consumers))); + sensor_int->connect_to(&(std::get<2>(merged->consumers))); + + // Here, we have a LambdaTransform that takes the tuple of values produced + // by the Join/Zip transform and converts it into a string. Note the template + // arguments: the transform input is a tuple of char, char, and int, and the + // output is a String. The same input type needs to be defined in our lambda + // function, starting with []. + + auto merged_string = new LambdaTransform, String>( + [](std::tuple values) { + return String(std::get<0>(values)) + " " + String(std::get<1>(values)) + + " " + String(std::get<2>(values)); + }); + + // Remember to connect the Join/Zip transform to the LambdaTransform: + + merged->connect_to(merged_string); + + // Finally, we connect the LambdaTransform to a consumer that will print the + // merged values to the console. + + merged_string->connect_to(new LambdaConsumer( + [](String value) { + ESP_LOGD("App", "Merged: %s", value.c_str()); + })); +} + +void loop() { app.tick(); } diff --git a/src/sensesp/transforms/join.h b/src/sensesp/transforms/join.h new file mode 100644 index 000000000..b2f182f87 --- /dev/null +++ b/src/sensesp/transforms/join.h @@ -0,0 +1,199 @@ +#ifndef SENSESP_SRC_SENSESP_TRANSFORMS_JOIN_H_ +#define SENSESP_SRC_SENSESP_TRANSFORMS_JOIN_H_ + +#include +#include + +#include "sensesp/system/lambda_consumer.h" +#include "sensesp/system/valueproducer.h" +#include "transform.h" + +namespace sensesp { + +//////////////////////////// +// Base class for Join. This is needed until Arduino ESP32 Core supports C++14 +// and template pack indices: https://stackoverflow.com/a/55807564/2999754 + +template +class JoinBase { + public: + JoinBase(long max_age = 0) : max_age_{max_age} { + for (int i = 0; i < N; i++) { + age_[i] = max_age; + } + } + + protected: + elapsedMillis age_[N]; + long max_age_; + + virtual void emit_tuple() = 0; + + void check_emit() { + for (int i = 0; i < N; i++) { + if (max_age_ != 0 && age_[i] > max_age_) { + return; + } + } + emit_tuple(); + } +}; + +/** + * @brief Join two producer values into a tuple. + * + * Joins the connected producers' values into a tuple. The tuple is emitted + * when any producer emits a new value and none of the values have aged more + * than max_age milliseconds. + * + */ +template +class Join : public JoinBase<2>, public ValueProducer> { + public: + Join(long max_age = 0) : JoinBase<2>(max_age) {} + + std::tuple, LambdaConsumer> consumers = { + LambdaConsumer([this](T1 value) { + std::get<0>(values) = value; + age_[0] = 0; + check_emit(); + }), + LambdaConsumer([this](T2 value) { + std::get<1>(values) = value; + age_[1] = 0; + check_emit(); + })}; + + protected: + std::tuple values; + + void emit_tuple() override { this->emit(values); } +}; + +/** + * @brief Merge three producer values into a tuple. + * + * Merges the connected producers' values into a tuple. The tuple is emitted + * once all producers have emitted a new value within max_age milliseconds. + * + */ +template +class Join3 : public JoinBase<3>, public ValueProducer> { + public: + Join3(long max_age = 0) : JoinBase<3>(max_age) {} + + std::tuple, LambdaConsumer, LambdaConsumer> + consumers = {LambdaConsumer([this](T1 value) { + std::get<0>(values) = value; + age_[0] = 0; + check_emit(); + }), + LambdaConsumer([this](T2 value) { + std::get<1>(values) = value; + age_[1] = 0; + check_emit(); + }), + LambdaConsumer([this](T3 value) { + std::get<2>(values) = value; + age_[2] = 0; + check_emit(); + })}; + + protected: + std::tuple values; + + void emit_tuple() override { this->emit(values); } +}; + +/** + * @brief Merge four producer values into a tuple. + * + * Merges the connected producers' values into a tuple. The tuple is emitted + * once all producers have emitted a new value within max_age milliseconds. + * + */ +template +class Join4 : public JoinBase<4>, + public ValueProducer> { + public: + Join4(long max_age = 0) : JoinBase<4>(max_age) {} + + std::tuple, LambdaConsumer, LambdaConsumer, + LambdaConsumer> + consumers = {LambdaConsumer([this](T1 value) { + std::get<0>(values) = value; + age_[0] = 0; + check_emit(); + }), + LambdaConsumer([this](T2 value) { + std::get<1>(values) = value; + age_[1] = 0; + check_emit(); + }), + LambdaConsumer([this](T3 value) { + std::get<2>(values) = value; + age_[2] = 0; + check_emit(); + }), + LambdaConsumer([this](T4 value) { + std::get<3>(values) = value; + age_[3] = 0; + check_emit(); + })}; + + protected: + std::tuple values; + + void emit_tuple() override { this->emit(values); } +}; + +/** + * @brief Merge five producer values into a tuple. + * + * Merges the connected producers' values into a tuple. The tuple is emitted + * once all producers have emitted a new value within max_age milliseconds. + * + */ +template +class Join5 : public JoinBase<5>, + public ValueProducer> { + public: + Join5(long max_age = 0) : JoinBase<5>(max_age) {} + + std::tuple, LambdaConsumer, LambdaConsumer, + LambdaConsumer, LambdaConsumer> + consumers = {LambdaConsumer([this](T1 value) { + std::get<0>(values) = value; + age_[0] = 0; + check_emit(); + }), + LambdaConsumer([this](T2 value) { + std::get<1>(values) = value; + age_[1] = 0; + check_emit(); + }), + LambdaConsumer([this](T3 value) { + std::get<2>(values) = value; + age_[2] = 0; + check_emit(); + }), + LambdaConsumer([this](T4 value) { + std::get<3>(values) = value; + age_[3] = 0; + check_emit(); + }), + LambdaConsumer([this](T5 value) { + std::get<4>(values) = value; + age_[4] = 0; + check_emit(); + })}; + + protected: + std::tuple values; + + void emit_tuple() override { this->emit(values); } +}; + +} // namespace sensesp + +#endif diff --git a/src/sensesp/transforms/zip.h b/src/sensesp/transforms/zip.h new file mode 100644 index 000000000..409627b9e --- /dev/null +++ b/src/sensesp/transforms/zip.h @@ -0,0 +1,221 @@ +#ifndef SENSESP_SRC_SENSESP_TRANSFORMS_ZIP_H_ +#define SENSESP_SRC_SENSESP_TRANSFORMS_ZIP_H_ + +#include +#include + +#include "sensesp/system/lambda_consumer.h" +#include "sensesp/system/valueproducer.h" +#include "transform.h" + +namespace sensesp { + +//////////////////////////// +// Base class for Zip. This is needed until Arduino ESP32 Core supports C++14 +// and template pack indices: https://stackoverflow.com/a/55807564/2999754 + +template +class ZipBase { + public: + ZipBase(long max_age = 0) : max_age_{max_age} { + for (int i = 0; i < N; i++) { + age_[i] = max_age; + fresh_[i] = false; + } + } + + protected: + elapsedMillis age_[N]; + bool fresh_[N]; + long max_age_; + + virtual void emit_tuple() = 0; + + virtual void check_emit() { + for (int i = 0; i < N; i++) { + if ((max_age_ != 0 && age_[i] > max_age_) || !fresh_[i]) { + return; + } + } + emit_tuple(); + + for (int i = 0; i < N; i++) { + fresh_[i] = false; + } + } +}; + +/** + * @brief Zip two producer values into a tuple. + * + * Zips the connected producers' values into a tuple. The tuple is emitted + * once all producers have emitted a new value within max_age milliseconds. + * All producers must emit a new value before the next tuple is emitted. + * + */ +template +class Zip : public ZipBase<2>, public ValueProducer> { + public: + Zip(long max_age = 0) : ZipBase<2>(max_age) {} + + std::tuple, LambdaConsumer> consumers = { + LambdaConsumer([this](T1 value) { + std::get<0>(values) = value; + age_[0] = 0; + fresh_[0] = true; + check_emit(); + }), + LambdaConsumer([this](T2 value) { + std::get<1>(values) = value; + age_[1] = 0; + fresh_[1] = true; + check_emit(); + })}; + + protected: + std::tuple values; + + void emit_tuple() override { this->emit(values); } +}; + +/** + * @brief Zip three producer values into a tuple. + * + * Zips the connected producers' values into a tuple. The tuple is emitted + * once all producers have emitted a new value within max_age milliseconds. + * + */ +template +class Zip3 : public ZipBase<3>, public ValueProducer> { + public: + Zip3(long max_age = 0) : ZipBase<3>(max_age) {} + + std::tuple, LambdaConsumer, LambdaConsumer> + consumers = {LambdaConsumer([this](T1 value) { + std::get<0>(values) = value; + age_[0] = 0; + fresh_[0] = true; + check_emit(); + }), + LambdaConsumer([this](T2 value) { + std::get<1>(values) = value; + age_[1] = 0; + fresh_[1] = true; + check_emit(); + }), + LambdaConsumer([this](T3 value) { + std::get<2>(values) = value; + age_[2] = 0; + fresh_[2] = true; + check_emit(); + })}; + + protected: + std::tuple values; + + void emit_tuple() override { this->emit(values); } +}; + +/** + * @brief Zip four producer values into a tuple. + * + * Zips the connected producers' values into a tuple. The tuple is emitted + * once all producers have emitted a new value within max_age milliseconds. + * + */ +template +class Zip4 : public ZipBase<4>, + public ValueProducer> { + public: + Zip4(long max_age = 0) : ZipBase<4>(max_age) {} + + std::tuple, LambdaConsumer, LambdaConsumer, + LambdaConsumer> + consumers = {LambdaConsumer([this](T1 value) { + std::get<0>(values) = value; + age_[0] = 0; + fresh_[0] = true; + check_emit(); + }), + LambdaConsumer([this](T2 value) { + std::get<1>(values) = value; + age_[1] = 0; + fresh_[1] = true; + check_emit(); + }), + LambdaConsumer([this](T3 value) { + std::get<2>(values) = value; + age_[2] = 0; + fresh_[2] = true; + check_emit(); + }), + LambdaConsumer([this](T4 value) { + std::get<3>(values) = value; + age_[3] = 0; + fresh_[3] = true; + check_emit(); + })}; + + protected: + std::tuple values; + + void emit_tuple() override { this->emit(values); } +}; + +/** + * @brief Zip five producer values into a tuple. + * + * Zips the connected producers' values into a tuple. The tuple is emitted + * once all producers have emitted a new value within max_age milliseconds. + * + */ +template +class Zip5 : public ZipBase<5>, + public ValueProducer> { + public: + Zip5(long max_age = 0) : ZipBase<5>(max_age) {} + + // The consumers member is a tuple of LambdaConsumers. Each LambdaConsumer + // has a different type, hence necessitating the use of a tuple. + std::tuple, LambdaConsumer, LambdaConsumer, + LambdaConsumer, LambdaConsumer> + consumers = {LambdaConsumer([this](T1 value) { + std::get<0>(values) = value; + age_[0] = 0; + fresh_[0] = true; + check_emit(); + }), + LambdaConsumer([this](T2 value) { + std::get<1>(values) = value; + age_[1] = 0; + fresh_[1] = true; + check_emit(); + }), + LambdaConsumer([this](T3 value) { + std::get<2>(values) = value; + age_[2] = 0; + fresh_[2] = true; + check_emit(); + }), + LambdaConsumer([this](T4 value) { + std::get<3>(values) = value; + age_[3] = 0; + fresh_[3] = true; + check_emit(); + }), + LambdaConsumer([this](T5 value) { + std::get<4>(values) = value; + age_[4] = 0; + fresh_[4] = true; + check_emit(); + })}; + + protected: + std::tuple values; + + void emit_tuple() override { this->emit(values); } +}; + +} // namespace sensesp + +#endif