你有没有发现:PostgreSQL可以做到Redis能做的一切

Polliog 2026-04-13 10:20:56

我之前用的是一套很典型的Web应用技术栈:

 

  • PostgreSQL负责持久化数据存储
  • Redis负责缓存、发布订阅以及后台任务处理

 

两个数据库,两个体系需要管理,也意味着多了两处故障风险点。

 

后来我意识到:PostgreSQL可以做到Redis能做的一切。

 

于是我彻底移除了Redis,迁移过程是这样的。

 

一、设置:我使用Redis的目的

 

在替换之前,Redis主要处理三件事:

 

1、缓存(使用率70%)

 

// Cache API responses
await redis.set(`user:${id}`JSON.stringify(user), 'EX'3600);

 

2、发布订阅(使用率20%)

 

// Real-time notifications
redis.publish('notifications'JSON.stringify({ userId, message }));

 

3、后台消息队列(使用率10%)

 

// Using Bull/BullMQ
queue.add('send-email', { to, subject, body });

 

痛点:

 

  • 需要备份两个数据库
  • Redis使用内存(规模化时成本很高)
  • Redis持久化机制……很复杂
  • Postgres和Redis之间还存在一次网络跳转开销

 

二、我为什么考虑替换Redis

 

原因一:成本

 

我的Redis配置:

 

  • AWS ElastiCache(2GB):每月45美元
  • 若扩容至 5GB,每月费用将增至110美元

 

PostgreSQL:

 

  • 已付费使用RDS(20GB存储):每月50美元
  • 即便增加5GB数据流量:每月仅需0.5美元

 

节省成本:每月约100美元

 

原因二:运行复杂性

 

使用 Redis:

 

Postgres backup ✅
Redis backup ❓ (RDB? AOF? Both?)
Postgres monitoring ✅
Redis monitoring ❓
Postgres failover ✅
Redis Sentinel/Cluster ❓

 

不使用Redis:

 

Postgres backup ✅
Postgres monitoring ✅
Postgres failover

 

系统依赖组件更少。

 

原因三:数据一致性

 

经典问题:

 

// Update database
await db.query('UPDATE users SET name = $1 WHERE id = $2', [name, id]);


// Invalidate cache
await redis.del(`user:${id}`);


// ⚠️ What if Redis is down?
// ⚠️ What if this fails?
// Now cache and DB are out of sync

 

在PostgreSQL中,这类问题通过事务即可解决。

 

三、PostgreSQL特性

 

1、使用非日志表进行缓存

 

Redis:

 

await redis.set('session:abc123', JSON.stringify(sessionData), 'EX'3600);

 

PostgreSQL:

 

CREATE UNLOGGED TABLE cache (
  key TEXT PRIMARY KEY,
  value JSONB NOT NULL,
  expires_at TIMESTAMPTZ NOT NULL
);


CREATE INDEX idx_cache_expires ON cache(expires_at);

 

插入:

 

INSERT INTO cache (keyvalue, expires_at)
VALUES ($1, $2, NOW() + INTERVAL '1 hour')
ON CONFLICT (key) DO UPDATE
  SET value = EXCLUDED.value,
      expires_at = EXCLUDED.expires_at;

 

读:

 

SELECT value FROM cache
WHERE key = $1 AND expires_at > NOW();

 

清理(定期运行):

 

DELETE FROM cache WHERE expires_at < NOW();

 

什么是非日志表?

 

  • 跳过预写式日志(WAL)
  • 写入性能大幅提升
  • 崩溃后数据不保留(非常适合用作缓存!)

 

表现:

 

Redis SET: 0.05ms
Postgres UNLOGGED INSERT: 0.08ms

 

用作缓存已经完全够用。

 

2、基于LISTEN或NOTIFY实现发布订阅功能

 

接下来就精彩了。

 

PostgreSQL具有原生的发布订阅功能,但大多数开发人员并不了解。

 

1)Redis的发布订阅功能

 

// Publisher
redis.publish('notifications', JSON.stringify({ userId123msg'Hello' }));


// Subscriber
redis.subscribe('notifications');
redis.on('message', (channel, message) => {
  console.log(message);
});

 

2)PostgreSQL的发布订阅功能

 

-- Publisher
NOTIFY notifications, '{"userId": 123, "msg""Hello"}';

 

// Subscriber (Node.js with pg)
const client = new Client({ connectionString: process.env.DATABASE_URL });
await client.connect();


