Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion code/sonalyze/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ SUBDIRS=application \
common \
daemon \
data/card data/common data/cpusample data/gpusample data/node data/sample \
data/slurmjob data/slurmnode data/slurmpart \
data/samplejob data/slurmjob data/slurmnode data/slurmpart \
db db/errs db/filedb db/parse db/repr db/special \
table

Expand Down
116 changes: 100 additions & 16 deletions code/sonalyze/application/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ package application
import (
"fmt"
"io"
"time"

. "sonalyze/cmd"
"sonalyze/cmd/jobs"
. "sonalyze/common"
"sonalyze/data/sample"
"sonalyze/db"
"sonalyze/db/repr"
"sonalyze/db/special"
)

// Clearly, for `jobs` the file list thing is tricky b/c the list can be *either* sample data *or*
Expand All @@ -18,22 +23,7 @@ import (

func LocalSampleOperation(command SampleAnalysisCommand, _ io.Reader, stdout, stderr io.Writer) error {
args := command.SampleAnalysisFlags()

var filter sample.QueryFilter
filter.AllUsers, filter.SkipSystemUsers, filter.ExcludeSystemCommands, filter.ExcludeHeartbeat =
command.DefaultRecordFilters()
filter.HaveFrom = args.SourceArgs.HaveFrom
filter.FromDate = args.SourceArgs.FromDate
filter.HaveTo = args.SourceArgs.HaveTo
filter.ToDate = args.SourceArgs.ToDate
filter.Host = args.HostArgs.Host
filter.ExcludeSystemJobs = args.RecordFilterArgs.ExcludeSystemJobs
filter.User = args.RecordFilterArgs.User
filter.ExcludeUser = args.RecordFilterArgs.ExcludeUser
filter.Command = args.RecordFilterArgs.Command
filter.ExcludeCommand = args.RecordFilterArgs.ExcludeCommand
filter.Job = args.RecordFilterArgs.Job
filter.ExcludeJob = args.RecordFilterArgs.ExcludeJob
filter := BuildSampleFilter(command)

theLog, err := db.OpenReadOnlyDB(
command.ConfigFile(),
Expand All @@ -53,3 +43,97 @@ func LocalSampleOperation(command SampleAnalysisCommand, _ io.Reader, stdout, st

return command.Perform(stdout, cfg, theLog, filter, hosts, recordFilter)
}

type FileListJobsDataProvider struct {
provider db.DataProvider
isSample bool
}

func (fljdp *FileListJobsDataProvider) ReadSamples(
fromDate, toDate time.Time,
hosts *Hosts,
verbose bool,
) (
sampleBlobs [][]*repr.Sample,
softErrors int,
err error,
) {
if fljdp.isSample {
return fljdp.provider.ReadSamples(fromDate, toDate, hosts, verbose)
}
return nil, 0, nil
}

func (fljdp *FileListJobsDataProvider) ReadSacctData(
fromDate, toDate time.Time,
verbose bool,
) (
recordBlobs [][]*repr.SacctInfo,
softErrors int,
err error,
) {
if !fljdp.isSample {
return fljdp.provider.ReadSacctData(fromDate, toDate, verbose)
}
return nil, 0, nil
}

var _ = jobs.JobsDataProvider((*FileListJobsDataProvider)(nil))

func LocalJobsOperation(command *jobs.JobsCommand, _ io.Reader, stdout, stderr io.Writer) error {
args := command.SampleAnalysisFlags()
filter := BuildSampleFilter(command)
cfg, err := special.MaybeGetConfig(command.ConfigFile())
if err != nil {
return err
}
hosts, recordFilter, err := sample.BuildSampleFilter(cfg, filter, args.Verbose)
if err != nil {
return fmt.Errorf("Failed to create record filter: %v", err)
}

var theLog jobs.JobsDataProvider
if len(args.LogFiles) > 0 {
// We default to sample data, fall back to sacct data under a switch.
fljdp := &FileListJobsDataProvider{}
theLog = fljdp
if command.SlurmJobData {
fljdp.provider, err = db.OpenFileListDB(db.FileListSlurmJobData, args.LogFiles, cfg)
} else {
fljdp.isSample = true
fljdp.provider, err = db.OpenFileListDB(db.FileListSampleData, args.LogFiles, cfg)
}
} else {
if args.DataDir == "" {
return fmt.Errorf("Must have either dataDir or logFiles")
}
theLog, err = db.OpenPersistentDirectoryDB(args.DataDir, cfg)
}
if err != nil {
return fmt.Errorf("Failed to open log store: %v", err)
}

return command.Perform(stdout, cfg, theLog, filter, hosts, recordFilter)
}

func BuildSampleFilter(params SampleAnalysisParameters) sample.QueryFilter {
args := params.SampleAnalysisFlags()

var filter sample.QueryFilter
filter.AllUsers, filter.SkipSystemUsers, filter.ExcludeSystemCommands, filter.ExcludeHeartbeat =
params.DefaultRecordFilters()
filter.HaveFrom = args.SourceArgs.HaveFrom
filter.FromDate = args.SourceArgs.FromDate
filter.HaveTo = args.SourceArgs.HaveTo
filter.ToDate = args.SourceArgs.ToDate
filter.Host = args.HostArgs.Host
filter.ExcludeSystemJobs = args.RecordFilterArgs.ExcludeSystemJobs
filter.User = args.RecordFilterArgs.User
filter.ExcludeUser = args.RecordFilterArgs.ExcludeUser
filter.Command = args.RecordFilterArgs.Command
filter.ExcludeCommand = args.RecordFilterArgs.ExcludeCommand
filter.Job = args.RecordFilterArgs.Job
filter.ExcludeJob = args.RecordFilterArgs.ExcludeJob

return filter
}
8 changes: 4 additions & 4 deletions code/sonalyze/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ import (

// Standard method for cleaning an InputStreamSet relative to a config: the config must have a
// definition for each host. No config at all is a fatal error. No config for a host means we
// remove the host from the set, we return the modified set.
// remove the host from the set (imperatively).
//
// Over time this may become more complicated, as the config becomes time-dependent.
func EnsureConfigForInputStreams(
cfg *config.ClusterConfig,
streams sample.InputStreamSet,
reason string,
) (sample.InputStreamSet, error) {
) error {
// Bail if there's no config data at all.
if cfg == nil {
return nil, fmt.Errorf("Configuration file required: %s", reason)
return fmt.Errorf("Configuration file required: %s", reason)
}

// Remove streams for which we have no config data.
Expand All @@ -37,5 +37,5 @@ func EnsureConfigForInputStreams(
delete(streams, b)
}

return streams, nil
return nil
}
Loading