Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
cb55f9e
initial draft of base reader
Sep 27, 2024
8d7d9e2
updates to base reader
Sep 30, 2024
ceae939
base reader working without cct creation
Oct 3, 2024
8f7880a
Base Reader for single threaded reading
Oct 13, 2024
65e4332
changes to make core reader more parallel-friendly
Oct 13, 2024
b959fee
updates to have strided unique_id, take care of instant events, and t…
Oct 14, 2024
1fd2bc9
style updates
Oct 14, 2024
4c15683
Merge branch 'develop' into abstract-base-reader
movsesyanae Oct 18, 2024
0aa4669
Implement otf2 reader with core reader
zoreoo Nov 4, 2024
4501825
Fix metrics for non-paired events
zoreoo Nov 4, 2024
e615e01
Fix another thing with metrics
zoreoo Nov 4, 2024
5a0157e
Ensure metrics_dict is reset
zoreoo Nov 4, 2024
02a00e5
minor update
Nov 8, 2024
27e95f8
updated non-existing int values to -1
Nov 8, 2024
83939a5
core_reader -- initial implementation to use narwhals df and support …
Nov 22, 2024
a434f2b
otf2_reader -- converting to use nw frame, failing mp, so forcing 1 p…
Nov 22, 2024
8e31839
trace -- updated constructor to use nw frame
Nov 22, 2024
5b2b08b
otf2 reader -- removing forced 1 process
Nov 22, 2024
7c6107b
requirements.txt -- added narwhals as dependency
Nov 22, 2024
a6ea249
otf2_reader -- default num processes restored to None
Nov 22, 2024
8ac685f
otf2_reader -- updated matching timestamp to account for shift
Nov 23, 2024
c0a7567
core_reader -- removing int cast for matching timestamp
Nov 23, 2024
a98cf1f
trace -- updating match_events to use nw dfs (polars api)
Nov 23, 2024
54e7437
trace -- updated match_caller_callee to use polars API with nw frame
Nov 23, 2024
80d898e
trace -- updated calc_inc_metrics to use nw frame
Nov 23, 2024
4674b35
trace -- updated match_exc_events for nw and minor update to match_in…
Nov 25, 2024
fe92ed7
trace -- fixed minor bug in match events and caller/callee
Dec 3, 2024
eae91c1
trace -- minor bugfix for calc_exc and reduced to 1 join op
Dec 3, 2024
9e8940a
trace and core_reader -- changing match_event to shared matching_even…
Dec 3, 2024
6860bcd
trace and core_reader -- moved back to matching_event and inc metrics…
Dec 3, 2024
2c2da02
trace -- update idle_time to use narwhals
Dec 4, 2024
c3dcf13
trace -- updated comm_matrix to use nw
Dec 4, 2024
91e78d3
trace -- updated time_profile to nw, still [WIP]
Dec 5, 2024
8f14d56
trace -- time_profile working with nw
Dec 18, 2024
c2dc2d9
trace -- flat profile converted to using narwhals
Dec 18, 2024
887def8
trace -- time profile bugfix
Dec 18, 2024
a660444
trace -- load imbalanced ported to narwhals
Dec 19, 2024
bfcebba
trace -- message histogram updated
Dec 19, 2024
e26cf53
trace -- updated calc_metrix to only calculate new metrics
Dec 19, 2024
cd13da2
trace -- changing exc to stop computation early when necessary
Dec 20, 2024
91e0635
otf2_reader -- updated to require frame backend
Dec 20, 2024
82dd153
trace -- updated from_otf2 to accept frame backend
Dec 20, 2024
5976498
trace -- updated comm_over_time to use nw
Dec 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 184 additions & 0 deletions pipit/readers/core_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
from typing import List, Dict

import numpy as np
import pandas
import numpy

from pipit.trace import Trace
import narwhals as nw
from narwhals.typing import IntoDataFrameT, FrameT
import narwhals.selectors as ncs

class CoreTraceReader:
"""
Helper Object to read traces from different sources and convert them into a common
format
"""

def __init__(self, start: int = 0, stride: int = 1, frame_backend=pandas.DataFrame):
"""
Should be called by each process to create an empty trace per process in the
reader. Creates the following data structures to represent an empty trace:
- events: Dict[int, Dict[int, List[Dict]]]
- stacks: Dict[int, Dict[int, List[int]]]
"""
# keep stride for how much unique id should be incremented
self.stride = stride

# keep track of a unique id for each event
self.unique_id = start - self.stride

