Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion activator/activator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io"
"net"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
Expand Down Expand Up @@ -74,7 +75,15 @@ func parsePidFromNetNS(nn ns.NetNS) int {
return pid
}

var ErrMapNotFound = errors.New("bpf map could not be found")
const (
IfaceETH0 = "eth0"
IfaceLoopback = "lo"
)

var (
ErrMapNotFound = errors.New("bpf map could not be found")
DefaultIfaces = []string{IfaceLoopback, IfaceETH0}
)

func (s *Server) Start(ctx context.Context, connHook ConnHook, restoreHook RestoreHook, ports ...uint16) error {
s.connHook = connHook
Expand Down Expand Up @@ -107,6 +116,17 @@ func (s *Server) Start(ctx context.Context, connHook ConnHook, restoreHook Resto
return nil
}

const AttachActivatorFlag = "-zeropod-attach-activator"

// AttachExec attaches the activator using exec on itself.
func (s *Server) AttachExec() error {
out, err := exec.Command(os.Args[0], AttachActivatorFlag, strconv.Itoa(s.sandboxPid)).CombinedOutput()
if err != nil {
return fmt.Errorf("executing external attach: %s: %s", err, out)
}
return nil
}

func (s *Server) Started() bool {
return s.started
}
Expand Down Expand Up @@ -465,6 +485,10 @@ func (s *Server) initActivityTracker() error {
return nil
}

func netNSPath(pid int) string {
return fmt.Sprintf("/proc/%d/ns/net", pid)
}

// convertBPFTime takes the value of bpf_ktime_get_ns and converts it to a
// time.Time.
func convertBPFTime(t uint64) (time.Time, error) {
Expand Down
95 changes: 85 additions & 10 deletions activator/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"strconv"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/btf"
"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/rlimit"
"github.com/containernetworking/plugins/pkg/ns"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
)
Expand All @@ -28,6 +30,7 @@ const (
PodKubeletAddrsMapv4 = "kubelet_addrs_v4"
PodKubeletAddrsMapv6 = "kubelet_addrs_v6"
trackerIgnoreLocalhostVariable = "tracker_ignore_localhost"
taskCommOffsetVariable = "task_comm_offset"
tcxIngressPinName = "tcx_ingress"
tcxEgressPinName = "tcx_egress"
)
Expand Down Expand Up @@ -89,8 +92,8 @@ func InitBPF(pid int, log *slog.Logger, opts ...BPFOpts) (*BPF, error) {
return nil, err
}

// as a single shim process can host multiple containers, we store the map
// in a directory per shim process.
// as a single shim process can host multiple pods, we store the map in a
// directory per sandbox pid.
path := PinPath(pid)
if err := os.MkdirAll(path, os.ModePerm); err != nil {
return nil, fmt.Errorf("failed to create bpf fs subpath: %w", err)
Expand All @@ -109,12 +112,24 @@ func InitBPF(pid int, log *slog.Logger, opts ...BPFOpts) (*BPF, error) {
}
binName := [probeBinaryNameMaxLength]byte{}
copy(binName[:], cfg.probeBinaryName[:])
if err := spec.Variables[probeBinaryNameVariable].Set(binName); err != nil {
if err := setVar(spec, probeBinaryNameVariable, binName); err != nil {
return nil, fmt.Errorf("setting probe binary variable: %w", err)
}
if err := spec.Variables[trackerIgnoreLocalhostVariable].Set(cfg.trackerIgnoreLocalhost); err != nil {
if err := setVar(spec, trackerIgnoreLocalhostVariable, cfg.trackerIgnoreLocalhost); err != nil {
return nil, fmt.Errorf("setting trackerIgnoreLocalhost variable: %w", err)
}
// in case you wonder why we do this instead of just accessing comm directly
// in the task struct in eBPF: this would make the btf loading way more
// expensive, at least the way cilium/ebpf behaves at the moment. It is
// about 3x slower and uses 3x the amount of memory. This is rather ugly but
// I think the performance gains are worth it.
commOffset, err := getTaskCommOffset()
if err != nil {
return nil, err
}
if err := setVar(spec, taskCommOffsetVariable, commOffset); err != nil {
return nil, fmt.Errorf("setting probe binary variable: %w", err)
}

for mapName, size := range cfg.mapSizes {
spec.Maps[mapName].MaxEntries = size
Expand All @@ -131,6 +146,46 @@ func InitBPF(pid int, log *slog.Logger, opts ...BPFOpts) (*BPF, error) {
return &BPF{pid: pid, log: log, objs: &objs, noPin: cfg.disablePinning}, nil
}

// TCXPinned returns true if all TCX programs for the pid are pinned.
func TCXPinned(pid int, ifaces ...string) bool {
for _, iface := range ifaces {
for _, attach := range []ebpf.AttachType{ebpf.AttachTCXIngress, ebpf.AttachTCXEgress} {
_, err := os.Stat(tcxLinkPath(pid, iface, attach))
if err != nil {
return false
}
}
}
return true
}

func setVar(spec *ebpf.CollectionSpec, name string, value any) error {
if _, ok := spec.Variables[name]; !ok {
return fmt.Errorf("could not find var %s in spec", name)
}
if err := spec.Variables[name].Set(value); err != nil {
return fmt.Errorf("setting spec variable: %w", err)
}
return nil
}

func getTaskCommOffset() (uint32, error) {
kernelSpec, err := btf.LoadKernelSpec()
if err != nil {
return 0, fmt.Errorf("failed to load kernel BTF: %w", err)
}
var taskStruct *btf.Struct
if err := kernelSpec.TypeByName("task_struct", &taskStruct); err != nil {
return 0, fmt.Errorf("task_struct not found in kernel BTF: %w", err)
}
for _, member := range taskStruct.Members {
if member.Name == "comm" {
return member.Offset.Bytes(), nil
}
}
return 0, fmt.Errorf("comm field not found in task_struct")
}

func (bpf *BPF) Cleanup() error {
errs := []error{}
for _, link := range bpf.links {
Expand All @@ -157,6 +212,22 @@ func (bpf *BPF) Cleanup() error {
return errors.Join(errs...)
}

func (bpf *BPF) AttachInNetNS(pid int, ifaces ...string) error {
netNS, err := ns.GetNS(netNSPath(pid))
if err != nil {
return err
}
if err := netNS.Do(func(nn ns.NetNS) error {
if err := bpf.AttachRedirector(ifaces...); err != nil {
return err
}
return err
}); err != nil {
return errors.Join(err, bpf.Cleanup())
}
return nil
}

func (bpf *BPF) AttachRedirector(ifaces ...string) error {
for _, ifaceName := range ifaces {
iface, err := net.InterfaceByName(ifaceName)
Expand Down Expand Up @@ -193,11 +264,7 @@ func (bpf *BPF) attachTCX(iface *net.Interface) error {
}

func (bpf *BPF) loadOrAttachTCXLink(iface *net.Interface, program *ebpf.Program, attach ebpf.AttachType) (link.Link, error) {
name := tcxIngressPinName
if attach == ebpf.AttachTCXEgress {
name = tcxEgressPinName
}
pinPath := filepath.Join(PinPath(bpf.pid), fmt.Sprintf("%s_%s", name, iface.Name))
pinPath := tcxLinkPath(bpf.pid, iface.Name, attach)
l, err := link.LoadPinnedLink(pinPath, nil)
if err == nil {
return l, nil
Expand All @@ -208,14 +275,22 @@ func (bpf *BPF) loadOrAttachTCXLink(iface *net.Interface, program *ebpf.Program,
Attach: attach,
})
if err != nil {
return nil, fmt.Errorf("could not attach TCX %s: %w", name, err)
return nil, fmt.Errorf("could not attach TCX %s: %w", pinPath, err)
}
if bpf.noPin {
return l, nil
}
return l, l.Pin(pinPath)
}

func tcxLinkPath(pid int, ifaceName string, attach ebpf.AttachType) string {
name := tcxIngressPinName
if attach == ebpf.AttachTCXEgress {
name = tcxEgressPinName
}
return filepath.Join(PinPath(pid), fmt.Sprintf("%s_%s", name, ifaceName))
}

func (bpf *BPF) attachQdisc(iface *net.Interface) error {
qdisc := &netlink.GenericQdisc{
QdiscAttrs: netlink.QdiscAttrs{
Expand Down
2 changes: 2 additions & 0 deletions activator/bpf_bpfeb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified activator/bpf_bpfeb.o
Binary file not shown.
2 changes: 2 additions & 0 deletions activator/bpf_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified activator/bpf_bpfel.o
Binary file not shown.
9 changes: 5 additions & 4 deletions activator/redirector.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ struct {

const volatile char probe_binary_name[TASK_COMM_LEN] = "";
const volatile bool tracker_ignore_localhost = false;
const volatile __u32 task_comm_offset = 0;

static __always_inline int track_activity(__be16 dport) {
__u64 time = bpf_ktime_get_ns();
Expand Down Expand Up @@ -178,8 +179,8 @@ static __always_inline __be32 lookup_kubelet_ip_v4(struct iphdr *ip) {
}
// bpf_get_current_comm is not available in a tc program on arm64, so we use
// bpf_get_current_task to get the comm.
struct task_struct *task = (void *)bpf_get_current_task();
BPF_CORE_READ_STR_INTO(&comm, task, comm);
char *task = (char *)bpf_get_current_task();
bpf_probe_read_kernel(&comm, sizeof(comm), task + task_comm_offset);
if (bpf_strncmp(comm, TASK_COMM_LEN, (char *)probe_binary_name) == 0) {
// bpf_printk("found kubelet addr v4: %pI4", &ip->saddr);
bpf_map_update_elem(kubelet_addrs, &key, &ip->saddr, BPF_ANY);
Expand Down Expand Up @@ -212,8 +213,8 @@ static __always_inline struct in6_addr* lookup_kubelet_ip_v6(struct ipv6hdr *ip)
}
// bpf_get_current_comm is not available in a tc program on arm64, so we use
// bpf_get_current_task to get the comm.
struct task_struct *task = (void *)bpf_get_current_task();
BPF_CORE_READ_STR_INTO(&comm, task, comm);
char *task = (char *)bpf_get_current_task();
bpf_probe_read_kernel(&comm, sizeof(comm), task + task_comm_offset);
if (bpf_strncmp(comm, TASK_COMM_LEN, (char *)probe_binary_name) == 0) {
// bpf_printk("found kubelet addr v6: %pI6", &ip->saddr);
bpf_map_update_elem(kubelet_addrs, &key, &ip->saddr, BPF_ANY);
Expand Down
62 changes: 55 additions & 7 deletions shim/config.go → api/shim/v1/config.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package shim
package v1

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"runtime"
"slices"
"strconv"
Expand All @@ -15,6 +18,8 @@ import (
)

const (
ConfigDir = "etc"
ConfigFileName = "shim.json"
NodeLabel = "zeropod.ctrox.dev/node"
PortsAnnotationKey = "zeropod.ctrox.dev/ports-map"
ContainerNamesAnnotationKey = "zeropod.ctrox.dev/container-names"
Expand Down Expand Up @@ -42,9 +47,31 @@ const (
mappingDelim = ";"
mapDelim = "="
defaultContainerdNS = "k8s.io"
// DefaultProbeBufferSize should be able to fit kube-probe HTTP requests with
// reasonable path and header sizes but should still be small enough to not
// impact performance.
DefaultProbeBufferSize = 1024
DefaultProbeBinaryName = "kubelet"
DefaultTrackerIgnoreLocalhost = true
)

type Config struct {
var ContainerdAnnotations = []string{
PortsAnnotationKey,
ContainerNamesAnnotationKey,
ScaleDownDurationAnnotationKey,
DisableCheckpoiningAnnotationKey,
PreDumpAnnotationKey,
MigrateAnnotationKey,
LiveMigrateAnnotationKey,
DisableProbeDetectAnnotationKey,
ProbeBufferSizeAnnotationKey,
ProxyTimeoutAnnotationKey,
ConnectTimeoutAnnotationKey,
DisableMigrateDataAnnotationKey,
"io.containerd.runc.v2.group",
}

type AnnotationConfig struct {
ZeropodContainerNames []string
Ports []uint16
ScaleDownDuration time.Duration
Expand All @@ -63,7 +90,13 @@ type Config struct {
ProxyTimeout time.Duration
ConnectTimeout time.Duration
DisableMigrateData bool
spec *specs.Spec
Spec *specs.Spec
}

type Config struct {
ProbeBinaryName string `json:"probeBinaryName"`
TrackerIgnoreLocalhost bool `json:"trackerIgnoreLocalhost"`
AnnotationConfig `json:"-"`
}

// NewConfig uses the annotations from the container spec to create a new
Expand Down Expand Up @@ -156,7 +189,7 @@ func NewConfig(ctx context.Context, spec *specs.Spec) (*Config, error) {
}
}

probeBufferSize := defaultProbeBufferSize
probeBufferSize := DefaultProbeBufferSize
probeBufferSizeValue := spec.Annotations[ProbeBufferSizeAnnotationKey]
if probeBufferSizeValue != "" {
probeBufferSize, err = strconv.Atoi(probeBufferSizeValue)
Expand Down Expand Up @@ -191,8 +224,22 @@ func NewConfig(ctx context.Context, spec *specs.Spec) (*Config, error) {
return nil, err
}
}
cfg := &Config{
ProbeBinaryName: DefaultProbeBinaryName,
TrackerIgnoreLocalhost: DefaultTrackerIgnoreLocalhost,
}
e, err := os.Executable()
if err != nil {
return nil, fmt.Errorf("getting executable dir: %w", err)
}
b, err := os.ReadFile(filepath.Join(filepath.Dir(e), "..", ConfigDir, ConfigFileName))
if err == nil {
if err := json.Unmarshal(b, cfg); err != nil {
return nil, err
}
}

return &Config{
cfg.AnnotationConfig = AnnotationConfig{
Ports: containerPorts,
ScaleDownDuration: dur,
DisableCheckpointing: disableCheckpointing,
Expand All @@ -211,8 +258,9 @@ func NewConfig(ctx context.Context, spec *specs.Spec) (*Config, error) {
ProxyTimeout: proxyTimeout,
ConnectTimeout: connectTimeout,
DisableMigrateData: disableMigrateData,
spec: spec,
}, nil
Spec: spec,
}
return cfg, nil
}

func (cfg Config) IsZeropodContainer() bool {
Expand Down
Loading
Loading