Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 77 additions & 10 deletions cmd/gsw_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"os/signal"
"strings"
"sync"
"syscall"

Expand All @@ -21,6 +22,11 @@ import (
_ "net/http/pprof"
)

type resolvedDBConfig struct {
v1 *db.InfluxDBV1Config
v2 *db.InfluxDBV2Config
}

var (
shmDir = flag.String("shm", "/dev/shm", "directory to use for shared memory")
configFilepath = flag.String("c", "gsw_service", "name of config file")
Expand Down Expand Up @@ -99,27 +105,86 @@ func decomInitialize(ctx context.Context, wg *sync.WaitGroup) map[int]chan []byt
return channelMap
}

func dbInitialize(ctx context.Context, channelMap map[int]chan []byte, host string, port int, wg *sync.WaitGroup) error {
dbHandler := db.InfluxDBV1Handler{}
if err := dbHandler.Initialize(host, port); err != nil {
return err
func dbInitialize(ctx context.Context, channelMap map[int]chan []byte, cfg resolvedDBConfig, wg *sync.WaitGroup) error {
var handler db.Handler

if cfg.v2 != nil {
h := &db.InfluxDBV2Handler{}
if err := h.InitializeWithConfig(*cfg.v2); err != nil {
return fmt.Errorf("initializing InfluxDB V2: %w", err)
}
handler = h
logger.Info("Using InfluxDB V2 handler with batching",
zap.Uint("batchSize", cfg.v2.BatchSize),
zap.Uint("flushIntervalMs", cfg.v2.FlushInterval),
)
} else if cfg.v1 != nil {
h := &db.InfluxDBV1Handler{}
if err := h.InitializeWithConfig(*cfg.v1); err != nil {
return fmt.Errorf("initializing InfluxDB V1: %w", err)
}
handler = h
logger.Info("Using InfluxDB V1 handler (UDP)")
} else {
return nil
}

for _, packet := range proc.GswConfig.TelemetryPackets {
wg.Add(1)
go func(packet tlm.TelemetryPacket, ch chan []byte) {
defer wg.Done()
proc.DatabaseWriter(ctx, &dbHandler, packet, ch)
proc.DatabaseWriter(ctx, handler, packet, ch)
}(packet, channelMap[packet.Port])
}
return nil
}

func resolveDBConfig(config *viper.Viper) (resolvedDBConfig, error) {
v2Map := config.GetStringMap("database_v2")
if len(v2Map) > 0 {
precision, err := db.ParsePrecision(config.GetString("database_v2.precision"))
if err != nil {
return resolvedDBConfig{}, fmt.Errorf("invalid database_v2.precision: %w", err)
}

v2cfg := db.InfluxDBV2Config{
URL: config.GetString("database_v2.url"),
Token: config.GetString("database_v2.token"),
Org: config.GetString("database_v2.org"),
Bucket: config.GetString("database_v2.bucket"),
BatchSize: uint(config.GetInt("database_v2.batch_size")),
FlushInterval: uint(config.GetInt("database_v2.flush_interval_ms")),
Precision: precision,
}

if v2cfg.URL == "" || v2cfg.Org == "" || v2cfg.Bucket == "" {
return resolvedDBConfig{}, errors.New("database_v2.url, database_v2.org, and database_v2.bucket are required when database_v2 is set")
}

return resolvedDBConfig{v2: &v2cfg}, nil
}

hostSet := config.IsSet("database_host_name")
portSet := config.IsSet("database_port_number")
if hostSet || portSet {
host := config.GetString("database_host_name")
port := config.GetInt("database_port_number")
if host == "" || port <= 0 {
return resolvedDBConfig{}, errors.New("database_host_name and database_port_number must both be set for InfluxDB V1")
}
v1cfg := db.InfluxDBV1Config{Host: host, Port: port}
return resolvedDBConfig{v1: &v1cfg}, nil
}

return resolvedDBConfig{}, nil
}

func readConfig() (*viper.Viper, int) {
config := viper.New()
config.SetConfigName(*configFilepath)
config.SetConfigType("yaml")
config.SetEnvPrefix("GSW")
config.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
config.AutomaticEnv()
config.AddConfigPath("data/config/")
err := config.ReadInConfig()
Expand Down Expand Up @@ -179,13 +244,15 @@ func main() {
// Start decom writers
channelMap := decomInitialize(ctx, &wg)

// Start DB writers
if config.IsSet("database_host_name") && config.IsSet("database_port_number") {
if err = dbInitialize(ctx, channelMap, config.GetString("database_host_name"), config.GetInt("database_port_number"), &wg); err != nil {
logger.Warn("DB Initialization failed, telemetry packets will not be published to the database", zap.Error(err))
resolvedDB, err := resolveDBConfig(config)
if err != nil {
logger.Warn("Database configuration is invalid; telemetry packets will not be published to the database", zap.Error(err))
} else if resolvedDB.v1 != nil || resolvedDB.v2 != nil {
if err = dbInitialize(ctx, channelMap, resolvedDB, &wg); err != nil {
logger.Warn("DB initialization failed, telemetry packets will not be published to the database", zap.Error(err))
}
} else {
logger.Warn("database_host_name or database_port_number is not set, telemetry packets will not be published to the database")
logger.Info("No database configuration found; telemetry packets will not be published to the database")
}

// Wait for shutdown signal
Expand Down
23 changes: 17 additions & 6 deletions data/config/gsw_service.yaml.example
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
# path to telemetry definitions
# Path to telemetry definitions
telemetry_config: data/config/backplane.yaml

# database defines the hostname and port of an InfluxDB v1 UDP input
database_host_name: localhost
database_port_number: 8089
# InfluxDB V1 (UDP)
# database_host_name: localhost
# database_port_number: 8089

# path to gsw_service logging config
logging_config: data/config/logger.yaml
# InfluxDB V2 (batched HTTP)
# If database_v2 is set, V2 will be used and V1 settings are ignored
database_v2:
url: http://localhost:8086
token: your-token-here
org: gsw
bucket: gsw
batch_size: 100 # points buffered before auto-flush
flush_interval_ms: 1000 # max ms before flushing partial batch
precision: ns # ns | us | ms | s

# Path to GSW service logging config
logging_config: data/config/logger.yaml
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/gdamore/tcell/v2 v2.7.1
github.com/google/gopacket v1.1.19
github.com/gorilla/websocket v1.5.3
github.com/influxdata/influxdb-client-go/v2 v2.14.0
github.com/joho/godotenv v1.5.1
github.com/rivo/tview v0.0.0-20250330220935-949945f8d922
github.com/spf13/viper v1.19.0
Expand All @@ -17,13 +18,17 @@ require (
)

require (
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gdamore/encoding v1.0.0 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/oapi-codegen/runtime v1.0.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
Expand Down
15 changes: 15 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ=
github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk=
github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand All @@ -14,14 +18,21 @@ github.com/gdamore/tcell/v2 v2.7.1 h1:TiCcmpWHiAU7F0rA2I3S2Y4mmLmO9KHxJ7E1QhYzQb
github.com/gdamore/tcell/v2 v2.7.1/go.mod h1:dSXtXTSK0VsW1biw65DZLZ2NKr7j0qP/0J7ONmsraWg=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/influxdata/influxdb-client-go/v2 v2.14.0 h1:AjbBfJuq+QoaXNcrova8smSjwJdUHnwvfjMF71M1iI4=
github.com/influxdata/influxdb-client-go/v2 v2.14.0/go.mod h1:Ahpm3QXKMJslpXl3IftVLVezreAUtBOTZssDrjZEFHI=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand All @@ -34,6 +45,8 @@ github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZ
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/oapi-codegen/runtime v1.0.0 h1:P4rqFX5fMFWqRzY9M/3YF9+aPSPPB06IzP2P7oOxrWo=
github.com/oapi-codegen/runtime v1.0.0/go.mod h1:LmCUMQuPB4M/nLXilQXhHw+BLZdDb18B34OO356yJ/A=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down Expand Up @@ -61,10 +74,12 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI=
github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg=
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
Expand Down
14 changes: 9 additions & 5 deletions lib/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package db

// Handler is an interface for database access implementations
type Handler interface {
// Initialize sets up the database client
Initialize(host string, port int) error
// Insert sends the measurement data to the database
// Insert sends the measurement data to the database.
Insert(measurements MeasurementGroup) error
// CreateQuery generates the database query for measurementGroup
// CreateQuery generates the database query for measurementGroup.
CreateQuery(measurements MeasurementGroup) string
// Close closes the database client when done
// Close closes the database client when done.
Close() error
}

// BatchHandler extends Handler with batch write support
type BatchHandler interface {
Handler
Flush() error
}

// MeasurementGroup is a group of measurements to be sent to the database
type MeasurementGroup struct {
DatabaseName string // Name of the database
Expand Down
13 changes: 12 additions & 1 deletion lib/db/influx_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,20 @@ type InfluxDBV1Handler struct {
addr string // IP address and port of InfluxDB
}

// InfluxDBV1Config holds the fields needed for UDP writes.
type InfluxDBV1Config struct {
Host string
Port int
}

// Initialize sets up the InfluxDB UDP connection
func (h *InfluxDBV1Handler) Initialize(host string, port int) error {
h.addr = fmt.Sprintf("%s:%d", host, port)
return h.InitializeWithConfig(InfluxDBV1Config{Host: host, Port: port})
}

// InitializeWithConfig sets up the InfluxDB UDP connection.
func (h *InfluxDBV1Handler) InitializeWithConfig(cfg InfluxDBV1Config) error {
h.addr = fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
addr, err := net.ResolveUDPAddr("udp", h.addr)
if err != nil {
return fmt.Errorf("resolving db address: %w", err)
Expand Down
Loading
Loading