await client.query('LISTEN notifications');


client.on('notification'(msg) => {
  const payload = JSON.parse(msg.payload);
  console.log(payload);
});

 

性能对比:

 

Redis pub/sub latency: 1-2ms
Postgres NOTIFY latency: 2-5ms

 

性能略低,但优势明显:

 

  • 无需额外部署中间件
  • 可在事务中使用
  • 可与查询语句结合使用

 

3)实际应用场景:实时日志追踪

 

在我的日志管理应用中,需要实现日志实时流式推送

 

使用Redis:

 

// When new log arrives
await db.query('INSERT INTO logs ...');
await redis.publish('logs:new', JSON.stringify(log));


// Frontend listens
redis.subscribe('logs:new');

 

问题:有两个操作,如果发布失败怎么办?

 

使用PostgreSQL:

 

CREATE FUNCTION notify_new_log() RETURNS TRIGGER AS $$
BEGIN
  PERFORM pg_notify('logs_new', row_to_json(NEW)::text);
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;


CREATE TRIGGER log_inserted
AFTER INSERT ON logs
FOR EACH ROW EXECUTE FUNCTION notify_new_log();

 

现在整个操作是原子性的:插入数据与通知推送,要么同时生效,要么都不执行。

 

// Frontend (via SSE)
app.get('/logs/stream'async (req, res) => {
  const client = await pool.connect();


    res.writeHead(200, {
    'Content-Type''text/event-stream',
    'Cache-Control''no-cache',
  });


    await client.query('LISTEN logs_new');


      client.on('notification'(msg) => {
    res.write(`data: ${msg.payload}\n\n`);
  });
});

 

结果:无需Redis即可实现实时日志流传输。

 

3、基于SKIP LOCKED实现任务队列

 

Redis(使用Bull或者BullMQ):

 

queue.add('send-email', { to, subject, body });


queue.process('send-email'async (job) => {
  await sendEmail(job.data);
});

 

PostgreSQL:

 

CREATE TABLE jobs (
  id BIGSERIAL PRIMARY KEY,
  queue TEXT NOT NULL,
  payload JSONB NOT NULL,
  attempts INT DEFAULT 0,
  max_attempts INT DEFAULT 3,
  scheduled_at TIMESTAMPTZ DEFAULT NOW(),
  created_at TIMESTAMPTZ DEFAULT NOW()
);


CREATE INDEX idx_jobs_queue ON jobs(queue, scheduled_at) 
WHERE attempts < max_attempts;

 

入队:

 

INSERT INTO jobs (queue, payload)
VALUES ('send-email', '{"to": "user@example.com", "subject": "Hi"}');

 

工作进程(出队):

 

WITH next_job AS (
  SELECT id FROM jobs
  WHERE queue = $1
    AND attempts < max_attempts
    AND scheduled_at <= NOW()
  ORDER BY scheduled_at
  LIMIT 1
  FOR UPDATE SKIP LOCKED
)
UPDATE jobs
SET attempts = attempts + 1
FROM next_job
WHERE jobs.id = next_job.id
RETURNING *;

 

神奇之处:FOR UPDATE SKIP LOCKED

 

这让PostgreSQL成为了无锁队列

 

  • 多个工作进程可并发拉取任务
  • 任务不会被重复处理
  • 若工作进程崩溃,任务会自动重新变为可执行状态

 

表现:

 

Redis BRPOP: 0.1ms
Postgres SKIP LOCKED: 0.3ms

 

对于大多数业务负载而言,性能差异可以忽略不计。

 

4、限流

 

Redis(经典限流方案):

 

const key = `ratelimit:${userId}`;
const count = await redis.incr(key);
if (count === 1) {
  await redis.expire(key, 60); // 60 seconds
}


if (count > 100) {
  throw new Error('Rate limit exceeded');
}

 

PostgreSQL:

 

CREATE TABLE rate_limits (
  user_id INT PRIMARY KEY,
  request_count INT DEFAULT 0,
  window_start TIMESTAMPTZ DEFAULT NOW()
);


-- Check and increment
WITH current AS (
  SELECT 
    request_count,
    CASE 
      WHEN window_start < NOW() - INTERVAL '1 minute'
      THEN 1  -- Reset counter
      ELSE request_count + 1
    END AS new_count
  FROM rate_limits
  WHERE user_id = $1
  FOR UPDATE
)
UPDATE rate_limits
SET 
  request_count = (SELECT new_count FROM current),
  window_start = CASE
    WHEN window_start < NOW() - INTERVAL '1 minute'
    THEN NOW()
    ELSE window_start
  END
