Skip to content

Commit

Permalink
Updates from_functions and from_molues to use the same base "compile"
Browse files Browse the repository at this point in the history
function

This makes it so that we reduce code paths -- we'll eventually use
compile for everything. This also allows us to push functions up to the
user level, enabling `with_functions`
  • Loading branch information
elijahbenizzy committed Feb 4, 2025
1 parent a288e84 commit 7d7fbfe
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 21 deletions.
4 changes: 3 additions & 1 deletion hamilton/async_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import time
import typing
import uuid
from types import ModuleType
from types import FunctionType, ModuleType
from typing import Any, Dict, Optional, Tuple

import hamilton.lifecycle.base as lifecycle_base
Expand Down Expand Up @@ -199,6 +199,7 @@ def __init__(
result_builder: Optional[base.ResultMixin] = None,
adapters: typing.List[lifecycle.LifecycleAdapter] = None,
allow_module_overrides: bool = False,
functions: typing.List[FunctionType],
):
"""Instantiates an asynchronous driver.
Expand Down Expand Up @@ -249,6 +250,7 @@ def __init__(
*async_adapters, # note async adapters will not be called during synchronous execution -- this is for access later
],
allow_module_overrides=allow_module_overrides,
functions=functions,
)
self.initialized = False

Expand Down
24 changes: 20 additions & 4 deletions hamilton/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import typing
import uuid
from datetime import datetime
from types import ModuleType
from types import FunctionType, ModuleType
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -402,6 +402,7 @@ def __init__(
self,
config: Dict[str, Any],
*modules: ModuleType,
functions: List[FunctionType] = None,
adapter: Optional[
Union[lifecycle_base.LifecycleAdapter, List[lifecycle_base.LifecycleAdapter]]
] = None,
Expand Down Expand Up @@ -435,13 +436,15 @@ def __init__(
if adapter.does_hook("pre_do_anything", is_async=False):
adapter.call_all_lifecycle_hooks_sync("pre_do_anything")
error = None
self.graph_functions = functions if functions is not None else []
self.graph_modules = modules
try:
self.graph = graph.FunctionGraph.from_modules(
*modules,
self.graph = graph.FunctionGraph.compile(
modules=list(modules),
functions=functions,
config=config,
adapter=adapter,
allow_module_overrides=allow_module_overrides,
allow_node_overrides=allow_module_overrides,
)
if _materializers:
materializer_factories, extractor_factories = self._process_materializers(
Expand Down Expand Up @@ -1866,6 +1869,7 @@ def __init__(self):
# common fields
self.config = {}
self.modules = []
self.functions = []
self.materializers = []

# Allow later modules to override nodes of the same name
Expand Down Expand Up @@ -1927,6 +1931,17 @@ def with_modules(self, *modules: ModuleType) -> "Builder":
self.modules.extend(modules)
return self

def with_functions(self, *functions: FunctionType) -> "Builder":
"""Adds the specified functions to the list.
This can be called multiple times. If you have allow_module_overrides
set this will enabl overwriting modules or previously added functions.
:param functions:
:return: self
"""
self.functions.extend(functions)
return self

def with_adapter(self, adapter: base.HamiltonGraphAdapter) -> "Builder":
"""Sets the adapter to use.
Expand Down Expand Up @@ -2168,6 +2183,7 @@ def build(self) -> Driver:
_graph_executor=graph_executor,
_use_legacy_adapter=False,
allow_module_overrides=self._allow_module_overrides,
functions=self.functions,
)

def copy(self) -> "Builder":
Expand Down
59 changes: 45 additions & 14 deletions hamilton/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import pathlib
import uuid
from enum import Enum
from types import ModuleType
from types import FunctionType, ModuleType
from typing import Any, Callable, Collection, Dict, FrozenSet, List, Optional, Set, Tuple, Type

