Introduction
Manual database sharding is a distributed systems nightmare. You're building your own routing layer, handling cross-shard transactions, managing rebalancing, and debugging production issues across dozens of database instances. Vitess and Citus solve this problem differently—one for MySQL, one for PostgreSQL—but both aim to make sharding transparent to your application.
Vitess powers YouTube's infrastructure, handling millions of queries per second across thousands of MySQL shards. Citus (now part of Microsoft Azure) extends PostgreSQL with distributed query planning, making existing PostgreSQL applications scale horizontally with minimal code changes. Choosing between them (or choosing one's ecosystem) depends on your database, query patterns, and operational maturity.
This guide compares both systems architecturally, walks through production deployment, and covers the operational patterns that determine success or failure at scale.
Why Sharding Becomes Necessary
Before diving into Vitess and Citus, it's worth understanding when and why sharding becomes inevitable. A single database instance—no matter how powerful—has hard ceilings: disk I/O throughput, CPU cores for query execution, memory for buffer pools, and network bandwidth for replication. Vertical scaling (bigger machines) hits diminishing returns quickly, especially in cloud environments where instance sizes max out.
The typical progression for growing applications looks like this:
- Single database — works fine for thousands of users
- Read replicas — offload read traffic, but writes still bottleneck on a single primary
- Connection pooling — reduces connection overhead, but doesn't increase storage or compute capacity
- Application-level sharding — manually routing queries to multiple databases (fragile, error-prone)
- Vitess or Citus — automated, transparent sharding with minimal application changes
The jump from step 3 to step 5 is where most teams struggle. Vitess and Citus eliminate step 4 entirely—they handle the routing, rebalancing, and query planning so your application code stays clean.
-- Without sharding: all data on one node
-- With Citus: distributed across workers, transparent to application
SELECT create_distributed_table('orders', 'user_id');
-- Now orders are automatically spread across N worker nodes
-- Application code doesn't change at allUnderstanding Vitess: MySQL Sharding at Scale
Vitess was built at YouTube in 2010 to solve MySQL scaling limitations. It sits between your application and MySQL instances, providing transparent sharding, connection pooling, query routing, and online schema migrations.
Architecture
Vitess consists of several components:
- vtgate: The query router. Receives SQL from applications, determines which shard(s) to query, and merges results.
- vttablet: Runs alongside each MySQL instance, managing replication, backups, and health monitoring.
- Topology Service: Stores cluster metadata (etcd, ZooKeeper, or Consul).
- vtctld: The management daemon for administrative operations.
# Vitess VSchema: defining how tables are sharded
{
"sharded": true,
"vindexes": {
"hash_user_id": {
"type": "hash"
},
"lookup_email": {
"type": "consistent_lookup",
"params": {
"table": "email_to_user_idx",
"from": "email",
"to": "user_id"
},
"owner": "users"
}
},
"tables": {
"users": {
"column_vindexes": [
{ "column": "user_id", "name": "hash_user_id" },
{ "column": "email", "name": "lookup_email" }
]
},
"orders": {
"column_vindexes": [
{ "column": "user_id", "name": "hash_user_id" }
]
}
}
}-- These queries are automatically routed by vtgate
-- Single-shard (vtgate routes to the correct shard)
SELECT * FROM users WHERE user_id = 12345;
SELECT * FROM orders WHERE user_id = 12345;
-- Cross-shard (vtgate scatter-gathers and merges)
SELECT COUNT(*) FROM orders WHERE created_at > '2024-01-01';
SELECT * FROM users ORDER BY created_at DESC LIMIT 100;
-- vtgate handles JOINs within the same shard automatically
SELECT u.name, o.total
FROM users u JOIN orders o ON o.user_id = u.user_id
WHERE u.user_id = 12345;Vitess Online Schema Migration
One of Vitess's killer features is non-blocking schema migrations:
-- Online DDL: schema change runs in the background without locking
ALTER VSCHEMA ON users ADD VINDEX lookup_email(email) USING consistent_lookup;
-- Direct online DDL
ALTER TABLE users ADD COLUMN last_login_at TIMESTAMP;
-- This runs as a non-blocking online migration via gh-ost or VReplicationVReplication: The Resharding Engine
VReplication is Vitess's internal replication mechanism that enables online resharding, materialized views, and table movement between keyspaces. Unlike MySQL's native replication, VReplication can filter, transform, and route data at the row level.
-- Create a VReplication stream to move data during resharding
-- This runs in the background while the application continues serving traffic
INSERT INTO _vt.vreplication
(workflow, source, cell, tablet_types, on_ddl)
VALUES
('reshard_workflow',
'keyspace:"myapp" shard:"0"',
'zone1',
'replica',
'EXEC');
-- Monitor VReplication progress
SELECT * FROM _vt.vreplication;
-- Shows: id, workflow, source, pos, stop_pos, state, message
-- state: Running, Stopped, Error
-- message: current replication lag or error details
-- Switch traffic to new shards after catch-up
-- vtctl SwitchTraffic --tables=users,orders myapp 0
-- Verify data consistency before final cutover
-- vtctl VDiff myapp.reshard_workflowVReplication also supports materialized views—pre-computed query results that stay in sync with the source tables:
-- Create a materialized view for analytics queries
CREATE TABLE user_order_summary AS
SELECT user_id, COUNT(*) as order_count, SUM(total) as total_spent
FROM orders GROUP BY user_id;
-- VReplication keeps this in sync automatically
-- Useful for cross-shard aggregations that would otherwise require scatter-gatherConnection Pooling in Vitess
Vitess provides built-in connection pooling at the vtgate level. Each vtgate maintains a pool of connections to each vttablet, multiplexing thousands of application connections into a smaller number of database connections.
// Application connects to vtgate (not directly to MySQL)
// vtgate handles connection multiplexing
import mysql from 'mysql2/promise';
// vtgate acts as a MySQL-compatible proxy
const pool = mysql.createPool({
host: 'vtgate-host',
port: 3306,
user: 'app',
password: process.env.VITESS_PASSWORD,
database: 'myapp@primary', // @primary for writes, @replica for reads
connectionLimit: 100, // vtgate multiplicates this
});
// vtgate automatically routes reads to replicas
const [rows] = await pool.execute(
'SELECT * FROM orders WHERE user_id = ?',
[userId]
);Vitess's connection pooling means your MySQL instances see a manageable number of connections regardless of how many application instances are running. A cluster with 50 application pods connecting to vtgate with 100 connections each might only need 20 actual MySQL connections per shard.
Understanding Citus: PostgreSQL Sharding
Citus is a PostgreSQL extension that transforms a single PostgreSQL instance into a distributed database. Unlike Vitess (which is a separate proxy layer), Citus runs inside PostgreSQL as an extension.
Architecture
Citus uses a coordinator-worker model:
- Coordinator Node: Receives queries, plans distributed execution, and merges results.
- Worker Nodes: Store shards (distributed tables) and execute query fragments.
- Reference Tables: Small, frequently-joined tables replicated to all workers.
-- Install Citus extension
CREATE EXTENSION citus;
-- Add worker nodes
SELECT citus_add_node('worker-1', 5432);
SELECT citus_add_node('worker-2', 5432);
SELECT citus_add_node('worker-3', 5432);
-- Create distributed table
SELECT create_distributed_table('orders', 'user_id');
-- Reference table (replicated to all workers)
SELECT create_reference_table('products');
-- Now query as normal - Citus handles distribution
SELECT * FROM orders WHERE user_id = 12345; -- Single worker
SELECT COUNT(*) FROM orders; -- All workers, aggregated on coordinator
-- Citus handles distributed JOINs automatically
SELECT u.name, SUM(o.total) as total_spent
FROM users u
JOIN orders o ON o.user_id = u.id
GROUP BY u.name
ORDER BY total_spent DESC
LIMIT 10;Citus Distributed Query Planning
Citus intercepts queries on the coordinator and creates a distributed plan:
-- View the distributed execution plan
EXPLAIN (ANALYZE, VERBOSE)
SELECT user_id, COUNT(*) as order_count
FROM orders
WHERE created_at > '2024-01-01'
GROUP BY user_id
ORDER BY order_count DESC
LIMIT 20;
-- Output shows per-worker execution:
-- Custom Scan (Citus Adaptive) (actual time=... rows=...)
-- Task Count: 32
-- Tuple data received from nodes: 1024 kB
-- -> Distributed Subplan
-- -> Sort on coordinator
-- -> HashAggregate on coordinator
-- -> Append
-- -> Task on worker-1 (query fragment)
-- -> Task on worker-2 (query fragment)
-- -> ...Colocated Tables: The Key to Efficient JOINs
Citus's most important optimization is table colocation. When tables are distributed on the same column and colocated, JOINs between them execute on a single worker node—no cross-node data transfer required.
-- Colocate orders with users for efficient JOINs
SELECT create_distributed_table('orders', 'user_id', colocate_with => 'users');
SELECT create_distributed_table('order_items', 'user_id', colocate_with => 'orders');
-- This JOIN executes entirely within each worker node
SELECT u.name, o.total, oi.product_name
FROM users u
JOIN orders o ON o.user_id = u.id
JOIN order_items oi ON oi.order_id = o.id
WHERE u.id = 12345;
-- Verify colocation
SELECT * FROM pg_dist_colocation;
-- Shows which tables share the same colocation groupWhen tables aren't colocated, Citus must shuffle data between nodes for JOINs. This is orders of magnitude slower and should be avoided whenever possible.
Columnar Storage for Analytics
Citus supports columnar storage using PostgreSQL's access method API, dramatically reducing storage for append-heavy workloads like event logging and analytics.
-- Create a columnar-distributed table for analytics
CREATE TABLE events (
id BIGSERIAL,
user_id UUID NOT NULL,
event_type TEXT NOT NULL,
properties JSONB,
created_at TIMESTAMPTZ DEFAULT now()
) USING columnar;
SELECT create_distributed_table('events', 'user_id');
-- Columnar compression achieves 10-20x storage reduction
-- Queries that touch only a few columns read much less data
SELECT
event_type,
COUNT(*) as total,
COUNT(DISTINCT user_id) as unique_users,
date_trunc('hour', created_at) as hour
FROM events
WHERE created_at > now() - interval '24 hours'
GROUP BY event_type, date_trunc('hour', created_at)
ORDER BY hour DESC;Shard Key Selection: The Most Critical Decision
The shard key (or distribution column) determines how data is spread across nodes. This decision is nearly irreversible for large datasets—changing it requires rewriting all data. Here are the common strategies:
Hash Sharding
Distributes rows uniformly across shards using a hash function. Best for even distribution when range-based queries aren't common.
-- Citus: hash-based distribution (default)
SELECT create_distributed_table('users', 'user_id');
-- Uses hash(user_id) mod num_shards
-- Vitess: hash vindex
{
"vindexes": {
"hash_user_id": { "type": "hash" }
}
}Range Sharding
Groups related data together on the same shard. Best for time-series data where queries typically filter by date range.
-- Citus: range-based distribution
SELECT create_distributed_table('events', 'created_at', 'range');
-- Vitess: range vindex
{
"vindexes": {
"range_date": {
"type": "range",
"params": {
"table": "date_range_table",
"from": "date_start",
"to": "date_end"
}
}
}
}Lookup (Consistent) Sharding
Maintains a mapping table so you can look up which shard a value belongs to. Essential when you need to query by multiple columns.
-- Vitess: consistent_lookup for secondary access patterns
{
"vindexes": {
"hash_user_id": { "type": "hash" },
"lookup_email": {
"type": "consistent_lookup",
"params": {
"table": "email_to_user_idx",
"from": "email",
"to": "user_id"
},
"owner": "users"
}
}
}
-- Now queries by email are routed efficiently to the correct shardStep-by-Step Implementation
Deploying Vitess on Kubernetes
# vitess-cluster.yaml - Vitess operator configuration
apiVersion: planetscale.com/v2
kind: VitessCluster
metadata:
name: my-cluster
spec:
images:
vtctld: vitess/lite:v19.0.0
vtgate: vitess/lite:v19.0.0
vttablet: vitess/lite:v19.0.0
vtorc: vitess/lite:v19.0.0
vtbackup: vitess/lite:v19.0.0
cells:
- name: zone1
gateway:
authentication:
static:
secret:
name: cluster-secret
keyspaces:
- name: myapp
vitessSchema:
vschema: |
{
"sharded": true,
"vindexes": { "hash": { "type": "hash" } },
"tables": {
"users": { "column_vindexes": [{ "column": "user_id", "name": "hash" }] },
"orders": { "column_vindexes": [{ "column": "user_id", "name": "hash" }] }
}
}
partitionings:
- equal:
parts: 4
shardTemplate:
databaseInitScriptSecret:
name: init-script
tabletPools:
- cell: zone1
type: replica
replicas: 2
mysqld: {}
volumeClaimTemplates:
- name: data
spec:
resources:
requests:
storage: 50GiDeploying Citus on Azure
// Node.js application with Citus
import { Pool } from 'pg';
// Connect to the Citus coordinator (looks like a normal PostgreSQL connection)
const pool = new Pool({
host: 'citus-coordinator.postgres.database.azure.com',
port: 5432,
database: 'myapp',
ssl: { rejectUnauthorized: false },
max: 20,
});
// All queries go to the coordinator - Citus handles distribution
async function getUserOrders(userId: string) {
const result = await pool.query(
`SELECT o.*, p.name as product_name
FROM orders o
JOIN products p ON p.id = o.product_id
WHERE o.user_id = $1
ORDER BY o.created_at DESC`,
[userId]
);
return result.rows;
}
// This cross-shard aggregation is handled automatically
async function getTopCustomers(limit: number) {
const result = await pool.query(
`SELECT u.name, u.email, SUM(o.total) as lifetime_value
FROM users u
JOIN orders o ON o.user_id = u.id
GROUP BY u.id, u.name, u.email
ORDER BY lifetime_value DESC
LIMIT $1`,
[limit]
);
return result.rows;
}Migrating from Single-Node to Sharded
Migrating a live application from a single database to a sharded Vitess or Citus cluster is one of the hardest operational challenges. Both systems provide tools to make this easier, but the process requires careful planning.
Vitess Migration Path
Vitess uses MoveTables to migrate tables into a sharded keyspace without downtime:
# 1. Create the target sharded keyspace
vtctl CreateKeyspace --sharding_column_name=user_id myapp_sharded
# 2. Move tables from unsharded to sharded keyspace
vtctl MoveTables --source=myapp_unsharded --tables='users,orders' Create myapp_sharded.workflow1
# 3. Wait for replication to catch up
vtctl VDiff myapp_sharded.workflow1
# 4. Switch reads to the new keyspace
vtctl SwitchReads --tablet_type=replica myapp_sharded.workflow1
# 5. Verify application behavior with reads on new keyspace
# 6. Switch writes (this is the critical cutover)
vtctl SwitchWrites myapp_sharded.workflow1
# 7. Clean up the old workflow
vtctl MoveTables Complete myapp_sharded.workflow1Citus Migration Path
Citus migration is simpler because the coordinator handles distribution—your application doesn't need to change connection strings.
-- 1. Install Citus on your existing PostgreSQL instance
CREATE EXTENSION citus;
-- 2. Add worker nodes
SELECT citus_add_node('worker-1', 5432);
SELECT citus_add_node('worker-2', 5432);
-- 3. Distribute tables (this moves data to workers)
SELECT create_distributed_table('orders', 'user_id');
-- Citus handles the data movement and updates catalog tables
-- 4. Application continues connecting to the coordinator
-- No connection string changes neededThe Citus approach is less risky because the coordinator acts as a single point of contact. The Vitess approach gives more control but requires more operational expertise.
Real-World Use Cases
Use Case 1: Multi-Tenant SaaS with Citus
Citus excels at multi-tenant workloads where all queries are scoped to a tenant:
-- Distribute by tenant_id - all tenant data on same worker
SELECT create_distributed_table('accounts', 'tenant_id');
SELECT create_distributed_table('users', 'tenant_id');
SELECT create_distributed_table('orders', 'tenant_id');
SELECT create_distributed_table('events', 'tenant_id');
-- Tenant-scoped queries hit a single worker
SELECT * FROM orders WHERE tenant_id = 'tenant-123' ORDER BY created_at DESC;
-- Cross-tenant analytics run in parallel across all workers
SELECT tenant_id, COUNT(*) as order_count
FROM orders
WHERE created_at > now() - interval '30 days'
GROUP BY tenant_id;This pattern is so common that Citus has a dedicated tenant_id-based distribution strategy that automatically colocates all tables sharing the same tenant column.
Use Case 2: High-Throughput Write Workload with Vitess
A messaging platform processes 100,000 messages/second using Vitess hash sharding:
# VSchema for messages table
{
"sharded": true,
"vindexes": {
"hash_conversation": {
"type": "hash"
}
},
"tables": {
"messages": {
"column_vindexes": [
{ "column": "conversation_id", "name": "hash_conversation" }
],
"auto_increment": {
"column": "id",
"sequence": "messages_seq"
}
}
}
}// Application code - queries are transparently routed
import mysql from 'mysql2/promise';
const connection = await mysql.createConnection({
host: 'vtgate-host',
port: 3306,
user: 'app',
password: process.env.VITESS_PASSWORD,
database: 'myapp',
});
// Insert: vtgate routes to the shard owning this conversation_id
await connection.execute(
'INSERT INTO messages (conversation_id, sender_id, content) VALUES (?, ?, ?)',
[conversationId, senderId, content]
);
// Query: single-shard lookup by conversation_id
const [messages] = await connection.execute(
'SELECT * FROM messages WHERE conversation_id = ? ORDER BY created_at DESC LIMIT 50',
[conversationId]
);Use Case 3: Real-Time Analytics with Citus Columnar Storage
-- Combine distributed tables with columnar compression for analytics
CREATE TABLE events (
id BIGSERIAL,
user_id UUID NOT NULL,
event_type TEXT NOT NULL,
properties JSONB,
created_at TIMESTAMPTZ DEFAULT now()
) USING columnar;
SELECT create_distributed_table('events', 'user_id');
-- Columnar compression reduces storage by 10x
-- Citus can push down aggregation to workers
SELECT
event_type,
COUNT(*) as total,
COUNT(DISTINCT user_id) as unique_users,
date_trunc('hour', created_at) as hour
FROM events
WHERE created_at > now() - interval '24 hours'
GROUP BY event_type, date_trunc('hour', created_at)
ORDER BY hour DESC;Monitoring and Observability
Running a sharded database in production requires deep visibility into query routing, shard health, and replication lag. Both systems provide rich metrics.
Vitess Monitoring
-- Check vtgate query routing statistics
SHOW VITESS_TABLETS;
-- Shows: cell, keyspace, shard, tablet_type, state, hostname
-- Monitor replication lag across shards
SHOW VITESS_REPLICATION;
-- Shows per-shard replication status and lag
-- vtgate exposes Prometheus metrics at /metrics
-- Key metrics to watch:
-- vtgate_api_* - query counts, latencies by operation
-- vterrors_* - error counts by type
-- vttablet_* - tablet-level metrics
-- Check for scatter-gather queries (expensive)
-- vtgate logs these with a "scatter" prefix# Prometheus alert for high replication lag
- alert: VitessReplicationLagHigh
expr: mysql_slave_status_seconds_behind_master > 30
for: 5m
labels:
severity: warning
annotations:
summary: "Vitess replication lag > 30s on {{ $labels.instance }}"
# Alert for vtgate errors
- alert: VitessVtgateErrorRateHigh
expr: rate(vterrors_code[5m]) > 0.01
for: 2m
labels:
severity: criticalCitus Monitoring
-- Check distributed query performance
SELECT * FROM citus_stat_statements ORDER BY total_exec_time DESC LIMIT 20;
-- Monitor worker node health
SELECT * FROM citus_dist_stat_activity;
-- Check shard distribution balance
SELECT
nodename,
COUNT(*) as shard_count,
pg_size_pretty(SUM(pg_relation_size(logicalrelid))) as total_size
FROM pg_dist_shard
JOIN pg_dist_placement USING (shardid)
GROUP BY nodename
ORDER BY shard_count DESC;
-- Monitor background tasks (rebalancing, cleanup)
SELECT * FROM citus_rebalance_status();Best Practices for Production
- Choose Vitess for MySQL, Citus for PostgreSQL: Don't fight your ecosystem—pick the tool that matches your database.
- Use co-located tables: In both systems, tables sharded on the same key can be joined efficiently on the same shard.
- Use reference tables for small, static data: Citus's
create_reference_tableand Vitess's unsharded tables replicate lookup data to all nodes. - Monitor query routing: Log scatter-gather queries—they're the performance killers. Optimize them with better shard keys or denormalization.
- Test schema migrations on staging first: Both systems have online DDL, but edge cases exist. Always test.
- Plan for node failures: Both systems handle replica failures automatically, but coordinator/vtgate failures need careful failover configuration.
- Use connection pooling: Citus benefits from pgBouncer; Vitess has built-in connection pooling via vtgate.
- Benchmark with production-like data: Sharding performance characteristics only emerge at scale. Test with realistic data volumes.
- Avoid cross-shard transactions: Both systems support them, but they're significantly slower. Design your schema to keep related data on the same shard.
- Document your shard key decisions: Record why each table is distributed on a specific column. Future you will thank present you when debugging query performance.
Common Pitfalls and Solutions
| Pitfall | Impact | Solution |
|---|---|---|
| Cross-shard JOINs in Vitess | Query failure or full scatter | Shard on the JOIN column or use reference tables |
| Missing distributed index in Citus | Full table scan across all workers | Create distributed indexes on frequently filtered columns |
| Uneven shard distribution | Hot workers | Use hash vindex (Vitess) or check distribution column cardinality |
| Schema drift across shards | Inconsistent behavior | Use online DDL tools; verify schema parity |
| Large reference tables | Memory pressure on all workers | Keep reference tables small; shard large tables instead |
| Wrong shard key chosen early | Expensive migration later | Prototype with realistic data volumes before committing |
| Ignoring replication lag | Stale reads on replicas | Monitor lag and use primary reads for consistency-critical queries |
Performance Optimization
-- Citus: Colocate related tables for efficient JOINs
SELECT create_distributed_table('orders', 'user_id', colocate_with => 'users');
SELECT create_distributed_table('order_items', 'user_id', colocate_with => 'orders');
-- Vitess: Use VReplication for resharding without downtime
-- vtctl Reshard --source_shards=0 --target_shards=0-3
-- Monitor Citus worker health
SELECT * FROM citus_stat_statements ORDER BY total_exec_time DESC LIMIT 20;
SELECT * FROM citus_dist_stat_activity;
-- Vitess: Enable query buffering during resharding
-- vtgate --enable_buffer=true --buffer_window=10s --buffer_size=1000Comparison with Alternatives
| Feature | Vitess | Citus | CockroachDB | TiDB |
|---|---|---|---|---|
| Base database | MySQL | PostgreSQL | Custom (PG-compatible) | MySQL-compatible |
| Sharding transparency | High | High | Full | Full |
| Cross-shard transactions | Limited | Full | Full | Full |
| Online resharding | Yes | Yes | Automatic | Automatic |
| Operational complexity | High | Medium | Low | Medium |
| Connection pooling | Built-in (vtgate) | External (pgBouncer) | Built-in | Built-in |
| Columnar storage | No | Yes | No | TiFlash |
| Best for | MySQL at scale | PostgreSQL at scale | New projects | MySQL ecosystem |
Testing Strategies
describe('Vitess Integration', () => {
test('single-shard query uses correct shard', async () => {
const result = await connection.execute(
'SELECT * FROM users WHERE user_id = ?', [12345]
);
expect(result[0]).toHaveLength(1);
});
test('cross-shard query returns complete results', async () => {
const [rows] = await connection.execute(
'SELECT COUNT(*) as cnt FROM users'
);
expect(rows[0].cnt).toBeGreaterThan(0);
});
});
describe('Citus Integration', () => {
test('distributed JOIN works correctly', async () => {
const result = await pool.query(`
SELECT u.name, COUNT(o.id) as orders
FROM users u JOIN orders o ON o.user_id = u.id
GROUP BY u.name LIMIT 10
`);
expect(result.rows.length).toBeGreaterThan(0);
});
test('tenant isolation works across shards', async () => {
// Insert data for two different tenants
await pool.query(
'INSERT INTO orders (tenant_id, total) VALUES ($1, 100)',
['tenant-a']
);
await pool.query(
'INSERT INTO orders (tenant_id, total) VALUES ($1, 200)',
['tenant-b']
);
// Each tenant only sees their own data
const result = await pool.query(
'SELECT * FROM orders WHERE tenant_id = $1',
['tenant-a']
);
result.rows.forEach(row => {
expect(row.tenant_id).toBe('tenant-a');
});
});
});Future Outlook
Vitess 24 (current stable) focuses on improved VReplication for zero-downtime resharding and better MySQL 8.0/8.4 compatibility. The Vitess operator for Kubernetes has matured significantly, making it easier to run Vitess in cloud-native environments. Citus 13 (Azure Cosmos DB for PostgreSQL) continues adding improved columnar storage, automatic index recommendations, and better integration with Azure services. Both projects are converging toward making distributed SQL as easy as single-node SQL.
The broader trend in distributed databases is convergence: Vitess is adding more PostgreSQL-compatible features, while Citus is improving its MySQL compatibility layer. For teams evaluating these tools, the choice often comes down to which database ecosystem they're already invested in rather than technical merits alone.
Database sharding distributes data across multiple nodes to achieve horizontal scalability, but introduces complexity in query routing, cross-shard joins, and rebalancing operations.
Conclusion
Vitess and Citus solve the same problem—horizontal database scaling—for different ecosystems. Key takeaways:
- Vitess is battle-tested for MySQL at YouTube/Slack scale but requires operational expertise to run
- Citus extends PostgreSQL with minimal application changes, ideal for teams already invested in PostgreSQL
- Both require careful shard key selection—this decision is nearly irreversible
- Co-located tables and reference tables are essential patterns for efficient distributed queries
- Consider managed offerings (PlanetScale for Vitess, Azure Cosmos DB for Citus) to reduce operational burden
- Monitor replication lag, query routing, and shard balance continuously in production
- Plan your migration path early—moving from single-node to sharded is easier when the application is designed for it from the start
Start by evaluating whether your queries can be scoped to a single shard key. If yes, either tool will serve you well. If your workload is heavily cross-shard, consider CockroachDB or TiDB instead.