Replies: 7 comments 4 replies
-
Well in general, for a good-plugin architecture below are design, implementation and code structure considerations which we could take. Design Considerations
Implementation Considerations
Code Structure
from abc import ABC, abstractmethod
class PluginBase(ABC):
@abstractmethod
def execute(self, data):
pass
import importlib
class PluginManager:
def __init__(self):
self.plugins = {}
def load_plugins(self, plugin_name):
module = importlib.import_module(plugin_name)
self.plugins[plugin_name] = module.Plugin()
def unload_plugin(self, plugin_name):
del self.plugins[plugin_name]
def execute_plugins(self, data):
for plugin in self.plugins.values():
plugin.execute(data)
class MyPlugin(PluginBase):
def execute(self, data):
print(f"plugin executed with data: {data}") Example implementation for our usecase in Spark-Expectations could be as below:Define a Standard Reader InterfaceFirst, define an abstract base class that all reader plugins must implement. This ensures that they all have the same interface. from abc import ABC, abstractmethod
class ReaderPlugin(ABC):
@abstractmethod
def read(self, *args, **kwargs):
pass Implement the Default ReaderDefault reader for reading from Delta can be a concrete implementation of this interface. class DeltaReader(ReaderPlugin):
def read(self, path):
# Delta reading logic here Create a Reader ManagerCreate a manager class that will dynamically load the appropriate reader plugin at runtime. import importlib
class ReaderManager:
def __init__(self):
self.readers = {'delta': DeltaReader()}
def register_reader(self, name, reader_class_path):
module_name, class_name = reader_class_path.rsplit('.', 1)
module = importlib.import_module(module_name)
reader_class = getattr(module, class_name)
if not issubclass(reader_class, ReaderPlugin):
raise ValueError("Invalid reader plugin")
self.readers[name] = reader_class()
def get_reader(self, name):
return self.readers.get(name) User-Defined Reader PluginUsers can create their own reader plugins by implementing the class SnowflakeReader(ReaderPlugin):
def read(self, query, connection_params):
# Snowflake reading logic here Registering User-Defined PluginsUsers can register their custom reader plugins with reader_manager = ReaderManager()
reader_manager.register_reader('snowflake', 'my_package.SnowflakeReader') Using the Reader at RuntimeWhen def read_data(source_type, *args, **kwargs):
reader = reader_manager.get_reader(source_type)
if reader:
return reader.read(*args, **kwargs)
else:
raise ValueError(f"No reader registered for {source_type}") This way, our library remains open for extension but closed for modification, adhering to the Open/Closed Principle. Users can easily extend its functionality to read from different data sources without having to modify its core logic. |
Beta Was this translation helpful? Give feedback.
-
I completely agree with design considerations, but for implementation I was thinking more a import importlib
import pkgutil
import inspect
from abc import ABC, abstractmethod
class SparkExpectationsSource(ABC):
@abstractmethod
def read(self, *args, **kwargs):
pass
@abstractmethod
def write(self, *args, **kwargs):
pass
class ServiceLoader:
def __init__(self, base_class):
self.base_class = base_class
self.providers = self._load_all_providers()
def _load_all_providers(self):
providers = {}
for importer, modname, ispkg in pkgutil.iter_modules():
try:
module = importer.find_module(modname).load_module(modname)
for name, obj in inspect.getmembers(module):
if (
inspect.isclass(obj)
and issubclass(obj, self.base_class)
and obj is not self.base_class
):
providers[obj.type] = obj()
except ImportError:
pass
return providers
def get_providers(self):
return self.providers
#user needed type can be obtained from the config
provider = ServiceLoader(SparkExpectationsSource).get_providers("user needed type")
provider.read()
provider.write() |
Beta Was this translation helpful? Give feedback.
-
The default implementation can come from the input dataframe itself ( check the lineage and fetch the source) assuming reading and writing will happen to same system. |
Beta Was this translation helpful? Give feedback.
-
I think |
Beta Was this translation helpful? Give feedback.
-
I Liked both the approaches proposed by @newfront and @phanikumarvemuri , so tried to club them together in the original pattern. Let me know your thoughts ## import required Modules
from abc import ABC, abstractmethod
from pyspark.sql import SparkSession
import importlib
import pkgutil
import inspect
## Define the Abstract Reader Class (ReaderPlugin)
class ReaderPlugin(ABC):
def __init__(self, spark_session, config):
self.spark_session = spark_session
self.config = config
@abstractmethod
def read(self, *args, **kwargs):
pass
## Implement Delta Reader
class DeltaReader(ReaderPlugin):
def read(self, path):
print(f"Reading data from Delta at {path}")
## As per Phani's suggestion, modified a bit - OptimizedServiceLoader class
## We only scan required packages from a known class path rather than all the modules
## We lazily load the required plugin using get_provider function
## We store the providers which are already read
class ServiceLoader:
def __init__(self, base_class, scan_packages=[]):
self.base_class = base_class
self.scan_packages = scan_packages
self.providers = {}
def _load_providers(self, package):
for importer, modname, ispkg in pkgutil.iter_modules(package.__path__):
try:
module = importlib.import_module(f"{package.__name__}.{modname}")
for name, obj in inspect.getmembers(module):
if inspect.isclass(obj) and issubclass(obj, self.base_class):
self.providers[obj.__name__] = obj
except ImportError:
pass
def get_provider(self, name):
if name not in self.providers:
for package in self.scan_packages:
self._load_providers(package)
return self.providers.get(name)
## As per Scott's suggestion, implementing new spark_session per reader
class ReaderManager:
def __init__(self, base_spark_session=None, scan_packages=[]):
self.base_spark_session = base_spark_session or SparkSession.builder.getOrCreate()
self.loader = ServiceLoader(ReaderPlugin, scan_packages=scan_packages)
self.readers = {}
def get_reader(self, name, config={}):
if name not in self.readers:
reader_class = self.loader.get_provider(name)
if reader_class:
new_session = self.base_spark_session.newSession() # Creates a new session
self.readers[name] = reader_class(new_session, config) # Instantiate the reader with the new session
return self.readers.get(name) # Return the cached reader if it exists, otherwise return None User code could look like this with the above implementation@se.with_expectations(
reader_manager.get_reader('ReaderType', reader_config).get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
table_name="pilot_nonpub.dq_employee.employee",
dq_stats_table_name="pilot_nonpub.dq.dq_stats"
),
# ... (rest of the existing code)
)
def build_new() -> DataFrame:
# ... (rest of the existing code) Sample Implementation for Snowflakeclass SnowflakeReader(ReaderPlugin):
def __init__(self, spark_session, config):
super().__init__(spark_session, config)
def read(self, query: str) -> DataFrame:
url = self.config.get('url')
user = self.config.get('user')
password = self.config.get('password')
database = self.config.get('database')
schema = self.config.get('schema')
warehouse = self.config.get('warehouse')
return self.spark_session.read \
.format("snowflake") \
.option("sfUrl", url) \
.option("sfDatabase", database) \
.option("sfWarehouse", warehouse) \
.option("sfSchema", schema) \
.option("sfRole", "PUBLIC") \
.option("sfFetchSize", 1000) \
.option("user", user) \
.option("password", password) \
.option("dbtable", query) \
.load() User Code for snowflake# Initialize ReaderManager
reader_manager = ReaderManager(base_spark_session=spark, scan_packages=['package_containing_SnowflakeReader'])
# Snowflake configuration
snowflake_config = {
'url': 'snowflake_url',
'user': 'username',
'password': 'password',
'database': 'database',
'schema': 'schema',
'warehouse': 'warehouse'
}
@se.with_expectations(
reader_manager.get_reader('SnowflakeReader', snowflake_config).get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
table_name="pilot_nonpub.dq_employee.employee",
dq_stats_table_name="pilot_nonpub.dq.dq_stats"
),
# ... (rest of the existing code)
)
def build_new() -> DataFrame:
# ... (rest of the existing code) |
Beta Was this translation helpful? Give feedback.
-
Me and @phanikumarvemuri met today and discussed the implementation. We re-thought the usage of plugins and may be plugins are not needed at all if we use native spark DataFrameWriter, DataFrameWriterV2 Decisions discussed:
|
Beta Was this translation helpful? Give feedback.
-
What are the requirements needed in order to create a good plugin architecture?
Beta Was this translation helpful? Give feedback.
All reactions