diff --git a/src/object/srv_internal.h b/src/object/srv_internal.h index 3d3fbc15447..25e9894b512 100644 --- a/src/object/srv_internal.h +++ b/src/object/srv_internal.h @@ -1,6 +1,6 @@ /** * (C) Copyright 2016-2024 Intel Corporation. - * (C) Copyright 2025 Hewlett Packard Enterprise Development LP + * (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -26,7 +26,29 @@ extern struct dss_module_key obj_module_key; -struct migr_res_manager; +/* anchor point of resource waiter + * NB: resource control can be a independent library in the future. + */ +struct migr_res_waiter { + struct migrate_pool_tls *rw_tls; + /* link chain on resource manager */ + d_list_t rw_link; + /* quantity of resource being demanded */ + uint64_t rw_units; + /* start to wait since... */ + uint64_t rw_wait_since; + /* eventual to wait on */ + ABT_eventual rw_eventual; + /* for eventual */ + int *rw_rc; +}; + +/* resource handle */ +struct migr_res_handle { + int rh_type; + int rh_bkt; + uint64_t rh_units; +}; /* Per pool attached to the migrate tls(per xstream) */ struct migrate_pool_tls { @@ -80,8 +102,6 @@ struct migrate_pool_tls { /* The current in-flight data size */ uint64_t mpt_inflight_size; - struct migr_res_manager *mpt_rmg; - /* reference count for the structure */ uint64_t mpt_refcount; uint32_t mpt_opc; @@ -96,6 +116,11 @@ struct migrate_pool_tls { /* migration init error */ int mpt_init_err; + + /* Watchdog: track progress to detect complete rebuild hang */ + uint64_t mpt_last_progress_obj_count; /* obj_count at last check */ + uint64_t mpt_last_progress_rec_count; /* rec_count at last check */ + uint64_t mpt_last_progress_ts; /* time of last observed progress */ }; struct migrate_cont_hdl { diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index 2ccda79e4c2..a4e74e0f045 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -31,77 +31,147 @@ #pragma GCC diagnostic ignored "-Wframe-larger-than=" #endif -/* Max in-flight transfer size per xstream */ -/* Set the total in-flight size to be 50% of MAX DMA size for - * the moment, will adjust it later if needed. +enum migr_res_type { + MIGR_OBJ = 0, + MIGR_KEY, + MIGR_DATA, + MIGR_HULK, + MIGR_MAX, +}; + +/* Number of concurrent objects being migrated per engine, consider we have 8 resource + * buckets, this number divided by 8 is the per-bucket concurrency. + * + * Default=1600 means an engine can concurrently rebuild 100 EC(16+P) objects, which + * has high enough concurrency and wouldn't cause network resource exhaustion. */ -#define MIGR_TGT_INF_DATA (1 << 29) +enum { + MIGR_OBJ_CONCUR_MIN = 800, + MIGR_OBJ_CONCUR_DEF = 1600, + MIGR_OBJ_CONCUR_MAX = 3200, +}; +#define MIGRATE_OBJ_CONCUR_ENV "D_MIGRATE_OBJ_CONCUR" -/* Threshold for very large transfers. - * This may exceed the MIGR_TGT_INF_DATA limit to prevent starvation. - * Only one such transfer is allowed at a time. +/* OBJ_CUR + KEY_CUR is less than 1/3 of HW limit, the reason of being conservative + * is because we don't have precise control and some data pattern (fragmented RDMA) + * can lead to peak * workload. */ -#define MIGR_INF_DATA_HULK (1 << 28) +enum { + MIGR_KEY_CONCUR_MIN = (1600 - MIGR_OBJ_CONCUR_MIN), + MIGR_KEY_CONCUR_DEF = (4800 - MIGR_OBJ_CONCUR_DEF), + MIGR_KEY_CONCUR_MAX = (6400 - MIGR_OBJ_CONCUR_MAX), +}; +#define MIGRATE_KEY_CONCUR_ENV "D_MIGRATE_KEY_CONCUR" -/* Low water mark for DMA buffer usage, hulk transfer is allowed in this case. +/* Max in-flight data transfer size (in MB), default is 6GB. + * Giving there are 8 buckets by default, it's 1GB each bucket. */ -#define MIGR_INF_DATA_LWM (1 << 28) +enum { + MIGR_DATA_MB_MIN = (2ULL << 10), + MIGR_DATA_MB_DEF = (8ULL << 10), + MIGR_DATA_MB_MAX = (16ULL << 10), +}; +#define MIGRATE_DATA_MB_ENV "D_MIGRATE_DATA_MB" -#define ENV_MIGRATE_ULT_CNT "D_MIGRATE_ULT_CNT" +struct migr_res_manager; -/* Number of migration ULTs per target */ -#define MIGR_TGT_ULTS_MIN 100 -#define MIGR_TGT_ULTS_DEF 500 -#define MIGR_TGT_ULTS_MAX 2000 +/* resource consumed by migration */ +struct migr_resource { + /* back reference to res_manager */ + struct migr_res_manager *res_rmg; + /* upper limit of the resource */ + uint64_t res_limit; + /* used resource amount in "unit" */ + uint64_t res_used; + /* last time logging starveling */ + uint64_t res_log_since; + /* my bucket ID */ + int res_bkt; + /* active ULTs */ + int res_holders; + /* number of waiters on this resource */ + int res_waiters; + /* list head of all waiters */ + d_list_t res_waitq; + /* serialization */ + ABT_mutex res_mutex; + /* members for specific resource type */ + union { + /* MIGR_DATA only */ + struct { + /* total number of occurred memory errors */ + unsigned long mem_err; + /* waited more than 10 minutes, serious errors */ + unsigned long mem_ser_err; + /* number of revived ULTs after running into memory error */ + unsigned long mem_revived; + /* number of waiting ULTs */ + unsigned long mem_waiting; + /* TODO: add timeout error counter */ + } res_data; + /* may add other members for MIGR_OBJ and MIGR_KEY */ + }; +}; -/* 1/3 object ults, 2/3 key ULTs */ -#define MIGR_OBJ_ULT_PERCENT 33 +/* Distribute the global resource into 8 buckets to avoid high lock contention, meanwhile, + * units owned by each bucket can remain the same for engine configuration with different + * number of targets. + */ +enum { + MIGR_RES_BKT_MIN = 1, + MIGR_RES_BKT_DEF = 8, + MIGR_RES_BKT_MAX = 64, +}; +#define MIGRATE_RES_BUCKETS_ENV "D_MIGRATE_RES_BUCKETS" -#define MIGR_TGT_OBJ_ULTS(ults) ((ults * MIGR_OBJ_ULT_PERCENT) / 100) -#define MIGR_TGT_KEY_ULTS(ults) (ults - MIGR_TGT_OBJ_ULTS(ults)) +static unsigned int migr_res_buckets = MIGR_RES_BKT_DEF; enum { - MIGR_OBJ = 0, - MIGR_KEY, - MIGR_DATA, - MIGR_MAX, + MIGR_HULK_INF_MIN = 0, /* disable hulk data */ + MIGR_HULK_INF_DEF = 2, + MIGR_HULK_INF_MAX = 16, }; +#define MIGRATE_HULK_INF_ENV "D_MIGRATE_HULK_INF" -/* resource consumed by migration */ -struct migr_resource { - const char *res_name; - /* upper limit of the resource */ - long res_limit; - /* resource amount in "unit" */ - long res_units; - /* number of waiters on this resource */ - int res_waiters; - /* Only used by MIGR_DATA, it always allows exactly one ULT to use unbounded - * buffer for super large value (rare). - */ - int res_hulk; - /* ABT_cond for waiters */ - ABT_cond res_cond; +enum migr_bucket_type { + MIGR_BUCKET_MAP, /* shared, N to 1 map */ + MIGR_BUCKET_ROTATE, /* shared, rotating on buckets */ }; /* migration resources manager */ struct migr_res_manager { - ABT_mutex rmg_mutex; - struct migr_resource rmg_resources[MIGR_MAX]; + enum migr_res_type rmg_res_type; + /* type of bucket, see migr_bucket_type */ + enum migr_bucket_type rmg_bkt_type; + /* number of resource sharing buckets */ + int rmg_bkt_nr; + /* number of targets sharing the same resource bucket */ + int rmg_bkt_size; + /* round-robin bucket selector */ + ATOMIC int rmg_bkt_selector; + /* resource name */ + char *rmg_name; + /* bucket type name */ + char *rmg_bkt_tname; + /* all resource buckets */ + struct migr_resource *rmg_res_buckets; }; -struct migr_engine_res { - /* total ULTs per target, it a tunable which can be set by admin */ - unsigned int er_max_ults; - /* dss_tgt_nr resource managers */ - struct migr_res_manager *er_rmgs; -}; +/* per-engine resources */ +static struct migr_res_manager migr_res_managers[MIGR_MAX]; +static uint64_t migr_res_wd_since; +static uuid_t migr_uuid_null; -static struct migr_engine_res migr_eng_res; +static struct migr_resource * +migr_hdl2res(struct migr_res_handle *rsh); +static void +migr_res_wakeup_pool(uuid_t pool_uuid); struct migrate_one { struct migrate_pool_tls *mo_tls; struct iter_obj_arg *mo_obj_arg; + struct migr_res_handle mo_key_rsh; + struct migr_res_handle mo_data_rsh; daos_key_t mo_dkey; uint64_t mo_dkey_hash; uuid_t mo_pool_uuid; @@ -166,6 +236,8 @@ struct iter_cont_arg { uuid_t pool_hdl_uuid; uuid_t cont_uuid; uuid_t cont_hdl_uuid; + daos_handle_t cont_hdl; + struct cont_props cont_props; struct tree_cache_root *cont_root; unsigned int yield_freq; uint64_t *snaps; @@ -180,9 +252,15 @@ struct iter_obj_arg { uuid_t pool_uuid; uuid_t cont_uuid; daos_unit_oid_t oid; + struct migr_res_handle ioa_rsh; daos_handle_t ioa_oh; + daos_handle_t ioa_coh; int ioa_obj_ref; struct daos_oclass_attr ioa_oca; + /* RPC fanout of degraded I/O and enumeration, it should be deemed as + * amplification factor for resource management + */ + int ioa_fanout; daos_epoch_t epoch; daos_epoch_t punched_epoch; unsigned int shard; @@ -193,6 +271,10 @@ struct iter_obj_arg { uint32_t generation; }; +static int +migrate_try_obj_insert(struct migrate_pool_tls *tls, uuid_t co_uuid, daos_unit_oid_t oid, + daos_epoch_t epoch, daos_epoch_t punched_epoch, unsigned int shard, + unsigned int tgt_idx); void migrate_pool_tls_put(struct migrate_pool_tls *tls); @@ -685,14 +767,23 @@ mrone_recx_vos2_daos(struct migrate_one *mrone, int shard, daos_iod_t *iods, int mrone_recx_daos_vos_internal(mrone, false, shard, iods, iods_num); } +enum { + MEM_NO_WAIT, + MEM_WAIT, + MEM_LONG_WAIT, +}; + static int mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_t *sgls, daos_iod_t *iods, int iod_num, daos_epoch_t eph, uint32_t flags, d_iov_t *csum_iov_fetch, struct migrate_pool_tls *tls) { + struct migr_resource *res = migr_hdl2res(&mrone->mo_data_rsh); uint32_t *extra_arg = NULL; - int waited = 0; + uint64_t then = 0; + uint64_t now; int rc; + int wait = MEM_NO_WAIT; /* pass rebuild epoch by extra_arg */ if (flags & DIOF_FETCH_EPOCH_EC_AGG_BOUNDARY) { @@ -700,6 +791,7 @@ mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_ mrone->mo_epoch); extra_arg = (uint32_t *)mrone->mo_epoch; } + D_ASSERT(res != NULL); retry: rc = dsc_obj_fetch(oh, eph, &mrone->mo_dkey, iod_num, iods, sgls, NULL, flags, extra_arg, csum_iov_fetch); @@ -713,21 +805,40 @@ mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_ /* If pool map does not change, then let's retry for timeout, instead of * fail out. */ - DL_WARN(rc, DF_RB ": retry " DF_UOID, DP_RB_MPT(tls), DP_UOID(mrone->mo_oid)); - if (rc == -DER_NOMEM) { - /* sleep 10 seconds before retry, give other layers a chance to - * release resources. - */ - dss_sleep(10 * 1000); - if (waited != 0 && waited % 3600 == 0) { - DL_ERROR(rc, DF_RB ": waited memory for %d hour(s)", - DP_RB_MRO(mrone), waited / 3600); - } + if (rc != -DER_NOMEM) { + DL_WARN(rc, DF_RB ": retry " DF_UOID, DP_RB_MPT(tls), + DP_UOID(mrone->mo_oid)); + dss_sleep(1000); + D_GOTO(retry, rc); + } + + now = daos_gettime_coarse(); + if (wait == MEM_NO_WAIT) { + wait = MEM_WAIT; + res->res_data.mem_waiting++; + res->res_data.mem_err++; + then = now; + } + /* sleep a few seconds before retry, give other layers a chance to + * release resources. + */ + dss_sleep((10 + rand() % 20) * 1000); + if (wait != MEM_LONG_WAIT && now - then >= 600) { + wait = MEM_LONG_WAIT; /* flagged as long waiter */ + res->res_data.mem_ser_err++; /* counted as serious error */ + DL_ERROR(rc, + DF_RB " waited for 10 minutes, total memory errors: %lu/%lu," + " total waiters: %lu, total revived: %lu\n", + DP_RB_MRO(mrone), res->res_data.mem_ser_err, res->res_data.mem_err, + res->res_data.mem_waiting, res->res_data.mem_revived); } - waited += 10; D_GOTO(retry, rc); } - + if (wait != MEM_NO_WAIT) { + D_ASSERT(res->res_data.mem_waiting > 0); + res->res_data.mem_revived++; + res->res_data.mem_waiting--; + } return rc; } @@ -1883,99 +1994,194 @@ migrate_one_destroy(struct migrate_one *mrone) } static bool -migr_res_is_hulk(int res_type, long units) +migr_res_has_starveling(struct migr_resource *res, uint64_t now) { - return res_type == MIGR_DATA && units >= MIGR_INF_DATA_HULK; + static unsigned int starving_threshold = 30; /* 30 seconds */ + struct migr_res_waiter *waiter; + + if (res->res_waiters == 0) + return false; + + D_ASSERT(!d_list_empty(&res->res_waitq)); + waiter = d_list_entry(res->res_waitq.next, struct migr_res_waiter, rw_link); + + if (now - waiter->rw_wait_since < starving_threshold) + return false; + + /* If someone has already waited for more than 30 seconds, we should prioritize it and + * queue the current ULT. Log this waiter every 10 minutes so we can find out if it's + * blocked permentally. + */ + if (now - res->res_log_since > 600) { + D_DEBUG(DB_REBUILD, + DF_RB + " starving: res=%s, since=%lu, ask:limit=%lu:%lu, holders:waiters=%d:%d\n", + DP_RB_MPT(waiter->rw_tls), res->res_rmg->rmg_name, waiter->rw_wait_since, + waiter->rw_units, res->res_limit, res->res_holders, res->res_waiters); + res->res_log_since = now; + } + return true; } -static int -migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *yielded) +static void +migr_res_wakeup(struct migr_resource *res, uint64_t units, uuid_t pool_id) +{ + struct migr_res_waiter *waiter, *tmp; + + d_list_for_each_entry_safe(waiter, tmp, &res->res_waitq, rw_link) { + if (waiter->rw_units > units) + break; + + if (units == -1ULL) { + D_ASSERT(!uuid_is_null(pool_id)); + if (uuid_compare(pool_id, waiter->rw_tls->mpt_pool_uuid) != 0) + continue; + /* otherwise caller tries to abort rebuild for specified pool */ + } else { + D_ASSERT(uuid_is_null(pool_id)); + units -= waiter->rw_units; + } + + /* NB: for -1ULL case we should just wakeup who have the same uuid of the pool, + * however, ABT_cond can't support removing waiter from the queue, so we just + * wakeup everyone. + */ + res->res_waiters--; + d_list_del_init(&waiter->rw_link); + ABT_eventual_set(waiter->rw_eventual, NULL, 0); + } +} + +static bool +migr_res_is_hulk(uint64_t size) +{ + return size >= migr_res_managers[MIGR_DATA].rmg_res_buckets[0].res_limit; +} + +static struct migr_resource * +migr_type2res(int res_type) { - struct dss_module_info *dmi = dss_get_module_info(); struct migr_res_manager *rmg; - struct migr_resource *res; - bool is_hulk; - bool waited = false; - int rc = 0; + int bkt; - D_ASSERT(dmi->dmi_xs_id != 0); + rmg = &migr_res_managers[res_type]; + if (rmg->rmg_bkt_type == MIGR_BUCKET_MAP) { + struct dss_module_info *dmi = dss_get_module_info(); - rmg = &migr_eng_res.er_rmgs[dmi->dmi_tgt_id]; - if (tls->mpt_rmg == NULL) { - tls->mpt_rmg = rmg; + bkt = dmi->dmi_tgt_id / rmg->rmg_bkt_size; + D_ASSERT(bkt < rmg->rmg_bkt_nr); } else { - D_ASSERTF(tls->mpt_rmg == rmg, "target=%d, rmg_off=%d\n", dmi->dmi_tgt_id, - (int)(tls->mpt_rmg - &migr_eng_res.er_rmgs[0])); + /* MIGR_BUCKET_ROTATE */ + bkt = atomic_fetch_add(&rmg->rmg_bkt_selector, 1); + bkt %= rmg->rmg_bkt_nr; } + return &rmg->rmg_res_buckets[bkt]; +} + +static struct migr_resource * +migr_hdl2res(struct migr_res_handle *rsh) +{ + struct migr_res_manager *rmg; - res = &rmg->rmg_resources[res_type]; - is_hulk = migr_res_is_hulk(res_type, units); + rmg = &migr_res_managers[rsh->rh_type]; + D_ASSERT(rsh->rh_bkt < rmg->rmg_bkt_nr); + + return &rmg->rmg_res_buckets[rsh->rh_bkt]; +} + +static int +migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, + struct migr_res_handle *rsh) +{ + struct migr_resource *res; + struct migr_res_waiter waiter; + uint64_t now; + bool waited = false; + int rc = 0; + + waiter.rw_tls = tls; + waiter.rw_units = units; + waiter.rw_wait_since = 0; + waiter.rw_eventual = ABT_EVENTUAL_NULL; + + res = migr_type2res(res_type); + /* otherwise rebuild will hang forever */ + D_ASSERTF(units <= res->res_limit, "res=%s, units=%lu, limit=%lu\n", res->res_rmg->rmg_name, + units, res->res_limit); + + ABT_mutex_lock(res->res_mutex); while (1) { if (tls->mpt_fini) { rc = migrate_pool_tls_get_status(tls); D_GOTO(out, rc); } - if (is_hulk && res->res_hulk == 0 && res->res_units < MIGR_INF_DATA_LWM) { - /* skip the limit check and allow (only) one hulk transfer at a time */ - res->res_units += units; - res->res_hulk = 1; - break; - - } else if (!is_hulk && res->res_units + units <= res->res_limit) { - res->res_units += units; + now = daos_gettime_coarse(); + if ((waited || !migr_res_has_starveling(res, now)) && + (res->res_used + units <= res->res_limit)) { + res->res_used += units; break; } - ABT_mutex_lock(rmg->rmg_mutex); + if (waiter.rw_eventual == ABT_EVENTUAL_NULL) + rc = ABT_eventual_create(sizeof(*waiter.rw_rc), &waiter.rw_eventual); + else + rc = ABT_eventual_reset(waiter.rw_eventual); + + if (rc != ABT_SUCCESS) + D_GOTO(out, rc = dss_abterr2der(rc)); + res->res_waiters++; - if (res->res_waiters >= 100 && res->res_waiters % 100 == 0) { - D_DEBUG(DB_REBUILD, - "%d waiters are waiting on res=%s (target=%d, unit=%lu)\n", - res->res_waiters, res->res_name, dmi->dmi_tgt_id, units); + if (!waited) { + waiter.rw_wait_since = now; + waited = true; } + d_list_add_tail(&waiter.rw_link, &res->res_waitq); + ABT_mutex_unlock(res->res_mutex); + /* NB: we've released mutex at here, it's safe because Argobots can guarantee the + * wakeup event can be properly detected even if it happened before the wait call. + */ + ABT_eventual_wait(waiter.rw_eventual, (void **)&waiter.rw_rc); - ABT_cond_wait(res->res_cond, rmg->rmg_mutex); - res->res_waiters--; - ABT_mutex_unlock(rmg->rmg_mutex); - waited = true; + ABT_mutex_lock(res->res_mutex); + D_ASSERT(d_list_empty(&waiter.rw_link)); } - if (yielded) - *yielded = waited; + res->res_holders++; /* per-pool counters for rebuild status tracking */ if (res_type == MIGR_OBJ) tls->mpt_tgt_obj_ult_cnt++; else if (res_type == MIGR_KEY) tls->mpt_tgt_dkey_ult_cnt++; - else + else if (res_type == MIGR_DATA || res_type == MIGR_HULK) tls->mpt_inflight_size += units; D_DEBUG(DB_REBUILD, - "res=%s, hold=%lu, used=%lu, limit=%lu, waited=%d)\n" DF_RB - " obj_ults=%u, key_ults=%u, inf_data=" DF_U64 ")\n", - res->res_name, units, res->res_units, res->res_limit, waited, DP_RB_MPT(tls), + DF_RB " " + "res=%s units:total:limit=%lu:%lu:%lu waiters:holders=%d:%d, waited=%s, " + " obj_ults=%u, key_ults=%u, inf_data=" DF_U64 ")\n", + DP_RB_MPT(tls), res->res_rmg->rmg_name, units, res->res_used, res->res_limit, + res->res_waiters, res->res_holders, !!waiter.rw_wait_since ? "yes" : "no", tls->mpt_tgt_obj_ult_cnt, tls->mpt_tgt_dkey_ult_cnt, tls->mpt_inflight_size); + + rsh->rh_bkt = res->res_bkt; + rsh->rh_type = res_type; + rsh->rh_units = units; out: + ABT_mutex_unlock(res->res_mutex); + if (waiter.rw_eventual != ABT_EVENTUAL_NULL) + ABT_eventual_free(&waiter.rw_eventual); return rc; } static void -migrate_res_release(struct migrate_pool_tls *tls, int res_type, long units) +migrate_res_release(struct migrate_pool_tls *tls, struct migr_res_handle *rsh) { - struct migr_res_manager *rmg; - struct migr_resource *res; + struct migr_resource *res; + int res_type = rsh->rh_type; - rmg = tls->mpt_rmg; - D_ASSERT(rmg != NULL); - - res = &rmg->rmg_resources[res_type]; - - D_DEBUG(DB_REBUILD, - "%s: release=%lu, used=%lu, limit=%lu\n" DF_RB - " obj_ults=%u, key_ults=%u, inf_data=" DF_U64 ")\n", - res->res_name, units, res->res_units, res->res_limit, DP_RB_MPT(tls), - tls->mpt_tgt_obj_ult_cnt, tls->mpt_tgt_dkey_ult_cnt, tls->mpt_inflight_size); + res = migr_hdl2res(rsh); + ABT_mutex_lock(res->res_mutex); if (res_type == MIGR_OBJ) { D_ASSERT(tls->mpt_tgt_obj_ult_cnt > 0); @@ -1983,32 +2189,30 @@ migrate_res_release(struct migrate_pool_tls *tls, int res_type, long units) } else if (res_type == MIGR_KEY) { D_ASSERT(tls->mpt_tgt_dkey_ult_cnt > 0); tls->mpt_tgt_dkey_ult_cnt--; - } else { - D_ASSERT(tls->mpt_inflight_size >= units); - tls->mpt_inflight_size -= units; + } else if (res_type == MIGR_DATA || res_type == MIGR_HULK) { + D_ASSERT(tls->mpt_inflight_size >= rsh->rh_units); + tls->mpt_inflight_size -= rsh->rh_units; } - D_ASSERT(res->res_units >= units); - res->res_units -= units; - - if (migr_res_is_hulk(res_type, units)) { - D_ASSERT(res->res_hulk == 1); - res->res_hulk = 0; - } + D_ASSERT(res->res_used >= rsh->rh_units); + res->res_used -= rsh->rh_units; + D_ASSERT(res->res_holders > 0); + res->res_holders--; - if (res->res_waiters > 0) { - ABT_mutex_lock(rmg->rmg_mutex); - ABT_cond_signal(res->res_cond); - ABT_mutex_unlock(rmg->rmg_mutex); - } + D_ASSERT(res->res_waiters > 0 || d_list_empty(&res->res_waitq)); + if (res->res_waiters > 0) + migr_res_wakeup(res, res->res_limit - res->res_used, migr_uuid_null); + ABT_mutex_unlock(res->res_mutex); } static void migrate_one_ult(void *arg) { struct migrate_one *mrone = arg; + struct iter_obj_arg *ioa = mrone->mo_obj_arg; struct migrate_pool_tls *tls; - daos_size_t data_size; + daos_size_t data_size; + uint64_t data_units; int rc = 0; while (daos_fail_check(DAOS_REBUILD_TGT_REBUILD_HANG)) @@ -2021,21 +2225,24 @@ migrate_one_ult(void *arg) } data_size = daos_iods_len(mrone->mo_iods, mrone->mo_iod_num); - data_size += daos_iods_len(mrone->mo_iods_from_parity, - mrone->mo_iods_num_from_parity); + data_size += daos_iods_len(mrone->mo_iods_from_parity, mrone->mo_iods_num_from_parity); D_DEBUG(DB_TRACE, DF_RB ": mrone %p data size is " DF_U64 " %d/%d\n", DP_RB_MPT(tls), mrone, data_size, mrone->mo_iod_num, mrone->mo_iods_num_from_parity); D_ASSERT(data_size != (daos_size_t)-1); - rc = migrate_res_hold(tls, MIGR_DATA, data_size, NULL); + data_units = data_size * ioa->ioa_fanout; + if (migr_res_is_hulk(data_units)) + rc = migrate_res_hold(tls, MIGR_HULK, data_units, &mrone->mo_data_rsh); + else + rc = migrate_res_hold(tls, MIGR_DATA, data_units, &mrone->mo_data_rsh); if (rc) D_GOTO(out, rc); rc = migrate_dkey(tls, mrone, data_size); - migrate_res_release(tls, MIGR_DATA, data_size); + migrate_res_release(tls, &mrone->mo_data_rsh); D_DEBUG(DB_REBUILD, DF_RB ": " DF_UOID " layout %u migrate dkey " DF_KEY " inflight_size " DF_U64 @@ -2059,7 +2266,7 @@ migrate_one_ult(void *arg) tls->mpt_fini = 1; } out: - migrate_res_release(tls, MIGR_KEY, 1); + migrate_res_release(tls, &mrone->mo_key_rsh); migrate_one_destroy(mrone); } @@ -2852,7 +3059,7 @@ migrate_start_ult(struct enum_unpack_arg *unpack_arg) continue; } - rc = migrate_res_hold(tls, MIGR_KEY, 1, NULL); + rc = migrate_res_hold(tls, MIGR_KEY, arg->ioa_fanout, &mrone->mo_key_rsh); if (rc) break; d_list_del_init(&mrone->mo_list); @@ -2866,7 +3073,7 @@ migrate_start_ult(struct enum_unpack_arg *unpack_arg) rc = dss_ult_create(migrate_one_ult, mrone, DSS_XS_SELF, 0, MIGRATE_STACK_SIZE, NULL); if (rc) { - migrate_res_release(tls, MIGR_KEY, 1); + migrate_res_release(tls, &mrone->mo_key_rsh); migrate_one_destroy(mrone); break; } @@ -2882,8 +3089,7 @@ migrate_start_ult(struct enum_unpack_arg *unpack_arg) * Iterate akeys/dkeys of the object */ static int -migrate_one_epoch_object(daos_epoch_range_t *epr, struct migrate_pool_tls *tls, - struct iter_obj_arg *arg) +migrate_obj_epoch(struct migrate_pool_tls *tls, struct iter_obj_arg *arg, daos_epoch_range_t *epr) { daos_anchor_t anchor; daos_anchor_t dkey_anchor; @@ -3128,7 +3334,8 @@ struct migrate_stop_arg { uuid_t pool_uuid; unsigned int version; unsigned int generation; - unsigned int stop_count; + unsigned int tls_stopped; + unsigned int res_stopped; ABT_mutex stop_lock; }; @@ -3137,29 +3344,28 @@ migrate_fini_one_ult(void *data) { struct migrate_stop_arg *arg = data; struct migrate_pool_tls *tls; + bool last_one; int rc; - tls = migrate_pool_tls_lookup(arg->pool_uuid, arg->version, arg->generation); - if (tls == NULL) - return 0; - D_ASSERT(dss_get_module_info()->dmi_xs_id != 0); - tls->mpt_fini = 1; + tls = migrate_pool_tls_lookup(arg->pool_uuid, arg->version, arg->generation); ABT_mutex_lock(arg->stop_lock); - arg->stop_count++; + if (tls != NULL) { + arg->tls_stopped++; + tls->mpt_fini = 1; + } + + /* need to check "last_one" even if TLS of this target is gone (NULL) */ + arg->res_stopped++; + last_one = (arg->res_stopped == dss_tgt_nr); ABT_mutex_unlock(arg->stop_lock); - if (tls->mpt_rmg) { - struct migr_res_manager *rmg = tls->mpt_rmg; - int i; + if (last_one) + migr_res_wakeup_pool(arg->pool_uuid); - /* NB: no big deal but ULTs of all pools will be waken up */ - ABT_mutex_lock(rmg->rmg_mutex); - for (i = 0; i < MIGR_MAX; i++) - ABT_cond_broadcast(rmg->rmg_resources[i].res_cond); - ABT_mutex_unlock(rmg->rmg_mutex); - } + if (tls == NULL) + return 0; migrate_pool_tls_put(tls); /* lookup */ rc = ABT_eventual_wait(tls->mpt_done_eventual, NULL); @@ -3171,9 +3377,8 @@ migrate_fini_one_ult(void *data) rc = 0; } - migrate_pool_tls_put(tls); /* destroy */ - D_INFO("migrate fini one ult "DF_UUID"\n", DP_UUID(arg->pool_uuid)); + migrate_pool_tls_put(tls); /* destroy */ return rc; } @@ -3182,12 +3387,13 @@ void ds_migrate_stop(struct ds_pool *pool, unsigned int version, unsigned int generation) { struct migrate_stop_arg arg; - int rc; + int rc; uuid_copy(arg.pool_uuid, pool->sp_uuid); arg.version = version; arg.generation = generation; - arg.stop_count = 0; + arg.tls_stopped = 0; + arg.res_stopped = 0; rc = ABT_mutex_create(&arg.stop_lock); if (rc != ABT_SUCCESS) { D_ERROR(DF_UUID " migrate stop: %d\n", DP_UUID(pool->sp_uuid), rc); @@ -3195,11 +3401,11 @@ ds_migrate_stop(struct ds_pool *pool, unsigned int version, unsigned int generat } rc = ds_pool_thread_collective(pool->sp_uuid, 0, migrate_fini_one_ult, &arg, 0); - if (rc) + if (rc != 0) D_ERROR(DF_UUID" migrate stop: %d\n", DP_UUID(pool->sp_uuid), rc); - D_ASSERT(atomic_load(&pool->sp_rebuilding) >= arg.stop_count); - atomic_fetch_sub(&pool->sp_rebuilding, arg.stop_count); + D_ASSERT(atomic_load(&pool->sp_rebuilding) >= arg.tls_stopped); + atomic_fetch_sub(&pool->sp_rebuilding, arg.tls_stopped); ABT_mutex_free(&arg.stop_lock); D_INFO(DF_UUID" migrate stopped\n", DP_UUID(pool->sp_uuid)); @@ -3232,8 +3438,7 @@ migrate_obj_ult(void *data) struct migrate_pool_tls *tls = NULL; daos_epoch_range_t epr; daos_epoch_t stable_epoch = 0; - daos_handle_t coh = DAOS_HDL_INVAL; - struct cont_props props; + daos_handle_t coh = arg->ioa_coh; int i; int rc = 0; @@ -3244,27 +3449,6 @@ migrate_obj_ult(void *data) D_GOTO(free_notls, rc); } - /* Only reintegrating targets/pool needs to discard the object, - * if sp_need_discard is 0, either the target does not need to - * discard, or discard has been done. spc_discard_done means - * discarding has been done in the current VOS target. - */ - if (tls->mpt_pool->spc_pool->sp_need_discard) { - while(!tls->mpt_pool->spc_discard_done) { - D_DEBUG(DB_REBUILD, DF_RB ": wait for discard to finish.\n", - DP_RB_MPT(tls)); - dss_sleep(2 * 1000); - if (tls->mpt_fini) - D_GOTO(free_notls, rc); - } - if (tls->mpt_pool->spc_pool->sp_discard_status) { - rc = tls->mpt_pool->spc_pool->sp_discard_status; - D_DEBUG(DB_REBUILD, DF_RB ": discard failure: " DF_RC "\n", DP_RB_MPT(tls), - DP_RC(rc)); - D_GOTO(out, rc); - } - } - if (tls->mpt_reintegrating) { struct ds_cont_child *cont_child = NULL; @@ -3289,33 +3473,12 @@ migrate_obj_ult(void *data) ds_cont_child_put(cont_child); } - rc = dsc_pool_open(tls->mpt_pool_uuid, tls->mpt_poh_uuid, 0, NULL, - tls->mpt_pool->spc_pool->sp_map, &tls->mpt_svc_list, &tls->mpt_pool_hdl); - if (rc) { - DL_ERROR(rc, DF_RB ": dsc_pool_open failed", DP_RB_MPT(tls)); - D_GOTO(out, rc); - } - - rc = migrate_cont_open(tls, arg->cont_uuid, 0, &coh); - if (rc) { - DL_ERROR(rc, DF_RB ": migrate_cont_open failed", DP_RB_MPT(tls)); - D_GOTO(out, rc); - } - rc = dsc_obj_open(coh, arg->oid.id_pub, DAOS_OO_RO, &arg->ioa_oh); if (rc) { DL_ERROR(rc, DF_RB ": dsc_obj_open failed", DP_RB_MPT(tls)); D_GOTO(out, rc); } - dsc_cont_get_props(coh, &props); - rc = dsc_obj_id2oc_attr(arg->oid.id_pub, &props, &arg->ioa_oca); - if (rc) { - DL_ERROR(rc, DF_RB ": unknown object class: %u", DP_RB_MPT(tls), - daos_obj_id2class(arg->oid.id_pub)); - D_GOTO(out, rc); - } - for (i = 0; i < arg->snap_cnt; i++) { daos_epoch_t lower_epoch = 0; @@ -3335,7 +3498,7 @@ migrate_obj_ult(void *data) epr.epr_hi = arg->snaps[i]; D_DEBUG(DB_REBUILD, DF_RB ": rebuild_snap %d " DF_X64 "-" DF_X64 "\n", DP_RB_MPT(tls), i, epr.epr_lo, epr.epr_hi); - rc = migrate_one_epoch_object(&epr, tls, arg); + rc = migrate_obj_epoch(tls, arg, &epr); if (rc) D_GOTO(free, rc); } @@ -3351,7 +3514,7 @@ migrate_obj_ult(void *data) D_ASSERT(tls->mpt_max_eph != 0); epr.epr_hi = tls->mpt_max_eph; if (arg->epoch > 0) { - rc = migrate_one_epoch_object(&epr, tls, arg); + rc = migrate_obj_epoch(tls, arg, &epr); } else { /* The obj has been punched for this range */ D_DEBUG(DB_REBUILD, @@ -3390,7 +3553,7 @@ migrate_obj_ult(void *data) DP_RB_MPT(tls), DP_UOID(arg->oid), arg->shard, tls->mpt_tgt_obj_ult_cnt, tls->mpt_tgt_dkey_ult_cnt, tls->mpt_obj_count, DP_RC(rc)); free_notls: - migrate_res_release(tls, MIGR_OBJ, 1); + migrate_res_release(tls, &arg->ioa_rsh); migrate_obj_put(arg); } @@ -3403,10 +3566,9 @@ struct migrate_obj_val { /* This is still running on the main migration ULT */ static int -migrate_one_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_eph, - unsigned int shard, unsigned int tgt_idx, void *data) +migrate_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_eph, unsigned int shard, + unsigned int tgt_idx, struct iter_cont_arg *cont_arg) { - struct iter_cont_arg *cont_arg = data; struct iter_obj_arg *obj_arg; struct migrate_pool_tls *tls = cont_arg->pool_tls; daos_handle_t toh = tls->mpt_migrated_root_hdl; @@ -3433,6 +3595,7 @@ migrate_one_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_e uuid_copy(obj_arg->cont_uuid, cont_arg->cont_uuid); obj_arg->version = cont_arg->pool_tls->mpt_version; obj_arg->generation = cont_arg->pool_tls->mpt_generation; + obj_arg->ioa_coh = cont_arg->cont_hdl; if (cont_arg->snaps) { D_ALLOC(obj_arg->snaps, sizeof(*cont_arg->snaps) * cont_arg->snap_cnt); @@ -3444,10 +3607,30 @@ migrate_one_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_e sizeof(*obj_arg->snaps) * cont_arg->snap_cnt); } + rc = dsc_obj_id2oc_attr(oid.id_pub, &cont_arg->cont_props, &obj_arg->ioa_oca); + if (rc) { + DL_ERROR(rc, DF_RB ": unknown object class: %u", DP_RB_MPT(tls), + daos_obj_id2class(oid.id_pub)); + D_GOTO(free, rc); + } + + if (daos_oclass_is_ec(&obj_arg->ioa_oca)) /* RPC fanout has to be considered */ + obj_arg->ioa_fanout = MIN(16, obj_ec_data_tgt_nr(&obj_arg->ioa_oca)); + else + obj_arg->ioa_fanout = 1; + + rc = migrate_res_hold(cont_arg->pool_tls, MIGR_OBJ, obj_arg->ioa_fanout, &obj_arg->ioa_rsh); + if (rc != 0) { + DL_ERROR(rc, DF_UUID " enter migrate failed.", DP_UUID(cont_arg->cont_uuid)); + goto free; + } + D_ASSERT(tgt_idx == dss_get_module_info()->dmi_tgt_id); rc = dss_ult_create(migrate_obj_ult, obj_arg, DSS_XS_SELF, 0, MIGRATE_STACK_SIZE, NULL); - if (rc) + if (rc) { + migrate_res_release(cont_arg->pool_tls, &obj_arg->ioa_rsh); goto free; + } val.epoch = eph; val.shard = shard; @@ -3471,14 +3654,12 @@ static int migrate_obj_iter_cb(daos_handle_t ih, d_iov_t *key_iov, d_iov_t *val_iov, void *data) { struct iter_cont_arg *arg = data; - daos_unit_oid_t *oid = key_iov->iov_buf; + daos_unit_oid_t oid = *(daos_unit_oid_t *)key_iov->iov_buf; struct migrate_obj_val *obj_val = val_iov->iov_buf; daos_epoch_t epoch = obj_val->epoch; daos_epoch_t punched_epoch = obj_val->punched_epoch; unsigned int tgt_idx = obj_val->tgt_idx; - unsigned int shard = obj_val->shard; - d_iov_t tmp_iov; - bool yielded = false; + unsigned int shard = obj_val->shard; int rc; if (arg->pool_tls->mpt_fini) @@ -3486,38 +3667,26 @@ migrate_obj_iter_cb(daos_handle_t ih, d_iov_t *key_iov, d_iov_t *val_iov, void * D_DEBUG(DB_REBUILD, DF_RB ": obj migrate " DF_UUID "/" DF_UOID " %" PRIx64 " eph " DF_U64 " start\n", - DP_RB_MPT(arg->pool_tls), DP_UUID(arg->cont_uuid), DP_UOID(*oid), ih.cookie, epoch); + DP_RB_MPT(arg->pool_tls), DP_UUID(arg->cont_uuid), DP_UOID(oid), ih.cookie, epoch); - rc = migrate_res_hold(arg->pool_tls, MIGR_OBJ, 1, &yielded); + rc = dbtree_iter_delete(ih, NULL); if (rc) { - DL_ERROR(rc, DF_RB ": " DF_UUID " enter migrate failed.", DP_RB_MPT(arg->pool_tls), - DP_UUID(arg->cont_uuid)); + DL_ERROR(rc, DF_RB ": dbtree_iter_delete failed", DP_RB_MPT(arg->pool_tls)); return rc; } - rc = migrate_one_object(*oid, epoch, punched_epoch, shard, tgt_idx, arg); + rc = migrate_object(oid, epoch, punched_epoch, shard, tgt_idx, arg); if (rc != 0) { DL_ERROR(rc, DF_RB ": obj " DF_UOID " migration failed", DP_RB_MPT(arg->pool_tls), - DP_UOID(*oid)); - migrate_res_release(arg->pool_tls, MIGR_OBJ, 1); - return rc; - } - - /* migrate_res_hold possibly yielded the ULT, let's re-probe before delete */ - if (yielded) { - d_iov_set(&tmp_iov, oid, sizeof(*oid)); - rc = dbtree_iter_probe(ih, BTR_PROBE_EQ, DAOS_INTENT_MIGRATION, &tmp_iov, NULL); - if (rc) { - D_ASSERT(rc != -DER_NONEXIST); - DL_ERROR(rc, DF_RB ": obj " DF_UOID " probe failed", - DP_RB_MPT(arg->pool_tls), DP_UOID(*oid)); - return rc; + DP_UOID(oid)); + if (rc != -DER_SHUTDOWN) { + /* NB: rebuild failed anyway, it's not even matter to do this for now, + * but it could be necessary if we want to track rebuild progress + * and support resume in the future. + */ + migrate_try_obj_insert(arg->pool_tls, arg->cont_uuid, oid, epoch, + punched_epoch, shard, tgt_idx); } - } - - rc = dbtree_iter_delete(ih, NULL); - if (rc) { - DL_ERROR(rc, DF_RB ": dbtree_iter_delete failed", DP_RB_MPT(arg->pool_tls)); return rc; } @@ -3526,9 +3695,8 @@ migrate_obj_iter_cb(daos_handle_t ih, d_iov_t *key_iov, d_iov_t *val_iov, void * dss_sleep(0); } - /* re-probe the dbtree after deletion */ - rc = dbtree_iter_probe(ih, BTR_PROBE_FIRST, DAOS_INTENT_MIGRATION, - NULL, NULL); + /* re-probe the dbtree, because it has already deleted an element or yielded */ + rc = dbtree_iter_probe(ih, BTR_PROBE_FIRST, DAOS_INTENT_MIGRATION, NULL, NULL); if (rc == -DER_NONEXIST) return 1; else if (rc != 0) @@ -3628,14 +3796,25 @@ migrate_cont_iter_cb(daos_handle_t ih, d_iov_t *key_iov, arg.snap_cnt = fetch_arg.snap_cnt; arg.pool_tls = tls; uuid_copy(arg.cont_uuid, cont_uuid); + + rc = migrate_cont_open(tls, cont_uuid, 0, &arg.cont_hdl); + if (rc) { + DL_ERROR(rc, DF_RB ": migrate_cont_open failed", DP_RB_MPT(tls)); + D_GOTO(free, rc); + } + + dsc_cont_get_props(arg.cont_hdl, &arg.cont_props); while (!dbtree_is_empty(root->tcr_root_hdl)) { if (tls->mpt_fini) break; rc = dbtree_iterate(root->tcr_root_hdl, DAOS_INTENT_MIGRATION, false, migrate_obj_iter_cb, &arg); - if (rc || tls->mpt_fini) + if (rc || tls->mpt_fini) { + if (tls->mpt_status == 0) + tls->mpt_status = rc ?: -DER_SHUTDOWN; break; + } } D_DEBUG(DB_REBUILD, DF_RB ": iter cont " DF_UUID "/%" PRIx64 " finish.\n", DP_RB_MPT(tls), @@ -3689,20 +3868,51 @@ static void migrate_ult(void *arg) { struct migrate_pool_tls *pool_tls = arg; - int rc; + struct ds_pool_child *pool; + int rc = 0; D_ASSERT(pool_tls != NULL); + pool = pool_tls->mpt_pool; + /* Only reintegrating targets/pool needs to discard the object, if sp_need_discard is 0, + * either the target does not need to discard, or discard has been done. + * spc_discard_done means discarding has been done in the current VOS target. + */ + if (pool->spc_pool->sp_need_discard) { + while (!pool->spc_discard_done) { + D_DEBUG(DB_REBUILD, DF_RB " wait for discard to finish.\n", + DP_RB_MPT(pool_tls)); + dss_sleep(2 * 1000); + if (pool_tls->mpt_fini) + D_GOTO(out, rc); + } + if (pool->spc_pool->sp_discard_status) { + rc = pool->spc_pool->sp_discard_status; + D_DEBUG(DB_REBUILD, DF_RB " discard failure: " DF_RC, DP_RB_MPT(pool_tls), + DP_RC(rc)); + D_GOTO(out, rc); + } + } + + rc = + dsc_pool_open(pool_tls->mpt_pool_uuid, pool_tls->mpt_poh_uuid, 0, NULL, + pool->spc_pool->sp_map, &pool_tls->mpt_svc_list, &pool_tls->mpt_pool_hdl); + if (rc) { + DL_ERROR(rc, DF_RB ": dsc_pool_open failed", DP_RB_MPT(pool_tls)); + D_GOTO(out, rc); + } + while (!dbtree_is_empty(pool_tls->mpt_root_hdl) && !pool_tls->mpt_fini) { rc = dbtree_iterate(pool_tls->mpt_root_hdl, DAOS_INTENT_PURGE, false, migrate_cont_iter_cb, pool_tls); if (rc < 0) { DL_ERROR(rc, DF_RB ": dbtree iterate failed", DP_RB_MPT(pool_tls)); - if (pool_tls->mpt_status == 0) - pool_tls->mpt_status = rc; - break; + goto out; } } +out: + if (pool_tls->mpt_status == 0) + pool_tls->mpt_status = rc; pool_tls->mpt_ult_running = 0; migrate_pool_tls_put(pool_tls); @@ -4270,11 +4480,86 @@ struct migrate_query_arg { uint32_t generation; uint32_t ult_running; daos_rebuild_opc_t rebuild_op; - uint32_t mpt_reintegrating:1, - reint_post_start:1, - reint_post_processing:1; + uint32_t mpt_reintegrating : 1, reint_post_start : 1, reint_post_processing : 1, + res_watchdog : 1; }; +#define MIGR_STALL_LOG_INTERVAL 120 /* log at most once per 120 s */ +#define MIGR_STALL_DETECT_SECS 60 /* declare stall after 60 s without progress */ + +static bool +migr_tls_trigger_watchdog(struct migrate_pool_tls *tls) +{ + uint64_t now = daos_gettime_coarse(); + int tgt_id = dss_get_module_info()->dmi_tgt_id; + + if (tls->mpt_obj_count == 0) { /* no activity */ + tls->mpt_last_progress_ts = now; + return false; + } + + /* made some progress */ + if (tls->mpt_obj_count != tls->mpt_last_progress_obj_count || + tls->mpt_rec_count != tls->mpt_last_progress_rec_count) { + tls->mpt_last_progress_obj_count = tls->mpt_obj_count; + tls->mpt_last_progress_rec_count = tls->mpt_rec_count; + tls->mpt_last_progress_ts = now; + return false; + } + + /* no progress, but wait a little longer */ + if (now - tls->mpt_last_progress_ts < MIGR_STALL_DETECT_SECS || + now < migr_res_wd_since + MIGR_STALL_LOG_INTERVAL) + return false; + + /* time to trigger watchdog */ + D_WARN(DF_RB " TLS res watchdog: tgt=%d no progress for %lu secs (obj/rec both frozen)," + " obj_ults=%u key_ults=%u inflight_size=" DF_U64 " obj_count=" DF_U64 + " rec_count=" DF_U64 "\n", + DP_RB_MPT(tls), tgt_id, now - tls->mpt_last_progress_ts, tls->mpt_tgt_obj_ult_cnt, + tls->mpt_tgt_dkey_ult_cnt, tls->mpt_inflight_size, tls->mpt_obj_count, + tls->mpt_rec_count); + return true; +} + +static void +migr_res_watchdog(void) +{ + uint64_t now; + int i; + int j; + + now = daos_gettime_coarse(); + D_WARN("Rebuild resource watchdog warning:\n"); + for (i = 0; i < MIGR_MAX; i++) { + struct migr_res_manager *rmg = &migr_res_managers[i]; + struct migr_resource *res; + struct migr_res_waiter *waiter; + + D_WARN("resource=%s (%s)\n", rmg->rmg_name, rmg->rmg_bkt_tname); + for (j = 0; j < rmg->rmg_bkt_nr; j++) { + res = &rmg->rmg_res_buckets[j]; + + ABT_mutex_lock(res->res_mutex); + if (d_list_empty(&res->res_waitq)) { + D_WARN(" bucket=%d: limit=%lu, used=0, waiter=\n", j, + res->res_limit); + ABT_mutex_unlock(res->res_mutex); + continue; + } + + waiter = d_list_entry(res->res_waitq.next, struct migr_res_waiter, rw_link); + D_WARN(" bucket=%d: limit=" DF_U64 ", used=" DF_U64 ", waters=%d," + " head=(RB=" DF_RB ", units=" DF_U64 ", waited=" DF_U64 ")\n", + j, res->res_limit, res->res_used, res->res_waiters, + DP_RB_MPT(waiter->rw_tls), waiter->rw_units, + now - waiter->rw_wait_since); + ABT_mutex_unlock(res->res_mutex); + } + } + migr_res_wd_since = now; +} + static int migrate_check_one(void *data) { @@ -4290,7 +4575,9 @@ migrate_check_one(void *data) return 0; ult_cnt = tls->mpt_tgt_obj_ult_cnt + tls->mpt_tgt_dkey_ult_cnt; + ABT_mutex_lock(arg->status_lock); + arg->dms.dm_rec_count += tls->mpt_rec_count; arg->dms.dm_obj_count += tls->mpt_obj_count; arg->dms.dm_total_size += tls->mpt_size; @@ -4315,7 +4602,11 @@ migrate_check_one(void *data) arg->reint_post_processing = 1; } } + if (migr_tls_trigger_watchdog(tls)) + arg->res_watchdog = true; + ABT_mutex_unlock(arg->status_lock); + D_DEBUG(DB_REBUILD, DF_RB " status %d/%d/ ult %u/%u rec/obj/size " DF_U64 "/" DF_U64 "/" DF_U64 "\n", DP_RB_MQA(arg), tls->mpt_status, arg->dms.dm_status, tls->mpt_tgt_obj_ult_cnt, @@ -4381,6 +4672,9 @@ ds_migrate_query_status(uuid_t pool_uuid, uint32_t ver, unsigned int generation, D_GOTO(out, rc); } + if (arg.res_watchdog) + migr_res_watchdog(); + if (!gl_scan_done || arg.total_ult_cnt > 0 || arg.ult_running || arg.reint_post_processing) arg.dms.dm_migrating = 1; else @@ -4526,14 +4820,17 @@ ds_object_migrate_send(struct ds_pool *pool, uuid_t pool_hdl_uuid, uuid_t cont_h } static int -migr_res_init(struct migr_resource *res, const char *name, long limit) +migr_res_init(struct migr_res_manager *rmg, int bkt, uint64_t limit) { + struct migr_resource *res = &rmg->rmg_res_buckets[bkt]; int rc; memset(res, 0, sizeof(*res)); - res->res_name = name; + D_INIT_LIST_HEAD(&res->res_waitq); + res->res_rmg = rmg; + res->res_bkt = bkt; res->res_limit = limit; - rc = ABT_cond_create(&res->res_cond); + rc = ABT_mutex_create(&res->res_mutex); return (rc != ABT_SUCCESS) ? dss_abterr2der(rc) : 0; } @@ -4541,53 +4838,165 @@ migr_res_init(struct migr_resource *res, const char *name, long limit) static void migr_res_fini(struct migr_resource *res) { - if (res->res_cond) - ABT_cond_free(&res->res_cond); + D_ASSERT(!res->res_rmg || d_list_empty(&res->res_waitq)); /* unset or drained */ + if (res->res_mutex) + ABT_mutex_free(&res->res_mutex); } -int -obj_migrate_init(void) +static void +migr_res_wakeup_pool(uuid_t pool_uuid) { - unsigned int ults = MIGR_TGT_ULTS_DEF; - int i; - int rc = 0; - - D_CASSERT(MIGR_TGT_INF_DATA > MIGR_INF_DATA_LWM); - D_CASSERT(MIGR_TGT_INF_DATA > MIGR_INF_DATA_HULK); + int i; + int j; - d_getenv_uint(ENV_MIGRATE_ULT_CNT, &ults); - if (ults < MIGR_TGT_ULTS_MIN) - ults = MIGR_TGT_ULTS_MIN; - if (ults > MIGR_TGT_ULTS_MAX) - ults = MIGR_TGT_ULTS_MAX; + for (i = 0; i < MIGR_MAX; i++) { + struct migr_res_manager *rmg = &migr_res_managers[i]; - memset(&migr_eng_res, 0, sizeof(migr_eng_res)); - migr_eng_res.er_max_ults = ults; + /* on behalf of all target xstreams, wakeup ULTs for shared resources. */ + for (j = 0; j < rmg->rmg_bkt_nr; j++) { + struct migr_resource *res = &rmg->rmg_res_buckets[j]; - D_ASSERT(dss_tgt_nr > 0); - D_ALLOC(migr_eng_res.er_rmgs, sizeof(struct migr_res_manager) * dss_tgt_nr); - if (!migr_eng_res.er_rmgs) - return -DER_NOMEM; + /* locking is required by shared resource */ + ABT_mutex_lock(res->res_mutex); + migr_res_wakeup(res, -1ULL, pool_uuid); + ABT_mutex_unlock(res->res_mutex); + } + } +} - for (i = 0; i < dss_tgt_nr; i++) { - struct migr_res_manager *rmg = &migr_eng_res.er_rmgs[i]; +static int +migr_rmg_init(int type, uint64_t units) +{ + struct migr_res_manager *rmg; + char *name; + int bkt_nr; + int i; + int rc; + + if (type == MIGR_OBJ) { + name = "OBJ"; + } else if (type == MIGR_KEY) { + name = "KEY"; + } else if (type == MIGR_DATA) { + name = "DATA"; + } else if (type == MIGR_HULK) { + name = "HULK"; + } else { + name = "UNKNOWN"; + D_GOTO(failed, rc = -DER_INVAL); + } + bkt_nr = (type == MIGR_HULK) ? 1 : migr_res_buckets; + + rmg = &migr_res_managers[type]; + rmg->rmg_res_type = type; + rmg->rmg_name = name; + rmg->rmg_bkt_nr = bkt_nr; + rmg->rmg_bkt_size = dss_tgt_nr / bkt_nr; + if (dss_tgt_nr % bkt_nr == 0) { + rmg->rmg_bkt_type = MIGR_BUCKET_MAP; + rmg->rmg_bkt_tname = "map"; + } else { + rmg->rmg_bkt_type = MIGR_BUCKET_ROTATE; + rmg->rmg_bkt_tname = "rotate"; + } - rc = ABT_mutex_create(&rmg->rmg_mutex); - if (rc != ABT_SUCCESS) - D_GOTO(out, rc = dss_abterr2der(rc)); + D_ALLOC(rmg->rmg_res_buckets, bkt_nr * sizeof(*rmg->rmg_res_buckets)); + if (!rmg->rmg_res_buckets) + D_GOTO(failed, rc = -DER_NOMEM); - rc = migr_res_init(&rmg->rmg_resources[MIGR_OBJ], "OBJ", MIGR_TGT_OBJ_ULTS(ults)); + for (i = 0; i < bkt_nr; i++) { + rc = migr_res_init(rmg, i, units / bkt_nr); if (rc) - D_GOTO(out, rc); + D_GOTO(failed, rc); + } + return 0; +failed: + D_ERROR("Failed to initialize resource %s, error=%d\n", name, rc); + return rc; +} - rc = migr_res_init(&rmg->rmg_resources[MIGR_KEY], "KEY", MIGR_TGT_KEY_ULTS(ults)); - if (rc) - D_GOTO(out, rc); +static void +migr_rmg_fini(int type) +{ + struct migr_res_manager *rmg; + int i; - rc = migr_res_init(&rmg->rmg_resources[MIGR_DATA], "DATA", MIGR_TGT_INF_DATA); - if (rc) - D_GOTO(out, rc); + rmg = &migr_res_managers[type]; + if (!rmg->rmg_res_buckets) + return; + + for (i = 0; i < rmg->rmg_bkt_nr; i++) + migr_res_fini(&rmg->rmg_res_buckets[i]); + D_FREE(rmg->rmg_res_buckets); + memset(rmg, 0, sizeof(*rmg)); +} + +int +obj_migrate_init(void) +{ + uint32_t obj_units = MIGR_OBJ_CONCUR_DEF; + uint32_t key_units = MIGR_KEY_CONCUR_DEF; + uint64_t data_mb = MIGR_DATA_MB_DEF; + uint32_t hulk_inf = MIGR_HULK_INF_DEF; + int rc; + + uuid_clear(migr_uuid_null); + + rc = d_getenv_uint(MIGRATE_RES_BUCKETS_ENV, &migr_res_buckets); + if (rc == 0) { + if (migr_res_buckets < MIGR_RES_BKT_MIN) + migr_res_buckets = MIGR_RES_BKT_MIN; + if (migr_res_buckets > MIGR_RES_BKT_MAX) + migr_res_buckets = MIGR_RES_BKT_MAX; + } + D_ASSERT(dss_tgt_nr > 0); + if (migr_res_buckets > dss_tgt_nr) + migr_res_buckets = dss_tgt_nr; + + rc = d_getenv_uint(MIGRATE_OBJ_CONCUR_ENV, &obj_units); + if (rc == 0) { + if (obj_units < MIGR_OBJ_CONCUR_MIN) + obj_units = MIGR_OBJ_CONCUR_MIN; + if (obj_units > MIGR_OBJ_CONCUR_MAX) + obj_units = MIGR_OBJ_CONCUR_MAX; + } + rc = migr_rmg_init(MIGR_OBJ, obj_units); + if (rc) + D_GOTO(out, rc); + + rc = d_getenv_uint(MIGRATE_KEY_CONCUR_ENV, &key_units); + if (rc == 0) { + if (key_units < MIGR_KEY_CONCUR_MIN) + key_units = MIGR_KEY_CONCUR_MIN; + if (key_units > MIGR_KEY_CONCUR_MAX) + key_units = MIGR_KEY_CONCUR_MAX; } + rc = migr_rmg_init(MIGR_KEY, key_units); + if (rc) + D_GOTO(out, rc); + + data_mb = MIGR_DATA_MB_DEF; + rc = d_getenv_uint64_t(MIGRATE_DATA_MB_ENV, &data_mb); + if (rc == 0) { + if (data_mb < MIGR_DATA_MB_MIN) + data_mb = MIGR_DATA_MB_MIN; + if (data_mb > MIGR_DATA_MB_MAX) + data_mb = MIGR_DATA_MB_MAX; + } + rc = migr_rmg_init(MIGR_DATA, (data_mb << 20)); /* convert MiB into bytes */ + if (rc) + D_GOTO(out, rc); + + rc = d_getenv_uint(MIGRATE_HULK_INF_ENV, &hulk_inf); + if (rc == 0) { + /* NB: can be zero */ + if (hulk_inf > MIGR_HULK_INF_MAX) + hulk_inf = MIGR_HULK_INF_MAX; + } + rc = migr_rmg_init(MIGR_HULK, hulk_inf); + if (rc) + D_GOTO(out, rc); + return 0; out: obj_migrate_fini(); @@ -4598,18 +5007,7 @@ void obj_migrate_fini(void) { int i; - int j; - - if (migr_eng_res.er_rmgs) { - for (i = 0; i < dss_tgt_nr; i++) { - struct migr_res_manager *rmg = &migr_eng_res.er_rmgs[i]; - for (j = 0; j < MIGR_MAX; j++) - migr_res_fini(&rmg->rmg_resources[j]); - if (rmg->rmg_mutex) - ABT_mutex_free(&rmg->rmg_mutex); - } - D_FREE(migr_eng_res.er_rmgs); - } - memset(&migr_eng_res, 0, sizeof(migr_eng_res)); + for (i = 0; i < MIGR_MAX; i++) + migr_rmg_fini(i); }