import hamilton.lifecycle.base as lifecycle_base
Expand Down Expand Up @@ -142,17 +142,18 @@ def update_dependencies(
return nodes


def create_function_graph(
def compile_to_nodes(
*functions: List[Tuple[str, Callable]],
config: Dict[str, Any],
adapter: lifecycle_base.LifecycleAdapterSet = None,
fg: Optional["FunctionGraph"] = None,
allow_module_overrides: bool = False,
allow_node_leveL_overrides: bool = False,
) -> Dict[str, node.Node]:
"""Creates a graph of all available functions & their dependencies.
:param modules: A set of modules over which one wants to compute the function graph
:param config: Dictionary that we will inspect to get values from in building the function graph.
:param adapter: The adapter that adapts our node type checking based on the context.
:param allow_node_leveL_overrides: Whether or not to allow node names to override each other
:return: list of nodes in the graph.
If it needs to be more complicated, we'll return an actual networkx graph and get all the rest of the logic for free
"""
Expand All @@ -170,7 +171,7 @@ def create_function_graph(
for n in fm_base.resolve_nodes(f, config):
if n.name in config:
continue # This makes sure we overwrite things if they're in the config...
if n.name in nodes and not allow_module_overrides:
if n.name in nodes and not allow_node_leveL_overrides:
raise ValueError(
f"Cannot define function {n.name} more than once."
f" Already defined by function {f}"
Expand Down Expand Up @@ -713,13 +714,43 @@ def __init__(
self.nodes = nodes
self.adapter = adapter

@staticmethod
def compile(
modules: List[ModuleType],
functions: List[FunctionType],
config: Dict[str, Any],
adapter: lifecycle_base.LifecycleAdapterSet = None,
allow_node_overrides: bool = False,
) -> "FunctionGraph":
"""Base level static function for compiling a function graph. Note
that this can both use functions (E.G. passing them directly) and modules
(passing them in and crawling.
:param modules: Modules to use
:param functions: Functions to use
:param config: Config to use for setting up the DAG
:param adapter: Adapter to use for node resolution
:param allow_node_overrides: Whether or not to allow node level overrides.
:return: The compiled function graph
"""
module_functions = sum([find_functions(module) for module in modules], [])
nodes = compile_to_nodes(
*module_functions,
*functions,
config=config,
adapter=adapter,
allow_node_leveL_overrides=allow_node_overrides,
)
return FunctionGraph(nodes, config, adapter)

@staticmethod
def from_modules(
*modules: ModuleType,
config: Dict[str, Any],
adapter: lifecycle_base.LifecycleAdapterSet = None,
allow_module_overrides: bool = False,
):
additional_functions: List[FunctionType],
) -> "FunctionGraph":
"""Initializes a function graph from the specified modules. Note that this was the old
way we constructed FunctionGraph -- this is not a public-facing API, so we replaced it
with a constructor that takes in nodes directly. If you hacked in something using
Expand All @@ -732,28 +763,28 @@ def from_modules(
:return: a function graph.
"""

functions = sum([find_functions(module) for module in modules], [])
return FunctionGraph.from_functions(
*functions,
return FunctionGraph.compile(
modules=modules,
functions=[],
config=config,
adapter=adapter,
allow_module_overrides=allow_module_overrides,
allow_node_overrides=allow_module_overrides,
)

@staticmethod
def from_functions(
*functions,
*functions: FunctionType,
config: Dict[str, Any],
adapter: lifecycle_base.LifecycleAdapterSet = None,
allow_module_overrides: bool = False,
) -> "FunctionGraph":
nodes = create_function_graph(
*functions,
return FunctionGraph.compile(
modules=[],
functions=functions,
config=config,
adapter=adapter,
allow_module_overrides=allow_module_overrides,
allow_node_overrides=allow_module_overrides,
)
return FunctionGraph(nodes, config, adapter)

def with_nodes(self, nodes: Dict[str, Node]) -> "FunctionGraph":
"""Creates a new function graph with the additional specified nodes.
Expand Down
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ docs = [
"diskcache",
# required for all the plugins
"dlt",
# furo -- install from main for now until the next release is out:
"furo @ git+https://github.com/pradyunsg/furo@main",
"furo",
"gitpython", # Required for parsing git info for generation of data-adapter docs
"grpcio-status",
"lightgbm",
Expand Down

0 comments on commit 7d7fbfe

Please sign in to comment.