WHERE user_id = $1
RETURNING request_count;

 

或者用窗口函数更简单:

 

CREATE TABLE api_requests (
  user_id INT NOT NULL,
  created_at TIMESTAMPTZ DEFAULT NOW()
);


-- Check rate limit
SELECT COUNT(*) FROM api_requests
WHERE user_id = $1
  AND created_at > NOW() - INTERVAL '1 minute';


  -- If under limit, insert
INSERT INTO api_requests (user_id) VALUES ($1);


-- Cleanup old requests periodically
DELETE FROM api_requests WHERE created_at < NOW() - INTERVAL '5 minutes';

 

Postgres的适用场景:

 

  • 需要基于复杂业务逻辑做限流(而非仅简单计数)
  • 希望限流数据与业务逻辑在同一事务中处理

 

Redis的适用场景:

 

  • 需要亚毫秒级限流
  • 极高吞吐量(每秒数百万请求)

 

5、基于JSONB实现会话存储

 

Redis:

 

await redis.set(`session:${sessionId}`JSON.stringify(sessionData), 'EX'86400);

 

PostgreSQL:

 

CREATE TABLE sessions (
  id TEXT PRIMARY KEY,
  data JSONB NOT NULL,
  expires_at TIMESTAMPTZ NOT NULL
);


CREATE INDEX idx_sessions_expires ON sessions(expires_at);


