Skip to content

Commit 2711d7f

Browse files
committed
fix and make somewhat fast
1 parent 2b9f78c commit 2711d7f

9 files changed

Lines changed: 284 additions & 116 deletions

File tree

include/bucket.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ struct SparseBucket {
1818
return position >> 8;
1919
}
2020
inline uint16_t row() const {
21-
return position & 0xFFFF;
21+
return position & 0xFF;
2222
}
2323
inline void set_col(uint16_t col) {
2424
position = (col << 8) + row();

include/cc_sketch_alg.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,9 @@ class CCSketchAlg {
201201
* Specifically, the delta is in the form of a pointer to raw bucket data.
202202
* @param src_vertex The vertex where the all edges originate.
203203
* @param raw_buckets Pointer to the array of buckets from the delta sketch
204+
* @param num_buckets Size of raw_buckets array in number of buckets
204205
*/
205-
void apply_raw_buckets_update(node_id_t src_vertex, Bucket *raw_buckets);
206+
void apply_raw_buckets_update(node_id_t src_vertex, Bucket *raw_buckets, size_t num_buckets);
206207

207208
/**
208209
* The function performs a direct update to the associated sketch.

include/dense_sketch.h

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,18 +77,19 @@ class DenseSketch {
7777
* @param cols_per_sample [Optional] Number of sketch columns for each sample (default = 1)
7878
*/
7979
DenseSketch(vec_t vector_len, uint64_t seed, size_t num_samples = 1,
80-
size_t cols_per_sample = default_cols_per_sample);
80+
size_t cols_per_sample = default_cols_per_sample);
8181

8282
/**
8383
* Construct a sketch from a serialized stream
8484
* @param vector_len Length of the vector we are sketching
8585
* @param seed Random seed of the sketch
8686
* @param binary_in Stream holding serialized sketch object
87+
* @param num_buckets Number of buckets in serialized sketch
8788
* @param num_samples [Optional] Number of samples this sketch supports (default = 1)
8889
* @param cols_per_sample [Optional] Number of sketch columns for each sample (default = 1)
8990
*/
90-
DenseSketch(vec_t vector_len, uint64_t seed, std::istream& binary_in, size_t num_samples = 1,
91-
size_t cols_per_sample = default_cols_per_sample);
91+
DenseSketch(vec_t vector_len, uint64_t seed, std::istream& binary_in, size_t num_buckets,
92+
size_t num_samples = 1, size_t cols_per_sample = default_cols_per_sample);
9293

9394
/**
9495
* Sketch copy constructor
@@ -139,9 +140,10 @@ class DenseSketch {
139140
* Perform an in-place merge function without another Sketch and instead
140141
* use a raw bucket memory.
141142
* We also allow for only a portion of the buckets to be merge at once
142-
* @param raw_bucket Raw bucket data to merge into this sketch
143+
* @param raw_bucket Raw bucket data to merge into this sketch
144+
* @param n_raw_buckets Size of raw_buckets in number of Bucket data-structures
143145
*/
144-
void merge_raw_bucket_buffer(const Bucket *raw_buckets);
146+
void merge_raw_bucket_buffer(const Bucket *raw_buckets, size_t n_raw_buckets);
145147

146148
/**
147149
* Zero out all the buckets of a sketch.

include/sparse_sketch.h

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,14 @@ class SparseSketch {
8282
/**
8383
* Reallocates the bucket array if necessary to either grow or shrink the dense region
8484
*/
85-
void reallocate_if_needed();
85+
void reallocate_if_needed(int delta);
86+
void dense_realloc(size_t new_num_dense_rows);
8687

8788
// This variable lets us know how many Buckets to allocate to make space for the SparseBuckets
8889
// that will be using that space
8990
size_t sparse_data_size = ceil(double(sparse_capacity) * sizeof(SparseBucket) / sizeof(Bucket));
9091

91-
void update_sparse(uint16_t pos, vec_t update_idx, vec_hash_t checksum);
92+
void update_sparse(SparseBucket to_add, bool realloc_if_needed = true);
9293
SketchSample sample_sparse(size_t first_col, size_t end_col);
9394

9495
inline Bucket& deterministic_bucket() {
@@ -151,11 +152,12 @@ class SparseSketch {
151152
* @param vector_len Length of the vector we are sketching
152153
* @param seed Random seed of the sketch
153154
* @param binary_in Stream holding serialized sketch object
155+
* @param num_buckets Number of buckets in serialized sketch (dense + sparse_capacity)
154156
* @param num_samples [Optional] Number of samples this sketch supports (default = 1)
155157
* @param cols_per_sample [Optional] Number of sketch columns for each sample (default = 1)
156158
*/
157-
SparseSketch(vec_t vector_len, uint64_t seed, std::istream& binary_in, size_t num_samples = 1,
158-
size_t cols_per_sample = default_cols_per_sample);
159+
SparseSketch(vec_t vector_len, uint64_t seed, std::istream& binary_in, size_t num_buckets,
160+
size_t num_samples = 1, size_t cols_per_sample = default_cols_per_sample);
159161

160162
/**
161163
* SparseSketch copy constructor
@@ -206,9 +208,10 @@ class SparseSketch {
206208
* Perform an in-place merge function without another Sketch and instead
207209
* use a raw bucket memory.
208210
* We also allow for only a portion of the buckets to be merge at once
209-
* @param raw_bucket Raw bucket data to merge into this sketch
211+
* @param raw_bucket Raw bucket data to merge into this sketch
212+
* @param n_raw_buckets Size of raw_buckets in number of Bucket data-structures
210213
*/
211-
void merge_raw_bucket_buffer(const Bucket *raw_buckets);
214+
void merge_raw_bucket_buffer(const Bucket *raw_buckets, size_t n_raw_buckets);
212215

213216
/**
214217
* Zero out all the buckets of a sketch.
@@ -240,6 +243,7 @@ class SparseSketch {
240243
inline size_t get_columns() const { return num_columns; }
241244
inline size_t get_buckets() const { return num_buckets; }
242245
inline size_t get_num_samples() const { return num_samples; }
246+
inline size_t get_num_dense_rows() const { return num_dense_rows; }
243247

244248
static size_t calc_bkt_per_col(size_t n) { return ceil(log2(n)) + 1; }
245249

src/cc_alg_configuration.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ std::ostream& operator<< (std::ostream &out, const CCAlgConfiguration &conf) {
3434
#else
3535
out << " Sketching algorithm = CameoSketch" << std::endl;
3636
#endif
37+
#ifdef L0_FULLY_DENSE
38+
out << " Sketch storage = Dense Matrix" << std::endl;
39+
#else
40+
out << " Sketch storage = Hybrid Matrix" << std::endl;
41+
#endif
3742
#ifdef NO_EAGER_DSU
3843
out << " Using Eager DSU = False" << std::endl;
3944
#else

src/cc_sketch_alg.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,10 @@ CCSketchAlg::CCSketchAlg(node_id_t num_vertices, size_t seed, std::ifstream &bin
5353

5454
for (node_id_t i = 0; i < num_vertices; ++i) {
5555
representatives->insert(i);
56-
sketches[i] = new Sketch(sketch_vec_len, seed, binary_stream, sketch_num_samples);
56+
size_t num_bkts_in_sketch;
57+
binary_stream.read((char *) &num_bkts_in_sketch, sizeof(num_bkts_in_sketch));
58+
sketches[i] =
59+
new Sketch(sketch_vec_len, seed, binary_stream, num_bkts_in_sketch, sketch_num_samples);
5760
}
5861
binary_stream.close();
5962

@@ -117,9 +120,10 @@ void CCSketchAlg::apply_update_batch(int thr_id, node_id_t src_vertex,
117120
sketches[src_vertex]->merge(delta_sketch);
118121
}
119122

120-
void CCSketchAlg::apply_raw_buckets_update(node_id_t src_vertex, Bucket *raw_buckets) {
123+
void CCSketchAlg::apply_raw_buckets_update(node_id_t src_vertex, Bucket *raw_buckets,
124+
size_t num_buckets) {
121125
std::lock_guard<std::mutex> lk(sketches[src_vertex]->mutex);
122-
sketches[src_vertex]->merge_raw_bucket_buffer(raw_buckets);
126+
sketches[src_vertex]->merge_raw_bucket_buffer(raw_buckets, num_buckets);
123127
}
124128

125129
// Note: for performance reasons route updates through the driver instead of calling this function
@@ -617,6 +621,8 @@ void CCSketchAlg::write_binary(const std::string &filename) {
617621
binary_out.write((char *)&num_vertices, sizeof(num_vertices));
618622
binary_out.write((char *)&config._sketches_factor, sizeof(config._sketches_factor));
619623
for (node_id_t i = 0; i < num_vertices; ++i) {
624+
size_t num_bkts_in_sketch = sketches[i]->get_buckets();
625+
binary_out.write((char*) &num_bkts_in_sketch, sizeof(num_bkts_in_sketch));
620626
sketches[i]->serialize(binary_out);
621627
}
622628
binary_out.close();

src/dense_sketch.cpp

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
#include "dense_sketch.h"
22

3+
#include <cassert>
34
#include <cstring>
5+
#include <exception>
46
#include <iostream>
57
#include <vector>
6-
#include <cassert>
78

89
DenseSketch::DenseSketch(vec_t vector_len, uint64_t seed, size_t _samples, size_t _cols)
910
: seed(seed),
@@ -22,13 +23,17 @@ DenseSketch::DenseSketch(vec_t vector_len, uint64_t seed, size_t _samples, size_
2223
}
2324
}
2425

25-
DenseSketch::DenseSketch(vec_t vector_len, uint64_t seed, std::istream &binary_in, size_t _samples,
26-
size_t _cols)
26+
DenseSketch::DenseSketch(vec_t vector_len, uint64_t seed, std::istream &binary_in,
27+
size_t num_buckets, size_t _samples, size_t _cols)
2728
: seed(seed),
2829
num_samples(_samples),
2930
cols_per_sample(_cols),
3031
num_columns(cols_per_sample * num_samples),
31-
bkt_per_col(calc_bkt_per_col(vector_len)) {
32+
bkt_per_col(calc_bkt_per_col(vector_len)),
33+
num_buckets(num_buckets) {
34+
if (num_buckets != num_columns * bkt_per_col + 1) {
35+
throw std::invalid_argument("Serial Constructor: Number of buckets does not match expectation");
36+
}
3237
num_buckets = num_columns * bkt_per_col + 1; // plus 1 for deterministic bucket
3338
buckets = new Bucket[num_buckets];
3439

@@ -188,7 +193,11 @@ void DenseSketch::range_merge(const DenseSketch &other, size_t start_sample, siz
188193
// std::cout << *this << std::endl;
189194
}
190195

191-
void DenseSketch::merge_raw_bucket_buffer(const Bucket *raw_buckets) {
196+
void DenseSketch::merge_raw_bucket_buffer(const Bucket *raw_buckets, size_t n_raw_buckets) {
197+
if (n_raw_buckets != num_buckets) {
198+
throw std::invalid_argument("Raw bucket buffer is not the same size as DenseSketch");
199+
}
200+
192201
for (size_t i = 0; i < num_buckets; i++) {
193202
buckets[i].alpha ^= raw_buckets[i].alpha;
194203
buckets[i].gamma ^= raw_buckets[i].gamma;

0 commit comments

Comments
 (0)