From 06062a9133407596ae8fee6d19300becce86948a Mon Sep 17 00:00:00 2001 From: Maximilian Stefanac <137301184+corporatemax@users.noreply.github.com> Date: Mon, 25 Nov 2024 09:58:22 +0100 Subject: [PATCH] add error message in applog-stream for syslog-drain validation --- jobs/loggr-syslog-agent/templates/bpm.yml.erb | 1 + jobs/loggr-syslog-binding-cache/spec | 31 ++ .../templates/agent.crt.erb | 1 + .../templates/agent.key.erb | 1 + .../templates/agent_ca.crt.erb | 1 + .../templates/bpm.yml.erb | 15 + src/cmd/syslog-agent/app/config.go | 36 +- src/cmd/syslog-agent/app/syslog_agent.go | 21 +- .../app/syslog_agent_mtls_test.go | 6 +- src/cmd/syslog-agent/app/syslog_agent_test.go | 24 +- src/cmd/syslog-agent/main.go | 6 +- src/cmd/syslog-binding-cache/app/config.go | 20 +- .../app/syslog_binding_cache.go | 64 ++- .../app/syslog_binding_cache_test.go | 31 +- .../testhelper/spy_app_log_emitter.go | 25 + src/internal/testhelper/spy_log_client.go | 82 +++ .../bindingfakes}/fake_ipchecker.go | 6 +- .../blacklist}/blacklist_ranges.go | 2 +- .../blacklist}/blacklist_ranges_test.go | 50 +- .../binding/blacklist/blacklist_suite_test.go | 13 + src/pkg/binding/poller.go | 267 ++++++++- src/pkg/binding/poller_test.go | 510 +++++++++++++++--- src/pkg/egress/syslog/retry_writer_test.go | 77 --- src/pkg/egress/syslog/syslog_connector.go | 51 +- .../egress/syslog/syslog_connector_test.go | 32 +- src/pkg/egress/syslog/tcp.go | 9 + src/pkg/egress/syslog/tcp_test.go | 16 + src/pkg/egress/syslog/tls.go | 3 + src/pkg/egress/syslog/tls_test.go | 5 + src/pkg/egress/syslog/writer_factory.go | 8 +- src/pkg/egress/syslog/writer_factory_test.go | 22 +- src/pkg/ingress/applog/app_log_stream.go | 54 ++ src/pkg/ingress/applog/app_log_stream_test.go | 66 +++ src/pkg/ingress/applog/app_log_suite_test.go | 13 + .../bindings/filtered_binding_fetcher.go | 11 +- .../bindings/filtered_binding_fetcher_test.go | 7 +- 36 files changed, 1255 insertions(+), 332 deletions(-) create mode 100644 jobs/loggr-syslog-binding-cache/templates/agent.crt.erb create mode 100644 jobs/loggr-syslog-binding-cache/templates/agent.key.erb create mode 100644 jobs/loggr-syslog-binding-cache/templates/agent_ca.crt.erb create mode 100644 src/internal/testhelper/spy_app_log_emitter.go create mode 100644 src/internal/testhelper/spy_log_client.go rename src/pkg/{ingress/bindings/bindingsfakes => binding/bindingfakes}/fake_ipchecker.go (97%) rename src/pkg/{ingress/bindings => binding/blacklist}/blacklist_ranges.go (99%) rename src/pkg/{ingress/bindings => binding/blacklist}/blacklist_ranges_test.go (65%) create mode 100644 src/pkg/binding/blacklist/blacklist_suite_test.go create mode 100644 src/pkg/ingress/applog/app_log_stream.go create mode 100644 src/pkg/ingress/applog/app_log_stream_test.go create mode 100644 src/pkg/ingress/applog/app_log_suite_test.go diff --git a/jobs/loggr-syslog-agent/templates/bpm.yml.erb b/jobs/loggr-syslog-agent/templates/bpm.yml.erb index e56828fe6..4a1b5bb0f 100644 --- a/jobs/loggr-syslog-agent/templates/bpm.yml.erb +++ b/jobs/loggr-syslog-agent/templates/bpm.yml.erb @@ -37,6 +37,7 @@ "PPROF_PORT" => "#{p("metrics.pprof_port")}", "USE_RFC3339" => "#{p("logging.format.timestamp") == "rfc3339"}", "WARN_ON_INVALID_DRAINS" => "#{p("warn_on_invalid_drains")}", + "LOGGREGATOR_AGENT_ADDR" => "localhost:#{p('port')}", } } if_p("drain_cipher_suites") do | ciphers | diff --git a/jobs/loggr-syslog-binding-cache/spec b/jobs/loggr-syslog-binding-cache/spec index d09c781b8..9a60a025a 100644 --- a/jobs/loggr-syslog-binding-cache/spec +++ b/jobs/loggr-syslog-binding-cache/spec @@ -14,6 +14,9 @@ templates: metrics.key.erb: config/certs/metrics.key aggregate_drains.yml.erb: config/aggregate_drains.yml prom_scraper_config.yml.erb: config/prom_scraper_config.yml + agent.crt.erb: config/certs/agent.crt + agent.key.erb: config/certs/agent.key + agent_ca.crt.erb: config/certs/agent_ca.crt packages: - binding-cache @@ -134,3 +137,31 @@ properties: logging.format.timestamp: description: "Format for timestamp in component logs. Valid values are 'deprecated' and 'rfc3339'." default: "deprecated" + + agent.port: + description: "Port the agent is serving gRPC via mTLS" + default: 3458 + agent.ca_cert: + description: | + TLS loggregator root CA certificate. It is required for key/cert + verification. + agent.cert: + description: "TLS certificate for Syslog Binding Cache signed by the loggregator CA" + agent.key: + description: "TLS private key for Syslog Binding Cache signed by the loggregator CA" + agent.cipher_suites: + description: | + An ordered list of supported SSL cipher suites. Allowed cipher suites are + TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 and TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384. + default: "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256:TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" + + blacklisted_syslog_ranges: + description: | + A list of IP address ranges that are not allowed to be specified in + syslog drain binding URLs. + default: [] + example: [{start: 10.10.10.1, end: 10.10.10.10}] + + warn_on_invalid_drains: + description: "Whether to output log warnings on invalid drains" + default: true diff --git a/jobs/loggr-syslog-binding-cache/templates/agent.crt.erb b/jobs/loggr-syslog-binding-cache/templates/agent.crt.erb new file mode 100644 index 000000000..f01f3182a --- /dev/null +++ b/jobs/loggr-syslog-binding-cache/templates/agent.crt.erb @@ -0,0 +1 @@ +<%= p("agent.cert") %> diff --git a/jobs/loggr-syslog-binding-cache/templates/agent.key.erb b/jobs/loggr-syslog-binding-cache/templates/agent.key.erb new file mode 100644 index 000000000..211918fdd --- /dev/null +++ b/jobs/loggr-syslog-binding-cache/templates/agent.key.erb @@ -0,0 +1 @@ +<%= p("agent.key") %> diff --git a/jobs/loggr-syslog-binding-cache/templates/agent_ca.crt.erb b/jobs/loggr-syslog-binding-cache/templates/agent_ca.crt.erb new file mode 100644 index 000000000..19f95732d --- /dev/null +++ b/jobs/loggr-syslog-binding-cache/templates/agent_ca.crt.erb @@ -0,0 +1 @@ +<%= p("agent.ca_cert") %> diff --git a/jobs/loggr-syslog-binding-cache/templates/bpm.yml.erb b/jobs/loggr-syslog-binding-cache/templates/bpm.yml.erb index a91c6fe2b..9dce92fe6 100644 --- a/jobs/loggr-syslog-binding-cache/templates/bpm.yml.erb +++ b/jobs/loggr-syslog-binding-cache/templates/bpm.yml.erb @@ -1,4 +1,9 @@ <% + blacklisted_ranges = p("blacklisted_syslog_ranges") + blacklisted_ips = blacklisted_ranges.map do |range| + "#{range['start']}-#{range['end']}" + end.join(",") + certs_dir = "/var/vcap/jobs/loggr-syslog-binding-cache/config/certs" api_url = link("cloud_controller").address if_p("api.override_url") { @@ -32,6 +37,16 @@ "DEBUG_METRICS" => "#{p("metrics.debug")}", "PPROF_PORT" => "#{p("metrics.pprof_port")}", "USE_RFC3339" => "#{p("logging.format.timestamp") == "rfc3339"}", + + "AGENT_CA_FILE_PATH" => "#{certs_dir}/agent_ca.crt", + "AGENT_CERT_FILE_PATH" => "#{certs_dir}/agent.crt", + "AGENT_KEY_FILE_PATH" => "#{certs_dir}/agent.key", + "AGENT_CIPHER_SUITES" => "#{p("agent.cipher_suites").split(":").join(",")}", + "AGENT_PORT" => "#{p("agent.port")}", + "AGENT_ADDR" => "localhost:#{p("agent.port")}", + + "BLACKLISTED_SYSLOG_RANGES" => "#{blacklisted_ips}", + "WARN_ON_INVALID_DRAINS" => "#{p("warn_on_invalid_drains")}", } } bpm = {"processes" => [process] } diff --git a/src/cmd/syslog-agent/app/config.go b/src/cmd/syslog-agent/app/config.go index 47231d3ea..a452f5fd9 100644 --- a/src/cmd/syslog-agent/app/config.go +++ b/src/cmd/syslog-agent/app/config.go @@ -5,10 +5,9 @@ import ( "strings" "time" - "code.cloudfoundry.org/loggregator-agent-release/src/pkg/config" - "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/bindings" - "code.cloudfoundry.org/go-envstruct" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/binding/blacklist" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/config" ) // GRPC stores the configuration for the router as a server using a PORT @@ -22,25 +21,26 @@ type GRPC struct { } type Cache struct { - URL string `env:"CACHE_URL, report"` - CAFile string `env:"CACHE_CA_FILE_PATH, report"` - CertFile string `env:"CACHE_CERT_FILE_PATH, report"` - KeyFile string `env:"CACHE_KEY_FILE_PATH, report"` - CommonName string `env:"CACHE_COMMON_NAME, report"` - PollingInterval time.Duration `env:"CACHE_POLLING_INTERVAL, report"` - Blacklist bindings.BlacklistRanges `env:"BLACKLISTED_SYSLOG_RANGES, report"` + URL string `env:"CACHE_URL, report"` + CAFile string `env:"CACHE_CA_FILE_PATH, report"` + CertFile string `env:"CACHE_CERT_FILE_PATH, report"` + KeyFile string `env:"CACHE_KEY_FILE_PATH, report"` + CommonName string `env:"CACHE_COMMON_NAME, report"` + PollingInterval time.Duration `env:"CACHE_POLLING_INTERVAL, report"` + Blacklist blacklist.BlacklistRanges `env:"BLACKLISTED_SYSLOG_RANGES, report"` } // Config holds the configuration for the syslog agent type Config struct { - UseRFC3339 bool `env:"USE_RFC3339"` - BindingsPerAppLimit int `env:"BINDING_PER_APP_LIMIT, report"` - DrainSkipCertVerify bool `env:"DRAIN_SKIP_CERT_VERIFY, report"` - DrainCipherSuites string `env:"DRAIN_CIPHER_SUITES, report"` - DrainTrustedCAFile string `env:"DRAIN_TRUSTED_CA_FILE, report"` - DefaultDrainMetadata bool `env:"DEFAULT_DRAIN_METADATA, report"` - IdleDrainTimeout time.Duration `env:"IDLE_DRAIN_TIMEOUT, report"` - WarnOnInvalidDrains bool `env:"WARN_ON_INVALID_DRAINS, report"` + UseRFC3339 bool `env:"USE_RFC3339"` + BindingsPerAppLimit int `env:"BINDING_PER_APP_LIMIT, report"` + DrainSkipCertVerify bool `env:"DRAIN_SKIP_CERT_VERIFY, report"` + DrainCipherSuites string `env:"DRAIN_CIPHER_SUITES, report"` + DrainTrustedCAFile string `env:"DRAIN_TRUSTED_CA_FILE, report"` + DefaultDrainMetadata bool `env:"DEFAULT_DRAIN_METADATA, report"` + IdleDrainTimeout time.Duration `env:"IDLE_DRAIN_TIMEOUT, report"` + WarnOnInvalidDrains bool `env:"WARN_ON_INVALID_DRAINS, report"` + LoggregatorIngressAddr string `env:"LOGGREGATOR_AGENT_ADDR, report, required"` GRPC GRPC Cache Cache diff --git a/src/cmd/syslog-agent/app/syslog_agent.go b/src/cmd/syslog-agent/app/syslog_agent.go index 277bbdeb1..96b3ec363 100644 --- a/src/cmd/syslog-agent/app/syslog_agent.go +++ b/src/cmd/syslog-agent/app/syslog_agent.go @@ -13,6 +13,7 @@ import ( gendiodes "code.cloudfoundry.org/go-diodes" "code.cloudfoundry.org/go-loggregator/v10" metrics "code.cloudfoundry.org/go-metric-registry" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/applog" "code.cloudfoundry.org/tlsconfig" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/binding" @@ -56,19 +57,14 @@ func NewSyslogAgent( cfg Config, m Metrics, l *log.Logger, + appLogStreamFactory applog.AppLogStreamFactory, ) *SyslogAgent { internalTlsConfig, externalTlsConfig := drainTLSConfig(cfg) - writerFactory := syslog.NewWriterFactory( - internalTlsConfig, - externalTlsConfig, - syslog.NetworkTimeoutConfig{ - Keepalive: 10 * time.Second, - DialTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - }, - m, - ) - + writerFactory := syslog.NewWriterFactory(internalTlsConfig, externalTlsConfig, syslog.NetworkTimeoutConfig{ + Keepalive: 10 * time.Second, + DialTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + }, m) ingressTLSConfig, err := loggregator.NewIngressTLSConfig( cfg.GRPC.CAFile, cfg.GRPC.CertFile, @@ -81,6 +77,7 @@ func NewSyslogAgent( logClient, err := loggregator.NewIngressClient( ingressTLSConfig, loggregator.WithLogger(log.New(os.Stderr, "", log.LstdFlags)), + loggregator.WithAddr(cfg.LoggregatorIngressAddr), ) if err != nil { l.Panicf("failed to create log client for syslog connector: %q", err) @@ -91,7 +88,7 @@ func NewSyslogAgent( timeoutwaitgroup.New(time.Minute), writerFactory, m, - syslog.WithLogClient(logClient, "syslog_agent"), + syslog.WithAppLogStream(appLogStreamFactory.NewAppLogStream(logClient, "syslog_agent")), ) var cacheClient *cache.CacheClient diff --git a/src/cmd/syslog-agent/app/syslog_agent_mtls_test.go b/src/cmd/syslog-agent/app/syslog_agent_mtls_test.go index 3d3f9cef7..7332deeb8 100644 --- a/src/cmd/syslog-agent/app/syslog_agent_mtls_test.go +++ b/src/cmd/syslog-agent/app/syslog_agent_mtls_test.go @@ -10,6 +10,8 @@ import ( "os" "time" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/applog" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -154,7 +156,9 @@ var _ = Describe("SyslogAgent with mTLS", func() { agentCfg.Cache.PollingInterval = 10 * time.Millisecond } - agent = app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr) + factory := applog.NewAppLogStreamFactory() + + agent = app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr, &factory) go agent.Run() }) diff --git a/src/cmd/syslog-agent/app/syslog_agent_test.go b/src/cmd/syslog-agent/app/syslog_agent_test.go index 46a6d2c24..4c1eaaa62 100644 --- a/src/cmd/syslog-agent/app/syslog_agent_test.go +++ b/src/cmd/syslog-agent/app/syslog_agent_test.go @@ -13,6 +13,9 @@ import ( "strings" "time" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/binding/blacklist" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/applog" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -24,7 +27,6 @@ import ( "code.cloudfoundry.org/loggregator-agent-release/src/internal/testhelper" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/binding" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/config" - "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/bindings" "code.cloudfoundry.org/tlsconfig" ) @@ -48,6 +50,8 @@ var _ = Describe("SyslogAgent", func() { agentMetrics *metricsHelpers.SpyMetricsRegistry agentLogr *log.Logger agent *app.SyslogAgent + + factory applog.AppLogStreamFactory ) BeforeEach(func() { @@ -134,7 +138,9 @@ var _ = Describe("SyslogAgent", func() { agentCfg.Cache.PollingInterval = 10 * time.Millisecond } - agent = app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr) + factory := applog.NewAppLogStreamFactory() + + agent = app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr, &factory) go agent.Run() }) @@ -238,6 +244,14 @@ var _ = Describe("SyslogAgent", func() { Eventually(agentMetrics.GetDebugMetricsEnabled).Should(BeFalse()) }) + It("configures appLogStream", func() { + spyFactory := testhelper.SpyAppLogStreamFactory{} + app.NewSyslogAgent(agentCfg, agentMetrics, agentLogr, &spyFactory) + + Expect(spyFactory.SourceIndex()).Should(Equal("syslog_agent")) + Expect(spyFactory.LogClient()).ShouldNot(BeNil()) + }) + Context("when debug configuration is enabled", func() { BeforeEach(func() { agentCfg.MetricsServer.DebugMetrics = true @@ -272,8 +286,8 @@ var _ = Describe("SyslogAgent", func() { BeforeEach(func() { url, err := url.Parse(appHTTPSDrain.server.URL) Expect(err).NotTo(HaveOccurred()) - agentCfg.Cache.Blacklist = bindings.BlacklistRanges{ - Ranges: []bindings.BlacklistRange{ + agentCfg.Cache.Blacklist = blacklist.BlacklistRanges{ + Ranges: []blacklist.BlacklistRange{ { Start: url.Hostname(), End: url.Hostname(), @@ -423,7 +437,7 @@ var _ = Describe("SyslogAgent", func() { cfgCopy.GRPC.KeyFile = "invalid" msg := `failed to configure client TLS: "failed to load keypair: open invalid: no such file or directory"` - Expect(func() { app.NewSyslogAgent(cfgCopy, agentMetrics, agentLogr) }).To(PanicWith(msg)) + Expect(func() { app.NewSyslogAgent(cfgCopy, agentMetrics, agentLogr, factory) }).To(PanicWith(msg)) }) }) }) diff --git a/src/cmd/syslog-agent/main.go b/src/cmd/syslog-agent/main.go index 24fe4f5d8..db26a4325 100644 --- a/src/cmd/syslog-agent/main.go +++ b/src/cmd/syslog-agent/main.go @@ -5,6 +5,8 @@ import ( _ "net/http/pprof" //nolint:gosec "os" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/applog" + metrics "code.cloudfoundry.org/go-metric-registry" "code.cloudfoundry.org/loggregator-agent-release/src/cmd/syslog-agent/app" @@ -33,5 +35,7 @@ func main() { ), ) - app.NewSyslogAgent(cfg, m, logger).Run() + factory := applog.NewAppLogStreamFactory() + + app.NewSyslogAgent(cfg, m, logger, &factory).Run() } diff --git a/src/cmd/syslog-binding-cache/app/config.go b/src/cmd/syslog-binding-cache/app/config.go index 0a7f00d76..5f80fe10f 100644 --- a/src/cmd/syslog-binding-cache/app/config.go +++ b/src/cmd/syslog-binding-cache/app/config.go @@ -4,9 +4,9 @@ import ( "log" "time" - "code.cloudfoundry.org/loggregator-agent-release/src/pkg/config" - envstruct "code.cloudfoundry.org/go-envstruct" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/binding/blacklist" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/config" ) // Config holds the configuration for the syslog binding cache @@ -31,6 +31,22 @@ type Config struct { CachePort int `env:"CACHE_PORT, required, report"` MetricsServer config.MetricsServer + + AgentAddress string `env:"AGENT_ADDR, required, report"` + GRPC GRPC + Blacklist blacklist.BlacklistRanges `env:"BLACKLISTED_SYSLOG_RANGES, report"` + + WarnOnInvalidDrains bool `env:"WARN_ON_INVALID_DRAINS, report"` +} + +// GRPC stores the configuration for the router as a server using a PORT +// with mTLS certs and as a client. +type GRPC struct { + Port int `env:"AGENT_PORT, report"` + CAFile string `env:"AGENT_CA_FILE_PATH, required, report"` + CertFile string `env:"AGENT_CERT_FILE_PATH, required, report"` + KeyFile string `env:"AGENT_KEY_FILE_PATH, required, report"` + CipherSuites []string `env:"AGENT_CIPHER_SUITES, report"` } // LoadConfig will load the configuration for the syslog binding cache from the diff --git a/src/cmd/syslog-binding-cache/app/syslog_binding_cache.go b/src/cmd/syslog-binding-cache/app/syslog_binding_cache.go index ad2dba59d..40047330f 100644 --- a/src/cmd/syslog-binding-cache/app/syslog_binding_cache.go +++ b/src/cmd/syslog-binding-cache/app/syslog_binding_cache.go @@ -4,12 +4,16 @@ import ( "crypto/tls" "fmt" "log" + "net" "net/http" _ "net/http/pprof" //nolint:gosec + "os" "sync" "time" + "code.cloudfoundry.org/go-loggregator/v10" metrics "code.cloudfoundry.org/go-metric-registry" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/applog" "code.cloudfoundry.org/tlsconfig" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/binding" @@ -19,13 +23,20 @@ import ( "github.com/go-chi/chi/v5" ) +type IPChecker interface { + ResolveAddr(host string) (net.IP, error) + CheckBlacklist(ip net.IP) error +} + type SyslogBindingCache struct { - config Config - pprofServer *http.Server - server *http.Server - log *log.Logger - metrics Metrics - mu sync.Mutex + config Config + pprofServer *http.Server + server *http.Server + log *log.Logger + metrics Metrics + mu sync.Mutex + appLogStream applog.AppLogStream + checker IPChecker } type Metrics interface { @@ -34,11 +45,33 @@ type Metrics interface { RegisterDebugMetrics() } -func NewSyslogBindingCache(config Config, metrics Metrics, log *log.Logger) *SyslogBindingCache { +func NewSyslogBindingCache(config Config, metrics Metrics, logger *log.Logger) *SyslogBindingCache { + ingressTLSConfig, err := loggregator.NewIngressTLSConfig( + config.GRPC.CAFile, + config.GRPC.CertFile, + config.GRPC.KeyFile, + ) + if err != nil { + logger.Panicf("failed to configure client TLS: %q", err) + } + + logClient, err := loggregator.NewIngressClient( + ingressTLSConfig, + loggregator.WithLogger(log.New(os.Stderr, "", log.LstdFlags)), + loggregator.WithAddr(config.AgentAddress), + ) + if err != nil { + logger.Panicf("failed to create logger client for syslog binding cache: %q", err) + } + factory := applog.NewAppLogStreamFactory() + appLogStream := factory.NewAppLogStream(logClient, "syslog_binding_cache") + return &SyslogBindingCache{ - config: config, - log: log, - metrics: metrics, + config: config, + log: logger, + metrics: metrics, + appLogStream: appLogStream, + checker: &config.Blacklist, } } @@ -54,7 +87,16 @@ func (sbc *SyslogBindingCache) Run() { } store := binding.NewStore(sbc.metrics) aggregateStore := binding.NewAggregateStore(sbc.config.AggregateDrainsFile) - poller := binding.NewPoller(sbc.apiClient(), sbc.config.APIPollingInterval, store, sbc.metrics, sbc.log) + poller := binding.NewPoller( + sbc.apiClient(), + sbc.config.APIPollingInterval, + store, + sbc.metrics, + sbc.log, + sbc.appLogStream, + &sbc.config.Blacklist, + sbc.config.WarnOnInvalidDrains, + ) go poller.Poll() diff --git a/src/cmd/syslog-binding-cache/app/syslog_binding_cache_test.go b/src/cmd/syslog-binding-cache/app/syslog_binding_cache_test.go index d02e7d904..c0f007618 100644 --- a/src/cmd/syslog-binding-cache/app/syslog_binding_cache_test.go +++ b/src/cmd/syslog-binding-cache/app/syslog_binding_cache_test.go @@ -47,34 +47,34 @@ var _ = Describe("App", func() { BeforeEach(func() { r := results{ { - Url: "syslog://drain-a", + Url: "syslog://localhost:1000", Credentials: []binding.Credentials{ { - Cert: "cert", Key: "key", Apps: []binding.App{{Hostname: "org.space.app-name-1", AppID: "app-id-1"}}, + Apps: []binding.App{{Hostname: "org.space.app-name-1", AppID: "app-id-1"}}, }, }, }, { - Url: "syslog://drain-b", + Url: "syslog://localhost:1001", Credentials: []binding.Credentials{ { - Cert: "cert", Key: "key", Apps: []binding.App{{Hostname: "org.space.app-name-1", AppID: "app-id-1"}}, + Apps: []binding.App{{Hostname: "org.space.app-name-1", AppID: "app-id-1"}}, }, }, }, { - Url: "syslog://drain-c", + Url: "syslog://localhost:1002", Credentials: []binding.Credentials{ { - Cert: "cert", Key: "key", Apps: []binding.App{{Hostname: "org.space.app-name-2", AppID: "app-id-2"}}, + Apps: []binding.App{{Hostname: "org.space.app-name-2", AppID: "app-id-2"}}, }, }, }, { - Url: "syslog://drain-d", + Url: "syslog://localhost:1003", Credentials: []binding.Credentials{ { - Cert: "cert", Key: "key", Apps: []binding.App{{Hostname: "org.space.app-name-2", AppID: "app-id-2"}}, + Apps: []binding.App{{Hostname: "org.space.app-name-2", AppID: "app-id-2"}}, }, }, }, @@ -107,6 +107,7 @@ var _ = Describe("App", func() { err = aggDrainFile.Close() Expect(err).ToNot(HaveOccurred()) sbcCerts = testhelper.GenerateCerts("binding-cache-ca") + grpcPort := 30000 + GinkgoParallelProcess() sbcCfg = app.Config{ APIURL: capi.URL, APIPollingInterval: 10 * time.Millisecond, @@ -128,6 +129,12 @@ var _ = Describe("App", func() { KeyFile: sbcCerts.Key("metron"), PprofPort: uint16(pprofPort), }, + GRPC: app.GRPC{ + Port: grpcPort, + CAFile: sbcCerts.CA(), + CertFile: sbcCerts.Cert("metron"), + KeyFile: sbcCerts.Key("metron"), + }, } sbcMetrics = metricsHelpers.NewMetricsRegistry() sbcLogr = log.New(GinkgoWriter, "", log.LstdFlags) @@ -189,13 +196,13 @@ var _ = Describe("App", func() { Expect(results).To(HaveLen(4)) b := findBindings(results, "app-id-1") - Expect(b[0].Url).To(Equal("syslog://drain-a")) - Expect(b[1].Url).To(Equal("syslog://drain-b")) + Expect(b[0].Url).To(Equal("syslog://localhost:1000")) + Expect(b[1].Url).To(Equal("syslog://localhost:1001")) Expect(b[0].Credentials[0].Apps[0].Hostname).To(Equal("org.space.app-name-1")) b = findBindings(results, "app-id-2") - Expect(b[0].Url).To(Equal("syslog://drain-c")) - Expect(b[1].Url).To(Equal("syslog://drain-d")) + Expect(b[0].Url).To(Equal("syslog://localhost:1002")) + Expect(b[1].Url).To(Equal("syslog://localhost:1003")) Expect(b[0].Credentials[0].Apps[0].Hostname).To(Equal("org.space.app-name-2")) }) diff --git a/src/internal/testhelper/spy_app_log_emitter.go b/src/internal/testhelper/spy_app_log_emitter.go new file mode 100644 index 000000000..cb13955ee --- /dev/null +++ b/src/internal/testhelper/spy_app_log_emitter.go @@ -0,0 +1,25 @@ +package testhelper + +import ( + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/applog" +) + +type SpyAppLogStreamFactory struct { + logClient applog.LogClient + sourceIndex string +} + +func (factory *SpyAppLogStreamFactory) LogClient() applog.LogClient { + return factory.logClient +} + +func (factory *SpyAppLogStreamFactory) SourceIndex() string { + return factory.sourceIndex +} + +func (factory *SpyAppLogStreamFactory) NewAppLogStream(logClient applog.LogClient, sourceIndex string) applog.AppLogStream { + factory.logClient = logClient + factory.sourceIndex = sourceIndex + logStreamFactory := applog.NewAppLogStreamFactory() + return logStreamFactory.NewAppLogStream(logClient, sourceIndex) +} diff --git a/src/internal/testhelper/spy_log_client.go b/src/internal/testhelper/spy_log_client.go new file mode 100644 index 000000000..6fbbd9f5d --- /dev/null +++ b/src/internal/testhelper/spy_log_client.go @@ -0,0 +1,82 @@ +package testhelper + +import ( + "code.cloudfoundry.org/go-loggregator/v10" + v2 "code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2" + "sync" +) + +type spyLogClient struct { + mu sync.Mutex + _message []string + _appID []string + + // We use maps to ensure that we can query the keys + _sourceType map[string]struct{} + _sourceInstance map[string]struct{} +} + +func NewSpyLogClient() *spyLogClient { + return &spyLogClient{ + _sourceType: make(map[string]struct{}), + _sourceInstance: make(map[string]struct{}), + } +} + +func (s *spyLogClient) EmitLog(message string, opts ...loggregator.EmitLogOption) { + s.mu.Lock() + defer s.mu.Unlock() + + env := &v2.Envelope{ + Tags: make(map[string]string), + } + + for _, o := range opts { + o(env) + } + + s._message = append(s._message, message) + s._appID = append(s._appID, env.SourceId) + s._sourceType[env.GetTags()["source_type"]] = struct{}{} + s._sourceInstance[env.GetInstanceId()] = struct{}{} +} + +func (s *spyLogClient) Message() []string { + s.mu.Lock() + defer s.mu.Unlock() + + return s._message +} + +func (s *spyLogClient) AppID() []string { + s.mu.Lock() + defer s.mu.Unlock() + + return s._appID +} + +func (s *spyLogClient) SourceType() map[string]struct{} { + s.mu.Lock() + defer s.mu.Unlock() + + // Copy map so the original does not escape the mutex and induce a race. + m := make(map[string]struct{}) + for k := range s._sourceType { + m[k] = struct{}{} + } + + return m +} + +func (s *spyLogClient) SourceInstance() map[string]struct{} { + s.mu.Lock() + defer s.mu.Unlock() + + // Copy map so the original does not escape the mutex and induce a race. + m := make(map[string]struct{}) + for k := range s._sourceInstance { + m[k] = struct{}{} + } + + return m +} diff --git a/src/pkg/ingress/bindings/bindingsfakes/fake_ipchecker.go b/src/pkg/binding/bindingfakes/fake_ipchecker.go similarity index 97% rename from src/pkg/ingress/bindings/bindingsfakes/fake_ipchecker.go rename to src/pkg/binding/bindingfakes/fake_ipchecker.go index 251bb4f7d..4e6e77bf0 100644 --- a/src/pkg/ingress/bindings/bindingsfakes/fake_ipchecker.go +++ b/src/pkg/binding/bindingfakes/fake_ipchecker.go @@ -1,11 +1,11 @@ // Code generated by counterfeiter. DO NOT EDIT. -package bindingsfakes +package bindingfakes import ( "net" "sync" - "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/bindings" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/binding" ) type FakeIPChecker struct { @@ -188,4 +188,4 @@ func (fake *FakeIPChecker) recordInvocation(key string, args []interface{}) { fake.invocations[key] = append(fake.invocations[key], args) } -var _ bindings.IPChecker = new(FakeIPChecker) +var _ binding.IPChecker = new(FakeIPChecker) diff --git a/src/pkg/ingress/bindings/blacklist_ranges.go b/src/pkg/binding/blacklist/blacklist_ranges.go similarity index 99% rename from src/pkg/ingress/bindings/blacklist_ranges.go rename to src/pkg/binding/blacklist/blacklist_ranges.go index fe208264f..ec77c124d 100644 --- a/src/pkg/ingress/bindings/blacklist_ranges.go +++ b/src/pkg/binding/blacklist/blacklist_ranges.go @@ -1,4 +1,4 @@ -package bindings +package blacklist import ( "bytes" diff --git a/src/pkg/ingress/bindings/blacklist_ranges_test.go b/src/pkg/binding/blacklist/blacklist_ranges_test.go similarity index 65% rename from src/pkg/ingress/bindings/blacklist_ranges_test.go rename to src/pkg/binding/blacklist/blacklist_ranges_test.go index f1be90b56..693662265 100644 --- a/src/pkg/ingress/bindings/blacklist_ranges_test.go +++ b/src/pkg/binding/blacklist/blacklist_ranges_test.go @@ -1,9 +1,9 @@ -package bindings_test +package blacklist_test import ( "net" - "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/bindings" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/binding/blacklist" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -11,44 +11,44 @@ import ( var _ = Describe("BlacklistRanges", func() { Describe("validates", func() { It("accepts valid IP address range", func() { - _, err := bindings.NewBlacklistRanges( - bindings.BlacklistRange{Start: "127.0.2.2", End: "127.0.2.4"}, + _, err := blacklist.NewBlacklistRanges( + blacklist.BlacklistRange{Start: "127.0.2.2", End: "127.0.2.4"}, ) Expect(err).ToNot(HaveOccurred()) }) It("returns an error with an invalid start address", func() { - _, err := bindings.NewBlacklistRanges( - bindings.BlacklistRange{Start: "127.0.2.2.1", End: "127.0.2.4"}, + _, err := blacklist.NewBlacklistRanges( + blacklist.BlacklistRange{Start: "127.0.2.2.1", End: "127.0.2.4"}, ) Expect(err).To(MatchError("invalid IP Address for Blacklist IP Range: 127.0.2.2.1")) }) It("returns an error with an invalid end address", func() { - _, err := bindings.NewBlacklistRanges( - bindings.BlacklistRange{Start: "127.0.2.2", End: "127.0.2.4.3"}, + _, err := blacklist.NewBlacklistRanges( + blacklist.BlacklistRange{Start: "127.0.2.2", End: "127.0.2.4.3"}, ) Expect(err).To(HaveOccurred()) }) It("validates multiple blacklist ranges", func() { - _, err := bindings.NewBlacklistRanges( - bindings.BlacklistRange{Start: "127.0.2.2", End: "127.0.2.4"}, - bindings.BlacklistRange{Start: "127.0.2.2", End: "127.0.2.4.5"}, + _, err := blacklist.NewBlacklistRanges( + blacklist.BlacklistRange{Start: "127.0.2.2", End: "127.0.2.4"}, + blacklist.BlacklistRange{Start: "127.0.2.2", End: "127.0.2.4.5"}, ) Expect(err).To(HaveOccurred()) }) It("validates start IP is before end IP", func() { - _, err := bindings.NewBlacklistRanges( - bindings.BlacklistRange{Start: "10.10.10.10", End: "10.8.10.12"}, + _, err := blacklist.NewBlacklistRanges( + blacklist.BlacklistRange{Start: "10.10.10.10", End: "10.8.10.12"}, ) Expect(err).To(MatchError("invalid Blacklist IP Range: Start 10.10.10.10 has to be before End 10.8.10.12")) }) It("accepts start and end as the same", func() { - _, err := bindings.NewBlacklistRanges( - bindings.BlacklistRange{Start: "127.0.2.2", End: "127.0.2.2"}, + _, err := blacklist.NewBlacklistRanges( + blacklist.BlacklistRange{Start: "127.0.2.2", End: "127.0.2.2"}, ) Expect(err).ToNot(HaveOccurred()) }) @@ -56,15 +56,15 @@ var _ = Describe("BlacklistRanges", func() { Describe("CheckBlacklist()", func() { It("allows all urls for empty blacklist range", func() { - ranges, _ := bindings.NewBlacklistRanges() + ranges, _ := blacklist.NewBlacklistRanges() err := ranges.CheckBlacklist(net.ParseIP("127.0.0.1")) Expect(err).ToNot(HaveOccurred()) }) It("returns an error when the IP is in the blacklist range", func() { - ranges, err := bindings.NewBlacklistRanges( - bindings.BlacklistRange{Start: "127.0.1.2", End: "127.0.3.4"}, + ranges, err := blacklist.NewBlacklistRanges( + blacklist.BlacklistRange{Start: "127.0.1.2", End: "127.0.3.4"}, ) Expect(err).ToNot(HaveOccurred()) @@ -75,7 +75,7 @@ var _ = Describe("BlacklistRanges", func() { Describe("ResolveAddr()", func() { It("does not return an error when able to resolve", func() { - ranges, _ := bindings.NewBlacklistRanges() + ranges, _ := blacklist.NewBlacklistRanges() ip, err := ranges.ResolveAddr("localhost") Expect(err).ToNot(HaveOccurred()) @@ -83,7 +83,7 @@ var _ = Describe("BlacklistRanges", func() { }) It("can resolve addresses with port", func() { - ranges, _ := bindings.NewBlacklistRanges() + ranges, _ := blacklist.NewBlacklistRanges() ip, err := ranges.ResolveAddr("localhost:8080") Expect(err).ToNot(HaveOccurred()) @@ -91,7 +91,7 @@ var _ = Describe("BlacklistRanges", func() { }) It("returns an error when it fails to resolve", func() { - ranges, _ := bindings.NewBlacklistRanges() + ranges, _ := blacklist.NewBlacklistRanges() _, err := ranges.ResolveAddr("vcap.me.junky-garbage") Expect(err).To(HaveOccurred()) @@ -100,24 +100,24 @@ var _ = Describe("BlacklistRanges", func() { Describe("UnmarshalEnv", func() { It("returns an error for non-valid input", func() { - bl := &bindings.BlacklistRanges{} + bl := &blacklist.BlacklistRanges{} Expect(bl.UnmarshalEnv("invalid")).ToNot(Succeed()) Expect(bl.UnmarshalEnv("10.244.0.32-10")).ToNot(Succeed()) }) It("parses the given IP ranges", func() { - bl := &bindings.BlacklistRanges{} + bl := &blacklist.BlacklistRanges{} Expect(bl.UnmarshalEnv("10.0.0.4-10.0.0.8,123.4.5.6-123.4.5.7")).To(Succeed()) - Expect(bl.Ranges).To(Equal([]bindings.BlacklistRange{ + Expect(bl.Ranges).To(Equal([]blacklist.BlacklistRange{ {Start: "10.0.0.4", End: "10.0.0.8"}, {Start: "123.4.5.6", End: "123.4.5.7"}, })) }) It("does not return an error for an empty list", func() { - bl := &bindings.BlacklistRanges{} + bl := &blacklist.BlacklistRanges{} Expect(bl.UnmarshalEnv("")).To(Succeed()) }) }) diff --git a/src/pkg/binding/blacklist/blacklist_suite_test.go b/src/pkg/binding/blacklist/blacklist_suite_test.go new file mode 100644 index 000000000..04b1b6e9a --- /dev/null +++ b/src/pkg/binding/blacklist/blacklist_suite_test.go @@ -0,0 +1,13 @@ +package blacklist + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestBlacklist(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Blacklist Suite") +} diff --git a/src/pkg/binding/poller.go b/src/pkg/binding/poller.go index 188f9b18d..4227741bf 100644 --- a/src/pkg/binding/poller.go +++ b/src/pkg/binding/poller.go @@ -1,12 +1,19 @@ package binding import ( + "crypto/tls" + "crypto/x509" "encoding/json" + "fmt" "log" + "net" "net/http" + "net/url" "time" metrics "code.cloudfoundry.org/go-metric-registry" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/applog" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/simplecache" ) type Poller struct { @@ -17,6 +24,18 @@ type Poller struct { logger *log.Logger bindingRefreshErrorCounter metrics.Counter lastBindingCount metrics.Gauge + invalidDrains metrics.Gauge + blacklistedDrains metrics.Gauge + appLogStream applog.AppLogStream + checker IPChecker + failedHostsCache *simplecache.SimpleCache[string, bool] + warn bool +} + +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . IPChecker +type IPChecker interface { + ResolveAddr(host string) (net.IP, error) + CheckBlacklist(ip net.IP) error } type client interface { @@ -51,7 +70,20 @@ type Setter interface { Set(bindings []Binding, bindingCount int) } -func NewPoller(ac client, pi time.Duration, s Setter, m Metrics, logger *log.Logger) *Poller { +var allowedSchemes = []string{"syslog", "syslog-tls", "https", "https-batch"} + +func NewPoller( + ac client, + pi time.Duration, + s Setter, + m Metrics, + logger *log.Logger, + logStream applog.AppLogStream, + checker IPChecker, + warn bool, +) *Poller { + opt := metrics.WithMetricLabels(map[string]string{"unit": "total"}) + p := &Poller{ apiClient: ac, pollingInterval: pi, @@ -65,6 +97,20 @@ func NewPoller(ac client, pi time.Duration, s Setter, m Metrics, logger *log.Log "last_binding_refresh_count", "Current number of bindings received from binding provider during last refresh.", ), + invalidDrains: m.NewGauge( + "invalid_drains", + "Count of invalid drains encountered in last binding fetch. Includes blacklisted drains.", + opt, + ), + blacklistedDrains: m.NewGauge( + "blacklisted_drains", + "Count of blacklisted drains encountered in last binding fetch.", + opt, + ), + appLogStream: logStream, + checker: checker, + failedHostsCache: simplecache.New[string, bool](120 * time.Second), + warn: warn, } p.poll() return p @@ -88,7 +134,10 @@ func (p *Poller) poll() { return } if resp.StatusCode != http.StatusOK { - p.logger.Printf("unexpected response from internal bindings endpoint. status code: %d", resp.StatusCode) + p.logger.Printf( + "unexpected response from internal bindings endpoint. status code: %d", + resp.StatusCode, + ) return } @@ -107,9 +156,219 @@ func (p *Poller) poll() { } } - bindingCount := CalculateBindingCount(bindings) + filteredBindings := checkBindings( + bindings, + &p.appLogStream, + p.checker, + p.logger, + p.failedHostsCache, + p.blacklistedDrains, + p.invalidDrains, + p.warn, + ) + + bindingCount := CalculateBindingCount(filteredBindings) p.lastBindingCount.Set(float64(bindingCount)) - p.store.Set(bindings, bindingCount) + p.store.Set(filteredBindings, bindingCount) +} + +func checkBindings( + bindings []Binding, + logStream *applog.AppLogStream, + checker IPChecker, + logger *log.Logger, + failedHostsCache *simplecache.SimpleCache[string, bool], + blacklistedDrainsGauge metrics.Gauge, + invalidDrainsGauge metrics.Gauge, + warn bool, +) []Binding { + logger.Printf("checking bindings - found %d bindings", len(bindings)) + var invalidDrains float64 = 0 + var blacklistedDrains float64 = 0 + var filteredBindings []Binding + for _, b := range bindings { + if len(b.Credentials) == 0 { + logger.Print("No credentials - which include appIDs - for a binding. Check the bindings in the cloud controller.") + continue + } + + u, err := url.Parse(b.Url) + if err != nil { + invalidDrains += 1 + if warn { + for _, cred := range b.Credentials { + sendAppLogMessage( + "Cannot parse syslog drain URL.", + cred.Apps, + logStream, + logger, + ) + + } + } + continue + } + + anonymousUrl := *u + anonymousUrl.User = nil + anonymousUrl.RawQuery = "" + + if invalidScheme(u.Scheme) { + invalidDrains += 1 + if warn { + for _, cred := range b.Credentials { + sendAppLogMessage( + fmt.Sprintf("Invalid Scheme for syslog drain url %s", anonymousUrl.String()), + cred.Apps, + logStream, + logger, + ) + } + } + continue + } + + if len(u.Host) == 0 { + invalidDrains += 1 + if warn { + for _, cred := range b.Credentials { + sendAppLogMessage( + fmt.Sprintf("No hostname found in syslog drain url %s", anonymousUrl.String()), + cred.Apps, + logStream, + logger, + ) + } + } + continue + } + + _, exists := failedHostsCache.Get(u.Host) + if exists { + invalidDrains += 1 + if warn { + for _, cred := range b.Credentials { + sendAppLogMessage( + fmt.Sprintf( + "Skipped resolve ip address for syslog drain with url %s due to prior failure", + anonymousUrl.String(), + ), + cred.Apps, + logStream, + logger, + ) + } + } + continue + } + + ip, err := checker.ResolveAddr(u.Host) + if err != nil { + invalidDrains += 1 + failedHostsCache.Set(u.Host, true) + if warn { + for _, cred := range b.Credentials { + sendAppLogMessage( + fmt.Sprintf( + "Cannot resolve ip address for syslog drain with url %s", + anonymousUrl.String(), + ), + cred.Apps, + logStream, + logger, + ) + } + } + continue + } + + err = checker.CheckBlacklist(ip) + if err != nil { + invalidDrains += 1 + blacklistedDrains += 1 + if warn { + for _, cred := range b.Credentials { + sendAppLogMessage( + fmt.Sprintf( + "Resolved ip address for syslog drain with url %s is blacklisted", + anonymousUrl.String(), + ), + cred.Apps, + logStream, + logger, + ) + } + } + continue + } + + var validCredentials []Credentials + for _, cred := range b.Credentials { + if len(cred.Cert) > 0 && len(cred.Key) > 0 { + _, err := tls.X509KeyPair([]byte(cred.Cert), []byte(cred.Key)) + if err != nil { + if warn { + sendAppLogMessage( + fmt.Sprintf("failed to load certificate for %s", anonymousUrl.String()), + cred.Apps, + logStream, + logger, + ) + } + continue + } + } + + if len(cred.CA) > 0 { + certPool := x509.NewCertPool() + ok := certPool.AppendCertsFromPEM([]byte(cred.CA)) + if !ok { + if warn { + sendAppLogMessage( + fmt.Sprintf("failed to load root CA for %s", anonymousUrl.String()), + cred.Apps, + logStream, + logger, + ) + } + continue + } + } + + validCredentials = append(validCredentials, cred) + } + + if len(validCredentials) > 0 { + filteredBindings = append(filteredBindings, Binding{ + Url: b.Url, + Credentials: validCredentials, + }) + } + } + blacklistedDrainsGauge.Set(blacklistedDrains) + invalidDrainsGauge.Set(invalidDrains) + return filteredBindings +} + +func sendAppLogMessage(msg string, apps []App, logStream *applog.AppLogStream, logger *log.Logger) { + for _, app := range apps { + appId := app.AppID + if appId == "" { + continue + } + logStream.Emit(msg, appId) + logger.Printf("%s for app %s", msg, appId) + } +} + +func invalidScheme(scheme string) bool { + for _, s := range allowedSchemes { + if s == scheme { + return false + } + } + + return true } func CalculateBindingCount(bindings []Binding) int { diff --git a/src/pkg/binding/poller_test.go b/src/pkg/binding/poller_test.go index bc6212e37..3db867613 100644 --- a/src/pkg/binding/poller_test.go +++ b/src/pkg/binding/poller_test.go @@ -1,39 +1,48 @@ -package binding_test +package binding import ( "bytes" "encoding/json" "errors" + "fmt" "io" "log" + "net" "net/http" "sync/atomic" "time" metricsHelpers "code.cloudfoundry.org/go-metric-registry/testhelpers" + "code.cloudfoundry.org/loggregator-agent-release/src/internal/testhelper" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/binding/blacklist" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/applog" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/simplecache" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - - "code.cloudfoundry.org/loggregator-agent-release/src/pkg/binding" ) var _ = Describe("Poller", func() { var ( - apiClient *fakeAPIClient - store *fakeStore - metrics *metricsHelpers.SpyMetricsRegistry - logger = log.New(GinkgoWriter, "", 0) + apiClient *fakeAPIClient + store *fakeStore + metrics *metricsHelpers.SpyMetricsRegistry + logger = log.New(GinkgoWriter, "", 0) + appLogStream applog.AppLogStream + logClient = testhelper.NewSpyLogClient() ) BeforeEach(func() { apiClient = newFakeAPIClient() store = newFakeStore() metrics = metricsHelpers.NewMetricsRegistry() + factory := applog.NewAppLogStreamFactory() + logClient = testhelper.NewSpyLogClient() + appLogStream = factory.NewAppLogStream(logClient, "test") }) It("polls for bindings on an interval", func() { - p := binding.NewPoller(apiClient, 10*time.Millisecond, store, metrics, logger) + p := NewPoller(apiClient, 10*time.Millisecond, store, metrics, logger, appLogStream, &dummyIPChecker{}, false) go p.Poll() Eventually(apiClient.called).Should(BeNumerically(">=", 2)) @@ -41,15 +50,15 @@ var _ = Describe("Poller", func() { It("calls the api client and stores the result", func() { apiClient.bindings <- response{ - Results: []binding.Binding{ + Results: []Binding{ { - Url: "drain-0", - Credentials: []binding.Credentials{ + Url: "syslog://drain-0", + Credentials: []Credentials{ { - Cert: "cert0", Key: "key0", CA: "ca0", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, + Apps: []App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, }, { - Cert: "cert1", Key: "key1", CA: "ca1", Apps: []binding.App{ + Apps: []App{ {Hostname: "app-hostname1", AppID: "app-id-1"}, {Hostname: "app-hostname2", AppID: "app-id-2"}, }, @@ -57,30 +66,30 @@ var _ = Describe("Poller", func() { }, }, { - Url: "drain-1", - Credentials: []binding.Credentials{ + Url: "syslog://drain-1", + Credentials: []Credentials{ { - Cert: "cert2", Key: "key2", CA: "ca2", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, + Apps: []App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, }, }, }, }, } - p := binding.NewPoller(apiClient, 10*time.Millisecond, store, metrics, logger) + p := NewPoller(apiClient, 10*time.Millisecond, store, metrics, logger, appLogStream, &dummyIPChecker{}, false) go p.Poll() - var expectedBindings []binding.Binding + var expectedBindings []Binding Eventually(store.bindings).Should(Receive(&expectedBindings)) - Expect(expectedBindings).To(ConsistOf([]binding.Binding{ + Expect(expectedBindings).To(ConsistOf([]Binding{ { - Url: "drain-0", - Credentials: []binding.Credentials{ + Url: "syslog://drain-0", + Credentials: []Credentials{ { - Cert: "cert0", Key: "key0", CA: "ca0", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, + Apps: []App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, }, { - Cert: "cert1", Key: "key1", CA: "ca1", Apps: []binding.App{ + Apps: []App{ {Hostname: "app-hostname1", AppID: "app-id-1"}, {Hostname: "app-hostname2", AppID: "app-id-2"}, }, @@ -88,10 +97,10 @@ var _ = Describe("Poller", func() { }, }, { - Url: "drain-1", - Credentials: []binding.Credentials{ + Url: "syslog://drain-1", + Credentials: []Credentials{ { - Cert: "cert2", Key: "key2", CA: "ca2", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, + Apps: []App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, }, }, }, @@ -101,20 +110,20 @@ var _ = Describe("Poller", func() { It("fetches the next page of bindings and stores the result", func() { apiClient.bindings <- response{ NextID: 2, - Results: []binding.Binding{ + Results: []Binding{ { - Url: "drain-0", - Credentials: []binding.Credentials{ + Url: "syslog://drain-0", + Credentials: []Credentials{ { - Cert: "cert0", Key: "key0", CA: "ca0", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, + Apps: []App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, }, }, }, { - Url: "drain-1", - Credentials: []binding.Credentials{ + Url: "syslog://drain-1", + Credentials: []Credentials{ { - Cert: "cert1", Key: "key1", CA: "ca1", Apps: []binding.App{{Hostname: "app-hostname1", AppID: "app-id-1"}}, + Apps: []App{{Hostname: "app-hostname1", AppID: "app-id-1"}}, }, }, }, @@ -122,62 +131,62 @@ var _ = Describe("Poller", func() { } apiClient.bindings <- response{ - Results: []binding.Binding{ + Results: []Binding{ { - Url: "drain-2", - Credentials: []binding.Credentials{ + Url: "syslog://drain-2", + Credentials: []Credentials{ { - Cert: "cert2", Key: "key2", CA: "ca2", Apps: []binding.App{{Hostname: "app-hostname2", AppID: "app-id-2"}}, + Apps: []App{{Hostname: "app-hostname2", AppID: "app-id-2"}}, }, }, }, { - Url: "drain-3", - Credentials: []binding.Credentials{ + Url: "syslog://drain-3", + Credentials: []Credentials{ { - Cert: "cert3", Key: "key3", CA: "ca3", Apps: []binding.App{{Hostname: "app-hostname3", AppID: "app-id-3"}}, + Apps: []App{{Hostname: "app-hostname3", AppID: "app-id-3"}}, }, }, }, }, } - p := binding.NewPoller(apiClient, 10*time.Millisecond, store, metrics, logger) + p := NewPoller(apiClient, 10*time.Millisecond, store, metrics, logger, appLogStream, &dummyIPChecker{}, false) go p.Poll() - var expectedBindings []binding.Binding + var expectedBindings []Binding Eventually(store.bindings).Should(Receive(&expectedBindings)) Expect(expectedBindings).To(ConsistOf( - []binding.Binding{ + []Binding{ { - Url: "drain-0", - Credentials: []binding.Credentials{ + Url: "syslog://drain-0", + Credentials: []Credentials{ { - Cert: "cert0", Key: "key0", CA: "ca0", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, + Apps: []App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, }, }, }, { - Url: "drain-1", - Credentials: []binding.Credentials{ + Url: "syslog://drain-1", + Credentials: []Credentials{ { - Cert: "cert1", Key: "key1", CA: "ca1", Apps: []binding.App{{Hostname: "app-hostname1", AppID: "app-id-1"}}, + Apps: []App{{Hostname: "app-hostname1", AppID: "app-id-1"}}, }, }, }, { - Url: "drain-2", - Credentials: []binding.Credentials{ + Url: "syslog://drain-2", + Credentials: []Credentials{ { - Cert: "cert2", Key: "key2", CA: "ca2", Apps: []binding.App{{Hostname: "app-hostname2", AppID: "app-id-2"}}, + Apps: []App{{Hostname: "app-hostname2", AppID: "app-id-2"}}, }, }, }, { - Url: "drain-3", - Credentials: []binding.Credentials{ + Url: "syslog://drain-3", + Credentials: []Credentials{ { - Cert: "cert3", Key: "key3", CA: "ca3", Apps: []binding.App{{Hostname: "app-hostname3", AppID: "app-id-3"}}, + Apps: []App{{Hostname: "app-hostname3", AppID: "app-id-3"}}, }, }, }, @@ -188,7 +197,7 @@ var _ = Describe("Poller", func() { }) It("tracks the number of API errors", func() { - p := binding.NewPoller(apiClient, 10*time.Millisecond, store, metrics, logger) + p := NewPoller(apiClient, 10*time.Millisecond, store, metrics, logger, appLogStream, &dummyIPChecker{}, false) go p.Poll() apiClient.errors <- errors.New("expected") @@ -201,7 +210,7 @@ var _ = Describe("Poller", func() { It("does not update the stores if the response code is bad", func() { apiClient.statusCode <- 404 - p := binding.NewPoller(apiClient, 10*time.Millisecond, store, metrics, logger) + p := NewPoller(apiClient, 10*time.Millisecond, store, metrics, logger, appLogStream, &dummyIPChecker{}, false) go p.Poll() Eventually(store.bindings).Should(BeEmpty()) @@ -209,76 +218,393 @@ var _ = Describe("Poller", func() { It("tracks the number of bindings returned from CAPI", func() { apiClient.bindings <- response{ - Results: []binding.Binding{ + Results: []Binding{ + { + Url: "syslog://drain-0.example.com", + Credentials: []Credentials{ + { + Apps: []App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, + }, + }, + }, + { + Url: "syslog://drain-1.example.com", + Credentials: []Credentials{ + { + Apps: []App{{Hostname: "app-hostname1", AppID: "app-id-1"}}, + }, + }, + }, + }, + } + NewPoller(apiClient, time.Hour, store, metrics, logger, appLogStream, &dummyIPChecker{}, true) + + Expect(metrics.GetMetric("last_binding_refresh_count", nil).Value()). + To(BeNumerically("==", 2)) + }) + + It("filters invalid bindings", func() { + apiClient.bindings <- response{ + Results: []Binding{ { Url: "drain-0", - Credentials: []binding.Credentials{ + Credentials: []Credentials{ { - Cert: "cert0", Key: "key0", CA: "ca0", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, + Cert: "cert0", Key: "key0", CA: "ca0", Apps: []App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, }, }, }, { Url: "drain-1", - Credentials: []binding.Credentials{ + Credentials: []Credentials{ { - Cert: "cert1", Key: "key1", CA: "ca1", Apps: []binding.App{{Hostname: "app-hostname1", AppID: "app-id-1"}}, + Cert: "cert1", Key: "key1", CA: "ca1", Apps: []App{{Hostname: "app-hostname1", AppID: "app-id-1"}}, + }, + }, + }, + { + Url: "syslog://drain-2.example.com", + Credentials: []Credentials{ + { + Apps: []App{{Hostname: "app-hostname2", AppID: "app-id-2"}}, }, }, }, }, } - binding.NewPoller(apiClient, time.Hour, store, metrics, logger) + NewPoller(apiClient, time.Hour, store, metrics, logger, appLogStream, &dummyIPChecker{}, false) Expect(metrics.GetMetric("last_binding_refresh_count", nil).Value()). - To(BeNumerically("==", 2)) + To(BeNumerically("==", 1)) }) It("tracks the isolated CalculateBindingsCount call", func() { - noBinding := []binding.Binding{} - singleBinding := []binding.Binding{ + noBinding := []Binding{} + singleBinding := []Binding{ { Url: "drain-0", - Credentials: []binding.Credentials{ + Credentials: []Credentials{ { - Cert: "cert0", Key: "key0", CA: "ca0", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, + Cert: "cert0", Key: "key0", CA: "ca0", Apps: []App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, }, }, }, { Url: "drain-1", - Credentials: []binding.Credentials{ + Credentials: []Credentials{ { - Cert: "cert1", Key: "key1", CA: "ca1", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, + Cert: "cert1", Key: "key1", CA: "ca1", Apps: []App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, }, }, }, } - multipleBindings := []binding.Binding{ + multipleBindings := []Binding{ { Url: "drain-0", - Credentials: []binding.Credentials{ + Credentials: []Credentials{ { - Cert: "cert0", Key: "key0", CA: "ca0", Apps: []binding.App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, + Cert: "cert0", Key: "key0", CA: "ca0", Apps: []App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, }, }, }, { Url: "drain-1", - Credentials: []binding.Credentials{ + Credentials: []Credentials{ { - Cert: "cert1", Key: "key1", CA: "ca1", Apps: []binding.App{{Hostname: "app-hostname1", AppID: "app-id-1"}}, + Cert: "cert1", Key: "key1", CA: "ca1", Apps: []App{{Hostname: "app-hostname1", AppID: "app-id-1"}}, }, }, }, } - Expect(binding.CalculateBindingCount(noBinding)). + Expect(CalculateBindingCount(noBinding)). To(BeNumerically("==", 0)) - Expect(binding.CalculateBindingCount(singleBinding)). + Expect(CalculateBindingCount(singleBinding)). To(BeNumerically("==", 1)) - Expect(binding.CalculateBindingCount(multipleBindings)). + Expect(CalculateBindingCount(multipleBindings)). To(BeNumerically("==", 2)) }) + + Describe("checkBindings", func() { + It("returns no binding which contains an invalid URL and increases invalid_drains", func() { + bindings := []Binding{ + { + Url: "syslog:/invalid-url-a-slash-is-missing", + Credentials: []Credentials{ + { + Cert: "cert0", Key: "key0", CA: "ca0", Apps: []App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, + }, + }, + }, + } + cache := simplecache.New[string, bool](120 * time.Second) + blacklistedDrainsGauge := metrics.NewGauge( + "blacklisted_drains", + "Count of blacklisted drains encountered in last binding fetch.", + ) + invalidDrainsGauge := metrics.NewGauge( + "invalid_drains", + "Count of invalid drains encountered in last binding fetch. Includes blacklisted drains.", + ) + + filteredBindings := checkBindings( + bindings, + &appLogStream, + &dummyIPChecker{}, + logger, + cache, + blacklistedDrainsGauge, + invalidDrainsGauge, + true, + ) + + Expect(filteredBindings).To(BeEmpty()) + Expect(logClient.Message()).To(ContainElement(Equal("No hostname found in syslog drain url syslog:/invalid-url-a-slash-is-missing"))) + Expect(metrics.GetMetricValue("invalid_drains", map[string]string{})).To(BeNumerically("==", 1)) + Expect(metrics.GetMetricValue("blacklisted_drains", map[string]string{})).To(BeNumerically("==", 0)) + }) + + It("returns no binding which contains an invalid scheme in URL and increases invalid_drains", func() { + bindings := []Binding{ + { + Url: "syslog-ssl://drain-0", + Credentials: []Credentials{ + { + Cert: "cert0", Key: "key0", CA: "ca0", Apps: []App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, + }, + }, + }, + } + cache := simplecache.New[string, bool](120 * time.Second) + blacklistedDrainsGauge := metrics.NewGauge( + "blacklisted_drains", + "Count of blacklisted drains encountered in last binding fetch.", + ) + invalidDrainsGauge := metrics.NewGauge( + "invalid_drains", + "Count of invalid drains encountered in last binding fetch. Includes blacklisted drains.", + ) + + filteredBindings := checkBindings( + bindings, + &appLogStream, + &dummyIPChecker{}, + logger, + cache, + blacklistedDrainsGauge, + invalidDrainsGauge, + true, + ) + + Expect(filteredBindings).To(BeEmpty()) + Expect(logClient.Message()).To(ContainElement(Equal("Invalid Scheme for syslog drain url syslog-ssl://drain-0"))) + Expect(metrics.GetMetricValue("invalid_drains", map[string]string{})).To(BeNumerically("==", 1)) + Expect(metrics.GetMetricValue("blacklisted_drains", map[string]string{})).To(BeNumerically("==", 0)) + }) + + It("returns no binding with unresolvable URL and increases invalid_drains", func() { + bindings := []Binding{ + { + Url: "syslog://syslog-drain-test-37c4f6db-12e2-4206-8bb2-c8d6f440d4d2.example.com", + Credentials: []Credentials{ + { + Cert: "cert0", Key: "key0", CA: "ca0", Apps: []App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, + }, + }, + }, + } + cache := simplecache.New[string, bool](120 * time.Second) + blacklistedDrainsGauge := metrics.NewGauge( + "blacklisted_drains", + "Count of blacklisted drains encountered in last binding fetch.", + ) + invalidDrainsGauge := metrics.NewGauge( + "invalid_drains", + "Count of invalid drains encountered in last binding fetch. Includes blacklisted drains.", + ) + + filteredBindings := checkBindings( + bindings, + &appLogStream, + &unresolvableIPChecker{}, + logger, + cache, + blacklistedDrainsGauge, + invalidDrainsGauge, + true, + ) + + Expect(filteredBindings).To(BeEmpty()) + Expect(logClient.Message()).To(ContainElement(Equal("Cannot resolve ip address for syslog drain with url syslog://syslog-drain-test-37c4f6db-12e2-4206-8bb2-c8d6f440d4d2.example.com"))) + Expect(metrics.GetMetricValue("invalid_drains", map[string]string{})).To(BeNumerically("==", 1)) + Expect(metrics.GetMetricValue("blacklisted_drains", map[string]string{})).To(BeNumerically("==", 0)) + }) + + It("returns no binding which has a blacklisted IP and increases invalid_drains and blacklisted drains", func() { + bindings := []Binding{ + { + Url: "syslog://192.168.188.15", + Credentials: []Credentials{ + { + Cert: "cert0", Key: "key0", CA: "ca0", Apps: []App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, + }, + }, + }, + } + cache := simplecache.New[string, bool](120 * time.Second) + blacklistedDrainsGauge := metrics.NewGauge( + "blacklisted_drains", + "Count of blacklisted drains encountered in last binding fetch.", + ) + invalidDrainsGauge := metrics.NewGauge( + "invalid_drains", + "Count of invalid drains encountered in last binding fetch. Includes blacklisted drains.", + ) + blacklistRanges, _ := blacklist.NewBlacklistRanges( + blacklist.BlacklistRange{Start: "192.168.188.1", End: "192.168.188.255"}, + ) + + filteredBindings := checkBindings( + bindings, + &appLogStream, + blacklistRanges, + logger, + cache, + blacklistedDrainsGauge, + invalidDrainsGauge, + true, + ) + + Expect(filteredBindings).To(BeEmpty()) + Expect(logClient.Message()).To(ContainElement(Equal("Resolved ip address for syslog drain with url syslog://192.168.188.15 is blacklisted"))) + Expect(metrics.GetMetricValue("invalid_drains", map[string]string{})).To(BeNumerically("==", 1)) + Expect(metrics.GetMetricValue("blacklisted_drains", map[string]string{})).To(BeNumerically("==", 1)) + }) + + It("returns no binding when there is a prior IP checking failure for URL and increases invalid_drains", func() { + bindings := []Binding{ + { + Url: "syslog://syslog-drain-test-37c4f6db-12e2-4206-8bb2-c8d6f440d4d2.example.com", + Credentials: []Credentials{ + { + Apps: []App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, + }, + }, + }, + { + Url: "syslog://syslog-drain-test-37c4f6db-12e2-4206-8bb2-c8d6f440d4d2.example.com", + Credentials: []Credentials{ + { + Apps: []App{{Hostname: "app-hostname1", AppID: "app-id-1"}}, + }, + }, + }, + } + cache := simplecache.New[string, bool](120 * time.Second) + blacklistedDrainsGauge := metrics.NewGauge( + "blacklisted_drains", + "Count of blacklisted drains encountered in last binding fetch.", + ) + invalidDrainsGauge := metrics.NewGauge( + "invalid_drains", + "Count of invalid drains encountered in last binding fetch. Includes blacklisted drains.", + ) + blacklistRanges, _ := blacklist.NewBlacklistRanges( + blacklist.BlacklistRange{Start: "192.168.188.1", End: "192.168.188.255"}, + ) + + filteredBindings := checkBindings( + bindings, + &appLogStream, + blacklistRanges, + logger, + cache, + blacklistedDrainsGauge, + invalidDrainsGauge, + true, + ) + + Expect(filteredBindings).To(BeEmpty()) + Expect(logClient.Message()).To(ContainElement(Equal("Skipped resolve ip address for syslog drain with url syslog://syslog-drain-test-37c4f6db-12e2-4206-8bb2-c8d6f440d4d2.example.com due to prior failure"))) + Expect(metrics.GetMetricValue("invalid_drains", map[string]string{})).To(BeNumerically("==", 2)) + Expect(metrics.GetMetricValue("blacklisted_drains", map[string]string{})).To(BeNumerically("==", 0)) + }) + + It("returns no binding when key pair cannot be loaded and does not increase invalid_drains and blacklisted drains", func() { + bindings := []Binding{ + { + Url: "syslog-tls://drain-0", + Credentials: []Credentials{ + { + Cert: "cert0", Key: "key0", Apps: []App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, + }, + }, + }, + } + cache := simplecache.New[string, bool](120 * time.Second) + blacklistedDrainsGauge := metrics.NewGauge( + "blacklisted_drains", + "Count of blacklisted drains encountered in last binding fetch.", + ) + invalidDrainsGauge := metrics.NewGauge( + "invalid_drains", + "Count of invalid drains encountered in last binding fetch. Includes blacklisted drains.", + ) + + filteredBindings := checkBindings( + bindings, + &appLogStream, + &dummyIPChecker{}, + logger, + cache, + blacklistedDrainsGauge, + invalidDrainsGauge, + true, + ) + + Expect(filteredBindings).To(BeEmpty()) + Expect(logClient.Message()).To(ContainElement(Equal("failed to load certificate for syslog-tls://drain-0"))) + Expect(metrics.GetMetricValue("invalid_drains", map[string]string{})).To(BeNumerically("==", 0)) + Expect(metrics.GetMetricValue("blacklisted_drains", map[string]string{})).To(BeNumerically("==", 0)) + }) + + It("returns no binding when CA cannot be loaded and does not increase invalid_drains and blacklisted drains", func() { + bindings := []Binding{ + { + Url: "syslog-tls://drain-0", + Credentials: []Credentials{ + { + CA: "ca0", Apps: []App{{Hostname: "app-hostname0", AppID: "app-id-0"}}, + }, + }, + }, + } + cache := simplecache.New[string, bool](120 * time.Second) + blacklistedDrainsGauge := metrics.NewGauge( + "blacklisted_drains", + "Count of blacklisted drains encountered in last binding fetch.", + ) + invalidDrainsGauge := metrics.NewGauge( + "invalid_drains", + "Count of invalid drains encountered in last binding fetch. Includes blacklisted drains.", + ) + + filteredBindings := checkBindings( + bindings, + &appLogStream, + &dummyIPChecker{}, + logger, + cache, + blacklistedDrainsGauge, + invalidDrainsGauge, + true, + ) + + Expect(filteredBindings).To(BeEmpty()) + Expect(logClient.Message()).To(ContainElement(Equal("failed to load root CA for syslog-tls://drain-0"))) + Expect(metrics.GetMetricValue("invalid_drains", map[string]string{})).To(BeNumerically("==", 0)) + Expect(metrics.GetMetricValue("blacklisted_drains", map[string]string{})).To(BeNumerically("==", 0)) + }) + }) }) type fakeAPIClient struct { @@ -329,20 +655,40 @@ func (c *fakeAPIClient) called() int64 { } type fakeStore struct { - bindings chan []binding.Binding + bindings chan []Binding } func newFakeStore() *fakeStore { return &fakeStore{ - bindings: make(chan []binding.Binding, 100), + bindings: make(chan []Binding, 100), } } -func (c *fakeStore) Set(b []binding.Binding, bindingCount int) { +func (c *fakeStore) Set(b []Binding, bindingCount int) { c.bindings <- b } type response struct { - Results []binding.Binding + Results []Binding NextID int `json:"next_id"` } + +type dummyIPChecker struct{} + +func (d *dummyIPChecker) ResolveAddr(host string) (net.IP, error) { + return net.IPv4(127, 0, 0, 1), nil +} + +func (*dummyIPChecker) CheckBlacklist(ip net.IP) error { + return nil +} + +type unresolvableIPChecker struct{} + +func (d *unresolvableIPChecker) ResolveAddr(host string) (net.IP, error) { + return nil, fmt.Errorf("unable to resolve DNS entry: %s", host) +} + +func (*unresolvableIPChecker) CheckBlacklist(ip net.IP) error { + return nil +} diff --git a/src/pkg/egress/syslog/retry_writer_test.go b/src/pkg/egress/syslog/retry_writer_test.go index e44bae451..45b348394 100644 --- a/src/pkg/egress/syslog/retry_writer_test.go +++ b/src/pkg/egress/syslog/retry_writer_test.go @@ -3,11 +3,9 @@ package syslog_test import ( "errors" "net/url" - "sync" "sync/atomic" "time" - "code.cloudfoundry.org/go-loggregator/v10" v2 "code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog" @@ -165,81 +163,6 @@ func (s *spyWriteCloser) WriteAttempts() int { return int(atomic.LoadInt64(&s.writeAttempts)) } -type spyLogClient struct { - mu sync.Mutex - _message []string - _appID []string - - // We use maps to ensure that we can query the keys - _sourceType map[string]struct{} - _sourceInstance map[string]struct{} -} - -func newSpyLogClient() *spyLogClient { - return &spyLogClient{ - _sourceType: make(map[string]struct{}), - _sourceInstance: make(map[string]struct{}), - } -} - -func (s *spyLogClient) EmitLog(message string, opts ...loggregator.EmitLogOption) { - s.mu.Lock() - defer s.mu.Unlock() - - env := &v2.Envelope{ - Tags: make(map[string]string), - } - - for _, o := range opts { - o(env) - } - - s._message = append(s._message, message) - s._appID = append(s._appID, env.SourceId) - s._sourceType[env.GetTags()["source_type"]] = struct{}{} - s._sourceInstance[env.GetInstanceId()] = struct{}{} -} - -func (s *spyLogClient) message() []string { - s.mu.Lock() - defer s.mu.Unlock() - - return s._message -} - -func (s *spyLogClient) appID() []string { - s.mu.Lock() - defer s.mu.Unlock() - - return s._appID -} - -func (s *spyLogClient) sourceType() map[string]struct{} { - s.mu.Lock() - defer s.mu.Unlock() - - // Copy map so the orig does not escape the mutex and induce a race. - m := make(map[string]struct{}) - for k := range s._sourceType { - m[k] = struct{}{} - } - - return m -} - -func (s *spyLogClient) sourceInstance() map[string]struct{} { - s.mu.Lock() - defer s.mu.Unlock() - - // Copy map so the orig does not escape the mutex and induce a race. - m := make(map[string]struct{}) - for k := range s._sourceInstance { - m[k] = struct{}{} - } - - return m -} - func buildDelay(multiplier time.Duration) func(int) time.Duration { return func(attempt int) time.Duration { return time.Duration(attempt) * multiplier diff --git a/src/pkg/egress/syslog/syslog_connector.go b/src/pkg/egress/syslog/syslog_connector.go index a13321c60..1cca277e0 100644 --- a/src/pkg/egress/syslog/syslog_connector.go +++ b/src/pkg/egress/syslog/syslog_connector.go @@ -6,11 +6,10 @@ import ( "context" - metrics "code.cloudfoundry.org/go-metric-registry" - "code.cloudfoundry.org/go-diodes" - "code.cloudfoundry.org/go-loggregator/v10" + metrics "code.cloudfoundry.org/go-metric-registry" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/applog" ) type Binding struct { @@ -33,32 +32,20 @@ type Credentials struct { CA string `json:"ca"` } -// LogClient is used to emit logs. -type LogClient interface { - EmitLog(message string, opts ...loggregator.EmitLogOption) -} - -// nullLogClient ensures that the LogClient is in fact optional. -type nullLogClient struct{} - -// EmitLog drops all messages into /dev/null. -func (nullLogClient) EmitLog(message string, opts ...loggregator.EmitLogOption) { -} - type writerFactory interface { - NewWriter(*URLBinding) (egress.WriteCloser, error) + NewWriter(*URLBinding, applog.AppLogStream) (egress.WriteCloser, error) } // SyslogConnector creates the various egress syslog writers. type SyslogConnector struct { skipCertVerify bool - logClient LogClient wg egress.WaitGroup - sourceIndex string writerFactory writerFactory + metricClient metricClient - metricClient metricClient droppedMetric metrics.Counter + + appLogStream applog.AppLogStream } // NewSyslogConnector configures and returns a new SyslogConnector. @@ -78,7 +65,6 @@ func NewSyslogConnector( sc := &SyslogConnector{ skipCertVerify: skipCertVerify, wg: wg, - logClient: nullLogClient{}, writerFactory: f, metricClient: m, @@ -93,12 +79,11 @@ func NewSyslogConnector( // ConnectorOption allows a syslog connector to be customized. type ConnectorOption func(*SyslogConnector) -// WithLogClient returns a ConnectorOption that will set up logging for any +// WithAppLogStream returns a ConnectorOption that will set up logging for any // information about a binding. -func WithLogClient(logClient LogClient, sourceIndex string) ConnectorOption { +func WithAppLogStream(appLogStream applog.AppLogStream) ConnectorOption { return func(sc *SyslogConnector) { - sc.logClient = logClient - sc.sourceIndex = sourceIndex + sc.appLogStream = appLogStream } } @@ -110,7 +95,7 @@ func (w *SyslogConnector) Connect(ctx context.Context, b Binding) (egress.Writer return nil, err } - writer, err := w.writerFactory.NewWriter(urlBinding) + writer, err := w.writerFactory.NewWriter(urlBinding, w.appLogStream) if err != nil { return nil, err } @@ -138,8 +123,8 @@ func (w *SyslogConnector) Connect(ctx context.Context, b Binding) (egress.Writer w.droppedMetric.Add(float64(missed)) drainDroppedMetric.Add(float64(missed)) - w.emitLoggregatorErrorLog(b.AppId, fmt.Sprintf("%d messages lost for application %s in user provided syslog drain with url %s", missed, b.AppId, anonymousUrl.String())) w.emitStandardOutErrorLog(b.AppId, urlBinding.Scheme(), anonymousUrl.String(), missed) + w.appLogStream.Emit(fmt.Sprintf("%d messages lost for application %s in user provided syslog drain with url %s", missed, b.AppId, anonymousUrl.String()), b.AppId) }), w.wg) filteredWriter, err := NewFilteringDrainWriter(b, dw) @@ -151,20 +136,6 @@ func (w *SyslogConnector) Connect(ctx context.Context, b Binding) (egress.Writer return filteredWriter, nil } -func (w *SyslogConnector) emitLoggregatorErrorLog(appID, message string) { - if appID == "" { - return - } - option := loggregator.WithAppInfo(appID, "LGR", "") - w.logClient.EmitLog(message, option) - - option = loggregator.WithAppInfo( - appID, - "SYS", - w.sourceIndex, - ) - w.logClient.EmitLog(message, option) -} func (w *SyslogConnector) emitStandardOutErrorLog(appID, scheme, url string, missed int) { errorAppOrAggregate := fmt.Sprintf("for %s's app drain", appID) if appID == "" { diff --git a/src/pkg/egress/syslog/syslog_connector_test.go b/src/pkg/egress/syslog/syslog_connector_test.go index 37dbef5b6..de7438bee 100644 --- a/src/pkg/egress/syslog/syslog_connector_test.go +++ b/src/pkg/egress/syslog/syslog_connector_test.go @@ -1,6 +1,9 @@ package syslog_test import ( + "code.cloudfoundry.org/loggregator-agent-release/src/internal/testhelper" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/applog" + "errors" "fmt" "sync/atomic" @@ -176,13 +179,14 @@ var _ = Describe("SyslogConnector", func() { }) It("emits a LGR and SYS log to the log client about logs that have been dropped", func() { - logClient := newSpyLogClient() + logClient := testhelper.NewSpyLogClient() + factory := applog.NewAppLogStreamFactory() connector := syslog.NewSyslogConnector( true, spyWaitGroup, writerFactory, sm, - syslog.WithLogClient(logClient, "3"), + syslog.WithAppLogStream(factory.NewAppLogStream(logClient, "3")), ) binding := syslog.Binding{AppId: "app-id", @@ -205,26 +209,27 @@ var _ = Describe("SyslogConnector", func() { } }(writer) - Eventually(logClient.message).Should(ContainElement(MatchRegexp("\\d messages lost for application (.*) in user provided syslog drain with url"))) - Eventually(logClient.appID).Should(ContainElement("app-id")) + Eventually(logClient.Message).Should(ContainElement(MatchRegexp("\\d messages lost for application (.*) in user provided syslog drain with url"))) + Eventually(logClient.AppID).Should(ContainElement("app-id")) - Eventually(logClient.sourceType).Should(HaveLen(2)) - Eventually(logClient.sourceType).Should(HaveKey("LGR")) - Eventually(logClient.sourceType).Should(HaveKey("SYS")) + Eventually(logClient.SourceType).Should(HaveLen(2)) + Eventually(logClient.SourceType).Should(HaveKey("LGR")) + Eventually(logClient.SourceType).Should(HaveKey("SYS")) - Eventually(logClient.sourceInstance).Should(HaveLen(2)) - Eventually(logClient.sourceInstance).Should(HaveKey("")) - Eventually(logClient.sourceInstance).Should(HaveKey("3")) + Eventually(logClient.SourceInstance).Should(HaveLen(2)) + Eventually(logClient.SourceInstance).Should(HaveKey("")) + Eventually(logClient.SourceInstance).Should(HaveKey("3")) }) It("doesn't emit LGR and SYS log to the log client about aggregate drains drops", func() { - logClient := newSpyLogClient() + logClient := testhelper.NewSpyLogClient() + factory := applog.NewAppLogStreamFactory() connector := syslog.NewSyslogConnector( true, spyWaitGroup, writerFactory, sm, - syslog.WithLogClient(logClient, "3"), + syslog.WithAppLogStream(factory.NewAppLogStream(logClient, "3")), ) binding := syslog.Binding{Drain: syslog.Drain{Url: "dropping://"}} @@ -243,7 +248,7 @@ var _ = Describe("SyslogConnector", func() { } }(writer) - Consistently(logClient.message).ShouldNot(ContainElement(MatchRegexp("\\d messages lost for application (.*) in user provided syslog drain with url"))) + Consistently(logClient.Message()).ShouldNot(ContainElement(MatchRegexp("\\d messages lost for application (.*) in user provided syslog drain with url"))) }) It("does not panic on unknown dropped metrics", func() { @@ -280,6 +285,7 @@ type stubWriterFactory struct { func (f *stubWriterFactory) NewWriter( urlBinding *syslog.URLBinding, + appLogStream applog.AppLogStream, ) (egress.WriteCloser, error) { f.called = true return f.writer, f.err diff --git a/src/pkg/egress/syslog/tcp.go b/src/pkg/egress/syslog/tcp.go index 9e4edd46b..0e7c6e2a0 100644 --- a/src/pkg/egress/syslog/tcp.go +++ b/src/pkg/egress/syslog/tcp.go @@ -13,6 +13,7 @@ import ( "code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2" metrics "code.cloudfoundry.org/go-metric-registry" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/applog" ) // DialFunc represents a method for creating a connection, either TCP or TLS. @@ -32,6 +33,8 @@ type TCPWriter struct { syslogConverter *Converter egressMetric metrics.Counter + + appLogStream applog.AppLogStream } // NewTCPWriter creates a new TCP syslog writer. @@ -40,6 +43,7 @@ func NewTCPWriter( netConf NetworkTimeoutConfig, egressMetric metrics.Counter, c *Converter, + appLogStream applog.AppLogStream, ) egress.WriteCloser { dialer := &net.Dialer{ Timeout: netConf.DialTimeout, @@ -58,6 +62,7 @@ func NewTCPWriter( scheme: "syslog", egressMetric: egressMetric, syslogConverter: c, + appLogStream: appLogStream, } return w @@ -104,6 +109,10 @@ func (w *TCPWriter) connection() (net.Conn, error) { func (w *TCPWriter) connect() (net.Conn, error) { conn, err := w.dialFunc(w.url.Host) if err != nil { + appLogMessage := fmt.Sprintf("Failed to connect to %s", w.url.String()) + w.appLogStream.Emit(appLogMessage, w.appID) + platformLogMessage := fmt.Sprintf("%s for app %s", appLogMessage, w.appID) + log.Print(platformLogMessage) return nil, err } w.conn = conn diff --git a/src/pkg/egress/syslog/tcp_test.go b/src/pkg/egress/syslog/tcp_test.go index b40d5c334..c2e49020c 100644 --- a/src/pkg/egress/syslog/tcp_test.go +++ b/src/pkg/egress/syslog/tcp_test.go @@ -2,6 +2,10 @@ package syslog_test import ( "bufio" + + "code.cloudfoundry.org/loggregator-agent-release/src/internal/testhelper" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/applog" + "fmt" "io" "net" @@ -51,12 +55,16 @@ var _ = Describe("TCPWriter", func() { BeforeEach(func() { var err error egressCounter = &metricsHelpers.SpyMetric{} + factory := applog.NewAppLogStreamFactory() + logClient := testhelper.NewSpyLogClient() + appLogStream := factory.NewAppLogStream(logClient, "test_source") writer = syslog.NewTCPWriter( binding, netConf, egressCounter, syslog.NewConverter(), + appLogStream, ) Expect(err).ToNot(HaveOccurred()) }) @@ -183,12 +191,16 @@ var _ = Describe("TCPWriter", func() { It("write returns an error", func() { env := buildLogEnvelope("APP", "2", "just a test", loggregator_v2.Log_OUT) binding.URL, _ = url.Parse("syslog://localhost-garbage:9999") + factory := applog.NewAppLogStreamFactory() + logClient := testhelper.NewSpyLogClient() + appLogStream := factory.NewAppLogStream(logClient, "test_source") writer := syslog.NewTCPWriter( binding, netConf, &metricsHelpers.SpyMetric{}, syslog.NewConverter(), + appLogStream, ) errs := make(chan error, 1) @@ -208,11 +220,15 @@ var _ = Describe("TCPWriter", func() { Context("with a happy dialer", func() { BeforeEach(func() { var err error + factory := applog.NewAppLogStreamFactory() + logClient := testhelper.NewSpyLogClient() + logStream := factory.NewAppLogStream(logClient, "test_source") writer = syslog.NewTCPWriter( binding, netConf, &metricsHelpers.SpyMetric{}, syslog.NewConverter(), + logStream, ) Expect(err).ToNot(HaveOccurred()) diff --git a/src/pkg/egress/syslog/tls.go b/src/pkg/egress/syslog/tls.go index 1610efc4f..9d76d0129 100644 --- a/src/pkg/egress/syslog/tls.go +++ b/src/pkg/egress/syslog/tls.go @@ -7,6 +7,7 @@ import ( metrics "code.cloudfoundry.org/go-metric-registry" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/applog" ) // TLSWriter represents a syslog writer that connects over unencrypted TCP. @@ -27,6 +28,7 @@ func NewTLSWriter( tlsConf *tls.Config, egressMetric metrics.Counter, syslogConverter *Converter, + appLogStream applog.AppLogStream, ) egress.WriteCloser { dialer := &net.Dialer{ @@ -48,6 +50,7 @@ func NewTLSWriter( scheme: "syslog-tls", egressMetric: egressMetric, syslogConverter: syslogConverter, + appLogStream: appLogStream, }, } diff --git a/src/pkg/egress/syslog/tls_test.go b/src/pkg/egress/syslog/tls_test.go index 0f3adf4c4..119c51f62 100644 --- a/src/pkg/egress/syslog/tls_test.go +++ b/src/pkg/egress/syslog/tls_test.go @@ -11,6 +11,7 @@ import ( metricsHelpers "code.cloudfoundry.org/go-metric-registry/testhelpers" "code.cloudfoundry.org/loggregator-agent-release/src/internal/testhelper" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/applog" "code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2" @@ -55,6 +56,9 @@ var _ = Describe("TLSWriter", func() { Hostname: "test-hostname", URL: url, } + factory := applog.NewAppLogStreamFactory() + logClient := testhelper.NewSpyLogClient() + appLogStream := factory.NewAppLogStream(logClient, "3") writer := syslog.NewTLSWriter( binding, netConf, @@ -63,6 +67,7 @@ var _ = Describe("TLSWriter", func() { }, egressCounter, syslog.NewConverter(), + appLogStream, ) defer writer.Close() diff --git a/src/pkg/egress/syslog/writer_factory.go b/src/pkg/egress/syslog/writer_factory.go index a5d871c6f..e9cd00245 100644 --- a/src/pkg/egress/syslog/writer_factory.go +++ b/src/pkg/egress/syslog/writer_factory.go @@ -7,6 +7,7 @@ import ( metrics "code.cloudfoundry.org/go-metric-registry" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/applog" ) type metricClient interface { @@ -52,7 +53,7 @@ func NewWriterFactory(internalTlsConfig *tls.Config, externalTlsConfig *tls.Conf } } -func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) { +func (f WriterFactory) NewWriter(ub *URLBinding, appLogStream applog.AppLogStream) (egress.WriteCloser, error) { tlsCfg := f.externalTlsConfig.Clone() if ub.InternalTls { tlsCfg = f.internalTlsConfig.Clone() @@ -60,7 +61,8 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) { if len(ub.Certificate) > 0 && len(ub.PrivateKey) > 0 { cert, err := tls.X509KeyPair(ub.Certificate, ub.PrivateKey) if err != nil { - err = NewWriterFactoryErrorf(ub.URL, "failed to load certificate: %s", err.Error()) + errorMessage := err.Error() + err = NewWriterFactoryErrorf(ub.URL, "failed to load certificate: %s", errorMessage) return nil, err } tlsCfg.Certificates = []tls.Certificate{cert} @@ -120,6 +122,7 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) { f.netConf, egressMetric, converter, + appLogStream, ) case "syslog-tls": w = NewTLSWriter( @@ -128,6 +131,7 @@ func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error) { tlsCfg, egressMetric, converter, + appLogStream, ) } diff --git a/src/pkg/egress/syslog/writer_factory_test.go b/src/pkg/egress/syslog/writer_factory_test.go index 47e761efa..5c9294261 100644 --- a/src/pkg/egress/syslog/writer_factory_test.go +++ b/src/pkg/egress/syslog/writer_factory_test.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "net/url" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/applog" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -13,8 +14,9 @@ import ( var _ = Describe("EgressFactory", func() { var ( - f syslog.WriterFactory - sm *metricsHelpers.SpyMetricsRegistry + f syslog.WriterFactory + sm *metricsHelpers.SpyMetricsRegistry + appLogStream applog.AppLogStream ) BeforeEach(func() { @@ -30,7 +32,7 @@ var _ = Describe("EgressFactory", func() { URL: url, } - writer, err := f.NewWriter(urlBinding) + writer, err := f.NewWriter(urlBinding, appLogStream) Expect(err).ToNot(HaveOccurred()) retryWriter, ok := writer.(*syslog.RetryWriter) @@ -49,7 +51,7 @@ var _ = Describe("EgressFactory", func() { URL: url, } - writer, err := f.NewWriter(urlBinding) + writer, err := f.NewWriter(urlBinding, appLogStream) Expect(err).ToNot(HaveOccurred()) _, ok := writer.(*syslog.HTTPSBatchWriter) @@ -65,7 +67,7 @@ var _ = Describe("EgressFactory", func() { URL: url, } - writer, err := f.NewWriter(urlBinding) + writer, err := f.NewWriter(urlBinding, appLogStream) Expect(err).ToNot(HaveOccurred()) retryWriter, ok := writer.(*syslog.RetryWriter) @@ -84,7 +86,7 @@ var _ = Describe("EgressFactory", func() { URL: url, } - writer, err := f.NewWriter(urlBinding) + writer, err := f.NewWriter(urlBinding, appLogStream) Expect(err).ToNot(HaveOccurred()) retryWriter, ok := writer.(*syslog.RetryWriter) @@ -103,7 +105,7 @@ var _ = Describe("EgressFactory", func() { Certificate: []byte("invalid-certificate"), } - _, err = f.NewWriter(urlBinding) + _, err = f.NewWriter(urlBinding, appLogStream) Expect(err).ToNot(HaveOccurred()) }) }) @@ -117,7 +119,7 @@ var _ = Describe("EgressFactory", func() { PrivateKey: []byte("invalid-private-key"), } - _, err = f.NewWriter(urlBinding) + _, err = f.NewWriter(urlBinding, appLogStream) Expect(err).ToNot(HaveOccurred()) }) }) @@ -140,7 +142,7 @@ var _ = Describe("EgressFactory", func() { urlBinding.CA = []byte("invalid-ca") } - _, err = f.NewWriter(urlBinding) + _, err = f.NewWriter(urlBinding, appLogStream) Expect(err).To(MatchError(expectedErr)) }, @@ -166,7 +168,7 @@ var _ = Describe("EgressFactory", func() { AppID: appID, } - _, err = f.NewWriter(urlBinding) + _, err = f.NewWriter(urlBinding, appLogStream) Expect(err).ToNot(HaveOccurred()) metric := sm.GetMetric("egress", tags) diff --git a/src/pkg/ingress/applog/app_log_stream.go b/src/pkg/ingress/applog/app_log_stream.go new file mode 100644 index 000000000..97a1866f3 --- /dev/null +++ b/src/pkg/ingress/applog/app_log_stream.go @@ -0,0 +1,54 @@ +package applog + +import ( + "code.cloudfoundry.org/go-loggregator/v10" +) + +// LogClient is used to emit logs - i.e. Ingress Client. +type LogClient interface { + EmitLog(message string, opts ...loggregator.EmitLogOption) +} + +// AppLogStream abstracts the sending of a log to the application log stream. +type AppLogStream struct { + logClient LogClient + sourceIndex string +} + +// Emit writes a message in the application log stream using a LogClient. +func (appLogStream *AppLogStream) Emit(message string, appID string) { + if appLogStream.logClient == nil || appID == "" { + return + } + + logclientOption := loggregator.WithAppInfo(appID, "LGR", "") + appLogStream.logClient.EmitLog(message, logclientOption) + + logclientOption = loggregator.WithAppInfo( + appID, + "SYS", + appLogStream.sourceIndex, + ) + appLogStream.logClient.EmitLog(message, logclientOption) +} + +// AppLogStreamFactory is used to create new instances of AppLogStream +type AppLogStreamFactory interface { + NewAppLogStream(logClient LogClient, sourceIndex string) AppLogStream +} + +// DefaultLogStreamFactory implementation of AppLogStreamFactory. +type DefaultLogStreamFactory struct { +} + +// NewAppLogStream creates a new AppLogStream. +func (factory *DefaultLogStreamFactory) NewAppLogStream(logClient LogClient, sourceIndex string) AppLogStream { + return AppLogStream{ + logClient: logClient, + sourceIndex: sourceIndex, + } +} + +func NewAppLogStreamFactory() DefaultLogStreamFactory { + return DefaultLogStreamFactory{} +} diff --git a/src/pkg/ingress/applog/app_log_stream_test.go b/src/pkg/ingress/applog/app_log_stream_test.go new file mode 100644 index 000000000..414474403 --- /dev/null +++ b/src/pkg/ingress/applog/app_log_stream_test.go @@ -0,0 +1,66 @@ +package applog_test + +import ( + "code.cloudfoundry.org/loggregator-agent-release/src/internal/testhelper" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/applog" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Loggregator Egress", func() { + Describe("Log Stream", func() { + It("should emit a log message for an application with LGR and SYS source type with the provided app-ID", func() { + logClient := testhelper.NewSpyLogClient() + factory := applog.NewAppLogStreamFactory() + logStream := factory.NewAppLogStream(logClient, "0") + + logStream.Emit("some-message", "app-id") + + messages := logClient.Message() + appIDs := logClient.AppID() + sourceTypes := logClient.SourceType() + Expect(messages).To(HaveLen(2)) + Expect(messages[0]).To(Equal("some-message")) + Expect(messages[1]).To(Equal("some-message")) + Expect(appIDs[0]).To(Equal("app-id")) + Expect(appIDs[1]).To(Equal("app-id")) + Expect(sourceTypes).To(HaveKey("LGR")) + Expect(sourceTypes).To(HaveKey("SYS")) + }) + + It("should not emit a log message when the app-ID is empty", func() { + logClient := testhelper.NewSpyLogClient() + factory := applog.NewAppLogStreamFactory() + logStream := factory.NewAppLogStream(logClient, "0") + + logStream.Emit("some-message", "") + + Expect(logClient.Message()).To(BeEmpty()) + }) + }) + + Describe("DefaultLogStreamFactory", func() { + It("should produce a AppLogStream which emits to the provided LogClient", func() { + factory := applog.NewAppLogStreamFactory() + logClient := testhelper.NewSpyLogClient() + sourceIndex := "test-index" + + logStream := factory.NewAppLogStream(logClient, sourceIndex) + logStream.Emit("some-message", "app-id") + + messages := logClient.Message() + appIDs := logClient.AppID() + sourceTypes := logClient.SourceType() + sourceInstance := logClient.SourceInstance() + Expect(messages).To(HaveLen(2)) + Expect(messages[0]).To(Equal("some-message")) + Expect(messages[1]).To(Equal("some-message")) + Expect(appIDs[0]).To(Equal("app-id")) + Expect(appIDs[1]).To(Equal("app-id")) + Expect(sourceTypes).To(HaveKey("LGR")) + Expect(sourceTypes).To(HaveKey("SYS")) + Expect(sourceInstance).To(HaveKey("")) + Expect(sourceInstance).To(HaveKey("test-index")) + }) + }) +}) diff --git a/src/pkg/ingress/applog/app_log_suite_test.go b/src/pkg/ingress/applog/app_log_suite_test.go new file mode 100644 index 000000000..62af8e2f3 --- /dev/null +++ b/src/pkg/ingress/applog/app_log_suite_test.go @@ -0,0 +1,13 @@ +package applog + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestBinding(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "App Log Suite") +} diff --git a/src/pkg/ingress/bindings/filtered_binding_fetcher.go b/src/pkg/ingress/bindings/filtered_binding_fetcher.go index c496ba901..db0dbc7c7 100644 --- a/src/pkg/ingress/bindings/filtered_binding_fetcher.go +++ b/src/pkg/ingress/bindings/filtered_binding_fetcher.go @@ -2,7 +2,6 @@ package bindings import ( "log" - "net" "net/url" "time" @@ -14,19 +13,13 @@ import ( var allowedSchemes = []string{"syslog", "syslog-tls", "https", "https-batch"} -//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . IPChecker -type IPChecker interface { - ResolveAddr(host string) (net.IP, error) - CheckBlacklist(ip net.IP) error -} - // Metrics is the client used to expose gauge and counter metricsClient. type metricsClient interface { NewGauge(name, helpText string, opts ...metrics.MetricOption) metrics.Gauge } type FilteredBindingFetcher struct { - ipChecker IPChecker + ipChecker binding.IPChecker br binding.Fetcher warn bool logger *log.Logger @@ -35,7 +28,7 @@ type FilteredBindingFetcher struct { failedHostsCache *simplecache.SimpleCache[string, bool] } -func NewFilteredBindingFetcher(c IPChecker, b binding.Fetcher, m metricsClient, warn bool, lc *log.Logger) *FilteredBindingFetcher { +func NewFilteredBindingFetcher(c binding.IPChecker, b binding.Fetcher, m metricsClient, warn bool, lc *log.Logger) *FilteredBindingFetcher { opt := metrics.WithMetricLabels(map[string]string{"unit": "total"}) invalidDrains := m.NewGauge( diff --git a/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go b/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go index a17d25fdb..fbc5e2cc5 100644 --- a/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go +++ b/src/pkg/ingress/bindings/filtered_binding_fetcher_test.go @@ -7,10 +7,9 @@ import ( "net" metricsHelpers "code.cloudfoundry.org/go-metric-registry/testhelpers" + "code.cloudfoundry.org/loggregator-agent-release/src/pkg/binding/bindingfakes" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress/syslog" "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/bindings" - "code.cloudfoundry.org/loggregator-agent-release/src/pkg/ingress/bindings/bindingsfakes" - . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -196,13 +195,13 @@ var _ = Describe("FilteredBindingFetcher", func() { Context("when the drain host fails to resolve", func() { var logBuffer bytes.Buffer var warn bool - var mockic *bindingsfakes.FakeIPChecker + var mockic *bindingfakes.FakeIPChecker BeforeEach(func() { logBuffer = bytes.Buffer{} log.SetOutput(&logBuffer) warn = true - mockic = &bindingsfakes.FakeIPChecker{} + mockic = &bindingfakes.FakeIPChecker{} mockic.ResolveAddrReturns(net.IP{}, errors.New("oof ouch ip not resolved")) })