-- Insert/Update
INSERT INTO sessions (iddata, expires_at)
VALUES ($1, $2, NOW() + INTERVAL '24 hours')
ON CONFLICT (idDO UPDATE
  SET data = EXCLUDED.data,
      expires_at = EXCLUDED.expires_at;


      -- Read
SELECT data FROM sessions
WHERE id = $1 AND expires_at > NOW();

 

附加内容:JSONB 运算符

 

你可以在会话内部进行查询:

 

-- Find all sessions for a specific user
SELECT * FROM sessions
WHERE data->>'userId' = '123';


-- Find sessions with specific role
SELECT * FROM sessions
WHERE data->'user'->>'role' = 'admin';

 

你用Redis做不到这一点!

 

四、实际生产环境基准测试

 

我用生产数据集完成了基准测试:

 

1、测试设置

 

  • 硬件: AWS RDS db.t3.medium(2个虚拟CPU,4GB内存)
  • 数据集:100万条缓存条目,1万个会话
  • 工具:pgbench(自定义脚本)

 

2、结果

 

操作

Redis

PostgreSQL

不同之处

缓存集

0.05毫秒

0.08毫秒

速度降低 60%

缓存获取

0.04毫秒

0.06毫秒

速度降低 50%

发布订阅

1.2毫秒

3.1毫秒

速度降低 158%

队列推送

0.08毫秒

0.15毫秒

速度降低 87%

队列弹出

0.12毫秒

0.31毫秒

速度降低 158%

 

PostgreSQL速度较慢,但是:

 

  • 所有操作耗时均保持在1毫秒以内
  • 省去了与Redis交互的网络开销
  • 降低基础设施复杂性

 

3、合并执行(真正的胜利)

 

场景:插入数据 + 缓存失效 + 通知订阅者

 

使用Redis

 

await db.query('INSERT INTO posts ...');       // 2ms
await redis.del('posts:latest');                // 1ms (network hop)
await redis.publish('posts:new', data);         // 1ms (network hop)
// Total: ~4ms

 

使用PostgreSQL:

 

BEGIN;
INSERT INTO posts ...;                          -- 2ms
DELETE FROM cache WHERE key = 'posts:latest';  -- 0.1ms (same connection)
NOTIFY posts_new, '...';                        -- 0.1ms (same connection)
COMMIT;
-- Total: ~2.2ms

 

当多个操作合并执行时,PostgreSQL速度更快。

 

五、哪些场景仍建议保留Redis

 

如果符合以下条件,请不要替换Redis:

 

1、需要极致的性能

 

Redis100,000+ ops/sec (single instance)
Postgres10,000-50,000 ops/sec

 

如果你每秒执行数百万次缓存读取操作,那就继续使用 Redis。

 

2、使用Redis特有的数据结构

 

Redis具备:

 

  • 有序集合(排行榜)
  • HyperLogLog(基数统计)
  • 地理空间索引
  • Streams(高级发布订阅)

 

PostgreSQL 虽有对应实现,但使用起来更为繁琐:

 

-- Leaderboard in Postgres (slower)
SELECT user_id, score
FROM leaderboard
ORDER BY score DESC
LIMIT 10;


-- vs Redis
ZREVRANGE leaderboard 0 9 WITHSCORES

 

3、架构需要独立缓存层

 

如果你的架构要求独立的缓存层(例如微服务架构),建议保留Redis。

 

六、迁移方案

 

不要一夜之间就彻底放弃Redis,以下是我的做法:

 

第一阶段:并排共存(第1周)

 

// Write to both
await redis.set(key, value);
await pg.query('INSERT INTO cache ...');


// Read from Redis (still primary)
let data = await redis.get(key);

 

监控:对比命中率、延迟。

 

第二阶段:从Postgres读取数据(第2周)

 

// Try Postgres first
let data = await pg.query('SELECT value FROM cache WHERE key = $1', [key]);


// Fallback to Redis
if (!data) {
  data = await redis.get(key);
}

 

监控:错误率、性能。

 

第三阶段:仅写入Postgres(第3周)

 

// Only write to Postgres
await pg.query('INSERT INTO cache ...');

 

监控:所有功能是否正常运行?

 

第四阶段:移除Redis(第4周)

 

# Turn off Redis
# Watch for errors
# Nothing breaks? Success!

 

七、代码示例:完整实现

 

1、缓存模块(PostgreSQL)

 

// cache.js
class PostgresCache {
  constructor(pool) {
    this.pool = pool;
  }


    async get(key) {
    const result = await this.pool.query(
      'SELECT value FROM cache WHERE key = $1 AND expires_at > NOW()',
      [key]
    );
    return result.rows[0]?.value;
  }


    async set(key, value, ttlSeconds = 3600) {
    await this.pool.query(
      `INSERT INTO cache (key, value, expires_at)
       VALUES ($1, $2, NOW() + INTERVAL '${ttlSeconds} seconds')
       ON CONFLICT (key) DO UPDATE
         SET value = EXCLUDED.value,
             expires_at = EXCLUDED.expires_at`,
      [key, value]
    );
  }


    async delete(key) {
    await this.pool.query('DELETE FROM cache WHERE key = $1', [key]);
  }


    async cleanup() {
    await this.pool.query('DELETE FROM cache WHERE expires_at < NOW()');
  }
}


module.exports = PostgresCache;

 

2、发布订阅模块

 

// pubsub.js
class PostgresPubSub {
  constructor(pool) {
    this.pool = pool;
    this.listeners = new Map();
  }


    async publish(channel, message) {
    const payload = JSON.stringify(message);
    await this.pool.query('SELECT pg_notify($1, $2)', [channel, payload]);
  }


    async subscribe(channel, callback) {
    const client = await this.pool.connect();
    await client.query(`LISTEN ${channel}`);
    client.on('notification'(msg) => {
      if (msg.channel === channel) {
        callback(JSON.parse(msg.payload));
      }
    });


        this.listeners.set(channel, client);
  }


    async unsubscribe(channel) {
    const client = this.listeners.get(channel);
    if (client) {
      await client.query(`UNLISTEN ${channel}`);
      client.release();
      this.listeners.delete(channel);
    }
  }
}


module.exports = PostgresPubSub;

 

3、任务队列模块

 

// queue.js
class PostgresQueue {
  constructor(pool) {
    this.pool = pool;
  }


    async enqueue(queue, payload, scheduledAt = new Date()) {
    await this.pool.query(
      'INSERT INTO jobs (queue, payload, scheduled_at) VALUES ($1, $2, $3)',
      [queue, payload, scheduledAt]
    );
  }


    async dequeue(queue) {
    const result = await this.pool.query(
      `WITH next_job AS (
        SELECT id FROM jobs
        WHERE queue = $1
          AND attempts < max_attempts
          AND scheduled_at <= NOW()
        ORDER BY scheduled_at
        LIMIT 1
        FOR UPDATE SKIP LOCKED
      )
      UPDATE jobs
      SET attempts = attempts + 1
      FROM next_job
      WHERE jobs.id = next_job.id
      RETURNING jobs.*`,
      [queue]
    );


        return result.rows[0];
  }


    async complete(jobId) {
    await this.pool.query('DELETE FROM jobs WHERE id = $1', [jobId]);
  }


    async fail(jobId, error) {
    await this.pool.query(
      `UPDATE jobs
       SET attempts = max_attempts,
           payload = payload || jsonb_build_object('error', $2)
       WHERE id = $1`,
      [jobId, error.message]
    );
  }
}


module.exports = PostgresQueue;

 

八、性能优化技巧

 

1、使用连接池

 

const { Pool } = require('pg');


const pool = new Pool({
  max20,  // Max connections
  idleTimeoutMillis30000,
  connectionTimeoutMillis2000,
});

 

2、添加合适的索引

 

CREATE INDEX CONCURRENTLY idx_cache_key ON cache(keyWHERE expires_at > NOW();
CREATE INDEX CONCURRENTLY idx_jobs_pending ON jobs(queue, scheduled_at) 
  WHERE attempts < max_attempts;

 

3、调整PostgreSQL配置

 

# postgresql.conf
shared_buffers = 2GB           # 25% of RAM
effective_cache_size = 6GB     # 75% of RAM
work_mem = 50MB                # For complex queries
maintenance_work_mem = 512MB   # For VACUUM

 

4、定期维护

 

-- Run daily
VACUUM ANALYZE cache;
VACUUM ANALYZE jobs;


-- Or enable autovacuum (recommended)
ALTER TABLE cache SET (autovacuum_vacuum_scale_factor = 0.1);

 

九、三个月后的结果

 

我省下了:

 

  • 每月100美元(不再使用 ElastiCache)
  • 备份复杂性降低50%
  • 少监控一项服务
  • 更简单的部署(减少一项依赖)

 

我失去了:

 

  • 缓存操作延迟则增加约0.5毫秒
  • Redis特有的数据结构(其实并不需要)

 

我会再次这样做吗?就这个业务场景而言:会。

 

是否推荐所有人都这么做?不推荐。

 

十、决策矩阵

 

如果满足以下条件,可用Postgres替换Redis:

 

  • 仅用Redis做简单缓存或者会话管理
  • 缓存命中率低于95%(写入次数过多)
  • 需要事务一致性
  • 可以接受操作速度慢0.1-1毫秒
  • 团队规模小,运维资源有限

 

以下场景建议保留Redis:

 

  • 每秒10万次以上的操作量
  • 使用Redis特有的数据结构(有序集合等)
  • 配备专业的运维团队
  • 亚毫秒级延迟为核心要求
  • 需要跨区域地理复制

 

十一、参考资料

 

1、PostgreSQL 特性

 

  • LISTEN/NOTIFY 官方文档
  • SKIP LOCKED 语法
  • UNLOGGED 表

 

2、工具

 

  • pgBouncer - 连接池
  • pg_stat_statements - 查询性能

 

3、其他解决方案

 

  • Graphile Worker - 基于Postgres的任务队列
  • pg-boss - 另一款Postgres队列实现

 

十二、最后

 

我用PostgreSQL替换了Redis的这些场景:

 

  • 缓存 → UNLOGGED 表
  • 发布订阅 → LISTEN/NOTIFY
  • 任务队列 → SKIP LOCKED
  • 会话存储 → JSONB 表

 

结果:

 

  • 每月节省100美元
  • 降低了运维复杂度
  • 性能略有下降(延迟增加 0.1–1ms),但可接受
  • 保证了事务一致性

 

适合这样做的场景:

 

  • 中小型应用
  • 简单的缓存需求
  • 希望减少系统组件、简化架构

 

不适合这样做的场景:

 

  • 性能要求高(每秒10万次以上操作)
  • 用Redis特有的功能
  • 配备专职运维团队

 

你是否用过Postgres替换Redis(或反过来用Redis替换Postgres)?实际体验如何?欢迎在评论区分享你的基准测试数据!

 

作者丨Polliog      编译丨dbaplus社群

来源丨网址:
https://dev.to/polliog/i-replaced-redis-with-postgresql-and-its-faster-4942

dbaplus社群欢迎广大技术人员投稿,投稿邮箱:editor@dbaplus.cn

最新评论
访客 2024年04月08日

如果字段的最大可能长度超过255字节,那么长度值可能…

访客 2024年03月04日

只能说作者太用心了,优秀

访客 2024年02月23日

感谢详解

访客 2024年02月20日

一般干个7-8年(即30岁左右),能做到年入40w-50w;有…

访客 2023年08月20日

230721

活动预告