Kubernetes permanent port-forward (or close to that...)
- 28 kubernetes
- 5 rust
Introduction
In this article we will explore how to create and use a Kubernetes port-forward tool, the main objective is to declaratively configure the port-forwards that we need and have them going by firing a single command.
If you want to try it, it is called kube-forward
and you can find it here,
check the releases page as there are already binaries pre-compiled via github actions, I have only tested on x86_64, but
let me know if you try the others and they work!
A few notes, be sure that the ports you are using are free, also have the right Kubernetes configuration exported as the client will auto-detect the default context.
Configuration
First, lets check the configuration to see what it looks like and declare what we want to connect to…
- name: "argocd-ui" # identifier for the logs
target: "argocd-server.argocd" # deployment_name.namespace
ports:
local: 8080 # custom local port
remote: 8080 # remote port
options:
retry_interval: 5s # if the connection fails, retry automatically in x seconds
max_retries: 30 # max amount of retries
health_check_interval: 10s # check every x seconds if everything is still working
pod_selector:
label: "app.kubernetes.io/name=argocd-server" # use labels to select your pods
- name: "postgres"
target: "postgres.tr"
ports:
local: 5434
remote: 5432
options:
retry_interval: 5s
max_retries: 30
health_check_interval: 10s
pod_selector:
label: "app=postgres"
I added some comments to make the config easier to understand, the config might get a bit simpler over time, but for now that’s what the cli tool needs.
Test it
What would it look to run it?
❯ RUST_LOG=info kube-forward -c config.yaml -e
2025-01-10T23:53:38.711212Z INFO kube_forward: Setting up port-forward for argocd-ui
2025-01-10T23:53:39.346540Z INFO kube_forward: Setting up port-forward for postgres
2025-01-10T23:53:39.464369Z INFO kube_forward::forward: Port-forward established for argocd-ui
2025-01-10T23:53:39.465744Z INFO kube_forward::forward: New connection for argocd-ui peer_addr=127.0.0.1:37494
2025-01-10T23:53:39.793116Z INFO kube_forward::forward: Port-forward established for postgres
2025-01-10T23:53:39.794432Z INFO kube_forward::forward: New connection for postgres peer_addr=127.0.0.1:57104
With RUST_LOG
we can influence the amount of feedback we get, without it we will see some random connection errors
(which are safe to ignore, that’s the whole point of the tool), with -e
we can expose prometheus metrics if you are an
observability fan, and well, -c
is pretty self-explanatory.
That would look something like this:
❯ curl localhost:9292/metrics
# TYPE port_forward_connection_successes_total counter
port_forward_connection_successes_total{service="kube-forward",forward="postgres"} 21
port_forward_connection_successes_total{service="kube-forward",forward="argocd-ui"} 21
# TYPE port_forward_connection_attempts_total counter
port_forward_connection_attempts_total{service="kube-forward",forward="argocd-ui"} 21
port_forward_connection_attempts_total{service="kube-forward",forward="postgres"} 21
# TYPE port_forward_connected gauge
port_forward_connected{service="kube-forward",forward="argocd-ui"} 1
port_forward_connected{service="kube-forward",forward="postgres"} 1
The code
At the moment of this writing the code looks something like this (I will put only 2 files, there are more types, etc
but with these you will get the idea of what the code is doing and how), this is the main.rs
file.
use anyhow::Result;
use clap::Parser;
use kube::Client;
use metrics_exporter_prometheus::PrometheusBuilder;
use std::path::PathBuf;
use tracing::{error, info};
use kube_forward::{config::ForwardConfig, forward::PortForwardManager, util::resolve_service};
#[derive(Parser)]
#[command(author, version, about)]
struct Cli {
#[arg(short, long, default_value = "config.yaml")]
config: PathBuf,
#[arg(short, long)]
expose_metrics: bool,
#[arg(short, long, default_value = "9292")]
metrics_port: u16,
}
#[tokio::main]
async fn main() -> Result<()> {
// Initialize logging
tracing_subscriber::fmt::init();
// Parse command line arguments
let cli = Cli::parse();
// Initialize metrics
if cli.expose_metrics {
let builder = PrometheusBuilder::new();
builder
.with_http_listener(([0, 0, 0, 0], cli.metrics_port))
.add_global_label("service", "kube-forward")
.install()?;
}
// Load configuration
let config_content = tokio::fs::read_to_string(&cli.config).await?;
let config: Vec<ForwardConfig> = serde_yaml::from_str(&config_content)?;
// Initialize Kubernetes client
let client = Client::try_default().await?;
// Create port-forward manager
let manager = PortForwardManager::new(client.clone());
// Set up signal handling
let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel(1);
let shutdown_tx_clone = shutdown_tx.clone();
ctrlc::set_handler(move || {
let _ = shutdown_tx_clone.send(());
})?;
// Start port-forwards
for forward_config in config {
info!("Setting up port-forward for {}", forward_config.name);
match resolve_service(client.clone(), &forward_config.target).await {
Ok(service_info) => {
if let Err(e) = manager
.add_forward(forward_config.clone(), service_info)
.await
{
error!(
"Failed to set up port-forward {}: {}",
forward_config.name, e
);
}
}
Err(e) => {
error!(
"Failed to resolve service for {}: {}",
forward_config.name, e
);
}
}
}
// Wait for shutdown signal
shutdown_rx.recv().await?;
info!("Shutting down...");
// Stop all port-forwards
manager.stop_all().await;
Ok(())
}
Which basically sets everything up and start the process of set up the port-forwards, then the most critical file is
forward.rs
, which looks something like this:
In this file we have all the functions that do the heavy-lifting to setup and maintain the port-forwards thanks to
kube-rs
:
use socket2::{SockRef, TcpKeepalive};
use kube::{
api::{Api, DeleteParams, PostParams},
runtime::wait::{await_condition, conditions::is_pod_running},
Client, ResourceExt,
};
use crate::{
config::ForwardConfig,
config::PodSelector,
error::{PortForwardError, Result},
metrics::ForwardMetrics,
util::ServiceInfo,
};
use anyhow;
use chrono::DateTime;
use chrono::Utc;
use k8s_openapi::api::core::v1::Pod;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use tracing::{debug, error, info, warn};
use futures::TryStreamExt;
use std::net::SocketAddr;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpListener,
};
use tokio_stream::wrappers::TcpListenerStream;
#[derive(Debug)]
pub struct HealthCheck {
pub last_check: Arc<RwLock<Option<DateTime<Utc>>>>,
pub failures: Arc<RwLock<u32>>,
}
impl HealthCheck {
pub fn new() -> Self {
Self {
last_check: Arc::new(RwLock::new(None)),
failures: Arc::new(RwLock::new(0)),
}
}
pub async fn check_connection(&self, local_port: u16) -> bool {
use tokio::net::TcpStream;
match TcpStream::connect(format!("127.0.0.1:{}", local_port)).await {
Ok(_) => {
*self.failures.write().await = 0;
*self.last_check.write().await = Some(Utc::now());
true
}
Err(_) => {
let mut failures = self.failures.write().await;
*failures += 1;
false
}
}
}
}
// Represents the state of a port-forward
#[derive(Debug, Clone, PartialEq)]
pub enum ForwardState {
Starting,
Connected,
Disconnected,
Failed(String),
Stopping,
}
#[derive(Debug, Clone)]
pub struct PortForward {
pub config: ForwardConfig,
pub service_info: ServiceInfo,
pub state: Arc<RwLock<ForwardState>>,
pub shutdown: broadcast::Sender<()>,
pub metrics: ForwardMetrics,
}
impl PortForward {
pub fn new(config: ForwardConfig, service_info: ServiceInfo) -> Self {
let (shutdown_tx, _) = broadcast::channel(1);
Self {
metrics: ForwardMetrics::new(config.name.clone()),
config,
service_info,
state: Arc::new(RwLock::new(ForwardState::Starting)),
shutdown: shutdown_tx,
}
}
pub async fn start(&self, client: Client) -> Result<()> {
let mut retry_count = 0;
let mut shutdown_rx = self.shutdown.subscribe();
loop {
if retry_count >= self.config.options.max_retries {
let err_msg = "Max retry attempts reached".to_string();
*self.state.write().await = ForwardState::Failed(err_msg.clone());
return Err(PortForwardError::ConnectionError(err_msg));
}
self.metrics.record_connection_attempt();
match self.establish_forward(&client).await {
Ok(()) => {
*self.state.write().await = ForwardState::Connected;
self.metrics.record_connection_success();
self.metrics.set_connection_status(true);
info!("Port-forward established for {}", self.config.name);
// Monitor the connection
tokio::select! {
_ = shutdown_rx.recv() => {
info!("Received shutdown signal for {}", self.config.name);
break;
}
_ = self.monitor_connection(&client) => {
warn!("Connection lost for {}, attempting to reconnect", self.config.name);
*self.state.write().await = ForwardState::Disconnected;
}
}
}
Err(e) => {
warn!(
"Failed to establish port-forward for {}: {}",
self.config.name, e
);
self.metrics.record_connection_failure();
self.metrics.set_connection_status(false);
retry_count += 1;
tokio::time::sleep(self.config.options.retry_interval).await;
continue;
}
}
}
Ok(())
}
async fn monitor_connection(&self, client: &Client) -> Result<()> {
let health_check = HealthCheck::new();
let mut interval = tokio::time::interval(self.config.options.health_check_interval);
loop {
interval.tick().await;
// Check TCP connection
if !health_check.check_connection(self.config.ports.local).await {
return Err(PortForwardError::ConnectionError(
"Connection health check failed".to_string(),
));
}
// Check pod status
if let Ok(pod) = self.get_pod(client).await {
if let Some(status) = &pod.status {
if let Some(phase) = &status.phase {
if phase != "Running" {
return Err(PortForwardError::ConnectionError(
"Pod is no longer running".to_string(),
));
}
}
}
} else {
return Err(PortForwardError::ConnectionError(
"Pod not found".to_string(),
));
}
}
}
async fn establish_forward(&self, client: &Client) -> Result<()> {
self.metrics.record_connection_attempt();
// Get pod for the service
let pod = self.get_pod(client).await?;
// Clone the name to avoid lifetime issues
let pod_name = pod.metadata.name.clone().ok_or_else(|| {
self.metrics.record_connection_failure();
PortForwardError::ConnectionError("Pod name not found".to_string())
})?;
// Create Api instance for the namespace
let pods: Api<Pod> = Api::namespaced(client.clone(), &self.service_info.namespace);
// Create TCP listener for the local port
debug!(
"Creating TCP listener for the local port: {}",
self.config.ports.local
);
let addr = SocketAddr::from(([127, 0, 0, 1], self.config.ports.local));
let listener = TcpListener::bind(addr).await.map_err(|e| {
self.metrics.record_connection_failure();
match e.kind() {
std::io::ErrorKind::AddrInUse => PortForwardError::ConnectionError(format!(
"Port {} is already in use. Please choose a different local port",
self.config.ports.local
)),
_ => PortForwardError::ConnectionError(format!("Failed to bind to port: {}", e)),
}
})?;
// Set TCP keepalive
// let tcp = TcpStream::connect(&addr).await?;
let ka = TcpKeepalive::new().with_time(std::time::Duration::from_secs(30));
let sf = SockRef::from(&listener);
let _ = sf.set_tcp_keepalive(&ka);
// Set state to connected
*self.state.write().await = ForwardState::Connected;
self.metrics.record_connection_success();
self.metrics.set_connection_status(true);
// Clone values needed for the async task
let state = self.state.clone();
let name = self.config.name.clone();
let remote_port = self.config.ports.remote;
let mut shutdown = self.shutdown.subscribe();
let metrics = self.metrics.clone(); // Clone metrics for the task
// Spawn the main forwarding task
tokio::spawn(async move {
let mut listener_stream = TcpListenerStream::new(listener);
let name = name.as_str(); // Use as_str() to get a &str we can copy
loop {
tokio::select! {
// Handle new connections
Ok(Some(client_conn)) = listener_stream.try_next() => {
if let Ok(peer_addr) = client_conn.peer_addr() {
info!(%peer_addr, "New connection for {}", name);
metrics.record_connection_attempt();
}
let pods = pods.clone();
let pod_name = pod_name.clone();
let metrics = metrics.clone(); // Clone metrics for the connection task
tokio::spawn(async move {
if let Err(e) = Self::forward_connection(&pods, pod_name, remote_port, client_conn).await {
error!("Failed to forward connection: {}", e);
metrics.record_connection_failure();
} else {
metrics.record_connection_success();
}
});
}
// Handle shutdown signal
_ = shutdown.recv() => {
info!("Received shutdown signal for {}", name);
*state.write().await = ForwardState::Disconnected;
metrics.set_connection_status(false);
break;
}
else => {
error!("Port forward {} listener closed", name);
*state.write().await = ForwardState::Failed("Listener closed unexpectedly".to_string());
metrics.set_connection_status(false);
metrics.record_connection_failure();
break;
}
}
}
});
Ok(())
}
async fn forward_connection(
pods: &Api<Pod>,
pod_name: String,
port: u16,
mut client_conn: impl AsyncRead + AsyncWrite + Unpin,
) -> anyhow::Result<()> {
debug!("Starting port forward for port {}", port);
// Create port forward
let mut pf = pods
.portforward(&pod_name, &[port])
.await
.map_err(|e| anyhow::anyhow!("Failed to create portforward: {}", e))?;
// Get the stream for our port
let mut upstream_conn = pf
.take_stream(port) // Use port instead of 0
.ok_or_else(|| {
anyhow::anyhow!("Failed to get port forward stream for port {}", port)
})?;
debug!("Port forward stream established for port {}", port);
// Copy data bidirectionally with timeout
match tokio::time::timeout(
std::time::Duration::from_secs(30), // 30 second timeout
tokio::io::copy_bidirectional(&mut client_conn, &mut upstream_conn),
)
.await
{
Ok(Ok(_)) => {
debug!("Connection closed normally for port {}", port);
}
Ok(Err(e)) => {
warn!("Error during data transfer for port {}: {}", port, e);
return Err(anyhow::anyhow!("Data transfer error: {}", e));
}
Err(_) => {
warn!("Connection timeout for port {}", port);
return Err(anyhow::anyhow!("Connection timeout"));
}
}
// Clean up
drop(upstream_conn);
// Wait for the port forwarder to finish
if let Err(e) = pf.join().await {
warn!("Port forwarder join error: {}", e);
}
Ok(())
}
async fn get_pod(&self, client: &Client) -> Result<Pod> {
let pods: Api<Pod> = Api::namespaced(client.clone(), &self.service_info.namespace);
// Get all pods in the namespace
let pod_list = pods
.list(&kube::api::ListParams::default())
.await
.map_err(|e| PortForwardError::KubeError(e))?;
for pod in pod_list.items {
if self
.clone()
.matches_pod_selector(&pod, &self.config.pod_selector)
{
if let Some(status) = &pod.status {
if let Some(phase) = &status.phase {
if phase == "Running" {
return Ok(pod);
}
}
}
}
}
Err(PortForwardError::ConnectionError(format!(
"No ready pods found matching selector for service {}",
self.service_info.name
)))
}
pub fn matches_pod_selector(self, pod: &Pod, selector: &PodSelector) -> bool {
// If no selector is specified, fall back to checking if service name is in any label
if selector.label.is_none() && selector.annotation.is_none() {
return pod.metadata.labels.as_ref().map_or(false, |labels| {
labels.values().any(|v| v == &self.service_info.name)
});
}
// Check label if specified
if let Some(label_selector) = &selector.label {
let (key, value) = self.clone().parse_selector(label_selector);
if !pod.metadata.labels.as_ref().map_or(false, |labels| {
labels.get(key).map_or(false, |v| v == value)
}) {
return false;
}
}
// Check annotation if specified
if let Some(annotation_selector) = &selector.annotation {
let (key, value) = self.clone().parse_selector(annotation_selector);
if !pod
.metadata
.annotations
.as_ref()
.map_or(false, |annotations| {
annotations.get(key).map_or(false, |v| v == value)
})
{
return false;
}
}
true
}
pub fn parse_selector(self, selector: &str) -> (&str, &str) {
let parts: Vec<&str> = selector.split('=').collect();
match parts.as_slice() {
[key, value] => (*key, *value),
_ => ("", ""), // Return empty strings if format is invalid
}
}
}
// Manager to handle multiple port-forwards
pub struct PortForwardManager {
forwards: Arc<RwLock<Vec<Arc<PortForward>>>>,
client: Client,
}
impl PortForwardManager {
pub fn new(client: Client) -> Self {
Self {
forwards: Arc::new(RwLock::new(Vec::new())),
client,
}
}
pub async fn add_forward(
&self,
config: ForwardConfig,
service_info: ServiceInfo,
) -> Result<()> {
let forward = Arc::new(PortForward::new(config, service_info));
self.forwards.write().await.push(forward.clone());
// Start the port-forward in a separate task
let client = self.client.clone();
tokio::spawn(async move {
if let Err(e) = forward.start(client).await {
error!("Port-forward failed: {}", e);
}
});
Ok(())
}
pub async fn stop_all(&self) {
for forward in self.forwards.read().await.iter() {
// forward.stop().await;
forward.shutdown.send(()).unwrap();
}
}
}
As you can see there is a lot going on, but to simplify things over:
- Iterate over the config.
- Find the pods that matches the selector.
- Try to establish a connection.
- Keep the connection up and reconnect automatically if it fails for any reason.
- On drop or close, send a shutdown signal.
I will probably do a video soon about it to explain it in a bit more detail and show the basic usage, give it a try and let me know how it goes! Until next time!
-
Comments
Online: 0
Please sign in to be able to write comments.