Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

decouple compressor message from allreducesynchronizer #46

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions autodist/kernel/graph_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ def _initialize_synchronizers(self):
for part in node.part_config:
self._synchronizers[part.var_name] = \
Synchronizer.create(part.WhichOneof('synchronizer'),
getattr(part, part.WhichOneof('synchronizer')))
part)
else:
self._synchronizers[node.var_name] = \
Synchronizer.create(node.WhichOneof('synchronizer'),
getattr(node, node.WhichOneof('synchronizer')))
node)

config = self._strategy.graph_config.replicas
replica_devices = {device_spec.DeviceSpecV2.from_string(s) for s in config}
Expand Down
33 changes: 25 additions & 8 deletions autodist/kernel/synchronization/all_reduce_synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,23 @@
replica_prefix, get_control_consumers, update_control_consumers
from autodist.kernel.common.utils import get_op_name
from autodist.kernel.synchronization.collective_key import get_collective_keys
from autodist.kernel.synchronization.compressor import Compressor, CollectiveOpsConfig
# from autodist.kernel.synchronization.compressor import Compressor, CollectiveOpsConfig
from autodist.kernel.synchronization.compressor import Compressor
from autodist.kernel.synchronization.synchronizer import Synchronizer
from autodist.proto import synchronizers_pb2
from autodist.proto import synchronizers_pb2, compressor_pb2, strategy_pb2
from autodist.utils import logging


class CollectiveOpsConfig:
"""Config for using Collective Ops."""

group_size: int
group_key: str
instance_key: str
merge_op: str
final_op: str


class AllReduceSynchronizer(Synchronizer):
"""
AllReduce Synchronizer.
Expand All @@ -50,21 +61,27 @@ class AllReduceSynchronizer(Synchronizer):
2. any other types of hybrid reduction of PS and AllReduce.
"""

def __init__(self, config: synchronizers_pb2.AllReduceSynchronizer):
self._spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Name(config.spec)
def __init__(self, config: strategy_pb2.Strategy.Node):
compressor_value = getattr(config, 'compressor')
syncer_config = getattr(config, config.WhichOneof('synchronizer'))
self._spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Name(syncer_config.spec)
if autodist.float_major_minor_tf_version < 1.15 or autodist.float_major_minor_tf_version < 2.1:
logging.warning('Collective synchronizer spec "{}" a.k.a communication_hint has no effect '
'until tensorflow-gpu 1.x>= 1.15 or 2.x>=2.1. It may cause error currently.'
.format(self._spec))
self._spec = None

self._compressor_type = synchronizers_pb2.AllReduceSynchronizer.Compressor.Name(config.compressor)

# Collective ops within the same group will be merged by the scoped optimizer.
# Normally the group index shall be smaller than the number of variables in the graph; this kernel assumes
# the strategy will validate the group assignments are legitimate.
self._group = config.group
self._group = syncer_config.group
super().__init__()
if compressor_value:
self._compressor_type = compressor_pb2.Compressor.Type.Name(compressor_value)

def _all_reduce(self, tensor: Tensor, conf: CollectiveOpsConfig):

return collective_ops.all_reduce(tensor, **conf.__dict__)

def in_graph_apply(self, graph_item, var_name):
"""
Expand Down Expand Up @@ -122,7 +139,7 @@ def _collect_dense_gradients(self, graph_item, var_op_name):
# "\/" is added for name scope reuse
with ops.name_scope(replica_prefix(i) + "/collective-group-{}/".format(self._group)):
with ops.colocate_with(grad.op):
reduced_grad = compressors[i].reduce(grad, conf)
reduced_grad = compressors[i].compress(grad, conf)
update_consumers(grad_consumers, grad, reduced_grad)
# TODO(Hao): update grad, target pair here or not?

Expand Down
181 changes: 111 additions & 70 deletions autodist/kernel/synchronization/compressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@
from abc import ABC, abstractmethod
from tensorflow.python.framework import dtypes
from tensorflow.python.framework.ops import Tensor
from tensorflow.python.ops import collective_ops, math_ops

# from tensorflow.python.ops import collective_ops, math_ops
from tensorflow.python.ops import math_ops
#from tensorflow.python.ops import array_ops, collective_ops, linalg_ops, math_ops, random_ops
#from autodist.kernel.synchronization.collective_key import get_collective_keys
#from autodist.utils import logging


class CollectiveOpsConfig:
"""Config for using Collective Ops."""
# class CollectiveOpsConfig:
# """Config for using Collective Ops."""

group_size: int
group_key: str
instance_key: str
merge_op: str
final_op: str
# group_size: int
# group_key: str
# instance_key: str
# merge_op: str
# final_op: str


class Compressor(ABC):
Expand All @@ -44,21 +44,21 @@ class Compressor(ABC):
def __init__(self, var_op_name):
self.var_op_name = var_op_name

@abstractmethod
def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig):
"""
Compress, reduce, and decompress a given tensor.
# @abstractmethod
# def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig):
# """
# Compress, reduce, and decompress a given tensor.

