Skip to content

Commit

Permalink
Projected views in katana.local [KAT-2411] (#918)
Browse files Browse the repository at this point in the history
* Adds iterators to DynamicBitset because I'm sick of not having them.
* Cleans up the PropertyGraph::MakeProjectedGraph method to accept type IDs and allow null selection.
* Add Graph.project to python to create a projected graph view.
  • Loading branch information
arthurp authored Feb 22, 2022
1 parent 77b2655 commit 95ad4cf
Show file tree
Hide file tree
Showing 27 changed files with 370 additions and 210 deletions.
14 changes: 14 additions & 0 deletions libgalois/include/katana/DynamicBitset.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <boost/mpl/has_xxx.hpp>

#include "katana/AtomicWrapper.h"
#include "katana/DynamicBitsetSlow.h"
#include "katana/Galois.h"
#include "katana/PODVector.h"
#include "katana/config.h"
Expand All @@ -45,6 +46,7 @@ namespace katana {
class KATANA_EXPORT DynamicBitset {
public: // types
using TItem = katana::CopyableAtomic<uint64_t>;
using iterator = DynamicBitsetIterator<DynamicBitset>;

private: // variables
katana::PODVector<TItem> bitvec_;
Expand Down Expand Up @@ -92,6 +94,18 @@ class KATANA_EXPORT DynamicBitset {
*/
auto& get_vec() { return bitvec_; }

iterator begin() const {
iterator bit0{this, 0, 0};
if (test(0)) {
// If bit 0 is set then we have the right iterator
return bit0;
}
// Otherwise, increment to find the first set bit.
return ++bit0;
}

iterator end() const { return {this, bitvec_.size(), 0}; }

/**
* Resizes the bitset.
*
Expand Down
17 changes: 14 additions & 3 deletions libgraph/include/katana/PropertyGraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,23 @@ class KATANA_EXPORT PropertyGraph {
const katana::RDGManifest& rdg_manifest,
const katana::RDGLoadOptions& opts, katana::TxnContext* txn_ctx);

/// Make a projected graph from a property graph. Shares state with
/// the original graph.
static std::unique_ptr<PropertyGraph> MakeProjectedGraph(
[[deprecated]] static std::unique_ptr<PropertyGraph> MakeProjectedGraph(
const PropertyGraph& pg, const std::vector<std::string>& node_types,
const std::vector<std::string>& edge_types);

/// Make a projected graph from a property graph. Shares state with
/// the original graph.
static Result<std::unique_ptr<PropertyGraph>> MakeProjectedGraph(
const PropertyGraph& pg,
std::optional<std::vector<std::string>> node_types,
std::optional<std::vector<std::string>> edge_types);

/// Make a projected graph from a property graph. Shares state with
/// the original graph.
static Result<std::unique_ptr<PropertyGraph>> MakeProjectedGraph(
const PropertyGraph& pg, std::optional<SetOfEntityTypeIDs> node_types,
std::optional<SetOfEntityTypeIDs> edge_types);

/// \return A copy of this with the same set of properties. The copy shares no
/// state with this.
Result<std::unique_ptr<PropertyGraph>> Copy(
Expand Down
54 changes: 35 additions & 19 deletions libgraph/src/PropertyGraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,36 @@ std::unique_ptr<katana::PropertyGraph>
katana::PropertyGraph::MakeProjectedGraph(
const PropertyGraph& pg, const std::vector<std::string>& node_types,
const std::vector<std::string>& edge_types) {
auto ret = MakeProjectedGraph(
pg, node_types.empty() ? std::nullopt : std::make_optional(node_types),
edge_types.empty() ? std::nullopt : std::make_optional(edge_types));
KATANA_LOG_VASSERT(ret.has_value(), "{}", ret.error());
return std::move(ret.value());
}

katana::Result<std::unique_ptr<katana::PropertyGraph>>
katana::PropertyGraph::MakeProjectedGraph(
const PropertyGraph& pg, std::optional<std::vector<std::string>> node_types,
std::optional<std::vector<std::string>> edge_types) {
std::optional<SetOfEntityTypeIDs> node_type_ids;
if (node_types) {
node_type_ids = KATANA_CHECKED(
pg.GetNodeTypeManager().GetEntityTypeIDs(node_types.value()));
}
std::optional<SetOfEntityTypeIDs> edge_type_ids;
if (edge_types) {
edge_type_ids = KATANA_CHECKED(
pg.GetEdgeTypeManager().GetEntityTypeIDs(edge_types.value()));
}
return MakeProjectedGraph(pg, node_type_ids, edge_type_ids);
}

/// Make a projected graph from a property graph. Shares state with
/// the original graph.
katana::Result<std::unique_ptr<katana::PropertyGraph>>
katana::PropertyGraph::MakeProjectedGraph(
const PropertyGraph& pg, std::optional<SetOfEntityTypeIDs> node_types,
std::optional<SetOfEntityTypeIDs> edge_types) {
const auto& topology = pg.topology();
if (topology.empty()) {
return std::make_unique<PropertyGraph>();
Expand All @@ -366,7 +396,7 @@ katana::PropertyGraph::MakeProjectedGraph(
NUMAArray<Node> original_to_projected_nodes_mapping;
original_to_projected_nodes_mapping.allocateInterleaved(topology.NumNodes());

if (node_types.empty()) {
if (!node_types) {
num_new_nodes = topology.NumNodes();
// set all nodes
katana::do_all(katana::iterate(topology.Nodes()), [&](auto src) {
Expand All @@ -378,21 +408,14 @@ katana::PropertyGraph::MakeProjectedGraph(
original_to_projected_nodes_mapping.begin(),
original_to_projected_nodes_mapping.end(), Node{0});

std::set<katana::EntityTypeID> node_entity_type_ids;

for (auto node_type : node_types) {
auto entity_type_id = pg.GetNodeEntityTypeID(node_type);
node_entity_type_ids.insert(entity_type_id);
}

katana::GAccumulator<uint32_t> accum_num_new_nodes;

katana::do_all(katana::iterate(topology.Nodes()), [&](auto src) {
for (auto type : node_entity_type_ids) {
for (auto type : node_types.value()) {
if (pg.DoesNodeHaveType(src, type)) {
accum_num_new_nodes += 1;
bitset_nodes.set(src);
// this sets the correspondign entry in the array to 1
// this sets the corresponding entry in the array to 1
// will perform a prefix sum on this array later on
original_to_projected_nodes_mapping[src] = 1;
return;
Expand Down Expand Up @@ -444,7 +467,7 @@ katana::PropertyGraph::MakeProjectedGraph(
// initializes the edge-index array to all zeros
katana::ParallelSTL::fill(out_indices.begin(), out_indices.end(), Edge{0});

if (edge_types.empty()) {
if (!edge_types) {
katana::GAccumulator<uint32_t> accum_num_new_edges;
// set all edges incident to projected nodes
katana::do_all(
Expand All @@ -464,13 +487,6 @@ katana::PropertyGraph::MakeProjectedGraph(

num_new_edges = accum_num_new_edges.reduce();
} else {
std::set<katana::EntityTypeID> edge_entity_type_ids;

for (auto edge_type : edge_types) {
auto entity_type_id = pg.GetEdgeEntityTypeID(edge_type);
edge_entity_type_ids.insert(entity_type_id);
}

katana::GAccumulator<uint32_t> accum_num_new_edges;

katana::do_all(
Expand All @@ -481,7 +497,7 @@ katana::PropertyGraph::MakeProjectedGraph(
for (Edge e : topology.OutEdges(old_src)) {
auto dest = topology.OutEdgeDst(e);
if (bitset_nodes.test(dest)) {
for (auto type : edge_entity_type_ids) {
for (auto type : edge_types.value()) {
if (pg.DoesEdgeHaveTypeFromTopoIndex(e, type)) {
accum_num_new_edges += 1;
bitset_edges.set(e);
Expand Down
10 changes: 8 additions & 2 deletions libgraph/test/projection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,14 @@ main(int argc, char** argv) {
std::vector<std::string> edge_types;
SplitString(edgeTypes, &edge_types);

auto pg_view = katana::PropertyGraph::MakeProjectedGraph(
full_graph, node_types, edge_types);
auto pg_view_res = katana::PropertyGraph::MakeProjectedGraph(
full_graph,
node_types.empty() ? std::nullopt : std::make_optional(node_types),
edge_types.empty() ? std::nullopt : std::make_optional(edge_types));
if (!pg_view_res) {
KATANA_LOG_FATAL("Failed to construct projection: {}", pg_view_res.error());
}
auto pg_view = std::move(pg_view_res.value());

katana::analytics::TemporaryPropertyGuard temp_node_property{
full_graph.NodeMutablePropertyView()};
Expand Down
9 changes: 7 additions & 2 deletions libgraph/test/transformation-view-optional-topology.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,13 @@ main(int argc, char** argv) {
std::vector<std::string> edge_types;
SplitString(edgeTypes, &edge_types);

auto pg_view =
katana::PropertyGraph::MakeProjectedGraph(pg, node_types, edge_types);
auto pg_view_res = katana::PropertyGraph::MakeProjectedGraph(
pg, node_types.empty() ? std::nullopt : std::make_optional(node_types),
edge_types.empty() ? std::nullopt : std::make_optional(edge_types));
if (!pg_view_res) {
KATANA_LOG_FATAL("Failed to construct projection: {}", pg_view_res.error());
}
auto pg_view = std::move(pg_view_res.value());

TestOptionalTopologyGenerationEdgeShuffleTopology(*pg_view);
TestOptionalTopologyGenerationShuffleTopology(*pg_view);
Expand Down
40 changes: 40 additions & 0 deletions libkatana_python_native/src/PropertyGraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,46 @@ DefPropertyGraph(py::module& m) {
katana::DefWithNumba<&PropertyGraph::NumNodes>(cls, "num_nodes");
katana::DefWithNumba<&PropertyGraph::NumEdges>(cls, "num_edges");

cls.def(
"project",
[](PropertyGraph& self, py::object node_types,
py::object edge_types) -> Result<std::shared_ptr<PropertyGraph>> {
std::optional<katana::SetOfEntityTypeIDs> node_type_ids;
if (!node_types.is_none()) {
node_type_ids = katana::SetOfEntityTypeIDs();
node_type_ids->resize(self.GetNodeTypeManager().GetNumEntityTypes());
for (auto& t : node_types) {
node_type_ids->set(py::cast<EntityType>(t).type_id);
}
}
std::optional<katana::SetOfEntityTypeIDs> edge_type_ids;
if (!edge_types.is_none()) {
edge_type_ids = katana::SetOfEntityTypeIDs();
edge_type_ids->resize(self.GetEdgeTypeManager().GetNumEntityTypes());
for (auto& t : edge_types) {
edge_type_ids->set(py::cast<EntityType>(t).type_id);
}
}

py::gil_scoped_release
guard; // graph projection may copy or load data.
// is_none is safe without the GIL because it is just a pointer compare.
return KATANA_CHECKED(PropertyGraph::MakeProjectedGraph(
self, node_type_ids, edge_type_ids));
},
py::arg("node_types") = py::none(), py::arg("edge_types") = py::none(),
R"""(
Get a projected view of the graph which only contains nodes or edges of
specific types.
:type node_types: Optional[Iterable[EntityType]]
:param node_types: A set of node types to include in the projected graph,
or ``None`` to keep all nodes.
:type edge_types: Optional[Iterable[EntityType]]
:param edge_types: A set of edge types to include in the projected graph,
or ``None`` to keep all edges on the selected nodes.
)""");

// GetLocalNodeID(NodeHandle) -> LocalNodeID - local node ID
cls.def(
"get_local_node_id",
Expand Down
105 changes: 105 additions & 0 deletions libsupport/include/katana/DynamicBitsetSlow.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,97 @@

namespace katana {

/// An iterator over dynamic bitsets.
///
/// Increment is an optimized linear search over the bitset, so iteration is
/// O(N) where N is the size of the bitset, not O(|S|) where |S| is the number
/// of set bits in the bit set.

// TODO(amp): This is a template so it can be used for both DynamicBitsetSlow
// and DynamicBitset. This can be made a simple class once DynamicBitsetSlow
// is removed.
template <typename DynamicBitsetType>
class KATANA_EXPORT DynamicBitsetIterator
: public std::iterator<
std::forward_iterator_tag, uint64_t, int64_t, const uint64_t*,
uint64_t> {
const DynamicBitsetType* underlying_;
uint64_t array_index_;
uint8_t bit_offset_;

public:
DynamicBitsetIterator(
const DynamicBitsetType* underlying, uint64_t array_index,
uint8_t bit_offset)
: underlying_(underlying),
array_index_(array_index),
bit_offset_(bit_offset) {}

DynamicBitsetIterator& operator++() {
// Step forward one to the bit we want to examine first.
bit_offset_++;
if (bit_offset_ > DynamicBitsetType::kNumBitsInUint64) {
bit_offset_ = 0;
array_index_++;
}

const auto& bitvec = underlying_->get_vec();
const size_t size = underlying_->size();

// Used only to make sure we stop on the last real used bit in cases where
// the number of bits is not a multiple of kNumBitsInUint64.
uint64_t current_bit_index = **this;

// The following code is optimized to make the search process fast for
// sparse bitsets. It's performance for densely filled bitsets should be
// good too.

// Iterate forward word by word
for (; array_index_ < bitvec.size(); array_index_++) {
uint64_t word = bitvec[array_index_].load(std::memory_order_relaxed);
// For each word we check if it is non-zero (that is it contains a 1 bit)
if (word != 0) {
// Iterate over the bits in the work
uint64_t bit_mask = uint64_t{1} << bit_offset_;
for (; bit_offset_ < DynamicBitsetType::kNumBitsInUint64 &&
current_bit_index < size;
bit_offset_++, current_bit_index++, bit_mask <<= 1) {
// Check if the bit is set. If it is we have reached where we need to be
if ((word & bit_mask) != 0) {
return *this;
}
}
// Reset bit_offset_ here so that we start from our last bit_offset_ and
// only reset when we roll over to the next word.
bit_offset_ = 0;
}
}
bit_offset_ = 0;
array_index_ = bitvec.size();
return *this;
}

DynamicBitsetIterator operator++(int) {
auto r = *this;
++(*this);
return r;
}

reference operator*() const {
return array_index_ * DynamicBitsetType::kNumBitsInUint64 + bit_offset_;
}

bool operator==(const DynamicBitsetIterator& other) {
return underlying_ == other.underlying_ &&
array_index_ == other.array_index_ &&
bit_offset_ == other.bit_offset_;
}

bool operator!=(const DynamicBitsetIterator& other) {
return !(*this == other);
}
};

//TODO(emcginnis): Remove this class entirely when DynamicBitset is available to libsupport
/**
* Concurrent, thread safe, serial implementation of a dynamically allocated bitset
Expand All @@ -24,6 +115,8 @@ class KATANA_EXPORT DynamicBitsetSlow {
size_t num_bits_{0};

public:
using iterator = DynamicBitsetIterator<DynamicBitsetSlow>;

static constexpr uint32_t kNumBitsInUint64 = sizeof(uint64_t) * CHAR_BIT;

explicit DynamicBitsetSlow(
Expand Down Expand Up @@ -75,6 +168,18 @@ class KATANA_EXPORT DynamicBitsetSlow {
*/
auto& get_vec() { return bitvec_; }

iterator begin() const {
iterator bit0{this, 0, 0};
if (test(0)) {
// If bit 0 is set then we have the right iterator
return bit0;
}
// Otherwise, increment to find the first set bit.
return ++bit0;
}

iterator end() const { return {this, bitvec_.size(), 0}; }

/**
* Resizes the bitset.
*
Expand Down
3 changes: 2 additions & 1 deletion libsupport/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ function(add_unit_test name)
)
endfunction()

add_unit_test(arrow)
add_unit_test(array-from-scalars)
add_unit_test(arrow)
add_unit_test(bitmath)
add_unit_test(cache)
add_unit_test(disjoint_range_iterator)
add_unit_test(dynamic-bitset)
add_unit_test(env)
add_unit_test(experimental)
add_unit_test(logging)
Expand Down
Loading

0 comments on commit 95ad4cf

Please sign in to comment.