K8s port forward
Table of Contents
1. How to use
kubectl port-forward postgresql-0 15432:5432 --address="0.0.0.0"
2. User manual
kubectl port-forward --help Forward one or more local ports to a pod. This command requires the node to have 'socat' installed. Use resource type/name such as deployment/mydeployment to select a pod. Resource type defaults to 'pod' if omitted. If there are multiple pods matching the criteria, a pod will be selected automatically. The forwarding session ends when the selected pod terminates, and rerun of the command is needed to resume forwarding. Examples: # Listen on ports 5000 and 6000 locally, forwarding data to/from ports 5000 and 6000 in the pod kubectl port-forward pod/mypod 5000 6000 # Listen on ports 5000 and 6000 locally, forwarding data to/from ports 5000 and 6000 in a pod selected by the deployment kubectl port-forward deployment/mydeployment 5000 6000 # Listen on ports 5000 and 6000 locally, forwarding data to/from ports 5000 and 6000 in a pod selected by the service kubectl port-forward service/myservice 5000 6000 # Listen on port 8888 locally, forwarding to 5000 in the pod kubectl port-forward pod/mypod 8888:5000 # Listen on port 8888 on all addresses, forwarding to 5000 in the pod kubectl port-forward --address 0.0.0.0 pod/mypod 8888:5000 # Listen on port 8888 on localhost and selected IP, forwarding to 5000 in the pod kubectl port-forward --address localhost,10.19.21.23 pod/mypod 8888:5000 # Listen on a random port locally, forwarding to 5000 in the pod kubectl port-forward pod/mypod :5000 Options: --address=[localhost]: Addresses to listen on (comma separated). Only accepts IP addresses or localhost as a value. When localhost is supplied, kubectl will try to bind on both 127.0.0.1 and ::1 and will fail if neither of these addresses are available to bind. --pod-running-timeout=1m0s: The length of time (like 5s, 2m, or 3h, higher than zero) to wait until at least one pod is running Usage: kubectl port-forward TYPE/NAME [options] [LOCAL_PORT:]REMOTE_PORT [...[LOCAL_PORT_N:]REMOTE_PORT_N]
3. How it works
3.1. Client
当执行 kubectl port forward 时向如下的两个 URL 发送请求
GET https://10.10.16.36:16443/api/v1/namespaces/default/pods/postgresql-0 POST https://10.10.16.36:16443/api/v1/namespaces/default/pods/postgresql-0/portforward
kubectl 部分代码可以自行查看:kubernetes/staging/src/k8s.io/kubectl/pkg/cmd/portforward
本文主要讲 server 端实现
3.2. Source
3.2.1. 从 kubelet 开始
cmd/kubelet/app/server.go L:489 开始的 run 函数中进行了对 CRI 的初始化
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) { ... err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration, kubeDeps) ... }
pkg/kubelet/kubelet.go L305 中可以看到实际的初始化逻辑
func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies) error { remoteImageEndpoint := kubeCfg.ImageServiceEndpoint if remoteImageEndpoint == "" && kubeCfg.ContainerRuntimeEndpoint != "" { remoteImageEndpoint = kubeCfg.ContainerRuntimeEndpoint } var err error if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(kubeCfg.ContainerRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil { return err } if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil { return err } kubeDeps.useLegacyCadvisorStats = cadvisor.UsingLegacyCadvisorStats(kubeCfg.ContainerRuntimeEndpoint) return nil }
pkg/kubelet/cri/remote/remote_runtime.go L:71
func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration, tp trace.TracerProvider) (internalapi.RuntimeService, error) {
这段初始化的代码的目的就是为了建立一个 grpc 的 ClientConn
当发生 api 调用时会通过 grpc 访问到实际的 CRI 相应接口
pkg/kubelet/cri/streaming/server.go L:221
func (s *server) GetPortForward(req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) { if req.PodSandboxId == "" { return nil, status.Errorf(codes.InvalidArgument, "missing required pod_sandbox_id") } token, err := s.cache.Insert(req) if err != nil { return nil, err } return &runtimeapi.PortForwardResponse{ Url: s.buildURL("portforward", token), }, nil }
要想获取到实际的 portforward 的逻辑需要到实际的 CRI 中阅读
3.2.2. Containerd 中的实现
pkg/cri/server/sandbox_portforward_linux.go L:34
func (c *criService) portForward(ctx context.Context, id string, port int32, stream io.ReadWriteCloser) error { s, err := c.sandboxStore.Get(id) if err != nil { return errors.Wrapf(err, "failed to find sandbox %q in store", id) } var netNSDo func(func(ns.NetNS) error) error // netNSPath is the network namespace path for logging. var netNSPath string securityContext := s.Config.GetLinux().GetSecurityContext() hostNet := securityContext.GetNamespaceOptions().GetNetwork() == runtime.NamespaceMode_NODE if !hostNet { if closed, err := s.NetNS.Closed(); err != nil { return errors.Wrapf(err, "failed to check netwok namespace closed for sandbox %q", id) } else if closed { return errors.Errorf("network namespace for sandbox %q is closed", id) } netNSDo = s.NetNS.Do netNSPath = s.NetNS.GetPath() } else { // Run the function directly for host network. netNSDo = func(do func(_ ns.NetNS) error) error { return do(nil) } netNSPath = "host" } log.G(ctx).Infof("Executing port forwarding in network namespace %q", netNSPath) err = netNSDo(func(_ ns.NetNS) error { defer stream.Close() // localhost can resolve to both IPv4 and IPv6 addresses in dual-stack systems // but the application can be listening in one of the IP families only. // golang has enabled RFC 6555 Fast Fallback (aka HappyEyeballs) by default in 1.12 // It means that if a host resolves to both IPv6 and IPv4, it will try to connect to any // of those addresses and use the working connection. // However, the implementation uses go routines to start both connections in parallel, // and this cases that the connection is done outside the namespace, so we try to connect // serially. // We try IPv4 first to keep current behavior and we fallback to IPv6 if the connection fails. // xref https://github.com/golang/go/issues/44922 var conn net.Conn conn, err := net.Dial("tcp4", fmt.Sprintf("localhost:%d", port)) if err != nil { var errV6 error conn, errV6 = net.Dial("tcp6", fmt.Sprintf("localhost:%d", port)) if errV6 != nil { return fmt.Errorf("failed to connect to localhost:%d inside namespace %q, IPv4: %v IPv6 %v ", port, id, err, errV6) } } defer conn.Close() errCh := make(chan error, 2) // Copy from the the namespace port connection to the client stream go func() { log.G(ctx).Debugf("PortForward copying data from namespace %q port %d to the client stream", id, port) _, err := io.Copy(stream, conn) errCh <- err }() // Copy from the client stream to the namespace port connection go func() { log.G(ctx).Debugf("PortForward copying data from client stream to namespace %q port %d", id, port) _, err := io.Copy(conn, stream) errCh <- err }() // Wait until the first error is returned by one of the connections // we use errFwd to store the result of the port forwarding operation // if the context is cancelled close everything and return var errFwd error select { case errFwd = <-errCh: log.G(ctx).Debugf("PortForward stop forwarding in one direction in network namespace %q port %d: %v", id, port, errFwd) case <-ctx.Done(): log.G(ctx).Debugf("PortForward cancelled in network namespace %q port %d: %v", id, port, ctx.Err()) return ctx.Err() } // give a chance to terminate gracefully or timeout // after 1s // https://linux.die.net/man/1/socat const timeout = time.Second select { case e := <-errCh: if errFwd == nil { errFwd = e } log.G(ctx).Debugf("PortForward stopped forwarding in both directions in network namespace %q port %d: %v", id, port, e) case <-time.After(timeout): log.G(ctx).Debugf("PortForward timed out waiting to close the connection in network namespace %q port %d", id, port) case <-ctx.Done(): log.G(ctx).Debugf("PortForward cancelled in network namespace %q port %d: %v", id, port, ctx.Err()) errFwd = ctx.Err() } return errFwd }) if err != nil { return errors.Wrapf(err, "failed to execute portforward in network namespace %q", netNSPath) } log.G(ctx).Infof("Finish port forwarding for %q port %d", id, port) return nil }
- 通过 ID 获取到 sanbox 结构体从而拿到 security context 与 namespace
- 拿到 network namespace 信息判断是否是 host network.用来确定 network namespace path 和进入 namespace 的函数
- 进入到相应的 namespace,监听端口并将新开的端口与传递的流打通,从而达到 proxy 的效果
如果想知道进入 namespace 执行的逻辑可以看 openshitf 的库 containernetworking-plugins
https://github.com/openshift/containernetworking-plugins/blob/master/pkg/ns/ns_linux.go#L170