Kubernetes redireccionamiento de puertos permanente (o cerca...)


Introducción

En este artículo, vamos a ver cómo armar y usar una herramienta para hacer port-forwarding en Kubernetes. La idea principal es configurar de forma declarativa los port-forwards que necesitamos y tenerlos andando con un solo comando.

Si lo querés probar, se llama kube-forward y la encontrás acá. Fijate en la página de releases, ya que hay binarios precompilados a través de Github Actions. Solo la probé en x86_64, ¡pero chiflame si probás las otras y andan!

Un par de cosas: asegurate de que los puertos que estás usando estén libres y de tener la configuración piola de Kubernetes exportada, porque el cliente va a autodetectar el contexto por defecto.


Configuración

Primero, vamos a chusmear la configuración para ver cómo es y declarar a qué nos queremos conectar…

- name: "argocd-ui"                                 # identificador para los logs
  target: "argocd-server.argocd"                    # nombre_deploy.namespace
  ports:
    local: 8080                                     # puerto local
    remote: 8080                                    # puerto remoto
  options:
    retry_interval: 5s                              # reintentar cada x segundos
    max_retries: 30                                 # dejar de intentar despues de x reconneciones
    health_check_interval: 10s                      # verifica cada x segundos si la conexion sigue funcionando
  pod_selector:
    label: "app.kubernetes.io/name=argocd-server"   # usa labels para seleccionar tu pod

- name: "postgres"
  target: "postgres.tr"
  ports:
    local: 5434 
    remote: 5432   
  options:
    retry_interval: 5s
    max_retries: 30
    health_check_interval: 10s
  pod_selector:
    label: "app=postgres"

Agregue algunos comentarios para que sea mas facil de entender, seguro la configuracion se va a simplificar un poco en el tiempo, pero por ahora asi es como funciona.


Probando

Que deberiamos ver?

 RUST_LOG=info kube-forward -c config.yaml -e
2025-01-10T23:53:38.711212Z  INFO kube_forward: Setting up port-forward for argocd-ui
2025-01-10T23:53:39.346540Z  INFO kube_forward: Setting up port-forward for postgres
2025-01-10T23:53:39.464369Z  INFO kube_forward::forward: Port-forward established for argocd-ui
2025-01-10T23:53:39.465744Z  INFO kube_forward::forward: New connection for argocd-ui peer_addr=127.0.0.1:37494
2025-01-10T23:53:39.793116Z  INFO kube_forward::forward: Port-forward established for postgres
2025-01-10T23:53:39.794432Z  INFO kube_forward::forward: New connection for postgres peer_addr=127.0.0.1:57104

Con RUST_LOG podemos definir cuanta informacion nos devuelve la herramienta, estando en blanco solo veriamos errores eventuales (que se pueden ignorar en general), con -e podes exponer metricas de prometheus en el puerto 9292, y -c es bastante intuitivo.


Las metricas se verian algo asi (completamente innecesario):

 curl localhost:9292/metrics
# TYPE port_forward_connection_successes_total counter
port_forward_connection_successes_total{service="kube-forward",forward="postgres"} 21
port_forward_connection_successes_total{service="kube-forward",forward="argocd-ui"} 21

# TYPE port_forward_connection_attempts_total counter
port_forward_connection_attempts_total{service="kube-forward",forward="argocd-ui"} 21
port_forward_connection_attempts_total{service="kube-forward",forward="postgres"} 21

# TYPE port_forward_connected gauge
port_forward_connected{service="kube-forward",forward="argocd-ui"} 1
port_forward_connected{service="kube-forward",forward="postgres"} 1

El codigo

Al momento de escribir esto asi es como se ve, solo voy a poner los 2 archivos principales main.rs y forward.rs, hay mas archivos con tipos y otras cosas, pero toda la logica y lo principal esta aca.

use anyhow::Result;
use clap::Parser;
use kube::Client;
use metrics_exporter_prometheus::PrometheusBuilder;
use std::path::PathBuf;
use tracing::{error, info};

use kube_forward::{config::ForwardConfig, forward::PortForwardManager, util::resolve_service};

