Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion db/db_tunables.c
Original file line number Diff line number Diff line change
Expand Up @@ -518,13 +518,13 @@ extern int gbl_physrep_i_am_metadb;
extern int gbl_physrep_keepalive_v2;
extern int gbl_physrep_keepalive_freq_sec;
extern int gbl_physrep_max_candidates;
extern int gbl_physrep_max_pending_replicants;
extern int gbl_physrep_reconnect_penalty;
extern int gbl_physrep_reconnect_interval;
extern int gbl_physrep_shuffle_host_list;
extern int gbl_physrep_ignore_queues;
extern int gbl_physrep_max_rollback;
extern int gbl_physrep_filter_by_class;
extern int gbl_physrep_filter_by_class_warn;
extern int gbl_physrep_pollms;

/* source-name / host is from lrl */
Expand Down
8 changes: 3 additions & 5 deletions db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -1856,10 +1856,6 @@ REGISTER_TUNABLE("physrep_max_candidates",
"new physical replicant during registration. (Default: 6)",
TUNABLE_INTEGER, &gbl_physrep_max_candidates, 0, NULL,
NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_max_pending_replicants",
"There can be no more than this many physical replicants in "
"pending state. (Default: 10)",
TUNABLE_INTEGER, &gbl_physrep_max_pending_replicants, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_metadb_host", "List of physical replication metadb cluster hosts.", TUNABLE_STRING,
&gbl_physrep_metadb_host, READONLY, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_metadb_name", "Physical replication metadb cluster name.",
Expand Down Expand Up @@ -1888,8 +1884,10 @@ REGISTER_TUNABLE("query_comdb2db_for_absent_physrep_source_dbnum",
&gbl_query_comdb2db_for_absent_physrep_source_dbnum, INTERNAL, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_ignore_queues", "Don't replicate queues.", TUNABLE_BOOLEAN, &gbl_physrep_ignore_queues,
READONLY, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_filter_by_class", "Filter physrep replication by class. (Default: on)", TUNABLE_BOOLEAN,
REGISTER_TUNABLE("physrep_filter_by_class", "Filter physrep replication by class. (Default: off)", TUNABLE_BOOLEAN,
&gbl_physrep_filter_by_class, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_filter_by_class_warn", "Print warning message when told to replicate from lower tier. (Default: on)",
TUNABLE_BOOLEAN, &gbl_physrep_filter_by_class_warn, 1, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_max_rollback", "Maximum logs physrep can rollback. (Default: 0)", TUNABLE_INTEGER,
&gbl_physrep_max_rollback, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_pollms", "Physical replicant poll interval in milliseconds. (Default: 50)", TUNABLE_INTEGER,
Expand Down
22 changes: 10 additions & 12 deletions db/phys_rep.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ int gbl_physrep_reconnect_penalty = 0;
int gbl_blocking_physrep = 1;
int gbl_physrep_fanout = 8;
int gbl_physrep_max_candidates = 6;
int gbl_physrep_max_pending_replicants = 10;
int gbl_deferred_phys_flag = 0;
int gbl_physrep_source_nodes_refresh_freq_sec = 10;
int gbl_physrep_slow_replicant_check_freq_sec = 10;
Expand All @@ -74,7 +73,8 @@ int gbl_physrep_revconn_check_interval = 60;
int gbl_physrep_update_registry_interval = 60;
int gbl_physrep_shuffle_host_list = 0;
int gbl_physrep_i_am_metadb = 0;
int gbl_physrep_filter_by_class = 1;
int gbl_physrep_filter_by_class = 0;
int gbl_physrep_filter_by_class_warn = 0;
int gbl_started_physrep_threads = 0;

unsigned int physrep_min_logfile;
Expand Down Expand Up @@ -805,16 +805,13 @@ static int physrep_class_allow_source(const char *hostname)
CLASS_PROD = 5,
*/
int rtn = (my_class <= src_class);
if (!rtn) {
physrep_logmsg(LOGMSG_INFO, "%s discarding source %s class %s (my class is %s)\n", __func__, hostname,
mach_class_class2tier(src_class), mach_class_class2tier(my_class));
}
return rtn;
}

int physrep_allowed_source(const char *dbname, const char *hostname)
{
bdb_state_type *bdb_state = gbl_bdb_state;
int filter_by_class = gbl_physrep_filter_by_class;

/* Exclude anything which is part of this cluster */
if (!strcmp(dbname, gbl_dbname)) {
Expand All @@ -831,13 +828,14 @@ int physrep_allowed_source(const char *dbname, const char *hostname)
}
}

/* Exclude any lower tier */
if (gbl_physrep_filter_by_class && !physrep_class_allow_source(hostname)) {
if (gbl_physrep_debug) {
enum mach_class class = get_mach_class(hostname);
physrep_logmsg(LOGMSG_USER, "%s: discard lower-tier source, %s\n", __func__, mach_class_class2tier(class));
/* Exclude or warn on lower tier source */
if (filter_by_class || gbl_physrep_filter_by_class_warn) {
int rtn = physrep_class_allow_source(hostname);
if (rtn == 0) {
logmsg(LOGMSG_ERROR, "%s: %s physrep source in lower tier, %s/%s\n",
__func__, filter_by_class ? "Blocking" : "Warning", dbname, hostname);
}
return 0;
return filter_by_class ? rtn : 1;
}
return 1;
}
Expand Down
16 changes: 0 additions & 16 deletions lua/lib/physrep_register_replicant.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,6 @@ local function main(dbname, hostname, lsn, source_dbname, source_hosts)
-- from the comdb2_physrep_connections table.
db:exec("DELETE FROM comdb2_physrep_connections WHERE dbname = '" .. dbname .. "' AND host = '" .. hostname .. "'")

-- Try not to allow more than 'physrep_max_pending_replicants' replicant registrations in flight.
-- The following check is not perfect as it might not work if many requests show up at the same time.
-- In which case they all could get the same list of potential leader hosts that they can connect
-- against. And if they have 'physrep_shuffle_host_list' turned off, then the first host in the
-- list might end up supporting all the replicants.
-- We try to alleviate this by adding the following sleep for a random duration to spread out
-- these requests.
db:exec("SELECT sleep(abs(random()%10))")

local rs, rc = db:exec("SELECT COUNT(*) AS cnt FROM comdb2_physreps WHERE state='Pending'")
local row = rs:fetch()
if row.cnt > tunables["physrep_max_pending_replicants"] then
db:commit()
return
end

local physrep_fanout = tunables["physrep_fanout"]
local physrep_max_candidates = tunables["physrep_max_candidates"]
local firstfile = tunables["firstfile"]
Expand Down
5 changes: 0 additions & 5 deletions lua/syssp.c.in
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,6 @@ static int db_comdb_delete_sc_history(Lua L)
}

extern int gbl_physrep_fanout;
extern int gbl_physrep_max_pending_replicants;
extern int gbl_physrep_max_candidates;

static int db_comdb_physrep_tunables(Lua L)
Expand All @@ -598,10 +597,6 @@ static int db_comdb_physrep_tunables(Lua L)
lua_pushinteger(L, gbl_physrep_max_candidates);
lua_settable(L, -3);

lua_pushstring(L, "physrep_max_pending_replicants");
lua_pushinteger(L, gbl_physrep_max_pending_replicants);
lua_settable(L, -3);

int firstfile = 0;

/* Temporary glue */
Expand Down
3 changes: 1 addition & 2 deletions tests/tunables.test/t00_all_tunables.expected
Original file line number Diff line number Diff line change
Expand Up @@ -736,15 +736,14 @@
(name='physrep_debug', description='Print extended physrep trace. (Default: off)', type='BOOLEAN', value='OFF', read_only='N')
(name='physrep_exit_on_invalid_logstream', description='Exit physreps on invalid logstream. (Default: off)', type='BOOLEAN', value='OFF', read_only='N')
(name='physrep_fanout', description='Maximum number of physical replicants that a node can service (Default: 8)', type='INTEGER', value='8', read_only='N')
(name='physrep_filter_by_class', description='Filter physrep replication by class. (Default: on)', type='BOOLEAN', value='ON', read_only='N')
(name='physrep_filter_by_class', description='Filter physrep replication by class. (Default: on)', type='BOOLEAN', value='OFF', read_only='N')
(name='physrep_hung_replicant_check_freq_sec', description='Check for hung physical replicant this often. (Default: 10)', type='INTEGER', value='10', read_only='N')
(name='physrep_hung_replicant_threshold', description='Report if the physical replicant has been inactive for this duration. (Default: 60)', type='INTEGER', value='60', read_only='N')
(name='physrep_i_am_metadb', description='I am physical replication metadb (Default: off)', type='BOOLEAN', value='OFF', read_only='N')
(name='physrep_ignore_queues', description='Don't replicate queues.', type='BOOLEAN', value='ON', read_only='Y')
(name='physrep_keepalive_freq_sec', description='Periodically send lsn to source node after this interval. (Default: 10)', type='INTEGER', value='10', read_only='N')
(name='physrep_keepalive_v2', description='Use version 2 of keepalive which includes first lsn. (Default: off)', type='BOOLEAN', value='OFF', read_only='N')
(name='physrep_max_candidates', description='Maximum number of candidates that should be returned to a new physical replicant during registration. (Default: 6)', type='INTEGER', value='6', read_only='N')
(name='physrep_max_pending_replicants', description='There can be no more than this many physical replicants in pending state. (Default: 10)', type='INTEGER', value='10', read_only='N')
(name='physrep_max_rollback', description='Maximum logs physrep can rollback. (Default: 0)', type='INTEGER', value='0', read_only='N')
(name='physrep_metadb_host', description='List of physical replication metadb cluster hosts.', type='STRING', value=NULL, read_only='Y')
(name='physrep_metadb_name', description='Physical replication metadb cluster name.', type='STRING', value=NULL, read_only='Y')
Expand Down
Loading