# events are indexed by process number, then thread number
# stores a list of events
self.events: Dict[int, Dict[int, List[Dict]]] = {}

# stacks are indexed by process number, then thread number
# stores indices of events in the event list
self.stacks: Dict[int, Dict[int, List[int]]] = {}

# Set the frame backend
self.frame_backend = frame_backend

def add_event(self, event: Dict) -> None:
"""
Should be called to add each event to the trace. Will update the event lists and
stacks accordingly.
"""
# get process number -- if not present, set to 0
if "Process" in event:
process = event["Process"]
else:
process = 0

# get thread number -- if not present, set to 0
if "Thread" in event:
thread = event["Thread"]
else:
thread = 0
# event["Thread"] = 0

# assign a unique id to the event
event["unique_id"] = self.__get_unique_id()

# get event list
if process not in self.events:
self.events[process] = {}
if thread not in self.events[process]:
self.events[process][thread] = []
event_list = self.events[process][thread]

# get stack
if process not in self.stacks:
self.stacks[process] = {}
if thread not in self.stacks[process]:
self.stacks[process][thread] = []
stack: List[int] = self.stacks[process][thread]

# if the event is an enter event, add the event to the stack and update the
# parent-child relationships
if event["Event Type"] == "Enter":
self.__update_parent_child_relationships(event, stack, event_list, False)
elif event["Event Type"] == "Instant":
self.__update_parent_child_relationships(event, stack, event_list, True)
# if the event is a leave event, update the matching event and pop from the
# stack
elif event["Event Type"] == "Leave":
self.__update_match_event(event, stack, event_list)

# Finally add the event to the event list
event_list.append(event)

def finalize(self) -> IntoDataFrameT :
"""
Converts the events data structure into a pandas dataframe and returns it
"""
all_events = []
for process in self.events:
for thread in self.events[process]:
all_events.extend(self.events[process][thread])

# Create a trace_frame
trace_frame = nw.from_native(self.frame_backend(all_events))

# Fill null with -1 (since int32 does not allow nan)
trace_frame = trace_frame.with_columns([
nw.col(['_matching_event', '_parent', '_matching_timestamp']).fill_null(-1),
])

# Convert Name, Event Type, and Process to categorical memory savings
# Convert _matching_event, _parent, and _matching_timestamp to int, since they are indices
trace_frame = trace_frame.with_columns([
nw.col(['Name', 'Event Type', 'Process']).cast(nw.dtypes.Categorical),
nw.col(['_matching_event', '_parent']).cast(nw.dtypes.Int32),
])

# Return native because multiprocessing fails with narwhal frames
return trace_frame.to_native()

def __update_parent_child_relationships(
self, event: Dict, stack: List[int], event_list: List[Dict], is_instant: bool
) -> None:
"""
This method can be thought of the update upon an "Enter" event. It adds to the
stack and CCT
"""
if len(stack) == 0:
# root event
event["_parent"] = -1
else:
parent_event = event_list[stack[-1]]
event["_parent"] = parent_event["unique_id"]

# update stack
if not is_instant:
stack.append(len(event_list))

def __update_match_event(
self, leave_event: Dict, stack: List[int], event_list: List[Dict]
) -> None:
"""
This method can be thought of the update upon a "Leave" event. It pops from the
stack and updates the event list. We should look into using this function to add
artificial "Leave" events for unmatched "Enter" events
"""

while len(stack) > 0:

# popping matched events from the stack
enter_event = event_list[stack.pop()]

if enter_event["Name"] == leave_event["Name"]:
# matching event found

# update matching event ids
leave_event["_matching_event"] = enter_event["unique_id"]
enter_event["_matching_event"] = leave_event["unique_id"]

# update matching timestamps
leave_event["_matching_timestamp"] = enter_event["Timestamp (ns)"]
enter_event["_matching_timestamp"] = leave_event["Timestamp (ns)"]

break

def __get_unique_id(self) -> int:
self.unique_id += self.stride
return self.unique_id

def concat_trace_data(data_list: List[IntoDataFrameT]) -> FrameT:
"""
Concatenates the data from multiple trace readers into a single trace reader
"""
# Converting into nw frames
nw_frames = [nw.from_native(df) for df in data_list]

# Concatenating into a single frame
trace_frame: nw.DataFrame = nw.concat(nw_frames)

# Set index to unique_id
nw.maybe_set_index(trace_frame, "unique_id")

# Sort by timestamp and unique_id
trace_frame.sort("Timestamp (ns)", "unique_id")

# Return Narwhals frame
return trace_frame
Loading