diff --git a/activator/activator.go b/activator/activator.go index 61f3d93..e30409e 100644 --- a/activator/activator.go +++ b/activator/activator.go @@ -12,6 +12,7 @@ import ( "io" "net" "os" + "os/exec" "path/filepath" "strconv" "strings" @@ -74,7 +75,15 @@ func parsePidFromNetNS(nn ns.NetNS) int { return pid } -var ErrMapNotFound = errors.New("bpf map could not be found") +const ( + IfaceETH0 = "eth0" + IfaceLoopback = "lo" +) + +var ( + ErrMapNotFound = errors.New("bpf map could not be found") + DefaultIfaces = []string{IfaceLoopback, IfaceETH0} +) func (s *Server) Start(ctx context.Context, connHook ConnHook, restoreHook RestoreHook, ports ...uint16) error { s.connHook = connHook @@ -107,6 +116,17 @@ func (s *Server) Start(ctx context.Context, connHook ConnHook, restoreHook Resto return nil } +const AttachActivatorFlag = "-zeropod-attach-activator" + +// AttachExec attaches the activator using exec on itself. +func (s *Server) AttachExec() error { + out, err := exec.Command(os.Args[0], AttachActivatorFlag, strconv.Itoa(s.sandboxPid)).CombinedOutput() + if err != nil { + return fmt.Errorf("executing external attach: %s: %s", err, out) + } + return nil +} + func (s *Server) Started() bool { return s.started } @@ -465,6 +485,10 @@ func (s *Server) initActivityTracker() error { return nil } +func netNSPath(pid int) string { + return fmt.Sprintf("/proc/%d/ns/net", pid) +} + // convertBPFTime takes the value of bpf_ktime_get_ns and converts it to a // time.Time. func convertBPFTime(t uint64) (time.Time, error) { diff --git a/activator/bpf.go b/activator/bpf.go index 41b0ac5..0b33f58 100644 --- a/activator/bpf.go +++ b/activator/bpf.go @@ -11,8 +11,10 @@ import ( "strconv" "github.com/cilium/ebpf" + "github.com/cilium/ebpf/btf" "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/rlimit" + "github.com/containernetworking/plugins/pkg/ns" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" ) @@ -28,6 +30,7 @@ const ( PodKubeletAddrsMapv4 = "kubelet_addrs_v4" PodKubeletAddrsMapv6 = "kubelet_addrs_v6" trackerIgnoreLocalhostVariable = "tracker_ignore_localhost" + taskCommOffsetVariable = "task_comm_offset" tcxIngressPinName = "tcx_ingress" tcxEgressPinName = "tcx_egress" ) @@ -89,8 +92,8 @@ func InitBPF(pid int, log *slog.Logger, opts ...BPFOpts) (*BPF, error) { return nil, err } - // as a single shim process can host multiple containers, we store the map - // in a directory per shim process. + // as a single shim process can host multiple pods, we store the map in a + // directory per sandbox pid. path := PinPath(pid) if err := os.MkdirAll(path, os.ModePerm); err != nil { return nil, fmt.Errorf("failed to create bpf fs subpath: %w", err) @@ -109,12 +112,24 @@ func InitBPF(pid int, log *slog.Logger, opts ...BPFOpts) (*BPF, error) { } binName := [probeBinaryNameMaxLength]byte{} copy(binName[:], cfg.probeBinaryName[:]) - if err := spec.Variables[probeBinaryNameVariable].Set(binName); err != nil { + if err := setVar(spec, probeBinaryNameVariable, binName); err != nil { return nil, fmt.Errorf("setting probe binary variable: %w", err) } - if err := spec.Variables[trackerIgnoreLocalhostVariable].Set(cfg.trackerIgnoreLocalhost); err != nil { + if err := setVar(spec, trackerIgnoreLocalhostVariable, cfg.trackerIgnoreLocalhost); err != nil { return nil, fmt.Errorf("setting trackerIgnoreLocalhost variable: %w", err) } + // in case you wonder why we do this instead of just accessing comm directly + // in the task struct in eBPF: this would make the btf loading way more + // expensive, at least the way cilium/ebpf behaves at the moment. It is + // about 3x slower and uses 3x the amount of memory. This is rather ugly but + // I think the performance gains are worth it. + commOffset, err := getTaskCommOffset() + if err != nil { + return nil, err + } + if err := setVar(spec, taskCommOffsetVariable, commOffset); err != nil { + return nil, fmt.Errorf("setting probe binary variable: %w", err) + } for mapName, size := range cfg.mapSizes { spec.Maps[mapName].MaxEntries = size @@ -131,6 +146,46 @@ func InitBPF(pid int, log *slog.Logger, opts ...BPFOpts) (*BPF, error) { return &BPF{pid: pid, log: log, objs: &objs, noPin: cfg.disablePinning}, nil } +// TCXPinned returns true if all TCX programs for the pid are pinned. +func TCXPinned(pid int, ifaces ...string) bool { + for _, iface := range ifaces { + for _, attach := range []ebpf.AttachType{ebpf.AttachTCXIngress, ebpf.AttachTCXEgress} { + _, err := os.Stat(tcxLinkPath(pid, iface, attach)) + if err != nil { + return false + } + } + } + return true +} + +func setVar(spec *ebpf.CollectionSpec, name string, value any) error { + if _, ok := spec.Variables[name]; !ok { + return fmt.Errorf("could not find var %s in spec", name) + } + if err := spec.Variables[name].Set(value); err != nil { + return fmt.Errorf("setting spec variable: %w", err) + } + return nil +} + +func getTaskCommOffset() (uint32, error) { + kernelSpec, err := btf.LoadKernelSpec() + if err != nil { + return 0, fmt.Errorf("failed to load kernel BTF: %w", err) + } + var taskStruct *btf.Struct + if err := kernelSpec.TypeByName("task_struct", &taskStruct); err != nil { + return 0, fmt.Errorf("task_struct not found in kernel BTF: %w", err) + } + for _, member := range taskStruct.Members { + if member.Name == "comm" { + return member.Offset.Bytes(), nil + } + } + return 0, fmt.Errorf("comm field not found in task_struct") +} + func (bpf *BPF) Cleanup() error { errs := []error{} for _, link := range bpf.links { @@ -157,6 +212,22 @@ func (bpf *BPF) Cleanup() error { return errors.Join(errs...) } +func (bpf *BPF) AttachInNetNS(pid int, ifaces ...string) error { + netNS, err := ns.GetNS(netNSPath(pid)) + if err != nil { + return err + } + if err := netNS.Do(func(nn ns.NetNS) error { + if err := bpf.AttachRedirector(ifaces...); err != nil { + return err + } + return err + }); err != nil { + return errors.Join(err, bpf.Cleanup()) + } + return nil +} + func (bpf *BPF) AttachRedirector(ifaces ...string) error { for _, ifaceName := range ifaces { iface, err := net.InterfaceByName(ifaceName) @@ -193,11 +264,7 @@ func (bpf *BPF) attachTCX(iface *net.Interface) error { } func (bpf *BPF) loadOrAttachTCXLink(iface *net.Interface, program *ebpf.Program, attach ebpf.AttachType) (link.Link, error) { - name := tcxIngressPinName - if attach == ebpf.AttachTCXEgress { - name = tcxEgressPinName - } - pinPath := filepath.Join(PinPath(bpf.pid), fmt.Sprintf("%s_%s", name, iface.Name)) + pinPath := tcxLinkPath(bpf.pid, iface.Name, attach) l, err := link.LoadPinnedLink(pinPath, nil) if err == nil { return l, nil @@ -208,7 +275,7 @@ func (bpf *BPF) loadOrAttachTCXLink(iface *net.Interface, program *ebpf.Program, Attach: attach, }) if err != nil { - return nil, fmt.Errorf("could not attach TCX %s: %w", name, err) + return nil, fmt.Errorf("could not attach TCX %s: %w", pinPath, err) } if bpf.noPin { return l, nil @@ -216,6 +283,14 @@ func (bpf *BPF) loadOrAttachTCXLink(iface *net.Interface, program *ebpf.Program, return l, l.Pin(pinPath) } +func tcxLinkPath(pid int, ifaceName string, attach ebpf.AttachType) string { + name := tcxIngressPinName + if attach == ebpf.AttachTCXEgress { + name = tcxEgressPinName + } + return filepath.Join(PinPath(pid), fmt.Sprintf("%s_%s", name, ifaceName)) +} + func (bpf *BPF) attachQdisc(iface *net.Interface) error { qdisc := &netlink.GenericQdisc{ QdiscAttrs: netlink.QdiscAttrs{ diff --git a/activator/bpf_bpfeb.go b/activator/bpf_bpfeb.go index 3f1521d..c83d75a 100644 --- a/activator/bpf_bpfeb.go +++ b/activator/bpf_bpfeb.go @@ -82,6 +82,7 @@ type bpfMapSpecs struct { // It can be passed ebpf.CollectionSpec.Assign. type bpfVariableSpecs struct { ProbeBinaryName *ebpf.VariableSpec `ebpf:"probe_binary_name"` + TaskCommOffset *ebpf.VariableSpec `ebpf:"task_comm_offset"` TrackerIgnoreLocalhost *ebpf.VariableSpec `ebpf:"tracker_ignore_localhost"` } @@ -131,6 +132,7 @@ func (m *bpfMaps) Close() error { // It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. type bpfVariables struct { ProbeBinaryName *ebpf.Variable `ebpf:"probe_binary_name"` + TaskCommOffset *ebpf.Variable `ebpf:"task_comm_offset"` TrackerIgnoreLocalhost *ebpf.Variable `ebpf:"tracker_ignore_localhost"` } diff --git a/activator/bpf_bpfeb.o b/activator/bpf_bpfeb.o index 80497e5..2c7f38a 100644 Binary files a/activator/bpf_bpfeb.o and b/activator/bpf_bpfeb.o differ diff --git a/activator/bpf_bpfel.go b/activator/bpf_bpfel.go index c5b3faf..e22f3da 100644 --- a/activator/bpf_bpfel.go +++ b/activator/bpf_bpfel.go @@ -82,6 +82,7 @@ type bpfMapSpecs struct { // It can be passed ebpf.CollectionSpec.Assign. type bpfVariableSpecs struct { ProbeBinaryName *ebpf.VariableSpec `ebpf:"probe_binary_name"` + TaskCommOffset *ebpf.VariableSpec `ebpf:"task_comm_offset"` TrackerIgnoreLocalhost *ebpf.VariableSpec `ebpf:"tracker_ignore_localhost"` } @@ -131,6 +132,7 @@ func (m *bpfMaps) Close() error { // It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. type bpfVariables struct { ProbeBinaryName *ebpf.Variable `ebpf:"probe_binary_name"` + TaskCommOffset *ebpf.Variable `ebpf:"task_comm_offset"` TrackerIgnoreLocalhost *ebpf.Variable `ebpf:"tracker_ignore_localhost"` } diff --git a/activator/bpf_bpfel.o b/activator/bpf_bpfel.o index 7061de3..bf39eb3 100644 Binary files a/activator/bpf_bpfel.o and b/activator/bpf_bpfel.o differ diff --git a/activator/redirector.c b/activator/redirector.c index d75ba6a..b6624ea 100644 --- a/activator/redirector.c +++ b/activator/redirector.c @@ -74,6 +74,7 @@ struct { const volatile char probe_binary_name[TASK_COMM_LEN] = ""; const volatile bool tracker_ignore_localhost = false; +const volatile __u32 task_comm_offset = 0; static __always_inline int track_activity(__be16 dport) { __u64 time = bpf_ktime_get_ns(); @@ -178,8 +179,8 @@ static __always_inline __be32 lookup_kubelet_ip_v4(struct iphdr *ip) { } // bpf_get_current_comm is not available in a tc program on arm64, so we use // bpf_get_current_task to get the comm. - struct task_struct *task = (void *)bpf_get_current_task(); - BPF_CORE_READ_STR_INTO(&comm, task, comm); + char *task = (char *)bpf_get_current_task(); + bpf_probe_read_kernel(&comm, sizeof(comm), task + task_comm_offset); if (bpf_strncmp(comm, TASK_COMM_LEN, (char *)probe_binary_name) == 0) { // bpf_printk("found kubelet addr v4: %pI4", &ip->saddr); bpf_map_update_elem(kubelet_addrs, &key, &ip->saddr, BPF_ANY); @@ -212,8 +213,8 @@ static __always_inline struct in6_addr* lookup_kubelet_ip_v6(struct ipv6hdr *ip) } // bpf_get_current_comm is not available in a tc program on arm64, so we use // bpf_get_current_task to get the comm. - struct task_struct *task = (void *)bpf_get_current_task(); - BPF_CORE_READ_STR_INTO(&comm, task, comm); + char *task = (char *)bpf_get_current_task(); + bpf_probe_read_kernel(&comm, sizeof(comm), task + task_comm_offset); if (bpf_strncmp(comm, TASK_COMM_LEN, (char *)probe_binary_name) == 0) { // bpf_printk("found kubelet addr v6: %pI6", &ip->saddr); bpf_map_update_elem(kubelet_addrs, &key, &ip->saddr, BPF_ANY); diff --git a/shim/config.go b/api/shim/v1/config.go similarity index 81% rename from shim/config.go rename to api/shim/v1/config.go index 91e8776..8dd2822 100644 --- a/shim/config.go +++ b/api/shim/v1/config.go @@ -1,8 +1,11 @@ -package shim +package v1 import ( "context" + "encoding/json" "fmt" + "os" + "path/filepath" "runtime" "slices" "strconv" @@ -15,6 +18,8 @@ import ( ) const ( + ConfigDir = "etc" + ConfigFileName = "shim.json" NodeLabel = "zeropod.ctrox.dev/node" PortsAnnotationKey = "zeropod.ctrox.dev/ports-map" ContainerNamesAnnotationKey = "zeropod.ctrox.dev/container-names" @@ -42,9 +47,31 @@ const ( mappingDelim = ";" mapDelim = "=" defaultContainerdNS = "k8s.io" + // DefaultProbeBufferSize should be able to fit kube-probe HTTP requests with + // reasonable path and header sizes but should still be small enough to not + // impact performance. + DefaultProbeBufferSize = 1024 + DefaultProbeBinaryName = "kubelet" + DefaultTrackerIgnoreLocalhost = true ) -type Config struct { +var ContainerdAnnotations = []string{ + PortsAnnotationKey, + ContainerNamesAnnotationKey, + ScaleDownDurationAnnotationKey, + DisableCheckpoiningAnnotationKey, + PreDumpAnnotationKey, + MigrateAnnotationKey, + LiveMigrateAnnotationKey, + DisableProbeDetectAnnotationKey, + ProbeBufferSizeAnnotationKey, + ProxyTimeoutAnnotationKey, + ConnectTimeoutAnnotationKey, + DisableMigrateDataAnnotationKey, + "io.containerd.runc.v2.group", +} + +type AnnotationConfig struct { ZeropodContainerNames []string Ports []uint16 ScaleDownDuration time.Duration @@ -63,7 +90,13 @@ type Config struct { ProxyTimeout time.Duration ConnectTimeout time.Duration DisableMigrateData bool - spec *specs.Spec + Spec *specs.Spec +} + +type Config struct { + ProbeBinaryName string `json:"probeBinaryName"` + TrackerIgnoreLocalhost bool `json:"trackerIgnoreLocalhost"` + AnnotationConfig `json:"-"` } // NewConfig uses the annotations from the container spec to create a new @@ -156,7 +189,7 @@ func NewConfig(ctx context.Context, spec *specs.Spec) (*Config, error) { } } - probeBufferSize := defaultProbeBufferSize + probeBufferSize := DefaultProbeBufferSize probeBufferSizeValue := spec.Annotations[ProbeBufferSizeAnnotationKey] if probeBufferSizeValue != "" { probeBufferSize, err = strconv.Atoi(probeBufferSizeValue) @@ -191,8 +224,22 @@ func NewConfig(ctx context.Context, spec *specs.Spec) (*Config, error) { return nil, err } } + cfg := &Config{ + ProbeBinaryName: DefaultProbeBinaryName, + TrackerIgnoreLocalhost: DefaultTrackerIgnoreLocalhost, + } + e, err := os.Executable() + if err != nil { + return nil, fmt.Errorf("getting executable dir: %w", err) + } + b, err := os.ReadFile(filepath.Join(filepath.Dir(e), "..", ConfigDir, ConfigFileName)) + if err == nil { + if err := json.Unmarshal(b, cfg); err != nil { + return nil, err + } + } - return &Config{ + cfg.AnnotationConfig = AnnotationConfig{ Ports: containerPorts, ScaleDownDuration: dur, DisableCheckpointing: disableCheckpointing, @@ -211,8 +258,9 @@ func NewConfig(ctx context.Context, spec *specs.Spec) (*Config, error) { ProxyTimeout: proxyTimeout, ConnectTimeout: connectTimeout, DisableMigrateData: disableMigrateData, - spec: spec, - }, nil + Spec: spec, + } + return cfg, nil } func (cfg Config) IsZeropodContainer() bool { diff --git a/shim/config_test.go b/api/shim/v1/config_test.go similarity index 99% rename from shim/config_test.go rename to api/shim/v1/config_test.go index e7a5ffe..572232b 100644 --- a/shim/config_test.go +++ b/api/shim/v1/config_test.go @@ -1,4 +1,4 @@ -package shim +package v1 import ( "context" diff --git a/cmd/installer/main.go b/cmd/installer/main.go index cabf27a..c092eee 100644 --- a/cmd/installer/main.go +++ b/cmd/installer/main.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/x509" + "encoding/json" "encoding/pem" "errors" "flag" @@ -21,6 +22,7 @@ import ( containerd "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/cmd/containerd/server/config" "github.com/coreos/go-systemd/v22/dbus" + v1 "github.com/ctrox/zeropod/api/shim/v1" "github.com/ctrox/zeropod/manager/node" "github.com/pelletier/go-toml/v2" corev1 "k8s.io/api/core/v1" @@ -31,12 +33,14 @@ import ( ) var ( - criuImage = flag.String("criu-image", "ghcr.io/ctrox/zeropod-criu:v4.2", "criu image to use.") - runtime = flag.String("runtime", "containerd", "specifies which runtime to configure. containerd/k3s/rke2") - hostOptPath = flag.String("host-opt-path", defaultOptPath, "path where zeropod binaries are stored on the host") - uninstall = flag.Bool("uninstall", false, "uninstalls zeropod by cleaning up all the files the installer created") - installTimeout = flag.Duration("timeout", time.Minute, "duration the installer waits for the installation to complete") - versionFlag = flag.Bool("version", false, "output version and exit") + criuImage = flag.String("criu-image", "ghcr.io/ctrox/zeropod-criu:v4.2", "criu image to use.") + runtime = flag.String("runtime", "containerd", "specifies which runtime to configure. containerd/k3s/rke2") + hostOptPath = flag.String("host-opt-path", defaultOptPath, "path where zeropod binaries are stored on the host") + uninstall = flag.Bool("uninstall", false, "uninstalls zeropod by cleaning up all the files the installer created") + installTimeout = flag.Duration("timeout", time.Minute, "duration the installer waits for the installation to complete") + versionFlag = flag.Bool("version", false, "output version and exit") + probeBinaryName = flag.String("probe-binary-name", v1.DefaultProbeBinaryName, "set the probe binary name for probe detection") + trackerIgnoreLocalhost = flag.Bool("tracker-ignore-localhost", v1.DefaultTrackerIgnoreLocalhost, "set to ignore traffic from localhost in socket tracker") version = "" revision = "" @@ -60,15 +64,12 @@ const ( configBackupSuffix = ".original" templateSuffix = ".tmpl" caSecretName = "ca-cert" - defaultCriuBin = "criu" - criuIPTablesBin = "criu-iptables" criuConfig = `tcp-close skip-in-flight network-lock skip ` defaultOptPath = "/opt/zeropod" containerdOptKey = "io.containerd.internal.v1.opt" - criPluginKey = "io.containerd.grpc.v1.cri" zeropodRuntimeKey = "containerd.runtimes.zeropod" optPlugin = ` [plugins."io.containerd.internal.v1.opt"] @@ -80,21 +81,7 @@ network-lock skip [plugins."io.containerd.cri.v1.runtime".containerd.runtimes.zeropod] runtime_type = "io.containerd.runc.v2" runtime_path = "%s/bin/containerd-shim-zeropod-v2" - pod_annotations = [ - "zeropod.ctrox.dev/ports-map", - "zeropod.ctrox.dev/container-names", - "zeropod.ctrox.dev/scaledown-duration", - "zeropod.ctrox.dev/disable-checkpointing", - "zeropod.ctrox.dev/pre-dump", - "zeropod.ctrox.dev/migrate", - "zeropod.ctrox.dev/live-migrate", - "zeropod.ctrox.dev/disable-probe-detection", - "zeropod.ctrox.dev/probe-buffer-size", - "zeropod.ctrox.dev/disable-migrate-data", - "zeropod.ctrox.dev/connect-timeout", - "zeropod.ctrox.dev/proxy-timeout", - "io.containerd.runc.v2.group" - ] + pod_annotations = %s [plugins."io.containerd.cri.v1.runtime".containerd.runtimes.zeropod.options] # use systemd cgroup by default @@ -105,21 +92,7 @@ network-lock skip [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.zeropod] runtime_type = "io.containerd.runc.v2" runtime_path = "%s/bin/containerd-shim-zeropod-v2" - pod_annotations = [ - "zeropod.ctrox.dev/ports-map", - "zeropod.ctrox.dev/container-names", - "zeropod.ctrox.dev/scaledown-duration", - "zeropod.ctrox.dev/disable-checkpointing", - "zeropod.ctrox.dev/pre-dump", - "zeropod.ctrox.dev/migrate", - "zeropod.ctrox.dev/live-migrate", - "zeropod.ctrox.dev/disable-probe-detection", - "zeropod.ctrox.dev/probe-buffer-size", - "zeropod.ctrox.dev/disable-migrate-data", - "zeropod.ctrox.dev/connect-timeout", - "zeropod.ctrox.dev/proxy-timeout", - "io.containerd.runc.v2.group" - ] + pod_annotations = %s [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.zeropod.options] # use systemd cgroup by default @@ -239,10 +212,11 @@ func installRuntime(ctx context.Context, runtime containerRuntime) error { return fmt.Errorf("unable to connect to dbus: %w", err) } + opt := optPath(ctx, runtime) // note that if the shim binary already exists, we simply switch it out with // the new one but existing zeropods will have to be deleted to use the // updated shim. - shimDest := filepath.Join(optPath(ctx, runtime), binPath, shimBinaryName) + shimDest := filepath.Join(opt, binPath, shimBinaryName) if err := os.Remove(shimDest); err != nil { log.Printf("unable to remove shim binary, continuing with install: %s", err) } @@ -256,6 +230,20 @@ func installRuntime(ctx context.Context, runtime containerRuntime) error { return fmt.Errorf("unable to write shim file: %w", err) } + b, err := json.MarshalIndent(&v1.Config{ + ProbeBinaryName: *probeBinaryName, + TrackerIgnoreLocalhost: *trackerIgnoreLocalhost, + }, "", " ") + if err != nil { + return fmt.Errorf("marshaling config: %w", err) + } + if err := os.MkdirAll(filepath.Join(opt, v1.ConfigDir), os.ModePerm); err != nil { + return err + } + if err := os.WriteFile(filepath.Join(opt, v1.ConfigDir, v1.ConfigFileName), b, 0600); err != nil { + return fmt.Errorf("unable to write shim file: %w", err) + } + if runtime == runtimeK3S { // for some reason, k3s containerd only has access to the busybox tar by // default. This breaks criu checkpoint since it needs the full gnu tar. @@ -428,7 +416,11 @@ func configureContainerdv1(ctx context.Context, runtime containerRuntime, contai optPath = containerdOptPath } - if _, err := fmt.Fprintf(cfg, runtimeConfig, strings.TrimSuffix(optPath, "/")); err != nil { + if _, err := fmt.Fprintf( + cfg, runtimeConfig, + strings.TrimSuffix(optPath, "/"), + annotationsToml(), + ); err != nil { return false, err } @@ -526,7 +518,12 @@ func writeZeropodRuntimeConfig(containerdConfig, optPath string, existingOpt boo if version == 3 { zeropodRuntimeConfig = runtimeConfigV3 } - zeropodRuntimeConfig = fmt.Sprintf(zeropodRuntimeConfig, strings.TrimSuffix(optPath, "/")) + + zeropodRuntimeConfig = fmt.Sprintf( + zeropodRuntimeConfig, + strings.TrimSuffix(optPath, "/"), + annotationsToml(), + ) if !existingOpt { zeropodRuntimeConfig = zeropodRuntimeConfig + fmt.Sprintf(optPlugin, optPath) } @@ -536,6 +533,17 @@ func writeZeropodRuntimeConfig(containerdConfig, optPath string, existingOpt boo return nil } +func annotationsToml() string { + var buf bytes.Buffer + enc := toml.NewEncoder(&buf) + enc.SetArraysMultiline(true) + enc.SetIndentSymbol(" ") + if err := enc.Encode(v1.ContainerdAnnotations); err != nil { + return "[]" + } + return buf.String() +} + func restoreContainerdConfig(runtime containerRuntime, containerdConfigPath string) error { if _, err := os.Stat(containerdConfigPath + configBackupSuffix); err != nil { if errors.Is(err, os.ErrNotExist) { diff --git a/cmd/installer/main_test.go b/cmd/installer/main_test.go index 1442150..a816e37 100644 --- a/cmd/installer/main_test.go +++ b/cmd/installer/main_test.go @@ -2,6 +2,7 @@ package main import ( "context" + "fmt" "os" "testing" @@ -90,11 +91,12 @@ imports = [ "runtime_zeropod.toml", ] ` - containerdv1AlreadyConfigured = fullContainerdConfigV2 + runtimeConfig + ` +) + +var containerdv1AlreadyConfigured = fullContainerdConfigV2 + fmt.Sprintf(runtimeConfig, "", "[]") + ` [plugins."io.containerd.internal.v1.opt"] - path = "/opt/zeropod" + path = "/opt/zeropod" ` -) type testConfig struct { containerdConfig string diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 7dc3831..2ad856d 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -40,8 +40,8 @@ var ( inPlaceScaling = flag.Bool("in-place-scaling", false, "enable in-place resource scaling, requires InPlacePodVerticalScaling feature flag") statusLabels = flag.Bool("status-labels", false, "update pod labels to reflect container status") - probeBinaryName = flag.String("probe-binary-name", "kubelet", "set the probe binary name for probe detection") - trackerIgnoreLocalhost = flag.Bool("tracker-ignore-localhost", false, "set to ignore traffic from localhost in socket tracker") + probeBinaryName = flag.String("probe-binary-name", "kubelet", "set the probe binary name for probe detection (Deprecated: this has moved to the installer)") + trackerIgnoreLocalhost = flag.Bool("tracker-ignore-localhost", true, "set to ignore traffic from localhost in socket tracker (Deprecated: this has moved to the installer)") statusEvents = flag.Bool("status-events", false, "create status events to reflect container status") versionFlag = flag.Bool("version", false, "output version and exit") maxConcurrentReconciles = flag.Int("max-concurrent-reconciles", 10, "num reconciles the pod controller processes concurrently") diff --git a/cmd/shim/main.go b/cmd/shim/main.go index 61fbf89..31d1969 100644 --- a/cmd/shim/main.go +++ b/cmd/shim/main.go @@ -3,16 +3,28 @@ package main import ( "context" "io" + "log/slog" + "os" "path/filepath" + "strconv" "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/v2/cmd/containerd-shim-runc-v2/manager" "github.com/containerd/containerd/v2/pkg/shim" + "github.com/ctrox/zeropod/activator" shimv1 "github.com/ctrox/zeropod/api/shim/v1" zshim "github.com/ctrox/zeropod/shim" _ "github.com/ctrox/zeropod/shim/task/plugin" + "github.com/opencontainers/runtime-spec/specs-go" ) +func main() { + if attachActivator() { + return + } + shim.Run(context.Background(), newCompatManager()) +} + // compatManager is a wrapper around [shim.Manager] that allows us to control // the task API version. This makes it possible to use the containerd v2 shim // with containerd 1.7. @@ -56,6 +68,33 @@ func newCompatManager() shim.Manager { return &compatManager{mgr: manager.NewShimManager(zshim.RuntimeName)} } -func main() { - shim.Run(context.Background(), newCompatManager()) +func attachActivator() bool { + if len(os.Args) < 3 || os.Args[1] != activator.AttachActivatorFlag { + return false + } + log := slog.Default() + pid, err := strconv.Atoi(os.Args[2]) + if err != nil { + log.Error("converting pid", "error", err) + os.Exit(1) + } + cfg, err := shimv1.NewConfig(context.Background(), &specs.Spec{}) + if err != nil { + log.Error("loading config", "error", err) + os.Exit(1) + } + bpf, err := activator.InitBPF( + pid, log, + activator.ProbeBinaryName(cfg.ProbeBinaryName), + activator.TrackerIgnoreLocalhost(cfg.TrackerIgnoreLocalhost), + ) + if err != nil { + log.Error("unable to initialize BPF", "error", err) + os.Exit(1) + } + if err := bpf.AttachInNetNS(pid, activator.DefaultIfaces...); err != nil { + log.Error("attaching activator", "error", err) + os.Exit(1) + } + return true } diff --git a/config/k3s/kustomization.yaml b/config/k3s/kustomization.yaml index 4173476..99d3d9b 100644 --- a/config/k3s/kustomization.yaml +++ b/config/k3s/kustomization.yaml @@ -10,7 +10,7 @@ patches: kind: DaemonSet - patch: |- - op: add - path: /spec/template/spec/containers/0/args/- + path: /spec/template/spec/initContainers/0/args/- value: -probe-binary-name=k3s target: kind: DaemonSet diff --git a/config/tracker-ignore-localhost/kustomization.yaml b/config/tracker-ignore-localhost/kustomization.yaml index ecd7d72..6921ce0 100644 --- a/config/tracker-ignore-localhost/kustomization.yaml +++ b/config/tracker-ignore-localhost/kustomization.yaml @@ -3,7 +3,7 @@ kind: Component patches: - patch: |- - op: add - path: /spec/template/spec/containers/0/args/- + path: /spec/template/spec/initContainers/0/args/- value: -tracker-ignore-localhost=true target: kind: DaemonSet diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index 8faa191..0a74c07 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -11,7 +11,6 @@ import ( v1 "github.com/ctrox/zeropod/api/shim/v1" "github.com/ctrox/zeropod/manager" - "github.com/ctrox/zeropod/shim" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" @@ -191,7 +190,7 @@ func TestE2E(t *testing.T) { "pod with large HTTP probe and increased buffer": { pod: testPod( scaleDownAfter(time.Second), - annotations(map[string]string{shim.ProbeBufferSizeAnnotationKey: "2048"}), + annotations(map[string]string{v1.ProbeBufferSizeAnnotationKey: "2048"}), addContainer("nginx", "nginx", nil, 80), livenessProbe(&corev1.Probe{ InitialDelaySeconds: 3, diff --git a/e2e/setup_test.go b/e2e/setup_test.go index 4eb22b7..0ce2ab8 100644 --- a/e2e/setup_test.go +++ b/e2e/setup_test.go @@ -23,7 +23,6 @@ import ( v1 "github.com/ctrox/zeropod/api/runtime/v1" shimv1 "github.com/ctrox/zeropod/api/shim/v1" "github.com/ctrox/zeropod/manager" - "github.com/ctrox/zeropod/shim" "github.com/go-logr/logr" "github.com/phayes/freeport" "github.com/pkg/errors" @@ -196,12 +195,12 @@ func startKind(t testing.TB, name, kubeconfig string, port int) (c *rest.Config, }, { Role: v1alpha4.WorkerRole, - Labels: map[string]string{shim.NodeLabel: "true"}, + Labels: map[string]string{shimv1.NodeLabel: "true"}, ExtraMounts: extraMounts, }, { Role: v1alpha4.WorkerRole, - Labels: map[string]string{shim.NodeLabel: "true"}, + Labels: map[string]string{shimv1.NodeLabel: "true"}, ExtraMounts: extraMounts, }, }, @@ -378,43 +377,43 @@ func annotations(annotations map[string]string) podOption { func preDump(preDump bool) podOption { return annotations(map[string]string{ - shim.PreDumpAnnotationKey: strconv.FormatBool(preDump), + shimv1.PreDumpAnnotationKey: strconv.FormatBool(preDump), }) } func disableCheckpointing(disable bool) podOption { return annotations(map[string]string{ - shim.DisableCheckpoiningAnnotationKey: strconv.FormatBool(disable), + shimv1.DisableCheckpoiningAnnotationKey: strconv.FormatBool(disable), }) } func scaleDownAfter(dur time.Duration) podOption { return annotations(map[string]string{ - shim.ScaleDownDurationAnnotationKey: dur.String(), + shimv1.ScaleDownDurationAnnotationKey: dur.String(), }) } func containerNamesAnnotation(names ...string) podOption { return annotations(map[string]string{ - shim.ContainerNamesAnnotationKey: strings.Join(names, ","), + shimv1.ContainerNamesAnnotationKey: strings.Join(names, ","), }) } func portsAnnotation(portsMap string) podOption { return annotations(map[string]string{ - shim.PortsAnnotationKey: portsMap, + shimv1.PortsAnnotationKey: portsMap, }) } func migrateAnnotation(container string) podOption { return annotations(map[string]string{ - shim.MigrateAnnotationKey: container, + shimv1.MigrateAnnotationKey: container, }) } func liveMigrateAnnotation(container string) podOption { return annotations(map[string]string{ - shim.LiveMigrateAnnotationKey: container, + shimv1.LiveMigrateAnnotationKey: container, }) } @@ -442,7 +441,7 @@ func readinessProbe(probe *corev1.Probe, index int) podOption { func disableDataMigration() podOption { return annotations(map[string]string{ - shim.DisableMigrateDataAnnotationKey: "true", + shimv1.DisableMigrateDataAnnotationKey: "true", }) } diff --git a/go.mod b/go.mod index 8bf42d7..fa500c1 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.25.0 require ( github.com/checkpoint-restore/go-criu/v7 v7.2.0 - github.com/cilium/ebpf v0.19.0 + github.com/cilium/ebpf v0.21.0 github.com/containerd/cgroups/v3 v3.0.5 github.com/containerd/containerd/api v1.9.0 github.com/containerd/containerd/v2 v2.1.6 diff --git a/go.sum b/go.sum index 0cee487..325cba7 100644 --- a/go.sum +++ b/go.sum @@ -61,8 +61,8 @@ github.com/checkpoint-restore/go-criu/v7 v7.2.0/go.mod h1:u0LCWLg0w4yqqu14aXhiB4 github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= -github.com/cilium/ebpf v0.19.0 h1:Ro/rE64RmFBeA9FGjcTc+KmCeY6jXmryu6FfnzPRIao= -github.com/cilium/ebpf v0.19.0/go.mod h1:fLCgMo3l8tZmAdM3B2XqdFzXBpwkcSTroaVqN08OWVY= +github.com/cilium/ebpf v0.21.0 h1:4dpx1J/B/1apeTmWBH5BkVLayHTkFrMovVPnHEk+l3k= +github.com/cilium/ebpf v0.21.0/go.mod h1:1kHKv6Kvh5a6TePP5vvvoMa1bclRyzUXELSs272fmIQ= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/containerd/cgroups/v3 v3.0.5 h1:44na7Ud+VwyE7LIoJ8JTNQOa549a8543BmzaJHo6Bzo= diff --git a/manager/redirector_attacher.go b/manager/redirector_attacher.go index 73472c7..e298399 100644 --- a/manager/redirector_attacher.go +++ b/manager/redirector_attacher.go @@ -5,8 +5,6 @@ import ( "errors" "fmt" "log/slog" - "net" - "net/netip" "os" "path/filepath" "strconv" @@ -25,15 +23,9 @@ type Redirector struct { } type sandbox struct { - ips []netip.Addr activator *activator.BPF } -const ( - ifaceETH0 = "eth0" - ifaceLoopback = "lo" -) - // AttachRedirectors scans the zeropod maps path in the bpf file system for // directories named after the pid of the sandbox container. It does an // initial iteration over all directories and then starts a goroutine which @@ -72,6 +64,10 @@ func AttachRedirectors(ctx context.Context, log *slog.Logger, activatorOpts ...a continue } + if activator.TCXPinned(pid) { + r.log.Debug("skipping already pinned attach", "pid", pid) + continue + } errs = append(errs, r.attachRedirector(pid)) } @@ -105,6 +101,11 @@ func (r *Redirector) watchForSandboxPids(ctx context.Context) error { continue } + if activator.TCXPinned(pid, activator.DefaultIfaces...) { + r.log.Debug("skipping already pinned attach", "pid", pid) + continue + } + if err := statNetNS(pid); err != nil { r.log.Warn("ignoring pid as net ns was not found", "pid", pid) continue @@ -144,24 +145,19 @@ func (r *Redirector) attachRedirector(pid int) error { return err } - var sandboxIPs []netip.Addr if err := netNS.Do(func(nn ns.NetNS) error { - // TODO: is this really always eth0? - // as for loopback, this is required for port-forwarding to work - ifaces := []string{ifaceETH0, ifaceLoopback} - r.log.Info("attaching redirector for sandbox", "pid", pid, "links", ifaces) - if err := bpf.AttachRedirector(ifaces...); err != nil { + r.log.Info("attaching redirector for sandbox", "pid", pid, "links", activator.DefaultIfaces) + if err := bpf.AttachRedirector(activator.DefaultIfaces...); err != nil { return err } - sandboxIPs, err = getSandboxIPs(ifaceETH0) return err }); err != nil { return errors.Join(err, bpf.Cleanup()) } r.Lock() - r.sandboxes[pid] = sandbox{activator: bpf, ips: sandboxIPs} + r.sandboxes[pid] = sandbox{activator: bpf} r.Unlock() return nil @@ -213,36 +209,6 @@ func (r *Redirector) getSandboxPids() ([]int, error) { return intPids, nil } -func getSandboxIPs(ifaceName string) ([]netip.Addr, error) { - ips := []netip.Addr{} - iface, err := net.InterfaceByName(ifaceName) - if err != nil { - return ips, fmt.Errorf("could not get interface: %w", err) - } - addrs, err := iface.Addrs() - if err != nil { - return ips, fmt.Errorf("could not get interface addrs: %w", err) - } - for _, addr := range addrs { - if ipnet, ok := addr.(*net.IPNet); ok { - // no need to track link local addresses - if ipnet.IP.IsLinkLocalUnicast() { - continue - } - ip, ok := netip.AddrFromSlice(ipnet.IP) - if !ok { - return ips, fmt.Errorf("unable to convert net.IP to netip.Addr: %s", ipnet.IP) - } - // use Unmap as the ipv4 might be mapped in v6 - ips = append(ips, ip.Unmap()) - } - } - if len(ips) == 0 { - return ips, fmt.Errorf("sandbox IPs not found") - } - return ips, nil -} - func ignoredDir(dir string) bool { return dir == activator.SocketTrackerMap || dir == activator.PodKubeletAddrsMapv4 || diff --git a/shim/container.go b/shim/container.go index b21c9d2..7ca99c9 100644 --- a/shim/container.go +++ b/shim/container.go @@ -38,7 +38,7 @@ type Container struct { id string createOpts *anypb.Any activator *activator.Server - cfg *Config + cfg *v1.Config initialProcess process.Process process process.Process cgroup any @@ -60,9 +60,9 @@ type Container struct { runcVersion string } -func New(ctx context.Context, cfg *Config, r *taskAPI.CreateTaskRequest, pt stdio.Platform, events chan *v1.ContainerStatus) (*Container, error) { +func New(ctx context.Context, cfg *v1.Config, r *taskAPI.CreateTaskRequest, pt stdio.Platform, events chan *v1.ContainerStatus) (*Container, error) { // get network ns of our container and store it for later use - netNSPath, err := GetNetworkNS(cfg.spec) + netNSPath, err := GetNetworkNS(cfg.Spec) if err != nil { return nil, err } @@ -126,7 +126,7 @@ func (c *Container) Register(ctx context.Context, container *runc.Container) err return nil } -func (c *Container) Config() *Config { +func (c *Container) Config() *v1.Config { return c.cfg } @@ -440,12 +440,17 @@ func (c *Container) startActivator(ctx context.Context, ports ...uint16) error { if c.activator.Started() { return nil } + if err := c.activator.AttachExec(); err != nil { + log.G(ctx).WithError(err).Error("failed to attach activator") + return err + } + log.G(ctx).Infof("attached %s", time.Now().Format(time.RFC3339Nano)) if err := c.activator.Start(c.context, c.detectProbe(c.context), c.restoreHandler(c.context), ports...); err != nil { if errors.Is(err, activator.ErrMapNotFound) { return err } - log.G(ctx).Errorf("failed to start activator: %s", err) + log.G(ctx).WithError(err).Error("failed to start activator") return err } log.G(ctx).Printf("activator started") diff --git a/shim/log.go b/shim/log.go index d6141e2..d2c01e1 100644 --- a/shim/log.go +++ b/shim/log.go @@ -5,6 +5,8 @@ import ( "os" "path/filepath" "sort" + + v1 "github.com/ctrox/zeropod/api/shim/v1" ) // getLogPath gets the log path of the container by searching for the last log @@ -13,7 +15,7 @@ import ( // containerd only passes that to the sandbox container (pause). One possible // solution would be to implement log restoring in the sandbox container // instead of the zeropod. -func getLogPath(cfg *Config) (string, error) { +func getLogPath(cfg *v1.Config) (string, error) { logDir := fmt.Sprintf("/var/log/pods/%s_%s_%s/%s", cfg.PodNamespace, cfg.PodName, cfg.PodUID, cfg.ContainerName) dir, err := os.Open(logDir) diff --git a/shim/metrics.go b/shim/metrics.go index 11ad1e6..7ea99d6 100644 --- a/shim/metrics.go +++ b/shim/metrics.go @@ -4,7 +4,7 @@ import ( v1 "github.com/ctrox/zeropod/api/shim/v1" ) -func newMetrics(cfg *Config, running bool) *v1.ContainerMetrics { +func newMetrics(cfg *v1.Config, running bool) *v1.ContainerMetrics { return &v1.ContainerMetrics{ Name: cfg.ContainerName, PodName: cfg.PodName, diff --git a/shim/probe.go b/shim/probe.go index 42a2f25..9653e67 100644 --- a/shim/probe.go +++ b/shim/probe.go @@ -13,11 +13,6 @@ import ( "github.com/ctrox/zeropod/activator" ) -// defaultProbeBufferSize should be able to fit kube-probe HTTP requests with -// reasonable path and header sizes but should still be small enough to not -// impact performance. -const defaultProbeBufferSize = 1024 - func (c *Container) detectProbe(ctx context.Context) activator.ConnHook { if c.cfg.DisableProbeDetection { return func(conn net.Conn) (net.Conn, bool, error) { diff --git a/shim/probe_test.go b/shim/probe_test.go index 4cc4808..5b2568a 100644 --- a/shim/probe_test.go +++ b/shim/probe_test.go @@ -8,6 +8,7 @@ import ( "net/http" "testing" + v1 "github.com/ctrox/zeropod/api/shim/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -37,7 +38,7 @@ func TestDetectProbe(t *testing.T) { }, "probe request header bigger than buffer": { clientFunc: httpRequest("kube-probe/1.32", http.StatusOK, func(req *http.Request) { - rnd, err := randomData(defaultProbeBufferSize * 10) + rnd, err := randomData(v1.DefaultProbeBufferSize * 10) assert.NoError(t, err) req.Header.Set("random-stuff", base64.URLEncoding.EncodeToString(rnd)) }), @@ -45,7 +46,7 @@ func TestDetectProbe(t *testing.T) { }, "probe request path bigger than buffer": { clientFunc: httpRequest("kube-probe/1.32", http.StatusOK, func(req *http.Request) { - rnd, err := randomData(defaultProbeBufferSize * 10) + rnd, err := randomData(v1.DefaultProbeBufferSize * 10) assert.NoError(t, err) req.URL.Path = "/" + base64.URLEncoding.EncodeToString(rnd) }), @@ -56,7 +57,7 @@ func TestDetectProbe(t *testing.T) { probeDetected: false, }, "random TCP data bigger than buffer": { - clientFunc: writeRandomTCPData(defaultProbeBufferSize * 1024), + clientFunc: writeRandomTCPData(v1.DefaultProbeBufferSize * 1024), probeDetected: false, }, } { @@ -72,7 +73,7 @@ func TestDetectProbe(t *testing.T) { conn, err := l.Accept() require.NoError(t, err) - c := &Container{cfg: &Config{ProbeBufferSize: defaultProbeBufferSize}} + c := &Container{cfg: &v1.Config{AnnotationConfig: v1.AnnotationConfig{ProbeBufferSize: v1.DefaultProbeBufferSize}}} newConn, cont, err := c.detectProbe(ctx)(conn) require.NoError(t, err) if cont { diff --git a/shim/restore.go b/shim/restore.go index 8ae0475..c50d60f 100644 --- a/shim/restore.go +++ b/shim/restore.go @@ -208,7 +208,7 @@ func createContainerLoggers(ctx context.Context, logPath string, tty bool) (stdo // MigrationRestore requests a restore from the node. If a matching migration is // found, it sets the Checkpoint path in the CreateTaskRequest. -func MigrationRestore(ctx context.Context, r *task.CreateTaskRequest, cfg *Config) (skipStart bool, err error) { +func MigrationRestore(ctx context.Context, r *task.CreateTaskRequest, cfg *v1.Config) (skipStart bool, err error) { conn, err := net.Dial("unix", nodev1.SocketPath) if err != nil { return false, fmt.Errorf("%w: dialing node service: %w", ErrRestoreDial, err) @@ -300,7 +300,7 @@ func setCriuWorkPath(r *task.CreateTaskRequest, path string) error { return nil } -func FinishRestore(ctx context.Context, id string, cfg *Config, startTime time.Time) error { +func FinishRestore(ctx context.Context, id string, cfg *v1.Config, startTime time.Time) error { conn, err := net.Dial("unix", nodev1.SocketPath) if err != nil { return fmt.Errorf("dialing node service: %w", err) diff --git a/shim/task/service_zeropod.go b/shim/task/service_zeropod.go index bbe5406..b866d3b 100644 --- a/shim/task/service_zeropod.go +++ b/shim/task/service_zeropod.go @@ -123,7 +123,7 @@ func (w *wrapper) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * return nil, err } - cfg, err := zshim.NewConfig(ctx, spec) + cfg, err := v1.NewConfig(ctx, spec) if err != nil { return nil, err }