#[derive(Parser)]
#[command(author, version, about)]
struct Cli {
    #[arg(short, long, default_value = "config.yaml")]
    config: PathBuf,

    #[arg(short, long)]
    expose_metrics: bool,

    #[arg(short, long, default_value = "9292")]
    metrics_port: u16,
}

#[tokio::main]
async fn main() -> Result<()> {
    // Initialize logging
    tracing_subscriber::fmt::init();

    // Parse command line arguments
    let cli = Cli::parse();

    // Initialize metrics
    if cli.expose_metrics {
        let builder = PrometheusBuilder::new();
        builder
            .with_http_listener(([0, 0, 0, 0], cli.metrics_port))
            .add_global_label("service", "kube-forward")
            .install()?;
    }

    // Load configuration
    let config_content = tokio::fs::read_to_string(&cli.config).await?;
    let config: Vec<ForwardConfig> = serde_yaml::from_str(&config_content)?;

    // Initialize Kubernetes client
    let client = Client::try_default().await?;

    // Create port-forward manager
    let manager = PortForwardManager::new(client.clone());

    // Set up signal handling
    let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel(1);
    let shutdown_tx_clone = shutdown_tx.clone();

    ctrlc::set_handler(move || {
        let _ = shutdown_tx_clone.send(());
    })?;

    // Start port-forwards
    for forward_config in config {
        info!("Setting up port-forward for {}", forward_config.name);

        match resolve_service(client.clone(), &forward_config.target).await {
            Ok(service_info) => {
                if let Err(e) = manager
                    .add_forward(forward_config.clone(), service_info)
                    .await
                {
                    error!(
                        "Failed to set up port-forward {}: {}",
                        forward_config.name, e
                    );
                }
            }
            Err(e) => {
                error!(
                    "Failed to resolve service for {}: {}",
                    forward_config.name, e
                );
            }
        }
    }

    // Wait for shutdown signal
    shutdown_rx.recv().await?;
    info!("Shutting down...");

    // Stop all port-forwards
    manager.stop_all().await;

    Ok(())
}

Basicamente configura todo e inicia el proceso para configurar los port-forwards.


El archivo mas critico es forward.rs que se ve algo asi, en este arhivo es donde se configura todo gracias a kube-rs:

use socket2::{SockRef, TcpKeepalive};

use kube::{
    api::{Api, DeleteParams, PostParams},
    runtime::wait::{await_condition, conditions::is_pod_running},
    Client, ResourceExt,
};

use crate::{
    config::ForwardConfig,
    config::PodSelector,
    error::{PortForwardError, Result},
    metrics::ForwardMetrics,
    util::ServiceInfo,
};
use anyhow;
use chrono::DateTime;
use chrono::Utc;
use k8s_openapi::api::core::v1::Pod;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};

use tracing::{debug, error, info, warn};

use futures::TryStreamExt;

use std::net::SocketAddr;
use tokio::{
    io::{AsyncRead, AsyncWrite},
    net::TcpListener,
};
use tokio_stream::wrappers::TcpListenerStream;

#[derive(Debug)]
pub struct HealthCheck {
    pub last_check: Arc<RwLock<Option<DateTime<Utc>>>>,
    pub failures: Arc<RwLock<u32>>,
}

impl HealthCheck {
    pub fn new() -> Self {
        Self {
            last_check: Arc::new(RwLock::new(None)),
            failures: Arc::new(RwLock::new(0)),
        }
    }

    pub async fn check_connection(&self, local_port: u16) -> bool {
        use tokio::net::TcpStream;

        match TcpStream::connect(format!("127.0.0.1:{}", local_port)).await {
            Ok(_) => {
                *self.failures.write().await = 0;
                *self.last_check.write().await = Some(Utc::now());
                true
            }
            Err(_) => {
                let mut failures = self.failures.write().await;
                *failures += 1;
                false
            }
        }
    }
}

// Represents the state of a port-forward
#[derive(Debug, Clone, PartialEq)]
pub enum ForwardState {
    Starting,
    Connected,
    Disconnected,
    Failed(String),
    Stopping,
}