Args:
tensor (Tensor): the Tensor to reduce.
conf (CollectiveOpsConfig): the config for Collective Ops.
# Args:
# tensor (Tensor): the Tensor to reduce.
# conf (CollectiveOpsConfig): the config for Collective Ops.

Returns:
Reduced Tensor
"""
# Returns:
# Reduced Tensor
# """

@abstractmethod
def _compress(self, tensor: Tensor):
def compress(self, tensor: Tensor):
"""
Compress a given tensor.

Expand All @@ -70,7 +70,7 @@ def _compress(self, tensor: Tensor):
"""

@abstractmethod
def _decompress(self, compressed_tensor: Tensor):
def decompress(self, compressed_tensor: Tensor):
"""
Decompress a given tensor.

Expand All @@ -81,19 +81,19 @@ def _decompress(self, compressed_tensor: Tensor):
Tensor, Context
"""

@staticmethod
def _all_reduce(tensor: Tensor, conf: CollectiveOpsConfig):
"""
Using CollectiveOps, AllReduce the given tensor.
# @staticmethod
# def _all_reduce(tensor: Tensor, conf: CollectiveOpsConfig):
# """
# Using CollectiveOps, AllReduce the given tensor.

Args:
tensor (Tensor): the tensor to all-reduce
conf (CollectiveOpsConfig): the config for CollectiveOps
# Args:
# tensor (Tensor): the tensor to all-reduce
# conf (CollectiveOpsConfig): the config for CollectiveOps

Returns:
The All-Reduced Tensor
"""
return collective_ops.all_reduce(tensor, **conf.__dict__)
# Returns:
# The All-Reduced Tensor
# """
# return collective_ops.all_reduce(tensor, **conf.__dict__)

@classmethod
def create(cls, name, *args, **kwargs):
Expand Down Expand Up @@ -124,45 +124,68 @@ def __init__(self, var_op_name):
self.error = None
super().__init__(var_op_name)

def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig):
"""
Compress, reduce, and decompress a given tensor.

Args:
tensor (Tensor): the Tensor to reduce.
conf (CollectiveOpsConfig): the config for Collective Ops.

Returns:
Reduced Tensor
"""
# def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig):
# """
# Compress, reduce, and decompress a given tensor.

# Args:
# tensor (Tensor): the Tensor to reduce.
# conf (CollectiveOpsConfig): the config for Collective Ops.

# Returns:
# Reduced Tensor
# """
# if self.error is not None:
# tensor += self.error
# compressed_tensor = self._compress(tensor)
# self.error = tensor - self._decompress(compressed_tensor)
# reduced = self._all_reduce(compressed_tensor, conf)
# return self._decompress(reduced)

def _compute_error(self, tensor: Tensor):
if self.error is not None:
tensor += self.error
compressed_tensor = self._compress(tensor)
self.error = tensor - self._decompress(compressed_tensor)
reduced = self._all_reduce(compressed_tensor, conf)
return self._decompress(reduced)

compressed_tensor = self.compress(tensor)
self.error = tensor - self.decompress(compressed_tensor)

class NoneCompressor(Compressor):
"""An identity Compressor."""

def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig):
# def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig):
# """
# Compress, reduce, and decompress a given tensor.

# Args:
# tensor (Tensor): the Tensor to reduce.
# conf (CollectiveOpsConfig): the config for Collective Ops.

# Returns:
# Reduced Tensor
# """
# return self._all_reduce(tensor, conf)

def compress(self, tensor: Tensor):
"""
Compress, reduce, and decompress a given tensor.
Compress a given tensor.

Args:
tensor (Tensor): the Tensor to reduce.
conf (CollectiveOpsConfig): the config for Collective Ops.
tensor (Tensor): the Tensor to compress.

Returns:
Reduced Tensor
Tensor
"""
return self._all_reduce(tensor, conf)

