diff --git a/mlperf_logging/result_summarizer/result_summarizer.py b/mlperf_logging/result_summarizer/result_summarizer.py index 1198236..131d208 100644 --- a/mlperf_logging/result_summarizer/result_summarizer.py +++ b/mlperf_logging/result_summarizer/result_summarizer.py @@ -7,18 +7,22 @@ import os import re import sys +import traceback import itertools import pandas as pd import yaml +import numpy as np import hashlib import math import operator import uuid as uuidlib +import copy from ..compliance_checker import mlp_compliance from ..compliance_checker.mlp_compliance import usage_choices, rule_choices from ..compliance_checker.mlp_parser import parse_file +from ..rcp_checker import rcp_checker from ..benchmark_meta import get_allowed_benchmarks, get_result_file_counts @@ -263,12 +267,23 @@ def _get_weak_scaling_metric_schema(): } -def _get_empty_summary(usage, ruleset, weak_scaling=False): +def _get_strong_scaling_metric_schema(): + return { + 'time_to_train': float, + 'Energy': float, + 'GBS': float, + 'epochs': float, + 'RCP': str, + 'rcp_scaling_factor': float, + } + + +def _get_empty_summary(usage, ruleset, weak_scaling=False, detailed=False): return Summary( - _get_column_schema(usage, ruleset, weak_scaling=weak_scaling).keys()) + _get_column_schema(usage, ruleset, weak_scaling=weak_scaling, detailed=detailed).keys()) -def _get_column_schema(usage, ruleset, weak_scaling=False): +def _get_column_schema(usage, ruleset, weak_scaling=False, detailed=False): schema = { 'division': str, 'availability': str, @@ -289,9 +304,17 @@ def _get_column_schema(usage, ruleset, weak_scaling=False): for metric, dtype in _get_weak_scaling_metric_schema().items(): schema['{}:{}'.format(benchmark, metric)] = dtype else: - schema.update( - {b: float - for b in get_allowed_benchmarks(usage, ruleset)}) + if detailed: + benchmarks = get_allowed_benchmarks(usage, ruleset) + for benchmark in benchmarks: + for metric, dtype in _get_strong_scaling_metric_schema().items(): + schema['{}:{}'.format(benchmark, metric)] = dtype + else: + schema.update( + { + b: float for b in get_allowed_benchmarks(usage, ruleset) + } + ) schema.update({'details_url': str, 'code_url': str}) return schema @@ -404,8 +427,8 @@ def _compute_strong_score_standalone( power_score = olympic_avg power_score *= scaling_factor if return_full_scores: - return scores_track, power_scores_track, score, power_score - return score, power_score + return scores_track, power_scores_track, score, power_score, scaling_factor + return score, power_score, scaling_factor def _compute_weak_score_standalone(benchmark, system, has_power, benchmark_folder, usage, ruleset, desc = {"submitter": None}): @@ -474,31 +497,106 @@ def _compute_weak_score_standalone(benchmark, system, has_power, benchmark_folde -def _compute_strong_scaling_scores(desc, system_folder, usage, ruleset): +def _compute_strong_scaling_scores(desc, system_folder, usage, ruleset, division, rcp_bypass=False): # Collect scores for benchmarks. benchmark_scores = {} - benchmark_power_scores = {} - has_power = None + detailed_bechmark_scores = {} benchmark_folder_parent = os.path.join( system_folder, 'strong') if usage == 'hpc' else system_folder if not os.path.isdir(benchmark_folder_parent): - return benchmark_scores, benchmark_power_scores + return benchmark_scores, {} for benchmark_folder in _get_sub_folders(benchmark_folder_parent): folder_parts = benchmark_folder.split('/') # Check if this benchmark has power results has_power = _has_power(benchmark_folder) benchmark = _benchmark_alias(folder_parts[-1]) system = folder_parts[-3] if usage == 'hpc' else folder_parts[-2] - # Read scores from result files. - score, power_score = _compute_strong_score_standalone(benchmark, system, has_power, benchmark_folder, usage, ruleset, desc) + # Compute base perf/power scores + score, power_score, rcp_scaling_factor = _compute_strong_score_standalone( + benchmark, system, has_power, benchmark_folder, usage, ruleset, desc + ) + + # RCP/GBS/Epochs additions for closed division + benchmark_gbs = None + benchmark_epochs = None + benchmark_rcp = None + if division == 'closed': + pattern = '{folder}/result_*.txt'.format(folder=benchmark_folder) + result_files = glob.glob(pattern, recursive=True) + try: + # RCP check + verbose = False + bert_train_samples = False + rcp_pass, rcp_msg, _ = rcp_checker.check_directory( + benchmark_folder, + usage, + ruleset, + verbose, + bert_train_samples, + rcp_file=None, + rcp_pass='pruned_rcps', + rcp_bypass=rcp_bypass, + set_scaling=True, + ) + if not rcp_pass: + print( + 'ERROR: RCP Test Failed on {}/{}/{} with message: {}.'.format( + desc['submitter'], system, benchmark, rcp_msg + ) + ) + if rcp_msg == 'RCP found': + benchmark_rcp = 'Fail' + elif rcp_msg == 'RCP Interpolation': + benchmark_rcp = 'Interp. Fail' + elif 'Missing' in rcp_msg: + benchmark_rcp = 'Missing' + elif rcp_msg == 'Cannot find any RCPs': + benchmark_rcp = 'No RCP' + else: + benchmark_rcp = 'Unknown state' + else: + benchmark_rcp = 'Pass' + + # GBS and epochs + benchmark_gbs, subm_epochs, _ = rcp_checker.get_submission_epochs( + result_files, ruleset, bert_train_samples=False + ) + subm_epochs.sort() + samples_rejected = 1 + if len(subm_epochs) >= 2 * samples_rejected + 1: + benchmark_epochs = float( + np.mean( + subm_epochs[ + samples_rejected : len(subm_epochs) - samples_rejected + ] + ) + ) + except Exception as e: + print( + f"WARNING: RCP/GBS computation failed for {benchmark_folder}: {e}" + ) + traceback.print_exc() + + # Map into metric-suffixed keys for schema + detailed_bechmark_scores[f"{benchmark}:rcp_scaling_factor"] = float( + rcp_scaling_factor + ) if score is not None: - benchmark_scores[benchmark] = score + detailed_bechmark_scores[f"{benchmark}:time_to_train"] = score + if benchmark_gbs is not None: + detailed_bechmark_scores[f"{benchmark}:GBS"] = float(benchmark_gbs) + if benchmark_epochs is not None: + detailed_bechmark_scores[f"{benchmark}:samples_to_converge"] = float(benchmark_epochs) + if benchmark_rcp is not None: + detailed_bechmark_scores[f"{benchmark}:RCP"] = benchmark_rcp if power_score is not None: - benchmark_power_scores[benchmark] = power_score - _fill_empty_benchmark_scores(benchmark_scores, usage, ruleset) - if len(benchmark_power_scores) > 0: - _fill_empty_benchmark_scores(benchmark_power_scores, usage, ruleset) - return benchmark_scores, benchmark_power_scores + detailed_bechmark_scores[f"{benchmark}:Energy"] = power_score + benchmark_scores[f"{benchmark}"] = float( + rcp_scaling_factor + ) + _fill_empty_benchmark_scores(benchmark_scores, usage, ruleset, detailed=False) + _fill_empty_benchmark_scores(detailed_bechmark_scores, usage, ruleset, detailed=True) + return benchmark_scores, detailed_bechmark_scores def _compute_weak_scaling_scores(desc, system_folder, usage, ruleset): @@ -693,6 +791,7 @@ def _fill_empty_benchmark_scores( usage, ruleset, weak_scaling=False, + detailed=False, ): for benchmark in get_allowed_benchmarks(usage, ruleset): if weak_scaling: @@ -702,8 +801,19 @@ def _fill_empty_benchmark_scores( benchmark_scores[k] = None else: - if benchmark not in benchmark_scores: - benchmark_scores[benchmark] = None + if detailed: + strong_schema = _get_strong_scaling_metric_schema() + for metric, dtype in strong_schema.items(): + k = '{}:{}'.format(benchmark, metric) + if dtype is str: + if k not in benchmark_scores or benchmark_scores[k] is None: + benchmark_scores[k] = '' + else: + if k not in benchmark_scores: + benchmark_scores[k] = None + else: + if benchmark not in benchmark_scores: + benchmark_scores[benchmark] = None def _get_id_from_sysinfo(summary): @@ -841,7 +951,7 @@ def summarize_results(folder, usage, ruleset, csv_file=None, **kwargs): weak_scaling_summary = _get_empty_summary(usage, ruleset, weak_scaling=True) - power_summary = _get_empty_summary(usage, ruleset) + detailed_strong_scaling_summary = _get_empty_summary(usage, ruleset, detailed=True) power_weak_scaling_summary = _get_empty_summary(usage, ruleset, weak_scaling=True) for system_folder in _get_sub_folders(results_folder): folder_parts = system_folder.split('/') @@ -924,8 +1034,8 @@ def _check_and_update_system_specs(desc_keys, column_name, query=None): continue # Compute the scores. - strong_scaling_scores, power_scores = _compute_strong_scaling_scores( - desc, system_folder, usage, ruleset) + strong_scaling_scores, detailed_strong_scaling_scores = _compute_strong_scaling_scores( + desc, system_folder, usage, ruleset, system_specs["division"], rcp_bypass=False) if usage == 'hpc': weak_scaling_scores, power_scores_weak_scaling = _compute_weak_scaling_scores( desc, system_folder, usage, ruleset) @@ -950,17 +1060,18 @@ def _check_and_update_system_specs(desc_keys, column_name, query=None): urls.items(), ): weak_scaling_summary.push(column_name, value) - if len(power_scores) > 0: + if len(detailed_strong_scaling_scores) > 0: for column_name, value in itertools.chain( system_specs.items(), - power_scores.items(), + detailed_strong_scaling_scores.items(), urls.items(), ): - power_summary.push(column_name, value) - if column_name in strong_scaling_scores: - power_summary.push(column_name, strong_scaling_scores[column_name]) - else: - power_summary.push(column_name, value) + merged = ( + detailed_strong_scaling_scores[column_name] + if column_name in detailed_strong_scaling_scores + else value + ) + detailed_strong_scaling_summary.push(column_name, merged) if usage == 'hpc' and len(power_scores_weak_scaling) > 0: for column_name, value in itertools.chain( system_specs.items(), @@ -975,13 +1086,13 @@ def _check_and_update_system_specs(desc_keys, column_name, query=None): if len(weak_scaling_summary) > 0: weak_scaling_summary = weak_scaling_summary.to_dataframe().sort_values( _get_sort_by_column_names()).reset_index(drop=True) - if len(power_summary) > 0: - power_summary = power_summary.to_dataframe().sort_values( + if len(detailed_strong_scaling_summary) > 0: + detailed_strong_scaling_summary = detailed_strong_scaling_summary.to_dataframe().sort_values( _get_sort_by_column_names()).reset_index(drop=True) if len(power_weak_scaling_summary) > 0: power_weak_scaling_summary = power_weak_scaling_summary.to_dataframe().sort_values( _get_sort_by_column_names()).reset_index(drop=True) - return strong_scaling_summary, weak_scaling_summary, power_summary, power_weak_scaling_summary + return strong_scaling_summary, weak_scaling_summary, detailed_strong_scaling_summary, power_weak_scaling_summary @@ -1039,7 +1150,7 @@ def main(): strong_scaling_summaries = [] weak_scaling_summaries = [] - power_summaries = [] + detailed_strong_scaling_summaries = [] power_weak_scaling_summaries = [] def _update_summaries(folder): @@ -1047,7 +1158,7 @@ def _update_summaries(folder): config_path = os.path.join(os.path.dirname(__file__), "config.yaml") with open(config_path, "r") as f: config = yaml.safe_load(f) - strong_scaling_summary, weak_scaling_summary, power_summary, power_weak_scaling_summary = summarize_results( + strong_scaling_summary, weak_scaling_summary, detailed_strong_scaling_summary, power_weak_scaling_summary = summarize_results( folder, args.usage, args.ruleset, @@ -1055,7 +1166,7 @@ def _update_summaries(folder): generate_private_ids = args.generate_private_ids, ) else: - strong_scaling_summary, weak_scaling_summary, power_summary, power_weak_scaling_summary = summarize_results( + strong_scaling_summary, weak_scaling_summary, detailed_strong_scaling_summary, power_weak_scaling_summary = summarize_results( folder, args.usage, args.ruleset, @@ -1064,8 +1175,8 @@ def _update_summaries(folder): strong_scaling_summaries.append(strong_scaling_summary) if len(weak_scaling_summary) > 0: weak_scaling_summaries.append(weak_scaling_summary) - if len(power_summary) > 0: - power_summaries.append(power_summary) + if len(detailed_strong_scaling_summary) > 0: + detailed_strong_scaling_summaries.append(detailed_strong_scaling_summary) if len(power_weak_scaling_summary) > 0: power_weak_scaling_summaries.append(power_weak_scaling_summary) @@ -1180,13 +1291,14 @@ def _summaries_to_xlsx(summaries: pd.DataFrame, path, version): writer.save() # Print and write back results. - def _print_and_write(summaries, weak_scaling=False, mode='w', power = False): + def _print_and_write(summaries, weak_scaling=False, mode='w', power = False, detailed = False): if len(summaries) > 0: summaries = pd.concat(summaries).astype( _get_column_schema( args.usage, args.ruleset, weak_scaling=weak_scaling, + detailed=detailed ) ) if weak_scaling: @@ -1208,6 +1320,9 @@ def _print_and_write(summaries, weak_scaling=False, mode='w', power = False): specs_and_notes = [c for c in summaries.columns if c not in benchmarks] csv = csv.replace(".csv", "_power.csv") summaries.groupby(specs_and_notes).apply(lambda x: agg_columns_fn(x, benchmarks)).to_csv(csv, mode=mode) + elif detailed: + csv = csv.replace(".csv", "_detailed.csv") + summaries.to_csv(csv, index=False, mode=mode) else: summaries.to_csv(csv, index=False, mode=mode) json_path = "summary.json" if args.csv is None else f"""{csv.replace(".csv", ".json")}""" @@ -1224,7 +1339,7 @@ def _print_and_write(summaries, weak_scaling=False, mode='w', power = False): None, 'display.max_colwidth', None): _print_and_write(strong_scaling_summaries) _print_and_write(weak_scaling_summaries, weak_scaling=True, mode='a') - _print_and_write(power_summaries, mode='a', power=True) + _print_and_write(detailed_strong_scaling_summaries, mode='a', detailed=True) _print_and_write(power_weak_scaling_summaries, weak_scaling=True, mode='a', power=True)