K8s port forward
Table of Contents
1. How to use
kubectl port-forward postgresql-0 15432:5432 --address="0.0.0.0"
2. User manual
kubectl port-forward --help Forward one or more local ports to a pod. This command requires the node to have 'socat' installed. Use resource type/name such as deployment/mydeployment to select a pod. Resource type defaults to 'pod' if omitted. If there are multiple pods matching the criteria, a pod will be selected automatically. The forwarding session ends when the selected pod terminates, and rerun of the command is needed to resume forwarding. Examples: # Listen on ports 5000 and 6000 locally, forwarding data to/from ports 5000 and 6000 in the pod kubectl port-forward pod/mypod 5000 6000 # Listen on ports 5000 and 6000 locally, forwarding data to/from ports 5000 and 6000 in a pod selected by the deployment kubectl port-forward deployment/mydeployment 5000 6000 # Listen on ports 5000 and 6000 locally, forwarding data to/from ports 5000 and 6000 in a pod selected by the service kubectl port-forward service/myservice 5000 6000 # Listen on port 8888 locally, forwarding to 5000 in the pod kubectl port-forward pod/mypod 8888:5000 # Listen on port 8888 on all addresses, forwarding to 5000 in the pod kubectl port-forward --address 0.0.0.0 pod/mypod 8888:5000 # Listen on port 8888 on localhost and selected IP, forwarding to 5000 in the pod kubectl port-forward --address localhost,10.19.21.23 pod/mypod 8888:5000 # Listen on a random port locally, forwarding to 5000 in the pod kubectl port-forward pod/mypod :5000 Options: --address=[localhost]: Addresses to listen on (comma separated). Only accepts IP addresses or localhost as a value. When localhost is supplied, kubectl will try to bind on both 127.0.0.1 and ::1 and will fail if neither of these addresses are available to bind. --pod-running-timeout=1m0s: The length of time (like 5s, 2m, or 3h, higher than zero) to wait until at least one pod is running Usage: kubectl port-forward TYPE/NAME [options] [LOCAL_PORT:]REMOTE_PORT [...[LOCAL_PORT_N:]REMOTE_PORT_N]
3. How it works
3.1. Client
当执行 kubectl port forward 时向如下的两个 URL 发送请求
GET https://10.10.16.36:16443/api/v1/namespaces/default/pods/postgresql-0 POST https://10.10.16.36:16443/api/v1/namespaces/default/pods/postgresql-0/portforward
kubectl 部分代码可以自行查看:kubernetes/staging/src/k8s.io/kubectl/pkg/cmd/portforward
本文主要讲 server 端实现
3.2. Source
3.2.1. 从 kubelet 开始
cmd/kubelet/app/server.go L:489 开始的 run 函数中进行了对 CRI 的初始化
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
...
err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration, kubeDeps)
...
}
pkg/kubelet/kubelet.go L305 中可以看到实际的初始化逻辑
func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies) error {
remoteImageEndpoint := kubeCfg.ImageServiceEndpoint
if remoteImageEndpoint == "" && kubeCfg.ContainerRuntimeEndpoint != "" {
remoteImageEndpoint = kubeCfg.ContainerRuntimeEndpoint
}
var err error
if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(kubeCfg.ContainerRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil {
return err
}
if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil {
return err
}
kubeDeps.useLegacyCadvisorStats = cadvisor.UsingLegacyCadvisorStats(kubeCfg.ContainerRuntimeEndpoint)
return nil
}
pkg/kubelet/cri/remote/remote_runtime.go L:71
func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration, tp trace.TracerProvider) (internalapi.RuntimeService, error) {
这段初始化的代码的目的就是为了建立一个 grpc 的 ClientConn
当发生 api 调用时会通过 grpc 访问到实际的 CRI 相应接口
pkg/kubelet/cri/streaming/server.go L:221
func (s *server) GetPortForward(req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
if req.PodSandboxId == "" {
return nil, status.Errorf(codes.InvalidArgument, "missing required pod_sandbox_id")
}
token, err := s.cache.Insert(req)
if err != nil {
return nil, err
}
return &runtimeapi.PortForwardResponse{
Url: s.buildURL("portforward", token),
}, nil
}
要想获取到实际的 portforward 的逻辑需要到实际的 CRI 中阅读
3.2.2. Containerd 中的实现
pkg/cri/server/sandbox_portforward_linux.go L:34
func (c *criService) portForward(ctx context.Context, id string, port int32, stream io.ReadWriteCloser) error {
s, err := c.sandboxStore.Get(id)
if err != nil {
return errors.Wrapf(err, "failed to find sandbox %q in store", id)
}
var netNSDo func(func(ns.NetNS) error) error
// netNSPath is the network namespace path for logging.
var netNSPath string
securityContext := s.Config.GetLinux().GetSecurityContext()
hostNet := securityContext.GetNamespaceOptions().GetNetwork() == runtime.NamespaceMode_NODE
if !hostNet {
if closed, err := s.NetNS.Closed(); err != nil {
return errors.Wrapf(err, "failed to check netwok namespace closed for sandbox %q", id)
} else if closed {
return errors.Errorf("network namespace for sandbox %q is closed", id)
}
netNSDo = s.NetNS.Do
netNSPath = s.NetNS.GetPath()
} else {
// Run the function directly for host network.
netNSDo = func(do func(_ ns.NetNS) error) error {
return do(nil)
}
netNSPath = "host"
}
log.G(ctx).Infof("Executing port forwarding in network namespace %q", netNSPath)
err = netNSDo(func(_ ns.NetNS) error {
defer stream.Close()
// localhost can resolve to both IPv4 and IPv6 addresses in dual-stack systems
// but the application can be listening in one of the IP families only.
// golang has enabled RFC 6555 Fast Fallback (aka HappyEyeballs) by default in 1.12
// It means that if a host resolves to both IPv6 and IPv4, it will try to connect to any
// of those addresses and use the working connection.
// However, the implementation uses go routines to start both connections in parallel,
// and this cases that the connection is done outside the namespace, so we try to connect
// serially.
// We try IPv4 first to keep current behavior and we fallback to IPv6 if the connection fails.
// xref https://github.com/golang/go/issues/44922
var conn net.Conn
conn, err := net.Dial("tcp4", fmt.Sprintf("localhost:%d", port))
if err != nil {
var errV6 error
conn, errV6 = net.Dial("tcp6", fmt.Sprintf("localhost:%d", port))
if errV6 != nil {
return fmt.Errorf("failed to connect to localhost:%d inside namespace %q, IPv4: %v IPv6 %v ", port, id, err, errV6)
}
}
defer conn.Close()
errCh := make(chan error, 2)
// Copy from the the namespace port connection to the client stream
go func() {
log.G(ctx).Debugf("PortForward copying data from namespace %q port %d to the client stream", id, port)
_, err := io.Copy(stream, conn)
errCh <- err
}()
// Copy from the client stream to the namespace port connection
go func() {
log.G(ctx).Debugf("PortForward copying data from client stream to namespace %q port %d", id, port)
_, err := io.Copy(conn, stream)
errCh <- err
}()
// Wait until the first error is returned by one of the connections
// we use errFwd to store the result of the port forwarding operation
// if the context is cancelled close everything and return
var errFwd error
select {
case errFwd = <-errCh:
log.G(ctx).Debugf("PortForward stop forwarding in one direction in network namespace %q port %d: %v", id, port, errFwd)
case <-ctx.Done():
log.G(ctx).Debugf("PortForward cancelled in network namespace %q port %d: %v", id, port, ctx.Err())
return ctx.Err()
}
// give a chance to terminate gracefully or timeout
// after 1s
// https://linux.die.net/man/1/socat
const timeout = time.Second
select {
case e := <-errCh:
if errFwd == nil {
errFwd = e
}
log.G(ctx).Debugf("PortForward stopped forwarding in both directions in network namespace %q port %d: %v", id, port, e)
case <-time.After(timeout):
log.G(ctx).Debugf("PortForward timed out waiting to close the connection in network namespace %q port %d", id, port)
case <-ctx.Done():
log.G(ctx).Debugf("PortForward cancelled in network namespace %q port %d: %v", id, port, ctx.Err())
errFwd = ctx.Err()
}
return errFwd
})
if err != nil {
return errors.Wrapf(err, "failed to execute portforward in network namespace %q", netNSPath)
}
log.G(ctx).Infof("Finish port forwarding for %q port %d", id, port)
return nil
}
- 通过 ID 获取到 sanbox 结构体从而拿到 security context 与 namespace
- 拿到 network namespace 信息判断是否是 host network.用来确定 network namespace path 和进入 namespace 的函数
- 进入到相应的 namespace,监听端口并将新开的端口与传递的流打通,从而达到 proxy 的效果
如果想知道进入 namespace 执行的逻辑可以看 openshitf 的库 containernetworking-plugins
https://github.com/openshift/containernetworking-plugins/blob/master/pkg/ns/ns_linux.go#L170