Skip to content

Commit

Permalink
rename: more readable (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
jasperzhong committed May 16, 2020
1 parent 300a06b commit 721b4ca
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 54 deletions.
59 changes: 31 additions & 28 deletions byteps/common/compressor/strategy/onebit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ OnebitCompressor::OnebitCompressor(bool use_scale) : _use_scale(use_scale){};

OnebitCompressor::~OnebitCompressor() = default;

template <typename T>
static size_t _Packing(T* data, size_t len) {
constexpr int PACKING_SIZE = sizeof(T) * 8;
template <typename scalar_t>
size_t OnebitCompressor::PackingImpl(scalar_t* data, size_t len) {
constexpr int PACKING_SIZE = sizeof(scalar_t) * 8;
size_t padding_len = (PACKING_SIZE - (len % PACKING_SIZE)) % PACKING_SIZE;
size_t chunk_size = (len + padding_len) / PACKING_SIZE;

Expand All @@ -53,22 +53,22 @@ static size_t _Packing(T* data, size_t len) {
}
}

return chunk_size * sizeof(T);
return chunk_size * sizeof(scalar_t);
}

static size_t Packing(void* data, size_t len, int dtype) {
size_t OnebitCompressor::Packing(void* data, size_t len, int dtype) {
switch (dtype) {
case BYTEPS_INT8:
case BYTEPS_UINT8:
return _Packing(reinterpret_cast<int8_t*>(data), len);
return PackingImpl(reinterpret_cast<int8_t*>(data), len);
case BYTEPS_FLOAT16:
return _Packing(reinterpret_cast<int16_t*>(data), len);
return PackingImpl(reinterpret_cast<int16_t*>(data), len);
case BYTEPS_INT32:
case BYTEPS_FLOAT32:
return _Packing(reinterpret_cast<int32_t*>(data), len);
return PackingImpl(reinterpret_cast<int32_t*>(data), len);
case BYTEPS_INT64:
case BYTEPS_FLOAT64:
return _Packing(reinterpret_cast<int64_t*>(data), len);
return PackingImpl(reinterpret_cast<int64_t*>(data), len);
default:
BPS_CHECK(0) << "Unsupported data type: " << dtype;
}
Expand All @@ -95,11 +95,13 @@ void OnebitCompressor::Compress(ByteBuf grad, int dtype, ByteBuf& compressed) {
compressed.size = compressed_size + sizeof(float);
}

template <typename T1, typename T2>
static size_t _Unpacking(T1* dst, const T2* src, size_t size) {
static_assert(sizeof(T1) == sizeof(T2), "T1 should be the same size as T2");
constexpr int PACKING_SIZE = sizeof(T2) * 8;
auto chunk_size = (size - sizeof(float)) / sizeof(T2);
template <typename scalar_t, typename packing_t>
size_t OnebitCompressor::UnpackingImpl(scalar_t* dst, const packing_t* src,
size_t size) {
static_assert(sizeof(scalar_t) == sizeof(packing_t),
"scalar_t should be the same size as packing_t");
constexpr int PACKING_SIZE = sizeof(packing_t) * 8;
auto chunk_size = (size - sizeof(float)) / sizeof(packing_t);

float scale;
auto pf = reinterpret_cast<const float*>(src + chunk_size);
Expand All @@ -118,30 +120,31 @@ static size_t _Unpacking(T1* dst, const T2* src, size_t size) {
return chunk_size;
}

static size_t Unpacking(void* dst, const void* src, size_t len, int dtype) {
size_t OnebitCompressor::Unpacking(void* dst, const void* src, size_t len,
int dtype) {
switch (dtype) {
case BYTEPS_INT8:
return _Unpacking(reinterpret_cast<int8_t*>(dst),
reinterpret_cast<const int8_t*>(src), len);
return UnpackingImpl(reinterpret_cast<int8_t*>(dst),
reinterpret_cast<const int8_t*>(src), len);
case BYTEPS_UINT8:
return _Unpacking(reinterpret_cast<uint8_t*>(dst),
reinterpret_cast<const int8_t*>(src), len);
return UnpackingImpl(reinterpret_cast<uint8_t*>(dst),
reinterpret_cast<const int8_t*>(src), len);
// TODO:
// case BYTEPS_FLOAT16:
// return _Unpacking(reinterpret_cast<uint16_t*>(dst),
// return UnpackingImpl(reinterpret_cast<uint16_t*>(dst),
// reinterpret_cast<const int16_t*>(src), len);
case BYTEPS_INT32:
return _Unpacking(reinterpret_cast<int32_t*>(dst),
reinterpret_cast<const int32_t*>(src), len);
return UnpackingImpl(reinterpret_cast<int32_t*>(dst),
reinterpret_cast<const int32_t*>(src), len);
case BYTEPS_FLOAT32:
return _Unpacking(reinterpret_cast<float*>(dst),
reinterpret_cast<const int32_t*>(src), len);
return UnpackingImpl(reinterpret_cast<float*>(dst),
reinterpret_cast<const int32_t*>(src), len);
case BYTEPS_INT64:
return _Unpacking(reinterpret_cast<int64_t*>(dst),
reinterpret_cast<const int64_t*>(src), len);
return UnpackingImpl(reinterpret_cast<int64_t*>(dst),
reinterpret_cast<const int64_t*>(src), len);
case BYTEPS_FLOAT64:
return _Unpacking(reinterpret_cast<double*>(dst),
reinterpret_cast<const int64_t*>(src), len);
return UnpackingImpl(reinterpret_cast<double*>(dst),
reinterpret_cast<const int64_t*>(src), len);
default:
BPS_CHECK(0) << "Unsupported data type: " << dtype;
}
Expand Down
13 changes: 12 additions & 1 deletion byteps/common/compressor/strategy/onebit.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace compressor {
*/
class OnebitCompressor : public BaseCompressor {
public:
OnebitCompressor(bool use_scale=false);
OnebitCompressor(bool use_scale = false);
virtual ~OnebitCompressor();

/*!
Expand All @@ -65,6 +65,17 @@ class OnebitCompressor : public BaseCompressor {
void Decompress(ByteBuf compressed, int dtype,
ByteBuf& decompressed) override;

private:
size_t Packing(void* data, size_t len, int dtype);

template <typename scalar_t>
size_t PackingImpl(scalar_t* data, size_t len);

size_t Unpacking(void* dst, const void* src, size_t len, int dtype);

template <typename scalar_t, typename packing_t>
size_t UnpackingImpl(scalar_t* dst, const packing_t* src, size_t size);

private:
bool _use_scale;
};
Expand Down
20 changes: 10 additions & 10 deletions byteps/common/compressor/strategy/randomk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ RandomkCompressor::RandomkCompressor(int k) : _k(k) { _gen.seed(_rd()); };

RandomkCompressor::~RandomkCompressor() = default;
template <typename index_t, typename scalar_t>
size_t RandomkCompressor::_Packing(index_t* dst, const scalar_t* src,
size_t RandomkCompressor::PackingImpl(index_t* dst, const scalar_t* src,
size_t len) {
static_assert(sizeof(index_t) == sizeof(scalar_t),
"index_t should be the same size as scalar_t");
Expand All @@ -61,22 +61,22 @@ size_t RandomkCompressor::_Packing(index_t* dst, const scalar_t* src,
size_t RandomkCompressor::Packing(const void* src, size_t size, int dtype) {
switch (dtype) {
case BYTEPS_INT8:
return _Packing(reinterpret_cast<int8_t*>(_buf.get()),
return PackingImpl(reinterpret_cast<int8_t*>(_buf.get()),
reinterpret_cast<const int8_t*>(src),
size / sizeof(int8_t));
case BYTEPS_UINT8:
return _Packing(reinterpret_cast<uint8_t*>(_buf.get()),
return PackingImpl(reinterpret_cast<uint8_t*>(_buf.get()),
reinterpret_cast<const uint8_t*>(src),
size / sizeof(uint8_t));
// case BYTEPS_FLOAT16:
// return _Packing(reinterpret_cast<int8_t*>(_buf.get()),
// reinterpret_cast<const int8_t*>(src), size);
case BYTEPS_FLOAT32:
return _Packing(reinterpret_cast<int32_t*>(_buf.get()),
return PackingImpl(reinterpret_cast<int32_t*>(_buf.get()),
reinterpret_cast<const float*>(src),
size / sizeof(int32_t));
case BYTEPS_FLOAT64:
return _Packing(reinterpret_cast<int64_t*>(_buf.get()),
return PackingImpl(reinterpret_cast<int64_t*>(_buf.get()),
reinterpret_cast<const double*>(src),
size / sizeof(int64_t));
default:
Expand All @@ -91,7 +91,7 @@ void RandomkCompressor::Compress(ByteBuf grad, int dtype, ByteBuf& compressed) {
}

template <typename index_t, typename scalar_t>
size_t RandomkCompressor::_Unpacking(scalar_t* dst, const index_t* src,
size_t RandomkCompressor::UnpackingImpl(scalar_t* dst, const index_t* src,
size_t len) {
static_assert(sizeof(index_t) == sizeof(scalar_t),
"index_t should be the same size as scalar_t");
Expand All @@ -116,22 +116,22 @@ size_t RandomkCompressor::Unpacking(void* dst, const void* src, size_t size,
int dtype) {
switch (dtype) {
case BYTEPS_INT8:
return _Unpacking(reinterpret_cast<int8_t*>(dst),
return UnpackingImpl(reinterpret_cast<int8_t*>(dst),
reinterpret_cast<const int8_t*>(src),
size / sizeof(int8_t) / 2);
case BYTEPS_UINT8:
return _Unpacking(reinterpret_cast<uint8_t*>(dst),
return UnpackingImpl(reinterpret_cast<uint8_t*>(dst),
reinterpret_cast<const uint8_t*>(src),
size / sizeof(uint8_t) / 2);
// case BYTEPS_FLOAT16:
// return _Unpacking(reinterpret_cast<int8_t*>(_buf.get()),
// reinterpret_cast<const int8_t*>(src), size);
case BYTEPS_FLOAT32:
return _Unpacking(reinterpret_cast<float*>(dst),
return UnpackingImpl(reinterpret_cast<float*>(dst),
reinterpret_cast<const int32_t*>(src),
size / sizeof(float) / 2);
case BYTEPS_FLOAT64:
return _Unpacking(reinterpret_cast<double*>(dst),
return UnpackingImpl(reinterpret_cast<double*>(dst),
reinterpret_cast<const int64_t*>(src),
size / sizeof(double) / 2);
default:
Expand Down
4 changes: 2 additions & 2 deletions byteps/common/compressor/strategy/randomk.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ class RandomkCompressor : public BaseCompressor {
size_t Packing(const void* src, size_t size, int dtype);

template <typename index_t, typename scalar_t>
size_t _Packing(index_t* dst, const scalar_t* src, size_t len);
size_t PackingImpl(index_t* dst, const scalar_t* src, size_t len);

size_t Unpacking(void* dst, const void* src, size_t size, int dtype);

template <typename index_t, typename scalar_t>
size_t _Unpacking(scalar_t* dst, const index_t* src, size_t len);
size_t UnpackingImpl(scalar_t* dst, const index_t* src, size_t len);

private:
int _k;
Expand Down
20 changes: 10 additions & 10 deletions byteps/common/compressor/strategy/topk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ TopkCompressor::TopkCompressor(int k) : _k(k){};
TopkCompressor::~TopkCompressor() = default;

template <typename index_t, typename scalar_t>
size_t TopkCompressor::_Packing(index_t* dst, const scalar_t* src, size_t len) {
size_t TopkCompressor::PackingImpl(index_t* dst, const scalar_t* src, size_t len) {
static_assert(sizeof(index_t) == sizeof(scalar_t),
"index_t should be the same size as scalar_t");
BPS_CHECK_LE(this->_k, len / 2);
Expand Down Expand Up @@ -78,22 +78,22 @@ size_t TopkCompressor::_Packing(index_t* dst, const scalar_t* src, size_t len) {
size_t TopkCompressor::Packing(const void* src, size_t size, int dtype) {
switch (dtype) {
case BYTEPS_INT8:
return _Packing(reinterpret_cast<int8_t*>(_buf.get()),
return PackingImpl(reinterpret_cast<int8_t*>(_buf.get()),
reinterpret_cast<const int8_t*>(src),
size / sizeof(int8_t));
case BYTEPS_UINT8:
return _Packing(reinterpret_cast<uint8_t*>(_buf.get()),
return PackingImpl(reinterpret_cast<uint8_t*>(_buf.get()),
reinterpret_cast<const uint8_t*>(src),
size / sizeof(uint8_t));
// case BYTEPS_FLOAT16:
// return _Packing(reinterpret_cast<int8_t*>(_buf.get()),
// reinterpret_cast<const int8_t*>(src), size);
case BYTEPS_FLOAT32:
return _Packing(reinterpret_cast<int32_t*>(_buf.get()),
return PackingImpl(reinterpret_cast<int32_t*>(_buf.get()),
reinterpret_cast<const float*>(src),
size / sizeof(int32_t));
case BYTEPS_FLOAT64:
return _Packing(reinterpret_cast<int64_t*>(_buf.get()),
return PackingImpl(reinterpret_cast<int64_t*>(_buf.get()),
reinterpret_cast<const double*>(src),
size / sizeof(int64_t));
default:
Expand All @@ -108,7 +108,7 @@ void TopkCompressor::Compress(ByteBuf grad, int dtype, ByteBuf& compressed) {
}

template <typename index_t, typename scalar_t>
size_t TopkCompressor::_Unpacking(scalar_t* dst, const index_t* src,
size_t TopkCompressor::UnpackingImpl(scalar_t* dst, const index_t* src,
size_t len) {
static_assert(sizeof(index_t) == sizeof(scalar_t),
"index_t should be the same size as scalar_t");
Expand All @@ -133,22 +133,22 @@ size_t TopkCompressor::Unpacking(void* dst, const void* src, size_t size,
int dtype) {
switch (dtype) {
case BYTEPS_INT8:
return _Unpacking(reinterpret_cast<int8_t*>(dst),
return UnpackingImpl(reinterpret_cast<int8_t*>(dst),
reinterpret_cast<const int8_t*>(src),
size / sizeof(int8_t) / 2);
case BYTEPS_UINT8:
return _Unpacking(reinterpret_cast<uint8_t*>(dst),
return UnpackingImpl(reinterpret_cast<uint8_t*>(dst),
reinterpret_cast<const uint8_t*>(src),
size / sizeof(uint8_t) / 2);
// case BYTEPS_FLOAT16:
// return _Unpacking(reinterpret_cast<int8_t*>(_buf.get()),
// reinterpret_cast<const int8_t*>(src), size);
case BYTEPS_FLOAT32:
return _Unpacking(reinterpret_cast<float*>(dst),
return UnpackingImpl(reinterpret_cast<float*>(dst),
reinterpret_cast<const int32_t*>(src),
size / sizeof(float) / 2);
case BYTEPS_FLOAT64:
return _Unpacking(reinterpret_cast<double*>(dst),
return UnpackingImpl(reinterpret_cast<double*>(dst),
reinterpret_cast<const int64_t*>(src),
size / sizeof(double) / 2);
default:
Expand Down
4 changes: 2 additions & 2 deletions byteps/common/compressor/strategy/topk.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ class TopkCompressor : public BaseCompressor {
size_t Packing(const void* src, size_t size, int dtype);

template <typename index_t, typename scalar_t>
size_t _Packing(index_t* dst, const scalar_t* src, size_t len);
size_t PackingImpl(index_t* dst, const scalar_t* src, size_t len);

size_t Unpacking(void* dst, const void* src, size_t size, int dtype);

template <typename index_t, typename scalar_t>
size_t _Unpacking(scalar_t* dst, const index_t* src, size_t len);
size_t UnpackingImpl(scalar_t* dst, const index_t* src, size_t len);

private:
int _k;
Expand Down
2 changes: 1 addition & 1 deletion byteps/common/global.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ void BytePSGlobal::Init() {
size_t pool_size = 4;
if (getenv("BYTEPS_THREADPOOL_SIZE")) {
pool_size = atoi(getenv("BYTEPS_THREADPOOL_SIZE"));
_thread_pool.reset(new ThreadPool(pool_size));
}
_thread_pool.reset(new ThreadPool(pool_size));
}

// ReadyTable for cross-PCIe-switch reduce
Expand Down

0 comments on commit 721b4ca

Please sign in to comment.