Kubernetes permanent port-forward (or close to that...)
In this article we will explore how to create and use a Kubernetes port-forward tool, the main objective is to declaratively configure the port-forwards that we need and have them going by firing a single command.
If you want to try it, it is called kube-forward
and you can find it here,
check the releases page as there are already binaries pre-compiled via github actions, I have only tested on x86_64, but
let me know if you try the others and they work!
A few notes, be sure that the ports you are using are free, also have the right Kubernetes configuration exported as the client will auto-detect the default context.
First, lets check the configuration to see what it looks like and declare what we want to connect to…
- name: "argocd-ui" # identifier for the logs
target: "argocd-server.argocd" # deployment_name.namespace
local: 8080 # custom local port
remote: 8080 # remote port
retry_interval: 5s # if the connection fails, retry automatically in x seconds
max_retries: 30 # max amount of retries
health_check_interval: 10s # check every x seconds if everything is still working
label: "app.kubernetes.io/name=argocd-server" # use labels to select your pods
- name: "postgres"
target: "postgres.tr"
local: 5434
remote: 5432
retry_interval: 5s
max_retries: 30
health_check_interval: 10s
label: "app=postgres"
I added some comments to make the config easier to understand, the config might get a bit simpler over time, but for now that’s what the cli tool needs.
Test it
What would it look to run it?
❯ RUST_LOG=info kube-forward -c config.yaml -e
2025-01-10T23:53:38.711212Z INFO kube_forward: Setting up port-forward for argocd-ui
2025-01-10T23:53:39.346540Z INFO kube_forward: Setting up port-forward for postgres
2025-01-10T23:53:39.464369Z INFO kube_forward::forward: Port-forward established for argocd-ui
2025-01-10T23:53:39.465744Z INFO kube_forward::forward: New connection for argocd-ui peer_addr=
2025-01-10T23:53:39.793116Z INFO kube_forward::forward: Port-forward established for postgres
2025-01-10T23:53:39.794432Z INFO kube_forward::forward: New connection for postgres peer_addr=
we can influence the amount of feedback we get, without it we will see some random connection errors
(which are safe to ignore, that’s the whole point of the tool), with -e
we can expose prometheus metrics if you are an
observability fan, and well, -c
is pretty self-explanatory.
That would look something like this:
❯ curl localhost:9292/metrics
# TYPE port_forward_connection_successes_total counter
port_forward_connection_successes_total{service="kube-forward",forward="postgres"} 21
port_forward_connection_successes_total{service="kube-forward",forward="argocd-ui"} 21
# TYPE port_forward_connection_attempts_total counter
port_forward_connection_attempts_total{service="kube-forward",forward="argocd-ui"} 21
port_forward_connection_attempts_total{service="kube-forward",forward="postgres"} 21
# TYPE port_forward_connected gauge
port_forward_connected{service="kube-forward",forward="argocd-ui"} 1
port_forward_connected{service="kube-forward",forward="postgres"} 1
The code
At the moment of this writing the code looks something like this (I will put only 2 files, there are more types, etc
but with these you will get the idea of what the code is doing and how), this is the main.rs
use anyhow::Result;
use clap::Parser;
use kube::Client;
use metrics_exporter_prometheus::PrometheusBuilder;
use std::path::PathBuf;
use tracing::{error, info};
use kube_forward::{config::ForwardConfig, forward::PortForwardManager, util::resolve_service};
#[command(author, version, about)]
struct Cli {
#[arg(short, long, default_value = "config.yaml")]
config: PathBuf,
#[arg(short, long)]
expose_metrics: bool,
#[arg(short, long, default_value = "9292")]
metrics_port: u16,
async fn main() -> Result<()> {
// Initialize logging
// Parse command line arguments
let cli = Cli::parse();
// Initialize metrics
if cli.expose_metrics {
let builder = PrometheusBuilder::new();
.with_http_listener(([0, 0, 0, 0], cli.metrics_port))
.add_global_label("service", "kube-forward")
// Load configuration
let config_content = tokio::fs::read_to_string(&cli.config).await?;
let config: Vec<ForwardConfig> = serde_yaml::from_str(&config_content)?;
// Initialize Kubernetes client
let client = Client::try_default().await?;
// Create port-forward manager
let manager = PortForwardManager::new(client.clone());
// Set up signal handling
let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel(1);
let shutdown_tx_clone = shutdown_tx.clone();
ctrlc::set_handler(move || {
let _ = shutdown_tx_clone.send(());
// Start port-forwards
for forward_config in config {
info!("Setting up port-forward for {}", forward_config.name);
match resolve_service(client.clone(), &forward_config.target).await {
Ok(service_info) => {
if let Err(e) = manager
.add_forward(forward_config.clone(), service_info)
"Failed to set up port-forward {}: {}",
forward_config.name, e
Err(e) => {
"Failed to resolve service for {}: {}",
forward_config.name, e
// Wait for shutdown signal
info!("Shutting down...");
// Stop all port-forwards
Which basically sets everything up and start the process of set up the port-forwards, then the most critical file is
, which looks something like this:
In this file we have all the functions that do the heavy-lifting to setup and maintain the port-forwards thanks to
use socket2::{SockRef, TcpKeepalive};
use kube::{
api::{Api, DeleteParams, PostParams},
runtime::wait::{await_condition, conditions::is_pod_running},
Client, ResourceExt,
use crate::{
error::{PortForwardError, Result},
use anyhow;
use chrono::DateTime;
use chrono::Utc;
use k8s_openapi::api::core::v1::Pod;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use tracing::{debug, error, info, warn};
use futures::TryStreamExt;
use std::net::SocketAddr;
use tokio::{
io::{AsyncRead, AsyncWrite},
use tokio_stream::wrappers::TcpListenerStream;
pub struct HealthCheck {
pub last_check: Arc<RwLock<Option<DateTime<Utc>>>>,
pub failures: Arc<RwLock<u32>>,
impl HealthCheck {
pub fn new() -> Self {
Self {
last_check: Arc::new(RwLock::new(None)),
failures: Arc::new(RwLock::new(0)),
pub async fn check_connection(&self, local_port: u16) -> bool {
use tokio::net::TcpStream;
match TcpStream::connect(format!("{}", local_port)).await {
Ok(_) => {
*self.failures.write().await = 0;
*self.last_check.write().await = Some(Utc::now());
Err(_) => {
let mut failures = self.failures.write().await;
*failures += 1;
// Represents the state of a port-forward
#[derive(Debug, Clone, PartialEq)]
pub enum ForwardState {
#[derive(Debug, Clone)]
pub struct PortForward {
pub config: ForwardConfig,
pub service_info: ServiceInfo,
pub state: Arc<RwLock<ForwardState>>,
pub shutdown: broadcast::Sender<()>,
pub metrics: ForwardMetrics,
impl PortForward {
pub fn new(config: ForwardConfig, service_info: ServiceInfo) -> Self {
let (shutdown_tx, _) = broadcast::channel(1);
Self {
metrics: ForwardMetrics::new(config.name.clone()),
state: Arc::new(RwLock::new(ForwardState::Starting)),
shutdown: shutdown_tx,
pub async fn start(&self, client: Client) -> Result<()> {
let mut retry_count = 0;
let mut shutdown_rx = self.shutdown.subscribe();
loop {
if retry_count >= self.config.options.max_retries {
let err_msg = "Max retry attempts reached".to_string();
*self.state.write().await = ForwardState::Failed(err_msg.clone());
return Err(PortForwardError::ConnectionError(err_msg));
match self.establish_forward(&client).await {
Ok(()) => {
*self.state.write().await = ForwardState::Connected;
info!("Port-forward established for {}", self.config.name);
// Monitor the connection
tokio::select! {
_ = shutdown_rx.recv() => {
info!("Received shutdown signal for {}", self.config.name);
_ = self.monitor_connection(&client) => {
warn!("Connection lost for {}, attempting to reconnect", self.config.name);
*self.state.write().await = ForwardState::Disconnected;
Err(e) => {
"Failed to establish port-forward for {}: {}",
self.config.name, e
retry_count += 1;
async fn monitor_connection(&self, client: &Client) -> Result<()> {
let health_check = HealthCheck::new();
let mut interval = tokio::time::interval(self.config.options.health_check_interval);
loop {
// Check TCP connection
if !health_check.check_connection(self.config.ports.local).await {
return Err(PortForwardError::ConnectionError(
"Connection health check failed".to_string(),
// Check pod status
if let Ok(pod) = self.get_pod(client).await {
if let Some(status) = &pod.status {
if let Some(phase) = &status.phase {
if phase != "Running" {
return Err(PortForwardError::ConnectionError(
"Pod is no longer running".to_string(),
} else {
return Err(PortForwardError::ConnectionError(
"Pod not found".to_string(),
async fn establish_forward(&self, client: &Client) -> Result<()> {
// Get pod for the service
let pod = self.get_pod(client).await?;
// Clone the name to avoid lifetime issues
let pod_name = pod.metadata.name.clone().ok_or_else(|| {
PortForwardError::ConnectionError("Pod name not found".to_string())
// Create Api instance for the namespace
let pods: Api<Pod> = Api::namespaced(client.clone(), &self.service_info.namespace);
// Create TCP listener for the local port
"Creating TCP listener for the local port: {}",
let addr = SocketAddr::from(([127, 0, 0, 1], self.config.ports.local));
let listener = TcpListener::bind(addr).await.map_err(|e| {
match e.kind() {
std::io::ErrorKind::AddrInUse => PortForwardError::ConnectionError(format!(
"Port {} is already in use. Please choose a different local port",
_ => PortForwardError::ConnectionError(format!("Failed to bind to port: {}", e)),
// Set TCP keepalive
// let tcp = TcpStream::connect(&addr).await?;
let ka = TcpKeepalive::new().with_time(std::time::Duration::from_secs(30));
let sf = SockRef::from(&listener);
let _ = sf.set_tcp_keepalive(&ka);
// Set state to connected
*self.state.write().await = ForwardState::Connected;
// Clone values needed for the async task
let state = self.state.clone();
let name = self.config.name.clone();
let remote_port = self.config.ports.remote;
let mut shutdown = self.shutdown.subscribe();
let metrics = self.metrics.clone(); // Clone metrics for the task
// Spawn the main forwarding task
tokio::spawn(async move {
let mut listener_stream = TcpListenerStream::new(listener);
let name = name.as_str(); // Use as_str() to get a &str we can copy
loop {
tokio::select! {
// Handle new connections
Ok(Some(client_conn)) = listener_stream.try_next() => {
if let Ok(peer_addr) = client_conn.peer_addr() {
info!(%peer_addr, "New connection for {}", name);
let pods = pods.clone();
let pod_name = pod_name.clone();
let metrics = metrics.clone(); // Clone metrics for the connection task
tokio::spawn(async move {
if let Err(e) = Self::forward_connection(&pods, pod_name, remote_port, client_conn).await {
error!("Failed to forward connection: {}", e);
} else {
// Handle shutdown signal
_ = shutdown.recv() => {
info!("Received shutdown signal for {}", name);
*state.write().await = ForwardState::Disconnected;
else => {
error!("Port forward {} listener closed", name);
*state.write().await = ForwardState::Failed("Listener closed unexpectedly".to_string());
async fn forward_connection(
pods: &Api<Pod>,
pod_name: String,
port: u16,
mut client_conn: impl AsyncRead + AsyncWrite + Unpin,
) -> anyhow::Result<()> {
debug!("Starting port forward for port {}", port);
// Create port forward
let mut pf = pods
.portforward(&pod_name, &[port])
.map_err(|e| anyhow::anyhow!("Failed to create portforward: {}", e))?;
// Get the stream for our port
let mut upstream_conn = pf
.take_stream(port) // Use port instead of 0
.ok_or_else(|| {
anyhow::anyhow!("Failed to get port forward stream for port {}", port)
debug!("Port forward stream established for port {}", port);
// Copy data bidirectionally with timeout
match tokio::time::timeout(
std::time::Duration::from_secs(30), // 30 second timeout
tokio::io::copy_bidirectional(&mut client_conn, &mut upstream_conn),
Ok(Ok(_)) => {
debug!("Connection closed normally for port {}", port);
Ok(Err(e)) => {
warn!("Error during data transfer for port {}: {}", port, e);
return Err(anyhow::anyhow!("Data transfer error: {}", e));
Err(_) => {
warn!("Connection timeout for port {}", port);
return Err(anyhow::anyhow!("Connection timeout"));
// Clean up
// Wait for the port forwarder to finish
if let Err(e) = pf.join().await {
warn!("Port forwarder join error: {}", e);
async fn get_pod(&self, client: &Client) -> Result<Pod> {
let pods: Api<Pod> = Api::namespaced(client.clone(), &self.service_info.namespace);
// Get all pods in the namespace
let pod_list = pods
.map_err(|e| PortForwardError::KubeError(e))?;
for pod in pod_list.items {
if self
.matches_pod_selector(&pod, &self.config.pod_selector)
if let Some(status) = &pod.status {
if let Some(phase) = &status.phase {
if phase == "Running" {
return Ok(pod);
"No ready pods found matching selector for service {}",
pub fn matches_pod_selector(self, pod: &Pod, selector: &PodSelector) -> bool {
// If no selector is specified, fall back to checking if service name is in any label
if selector.label.is_none() && selector.annotation.is_none() {
return pod.metadata.labels.as_ref().map_or(false, |labels| {
labels.values().any(|v| v == &self.service_info.name)
// Check label if specified
if let Some(label_selector) = &selector.label {
let (key, value) = self.clone().parse_selector(label_selector);
if !pod.metadata.labels.as_ref().map_or(false, |labels| {
labels.get(key).map_or(false, |v| v == value)
}) {
return false;
// Check annotation if specified
if let Some(annotation_selector) = &selector.annotation {
let (key, value) = self.clone().parse_selector(annotation_selector);
if !pod
.map_or(false, |annotations| {
annotations.get(key).map_or(false, |v| v == value)
return false;
pub fn parse_selector(self, selector: &str) -> (&str, &str) {
let parts: Vec<&str> = selector.split('=').collect();
match parts.as_slice() {
[key, value] => (*key, *value),
_ => ("", ""), // Return empty strings if format is invalid
// Manager to handle multiple port-forwards
pub struct PortForwardManager {
forwards: Arc<RwLock<Vec<Arc<PortForward>>>>,
client: Client,
impl PortForwardManager {
pub fn new(client: Client) -> Self {
Self {
forwards: Arc::new(RwLock::new(Vec::new())),
pub async fn add_forward(
config: ForwardConfig,
service_info: ServiceInfo,
) -> Result<()> {
let forward = Arc::new(PortForward::new(config, service_info));
// Start the port-forward in a separate task
let client = self.client.clone();
tokio::spawn(async move {
if let Err(e) = forward.start(client).await {
error!("Port-forward failed: {}", e);
pub async fn stop_all(&self) {
for forward in self.forwards.read().await.iter() {
// forward.stop().await;
As you can see there is a lot going on, but to simplify things over:
- Iterate over the config.
- Find the pods that matches the selector.
- Try to establish a connection.
- Keep the connection up and reconnect automatically if it fails for any reason.
- On drop or close, send a shutdown signal.
I will probably do a video soon about it to explain it in a bit more detail and show the basic usage, give it a try and let me know how it goes! Until next time!
