forked from membase/ep-engine
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmutation_log_compactor.cc
More file actions
108 lines (94 loc) · 3.84 KB
/
mutation_log_compactor.cc
File metadata and controls
108 lines (94 loc) · 3.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#include "config.h"
#include "mutation_log_compactor.hh"
#include "ep.hh"
/**
* Visit all the items in memory and dump them into a new mutation log file.
*/
class LogCompactionVisitor : public VBucketVisitor {
public:
LogCompactionVisitor(MutationLog &log, EPStats &st)
: mutationLog(log), stats(st), numItemsLogged(0), totalItemsLogged(0)
{ /* EMPTY */ }
void visit(StoredValue *v) {
if (!v->isDeleted() && v->hasId()) {
++numItemsLogged;
mutationLog.newItem(currentBucket->getId(), v->getKey(), v->getId());
}
}
bool visitBucket(RCPtr<VBucket> &vb) {
update();
return VBucketVisitor::visitBucket(vb);
}
void update() {
if (numItemsLogged > 0) {
mutationLog.commit1();
mutationLog.commit2();
getLogger()->log(EXTENSION_LOG_INFO, NULL,
"Mutation log compactor: Dumped %ld items from VBucket %d "
"into a new mutation log file.\n",
numItemsLogged, currentBucket->getId());
totalItemsLogged += numItemsLogged;
numItemsLogged = 0;
}
}
void complete() {
update();
getLogger()->log(EXTENSION_LOG_INFO, NULL,
"Mutation log compactor: Completed by dumping total %ld items "
"into a new mutation log file.\n", totalItemsLogged);
}
private:
MutationLog &mutationLog;
EPStats &stats;
size_t numItemsLogged;
size_t totalItemsLogged;
};
bool MutationLogCompactor::callback(Dispatcher &d, TaskId t) {
size_t num_new_items = mutationLog.itemsLogged[ML_NEW];
size_t num_del_items = mutationLog.itemsLogged[ML_DEL];
size_t num_logged_items = num_new_items + num_del_items;
size_t num_unique_items = num_new_items - num_del_items;
size_t queue_size = stats.queue_size.get() + stats.flusher_todo.get();
bool rv = true;
bool schedule_compactor =
mutationLog.logSize > compactorConfig.getMaxLogSize() &&
num_logged_items > (num_unique_items * compactorConfig.getMaxEntryRatio()) &&
queue_size < compactorConfig.getQueueCap();
if (schedule_compactor) {
std::string compact_file = mutationLog.getLogFile() + ".compact";
if (access(compact_file.c_str(), F_OK) == 0 &&
remove(compact_file.c_str()) != 0) {
getLogger()->log(EXTENSION_LOG_WARNING, NULL,
"Can't remove the existing compacted log file \"%s\"\n",
compact_file.c_str());
return false;
}
BlockTimer timer(&stats.mlogCompactorHisto, "klogCompactorTime", stats.timingLog);
epStore->pauseFlusher();
try {
MutationLog new_log(compact_file, mutationLog.getBlockSize());
new_log.open();
assert(new_log.isEnabled());
new_log.setSyncConfig(mutationLog.getSyncConfig());
LogCompactionVisitor compact_visitor(new_log, stats);
epStore->visit(compact_visitor);
mutationLog.replaceWith(new_log);
} catch (MutationLog::ReadException e) {
getLogger()->log(EXTENSION_LOG_WARNING, NULL,
"Error in creating a new mutation log for compaction: %s\n",
e.what());
} catch (...) {
getLogger()->log(EXTENSION_LOG_WARNING, NULL,
"Fatal error caught in task \"%s\"\n", description().c_str());
}
if (!mutationLog.isOpen()) {
mutationLog.disable();
rv = false;
}
epStore->resumeFlusher();
++stats.mlogCompactorRuns;
}
d.snooze(t, MUTATION_LOG_COMPACTOR_FREQ);
return rv;
}