#[derive(Debug, Clone)]
pub struct PortForward {
    pub config: ForwardConfig,
    pub service_info: ServiceInfo,
    pub state: Arc<RwLock<ForwardState>>,
    pub shutdown: broadcast::Sender<()>,
    pub metrics: ForwardMetrics,
}

impl PortForward {
    pub fn new(config: ForwardConfig, service_info: ServiceInfo) -> Self {
        let (shutdown_tx, _) = broadcast::channel(1);
        Self {
            metrics: ForwardMetrics::new(config.name.clone()),
            config,
            service_info,
            state: Arc::new(RwLock::new(ForwardState::Starting)),
            shutdown: shutdown_tx,
        }
    }

    pub async fn start(&self, client: Client) -> Result<()> {
        let mut retry_count = 0;
        let mut shutdown_rx = self.shutdown.subscribe();

        loop {
            if retry_count >= self.config.options.max_retries {
                let err_msg = "Max retry attempts reached".to_string();
                *self.state.write().await = ForwardState::Failed(err_msg.clone());
                return Err(PortForwardError::ConnectionError(err_msg));
            }

            self.metrics.record_connection_attempt();

            match self.establish_forward(&client).await {
                Ok(()) => {
                    *self.state.write().await = ForwardState::Connected;
                    self.metrics.record_connection_success();
                    self.metrics.set_connection_status(true);
                    info!("Port-forward established for {}", self.config.name);

                    // Monitor the connection
                    tokio::select! {
                        _ = shutdown_rx.recv() => {
                            info!("Received shutdown signal for {}", self.config.name);
                            break;
                        }
                        _ = self.monitor_connection(&client) => {
                            warn!("Connection lost for {}, attempting to reconnect", self.config.name);
                            *self.state.write().await = ForwardState::Disconnected;
                        }
                    }
                }
                Err(e) => {
                    warn!(
                        "Failed to establish port-forward for {}: {}",
                        self.config.name, e
                    );
                    self.metrics.record_connection_failure();
                    self.metrics.set_connection_status(false);
                    retry_count += 1;
                    tokio::time::sleep(self.config.options.retry_interval).await;
                    continue;
                }
            }
        }

        Ok(())
    }

    async fn monitor_connection(&self, client: &Client) -> Result<()> {
        let health_check = HealthCheck::new();
        let mut interval = tokio::time::interval(self.config.options.health_check_interval);

        loop {
            interval.tick().await;

            // Check TCP connection
            if !health_check.check_connection(self.config.ports.local).await {
                return Err(PortForwardError::ConnectionError(
                    "Connection health check failed".to_string(),
                ));
            }

            // Check pod status
            if let Ok(pod) = self.get_pod(client).await {
                if let Some(status) = &pod.status {
                    if let Some(phase) = &status.phase {
                        if phase != "Running" {
                            return Err(PortForwardError::ConnectionError(
                                "Pod is no longer running".to_string(),
                            ));
                        }
                    }
                }
            } else {
                return Err(PortForwardError::ConnectionError(
                    "Pod not found".to_string(),
                ));
            }
        }
    }