def _compress(self, tensor: Tensor):
return tensor

def _decompress(self, compressed_tensor: Tensor):
def decompress(self, compressed_tensor: Tensor):
"""
Decompress a given tensor.

Args:
compressed_tensor (Tensor): the Tensor to decompress.

Returns:
Tensor, Context
"""
return compressed_tensor


Expand All @@ -173,22 +196,31 @@ def __init__(self, var_op_name):
self.dtype = None
super().__init__(var_op_name)

def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig):
# def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig):
# """
# Compress, reduce, and decompress a given tensor.

# Args:
# tensor (Tensor): the Tensor to reduce.
# conf (CollectiveOpsConfig): the config for Collective Ops.

# Returns:
# Reduced Tensor
# """
# compressed_tensor = self._compress(tensor)
# reduced = self._all_reduce(compressed_tensor, conf)
# return self._decompress(reduced)

def compress(self, tensor: Tensor):
"""
Compress, reduce, and decompress a given tensor.
Compress a given tensor.

Args:
tensor (Tensor): the Tensor to reduce.
conf (CollectiveOpsConfig): the config for Collective Ops.
tensor (Tensor): the Tensor to compress.

Returns:
Reduced Tensor
Tensor
"""
compressed_tensor = self._compress(tensor)
reduced = self._all_reduce(compressed_tensor, conf)
return self._decompress(reduced)

def _compress(self, tensor: Tensor):
self.dtype = tensor.dtype
tensor_compressed = tensor
if tensor.dtype.is_floating:
Expand All @@ -197,12 +229,21 @@ def _compress(self, tensor: Tensor):
tensor_compressed = math_ops.cast(tensor, dtypes.float32)
return tensor_compressed

def _decompress(self, compressed_tensor: Tensor):
def decompress(self, compressed_tensor: Tensor):
"""
Decompress a given tensor.

Args:
compressed_tensor (Tensor): the Tensor to decompress.

Returns:
Tensor, Context
"""
return math_ops.cast(compressed_tensor, self.dtype)


class HorovodCompressorEF(CompressorEF, HorovodCompressor): # This works because of Method Resolution Order
"""Horovod's Compression but with Error Feedback."""
# class HorovodCompressorEF(CompressorEF, HorovodCompressor): # This works because of Method Resolution Order
# """Horovod's Compression but with Error Feedback."""


# class PowerSGDCompressor(CompressorEF):
Expand Down
17 changes: 11 additions & 6 deletions autodist/kernel/synchronization/ps_synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
remove_from_control_consumers, get_index_from_tensor_name, update_colocation_group
from autodist.kernel.common.variable_utils import get_read_var_ops
from autodist.kernel.synchronization.synchronizer import Synchronizer
from autodist.proto import synchronizers_pb2
# from autodist.proto import synchronizers_pb2, strategy_pb2
from autodist.proto import strategy_pb2, compressor_pb2


class PSSynchronizer(Synchronizer):
Expand All @@ -53,15 +54,19 @@ class PSSynchronizer(Synchronizer):
for each variable for the workers to mark when their variable update is complete.
"""

def __init__(self, config: synchronizers_pb2.PSSynchronizer):
self.target_device = config.reduction_destination if config.reduction_destination else ""
self._local_replication = config.local_replication
self._sync = config.sync
self._staleness = config.staleness
def __init__(self, config: strategy_pb2.Strategy.Node):
syncer_config = getattr(config, config.WhichOneof('synchronizer'))
compressor_value = getattr(config, 'compressor')
self.target_device = syncer_config.reduction_destination if syncer_config.reduction_destination else ""
self._local_replication = syncer_config.local_replication
self._sync = syncer_config.sync
self._staleness = syncer_config.staleness

self._var_op_to_agg_grad = {}
self._var_op_to_accum_apply_op = {}
super().__init__()
if compressor_value:
self._compressor_type = compressor_pb2.Compressor.Type.Name(compressor_value)

def in_graph_apply(self, graph_item, var_name):
"""
Expand Down
1 change: 1 addition & 0 deletions autodist/kernel/synchronization/synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(self):
self.var_op_to_accum_apply_op = None
self.is_chief = None
self.all_canonical_replica_devices = None
self._compressor_type = None

# pylint: disable=too-many-arguments
def assign_cluster_information(self,
Expand Down
Loading