    async fn establish_forward(&self, client: &Client) -> Result<()> {
        self.metrics.record_connection_attempt();
        // Get pod for the service
        let pod = self.get_pod(client).await?;
        // Clone the name to avoid lifetime issues
        let pod_name = pod.metadata.name.clone().ok_or_else(|| {
            self.metrics.record_connection_failure();
            PortForwardError::ConnectionError("Pod name not found".to_string())
        })?;

        // Create Api instance for the namespace
        let pods: Api<Pod> = Api::namespaced(client.clone(), &self.service_info.namespace);

        // Create TCP listener for the local port
        debug!(
            "Creating TCP listener for the local port: {}",
            self.config.ports.local
        );
        let addr = SocketAddr::from(([127, 0, 0, 1], self.config.ports.local));
        let listener = TcpListener::bind(addr).await.map_err(|e| {
            self.metrics.record_connection_failure();
            match e.kind() {
                std::io::ErrorKind::AddrInUse => PortForwardError::ConnectionError(format!(
                    "Port {} is already in use. Please choose a different local port",
                    self.config.ports.local
                )),
                _ => PortForwardError::ConnectionError(format!("Failed to bind to port: {}", e)),
            }
        })?;

        // Set TCP keepalive
        // let tcp = TcpStream::connect(&addr).await?;
        let ka = TcpKeepalive::new().with_time(std::time::Duration::from_secs(30));
        let sf = SockRef::from(&listener);
        let _ = sf.set_tcp_keepalive(&ka);

        // Set state to connected
        *self.state.write().await = ForwardState::Connected;
        self.metrics.record_connection_success();
        self.metrics.set_connection_status(true);

        // Clone values needed for the async task
        let state = self.state.clone();
        let name = self.config.name.clone();
        let remote_port = self.config.ports.remote;
        let mut shutdown = self.shutdown.subscribe();
        let metrics = self.metrics.clone(); // Clone metrics for the task

        // Spawn the main forwarding task
        tokio::spawn(async move {
            let mut listener_stream = TcpListenerStream::new(listener);
            let name = name.as_str(); // Use as_str() to get a &str we can copy

            loop {
                tokio::select! {
                    // Handle new connections
                    Ok(Some(client_conn)) = listener_stream.try_next() => {
                        if let Ok(peer_addr) = client_conn.peer_addr() {
                            info!(%peer_addr, "New connection for {}", name);
                            metrics.record_connection_attempt();
                        }
                        let pods = pods.clone();
                        let pod_name = pod_name.clone();
                        let metrics = metrics.clone(); // Clone metrics for the connection task

                        tokio::spawn(async move {
                            if let Err(e) = Self::forward_connection(&pods, pod_name, remote_port, client_conn).await {
                                error!("Failed to forward connection: {}", e);
                                metrics.record_connection_failure();
                            } else {
                                metrics.record_connection_success();
                            }
                        });
                    }

                    // Handle shutdown signal
                    _ = shutdown.recv() => {
                        info!("Received shutdown signal for {}", name);
                        *state.write().await = ForwardState::Disconnected;
                        metrics.set_connection_status(false);
                        break;
                    }

                    else => {
                        error!("Port forward {} listener closed", name);
                        *state.write().await = ForwardState::Failed("Listener closed unexpectedly".to_string());
                        metrics.set_connection_status(false);
                        metrics.record_connection_failure();
                        break;
                    }
                }
            }
        });

        Ok(())
    }

    async fn forward_connection(
        pods: &Api<Pod>,
        pod_name: String,
        port: u16,
        mut client_conn: impl AsyncRead + AsyncWrite + Unpin,
    ) -> anyhow::Result<()> {
        debug!("Starting port forward for port {}", port);

        // Create port forward
        let mut pf = pods
            .portforward(&pod_name, &[port])
            .await
            .map_err(|e| anyhow::anyhow!("Failed to create portforward: {}", e))?;

        // Get the stream for our port
        let mut upstream_conn = pf
            .take_stream(port) // Use port instead of 0
            .ok_or_else(|| {
                anyhow::anyhow!("Failed to get port forward stream for port {}", port)
            })?;

        debug!("Port forward stream established for port {}", port);

        // Copy data bidirectionally with timeout
        match tokio::time::timeout(
            std::time::Duration::from_secs(30), // 30 second timeout
            tokio::io::copy_bidirectional(&mut client_conn, &mut upstream_conn),
        )
        .await
        {
            Ok(Ok(_)) => {
                debug!("Connection closed normally for port {}", port);
            }
            Ok(Err(e)) => {
                warn!("Error during data transfer for port {}: {}", port, e);
                return Err(anyhow::anyhow!("Data transfer error: {}", e));
            }
            Err(_) => {
                warn!("Connection timeout for port {}", port);
                return Err(anyhow::anyhow!("Connection timeout"));
            }
        }

        // Clean up
        drop(upstream_conn);

        // Wait for the port forwarder to finish
        if let Err(e) = pf.join().await {
            warn!("Port forwarder join error: {}", e);
        }

        Ok(())
    }

    async fn get_pod(&self, client: &Client) -> Result<Pod> {
        let pods: Api<Pod> = Api::namespaced(client.clone(), &self.service_info.namespace);

        // Get all pods in the namespace
        let pod_list = pods
            .list(&kube::api::ListParams::default())
            .await
            .map_err(|e| PortForwardError::KubeError(e))?;

        for pod in pod_list.items {
            if self
                .clone()
                .matches_pod_selector(&pod, &self.config.pod_selector)
            {
                if let Some(status) = &pod.status {
                    if let Some(phase) = &status.phase {
                        if phase == "Running" {
                            return Ok(pod);
                        }
                    }
                }
            }
        }

        Err(PortForwardError::ConnectionError(format!(
            "No ready pods found matching selector for service {}",
            self.service_info.name
        )))
    }

    pub fn matches_pod_selector(self, pod: &Pod, selector: &PodSelector) -> bool {
        // If no selector is specified, fall back to checking if service name is in any label
        if selector.label.is_none() && selector.annotation.is_none() {
            return pod.metadata.labels.as_ref().map_or(false, |labels| {
                labels.values().any(|v| v == &self.service_info.name)
            });
        }

        // Check label if specified
        if let Some(label_selector) = &selector.label {
            let (key, value) = self.clone().parse_selector(label_selector);
            if !pod.metadata.labels.as_ref().map_or(false, |labels| {
                labels.get(key).map_or(false, |v| v == value)
            }) {
                return false;
            }
        }

        // Check annotation if specified
        if let Some(annotation_selector) = &selector.annotation {
            let (key, value) = self.clone().parse_selector(annotation_selector);
            if !pod
                .metadata
                .annotations
                .as_ref()
                .map_or(false, |annotations| {
                    annotations.get(key).map_or(false, |v| v == value)
                })
            {
                return false;
            }
        }

        true
    }

    pub fn parse_selector(self, selector: &str) -> (&str, &str) {
        let parts: Vec<&str> = selector.split('=').collect();
        match parts.as_slice() {
            [key, value] => (*key, *value),
            _ => ("", ""), // Return empty strings if format is invalid
        }
    }
}

// Manager to handle multiple port-forwards
pub struct PortForwardManager {
    forwards: Arc<RwLock<Vec<Arc<PortForward>>>>,
    client: Client,
}

impl PortForwardManager {
    pub fn new(client: Client) -> Self {
        Self {
            forwards: Arc::new(RwLock::new(Vec::new())),
            client,
        }
    }

    pub async fn add_forward(
        &self,
        config: ForwardConfig,
        service_info: ServiceInfo,
    ) -> Result<()> {
        let forward = Arc::new(PortForward::new(config, service_info));
        self.forwards.write().await.push(forward.clone());

        // Start the port-forward in a separate task
        let client = self.client.clone();
        tokio::spawn(async move {
            if let Err(e) = forward.start(client).await {
                error!("Port-forward failed: {}", e);
            }
        });

        Ok(())
    }

    pub async fn stop_all(&self) {
        for forward in self.forwards.read().await.iter() {
            // forward.stop().await;
            forward.shutdown.send(()).unwrap();
        }
    }
}

Como podés ver, hay mucha tela para cortar, pero para simplificar las cosas:

  • Itera sobre la configuración.
  • Encuentra los pods que coinciden con el selector.
  • Intenta establecer una conexión.
  • Mantiene la conexión activa y se reconecta automáticamente si falla por cualquier motivo.
  • Al cerrar o interrumpir la conexión, envía una señal de apagado.

Probablemente haga un video pronto para explicarlo con un poco más de detalle y mostrar el uso básico, ¡probalo y contame cómo te va! ¡Hasta la próxima!


No tienes cuenta? Regístrate aqui

Ya registrado? Iniciar sesión a tu cuenta ahora.

Iniciar session con GitHub
Iniciar sesion con Google
  • Comentarios

    Online: 0

Por favor inicie sesión para poder escribir comentarios.

by Gabriel Garrido