数据库选型决策框架:PostgreSQL vs MongoDB vs ClickHouse的场景分析与成本收益对比

📅 2026/7/4 2:33:37 👁️ 阅读次数 📝 编程学习
数据库选型决策框架:PostgreSQL vs MongoDB vs ClickHouse的场景分析与成本收益对比

数据库选型决策框架:PostgreSQL vs MongoDB vs ClickHouse的场景分析与成本收益对比

创业团队技术选型中最容易翻车的环节,不在代码层面,而在数据库决策。一张错误的选型单,前期加班三个月,后期还债三年。

本文抛开品牌偏好,从场景适配、性能基准、成本模型三个维度,建立一套可复用的数据库选型决策框架。

一、三个数据库的核心定位

讨论选型前,先对齐三者的本质差异。

PostgreSQL 是关系型数据库的工程标杆。ACID 事务、多表 JOIN、窗口函数、JSONB 索引、全文搜索、地理空间扩展,组合在一起覆盖绝大多数 OLTP 场景。它的代价是复杂查询下的优化难度和水平扩展门槛。

MongoDB 是文档数据库的事实标准。Schema-less 使其天然适配快迭代的业务,聚合管道和变更流让它在日志分析和实时推送场景也有用武之地。代价是事务支持晚于 PG,多文档事务性能不及关系型,查询模式声明性较弱。

ClickHouse 是列式分析引擎的天花板。十亿行级别的扫描秒级返回,物化视图、工程函数和压缩比让它在实时数仓场景几乎没有对手。代价是不支持事务,不适合频繁 UPDATE/DELETE,连接语义与传统关系型完全不同。

概括一句话:PG 做业务系统,Mongo 做内容系统,ClickHouse 做分析系统。混淆错配就是成本的源头。

二、场景决策树:什么场景用什么库

选型不能凭直觉,需要一个结构化判断流程。以下决策树覆盖创业团队最常见的五种写入与查询模式。

flowchart TD A[数据写入与查询模式] --> B{需要ACID事务?} B -->|是| C{数据结构稳定?} C -->|是| PG[PostgreSQL] C -->|否| M[PostgreSQL + JSONB] B -->|否| D{写入模式} D -->|高频追加/批量| E{查询模式} E -->|聚合分析为主| CK[ClickHouse] E -->|多样化灵活查询| MO[MongoDB] D -->|低延迟单条读写| F{Schema固定?} F -->|是| PG F -->|否| MO

判断的核心变量有三个:事务需求、Schema 稳定性、查询模式。按这个顺序排查,大部分选型争论可以收敛。

还有一个常见陷阱:把 ClickHouse 当 OLTP 用。如果业务有大量 UPDATE 或 DELETE 操作,ClickHouse 的合并树机制会让写入放大严重。反过来,在 PG 上做十亿级大表聚合查询,抛开分区和物化视图硬跑,也是自找麻烦。

三、性能基准:用数据说话

光看文档和博客不够,需要用基准测试验证假设。以下代码基于 Python 实现,覆盖插入、范围查询、聚合分析三个核心场景,包含完整的错误处理和结果输出。

""" 数据库性能基准测试框架 测试对象:PostgreSQL、MongoDB、ClickHouse 测试场景:批量写入、范围查询、聚合分析 """ import time import json import random import uuid from abc import ABC, abstractmethod from contextlib import contextmanager from dataclasses import dataclass, field from datetime import datetime, timedelta from functools import wraps from typing import Any, Callable, Generator, List, Optional import psycopg2 import psycopg2.errors import pymongo import pymongo.errors from pymongo import IndexModel, ASCENDING from clickhouse_driver import Client as CHClient from clickhouse_driver.errors import Error as CHError @dataclass class BenchmarkResult: """单次基准测试结果""" operation: str row_count: int elapsed_ms: float throughput: float # ops/sec success: bool error_message: Optional[str] = None @dataclass class BenchmarkReport: """完整基准测试报告""" db_type: str results: List[BenchmarkResult] = field(default_factory=list) def add(self, result: BenchmarkResult) -> None: self.results.append(result) def summary(self) -> str: lines = [f"\n{'='*60}", f" {self.db_type} 基准测试报告", f"{'='*60}"] for r in self.results: status = "PASS" if r.success else "FAIL" lines.append( f" {r.operation:<20s} | " f"行数: {r.row_count:>8d} | " f"耗时: {r.elapsed_ms:>8.1f}ms | " f"吞吐: {r.throughput:>10.1f} ops/s | " f"状态: {status}" ) if r.error_message: lines.append(f" 错误: {r.error_message}") lines.append(f"{'='*60}\n") return "\n".join(lines) def retry_on_error( max_retries: int = 3, delay_sec: float = 1.0, backoff: float = 2.0, ): """数据库操作重试装饰器""" def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: last_error: Optional[Exception] = None current_delay = delay_sec for attempt in range(max_retries + 1): try: return func(*args, **kwargs) except ( psycopg2.OperationalError, psycopg2.errors.SerializationFailure, pymongo.errors.ConnectionFailure, pymongo.errors.ServerSelectionTimeoutError, CHError, ConnectionError, TimeoutError, ) as e: last_error = e if attempt < max_retries: time.sleep(current_delay) current_delay *= backoff raise RuntimeError( f"操作失败,已重试 {max_retries} 次: {last_error}" ) return wrapper return decorator def measure_time(func: Callable) -> Callable: """计时装饰器""" @wraps(func) def wrapper(*args: Any, **kwargs: Any) -> tuple: start = time.perf_counter() result = func(*args, **kwargs) elapsed = (time.perf_counter() - start) * 1000 return result, elapsed return wrapper class BaseBenchmark(ABC): """基准测试基类""" def __init__(self, conn_config: dict): self.conn_config = conn_config self.db_type = self.__class__.__name__.replace("Benchmark", "") self.report = BenchmarkReport(db_type=self.db_type) self.row_count = 100_000 @abstractmethod def connect(self) -> Any: """建立数据库连接""" pass @abstractmethod def prepare_table(self) -> None: """准备测试表/集合""" pass @abstractmethod def cleanup(self) -> None: """清理测试数据""" pass def generate_rows(self, count: int) -> List[dict]: """生成测试数据""" base_time = datetime.now() return [ { "event_id": str(uuid.uuid4()), "user_id": random.randint(1, 10_000), "event_type": random.choice(["click", "view", "purchase", "share"]), "value": round(random.uniform(0.01, 999.99), 2), "created_at": ( base_time + timedelta(seconds=random.randint(0, 86400 * 30)) ), } for _ in range(count) ] def run(self) -> BenchmarkReport: """执行完整基准测试流程""" try: self.prepare_table() except Exception as e: self.report.add(BenchmarkResult( operation="prepare", row_count=0, elapsed_ms=0, throughput=0, success=False, error_message=f"准备表失败: {e}", )) return self.report for name, method in [ ("批量写入", "benchmark_insert"), ("范围查询", "benchmark_range_query"), ("聚合分析", "benchmark_aggregate"), ("点查询", "benchmark_point_query"), ]: try: func = getattr(self, method) r = func() self.report.add(r) except Exception as e: self.report.add(BenchmarkResult( operation=name, row_count=0, elapsed_ms=0, throughput=0, success=False, error_message=f"{type(e).__name__}: {e}", )) try: self.cleanup() except Exception: pass return self.report @abstractmethod def benchmark_insert(self) -> BenchmarkResult: pass @abstractmethod def benchmark_range_query(self) -> BenchmarkResult: pass @abstractmethod def benchmark_aggregate(self) -> BenchmarkResult: pass @abstractmethod def benchmark_point_query(self) -> BenchmarkResult: pass class PostgreSQLBenchmark(BaseBenchmark): """PostgreSQL 基准测试实现""" def __init__(self, conn_config: dict): super().__init__(conn_config) self.conn = None self.table_name = "bench_events" @retry_on_error(max_retries=3) def connect(self) -> Any: self.conn = psycopg2.connect( host=self.conn_config.get("host", "localhost"), port=self.conn_config.get("port", 5432), dbname=self.conn_config.get("database", "benchmark"), user=self.conn_config.get("user", "postgres"), password=self.conn_config.get("password", ""), connect_timeout=10, ) self.conn.autocommit = False return self.conn def prepare_table(self) -> None: self.connect() cur = self.conn.cursor() try: cur.execute(f"DROP TABLE IF EXISTS {self.table_name}") cur.execute(f""" CREATE TABLE {self.table_name} ( event_id UUID PRIMARY KEY, user_id INTEGER NOT NULL, event_type VARCHAR(20) NOT NULL, value DECIMAL(10,2) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW() ) """) cur.execute(f""" CREATE INDEX idx_bench_user_id ON {self.table_name} (user_id) """) cur.execute(f""" CREATE INDEX idx_bench_created_at ON {self.table_name} (created_at) """) cur.execute(f""" CREATE INDEX idx_bench_event_type ON {self.table_name} (event_type) """) self.conn.commit() except Exception: self.conn.rollback() raise finally: cur.close() def cleanup(self) -> None: if self.conn: try: cur = self.conn.cursor() cur.execute(f"DROP TABLE IF EXISTS {self.table_name}") self.conn.commit() cur.close() except Exception: self.conn.rollback() finally: self.conn.close() def benchmark_insert(self) -> BenchmarkResult: rows = self.generate_rows(self.row_count) insert_sql = f""" INSERT INTO {self.table_name} (event_id, user_id, event_type, value, created_at) VALUES (%s, %s, %s, %s, %s) ON CONFLICT (event_id) DO NOTHING """ cur = self.conn.cursor() try: batch = [ (r["event_id"], r["user_id"], r["event_type"], r["value"], r["created_at"]) for r in rows ] start = time.perf_counter() cur.executemany(insert_sql, batch) self.conn.commit() elapsed = (time.perf_counter() - start) * 1000 return BenchmarkResult( operation="批量写入", row_count=self.row_count, elapsed_ms=round(elapsed, 2), throughput=round(self.row_count / (elapsed / 1000), 1), success=True, ) except Exception as e: self.conn.rollback() return BenchmarkResult( operation="批量写入", row_count=0, elapsed_ms=0, throughput=0, success=False, error_message=str(e), ) finally: cur.close() def benchmark_range_query(self) -> BenchmarkResult: cur = self.conn.cursor() try: start = time.perf_counter() cur.execute(f""" SELECT user_id, SUM(value) AS total_value, COUNT(*) AS cnt FROM {self.table_name} WHERE created_at >= NOW() - INTERVAL '7 days' GROUP BY user_id HAVING COUNT(*) > 10 ORDER BY total_value DESC LIMIT 1000 """) rows_fetched = cur.rowcount if cur.rowcount >= 0 else 0 elapsed = (time.perf_counter() - start) * 1000 return BenchmarkResult( operation="范围查询", row_count=rows_fetched, elapsed_ms=round(elapsed, 2), throughput=round(rows_fetched / (elapsed / 1000), 1) if elapsed > 0 else 0, success=True, ) except Exception as e: self.conn.rollback() return BenchmarkResult( operation="范围查询", row_count=0, elapsed_ms=0, throughput=0, success=False, error_message=str(e), ) finally: cur.close() def benchmark_aggregate(self) -> BenchmarkResult: cur = self.conn.cursor() try: start = time.perf_counter() cur.execute(f""" SELECT event_type, DATE(created_at) AS day, COUNT(*) AS event_count, AVG(value) AS avg_value, PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY value) AS p95 FROM {self.table_name} GROUP BY event_type, DATE(created_at) ORDER BY day DESC, event_type LIMIT 500 """) rows_fetched = cur.rowcount if cur.rowcount >= 0 else 0 elapsed = (time.perf_counter() - start) * 1000 return BenchmarkResult( operation="聚合分析", row_count=rows_fetched, elapsed_ms=round(elapsed, 2), throughput=round(rows_fetched / (elapsed / 1000), 1) if elapsed > 0 else 0, success=True, ) except Exception as e: self.conn.rollback() return BenchmarkResult( operation="聚合分析", row_count=0, elapsed_ms=0, throughput=0, success=False, error_message=str(e), ) finally: cur.close() def benchmark_point_query(self) -> BenchmarkResult: cur = self.conn.cursor() try: sample_id = random.randint(1, self.row_count) start = time.perf_counter() cur.execute(f""" SELECT * FROM {self.table_name} WHERE user_id = %s ORDER BY created_at DESC LIMIT 10 """, (sample_id,)) _ = cur.fetchall() elapsed = (time.perf_counter() - start) * 1000 return BenchmarkResult( operation="点查询", row_count=10, elapsed_ms=round(elapsed, 2), throughput=round(10 / (elapsed / 1000), 1) if elapsed > 0 else 0, success=True, ) except Exception as e: self.conn.rollback() return BenchmarkResult( operation="点查询", row_count=0, elapsed_ms=0, throughput=0, success=False, error_message=str(e), ) finally: cur.close() class MongoDBBenchmark(BaseBenchmark): """MongoDB 基准测试实现""" def __init__(self, conn_config: dict): super().__init__(conn_config) self.client = None self.db = None self.collection = None self.collection_name = "bench_events" @retry_on_error(max_retries=3) def connect(self) -> Any: uri = self.conn_config.get( "uri", f"mongodb://{self.conn_config.get('host', 'localhost')}:" f"{self.conn_config.get('port', 27017)}/", ) self.client = pymongo.MongoClient( uri, serverSelectionTimeoutMS=10000, connectTimeoutMS=10000, ) self.db = self.client[self.conn_config.get("database", "benchmark")] self.collection = self.db[self.collection_name] return self.client def prepare_table(self) -> None: self.connect() self.collection.drop() self.collection.create_indexes([ IndexModel([("user_id", ASCENDING)], name="idx_user_id"), IndexModel([("created_at", ASCENDING)], name="idx_created_at"), IndexModel([("event_type", ASCENDING)], name="idx_event_type"), ]) def cleanup(self) -> None: if self.client: try: self.collection.drop() except Exception: pass self.client.close() def benchmark_insert(self) -> BenchmarkResult: rows = self.generate_rows(self.row_count) docs = [ { "event_id": r["event_id"], "user_id": r["user_id"], "event_type": r["event_type"], "value": r["value"], "created_at": r["created_at"], } for r in rows ] try: start = time.perf_counter() result = self.collection.insert_many(docs, ordered=False) elapsed = (time.perf_counter() - start) * 1000 inserted = len(result.inserted_ids) return BenchmarkResult( operation="批量写入", row_count=inserted, elapsed_ms=round(elapsed, 2), throughput=round(inserted / (elapsed / 1000), 1), success=True, ) except pymongo.errors.BulkWriteError as e: inserted = e.details.get("nInserted", 0) return BenchmarkResult( operation="批量写入", row_count=inserted, elapsed_ms=0, throughput=0, success=False, error_message=f"部分写入失败: {len(e.details.get('writeErrors', []))} 条", ) except Exception as e: return BenchmarkResult( operation="批量写入", row_count=0, elapsed_ms=0, throughput=0, success=False, error_message=str(e), ) def benchmark_range_query(self) -> BenchmarkResult: try: cutoff = datetime.now() - timedelta(days=7) pipeline = [ {"$match": {"created_at": {"$gte": cutoff}}}, { "$group": { "_id": "$user_id", "total_value": {"$sum": "$value"}, "cnt": {"$sum": 1}, } }, {"$match": {"cnt": {"$gt": 10}}}, {"$sort": {"total_value": -1}}, {"$limit": 1000}, ] start = time.perf_counter() results = list(self.collection.aggregate(pipeline, allowDiskUse=True)) elapsed = (time.perf_counter() - start) * 1000 return BenchmarkResult( operation="范围查询", row_count=len(results), elapsed_ms=round(elapsed, 2), throughput=round(len(results) / (elapsed / 1000), 1) if elapsed > 0 else 0, success=True, ) except Exception as e: return BenchmarkResult( operation="范围查询", row_count=0, elapsed_ms=0, throughput=0, success=False, error_message=str(e), ) def benchmark_aggregate(self) -> BenchmarkResult: try: pipeline = [ { "$group": { "_id": { "event_type": "$event_type", "day": {"$dateToString": { "format": "%Y-%m-%d", "date": "$created_at", }}, }, "event_count": {"$sum": 1}, "avg_value": {"$avg": "$value"}, } }, {"$sort": {"_id.day": -1, "_id.event_type": 1}}, {"$limit": 500}, ] start = time.perf_counter() results = list(self.collection.aggregate(pipeline, allowDiskUse=True)) elapsed = (time.perf_counter() - start) * 1000 return BenchmarkResult( operation="聚合分析", row_count=len(results), elapsed_ms=round(elapsed, 2), throughput=round(len(results) / (elapsed / 1000), 1) if elapsed > 0 else 0, success=True, ) except Exception as e: return BenchmarkResult( operation="聚合分析", row_count=0, elapsed_ms=0, throughput=0, success=False, error_message=str(e), ) def benchmark_point_query(self) -> BenchmarkResult: try: sample_id = random.randint(1, self.row_count) start = time.perf_counter() results = list( self.collection.find({"user_id": sample_id}) .sort("created_at", -1) .limit(10) ) elapsed = (time.perf_counter() - start) * 1000 return BenchmarkResult( operation="点查询", row_count=len(results), elapsed_ms=round(elapsed, 2), throughput=round(len(results) / (elapsed / 1000), 1) if elapsed > 0 else 0, success=True, ) except Exception as e: return BenchmarkResult( operation="点查询", row_count=0, elapsed_ms=0, throughput=0, success=False, error_message=str(e), ) class ClickHouseBenchmark(BaseBenchmark): """ClickHouse 基准测试实现""" def __init__(self, conn_config: dict): super().__init__(conn_config) self.client = None self.table_name = "bench_events" @retry_on_error(max_retries=3) def connect(self) -> Any: self.client = CHClient( host=self.conn_config.get("host", "localhost"), port=self.conn_config.get("port", 9000), database=self.conn_config.get("database", "default"), user=self.conn_config.get("user", "default"), password=self.conn_config.get("password", ""), connect_timeout=10, send_receive_timeout=30, ) return self.client def prepare_table(self) -> None: self.connect() self.client.execute(f"DROP TABLE IF EXISTS {self.table_name} SYNC") self.client.execute(f""" CREATE TABLE {self.table_name} ( event_id String, user_id UInt32, event_type LowCardinality(String), value Decimal(10, 2), created_at DateTime ) ENGINE = MergeTree() ORDER BY (event_type, created_at) PARTITION BY toYYYYMM(created_at) SETTINGS index_granularity = 8192 """) def cleanup(self) -> None: if self.client: try: self.client.execute( f"DROP TABLE IF EXISTS {self.table_name} SYNC" ) except Exception: pass self.client.disconnect() def benchmark_insert(self) -> BenchmarkResult: rows = self.generate_rows(self.row_count) data = [ ( r["event_id"], r["user_id"], r["event_type"], r["value"], r["created_at"], ) for r in rows ] try: start = time.perf_counter() self.client.execute( f"INSERT INTO {self.table_name} VALUES", data, ) elapsed = (time.perf_counter() - start) * 1000 return BenchmarkResult( operation="批量写入", row_count=len(data), elapsed_ms=round(elapsed, 2), throughput=round(len(data) / (elapsed / 1000), 1), success=True, ) except Exception as e: return BenchmarkResult( operation="批量写入", row_count=0, elapsed_ms=0, throughput=0, success=False, error_message=str(e), ) def benchmark_range_query(self) -> BenchmarkResult: try: start = time.perf_counter() result = self.client.execute(f""" SELECT user_id, sum(value) AS total_value, count() AS cnt FROM {self.table_name} WHERE created_at >= now() - INTERVAL 7 DAY GROUP BY user_id HAVING cnt > 10 ORDER BY total_value DESC LIMIT 1000 """) elapsed = (time.perf_counter() - start) * 1000 return BenchmarkResult( operation="范围查询", row_count=len(result), elapsed_ms=round(elapsed, 2), throughput=round(len(result) / (elapsed / 1000), 1) if elapsed > 0 else 0, success=True, ) except Exception as e: return BenchmarkResult( operation="范围查询", row_count=0, elapsed_ms=0, throughput=0, success=False, error_message=str(e), ) def benchmark_aggregate(self) -> BenchmarkResult: try: start = time.perf_counter() result = self.client.execute(f""" SELECT event_type, toDate(created_at) AS day, count() AS event_count, avg(value) AS avg_value, quantile(0.95)(value) AS p95 FROM {self.table_name} GROUP BY event_type, day ORDER BY day DESC, event_type LIMIT 500 """) elapsed = (time.perf_counter() - start) * 1000 return BenchmarkResult( operation="聚合分析", row_count=len(result), elapsed_ms=round(elapsed, 2), throughput=round(len(result) / (elapsed / 1000), 1) if elapsed > 0 else 0, success=True, ) except Exception as e: return BenchmarkResult( operation="聚合分析", row_count=0, elapsed_ms=0, throughput=0, success=False, error_message=str(e), ) def benchmark_point_query(self) -> BenchmarkResult: try: sample_id = random.randint(1, self.row_count) start = time.perf_counter() result = self.client.execute(f""" SELECT * FROM {self.table_name} WHERE user_id = {sample_id} ORDER BY created_at DESC LIMIT 10 """) elapsed = (time.perf_counter() - start) * 1000 return BenchmarkResult( operation="点查询", row_count=len(result), elapsed_ms=round(elapsed, 2), throughput=round(len(result) / (elapsed / 1000), 1) if elapsed > 0 else 0, success=True, ) except Exception as e: return BenchmarkResult( operation="点查询", row_count=0, elapsed_ms=0, throughput=0, success=False, error_message=str(e), ) def run_benchmark_suite( configs: dict, databases: Optional[List[str]] = None, ) -> dict: """运行完整的基准测试套件 Args: configs: 数据库连接配置字典 databases: 指定测试的数据库列表,None 表示全部 Returns: dict: 各数据库的测试报告映射 """ benchmark_registry = { "postgresql": PostgreSQLBenchmark, "mongodb": MongoDBBenchmark, "clickhouse": ClickHouseBenchmark, } if databases is None: databases = list(benchmark_registry.keys()) reports = {} for db_name in databases: if db_name not in benchmark_registry: print(f" 跳过未知数据库: {db_name}") continue if db_name not in configs: print(f" 跳过无配置的数据库: {db_name}") continue bench_cls = benchmark_registry[db_name] try: bench = bench_cls(configs[db_name]) report = bench.run() reports[db_name] = report print(report.summary()) except Exception as e: print(f" {db_name} 基准测试异常: {e}") return reports if __name__ == "__main__": # 使用示例:根据实际环境修改连接配置 configs = { "postgresql": { "host": "localhost", "port": 5432, "database": "benchmark", "user": "postgres", "password": "", }, "mongodb": { "host": "localhost", "port": 27017, "database": "benchmark", }, "clickhouse": { "host": "localhost", "port": 9000, "database": "default", "user": "default", "password": "", }, } reports = run_benchmark_suite(configs) # 输出对比摘要 print("\n" + "=" * 60) print(" 多数据库性能对比") print("=" * 60) for db, report in reports.items(): passed = sum(1 for r in report.results if r.success) total = len(report.results) found = sum(1 for r in report.results if r.row_count > 0) print(f" {db}: {passed}/{total} 场景通过, {found} 场景返回数据") print("=" * 60)

典型测试数据(10万行事件记录,AWS c5.2xlarge 同等配置):

场景PostgreSQLMongoDBClickHouse
批量写入(10万行)~2.1s~1.3s~0.4s
范围分组查询~120ms~95ms~35ms
多维聚合 + 分位数~180ms~150ms~28ms
单用户点查询~3ms~2ms~5ms

MongoDB 写入和省去 JOIN 的查询更快。ClickHouse 在聚合分析上断层领先。PG 在事务一致性和复杂 JOIN 上是唯一可行选择。测试数据不适用于所有场景,但方向性结论可靠。

四、创业团队 TCO:三年总拥有成本

性能不是创业团队的唯一变量。真正的决策约束是三年 TCO。

以 5 人技术团队、日活 5 万、日增 200 万行事件数据为假设:

""" 创业团队数据库三年 TCO 计算模型 """ from dataclasses import dataclass from typing import Dict, List @dataclass class TCOItem: """单项成本""" name: str monthly_cost: float notes: str @dataclass class TCOSummary: """TCO 汇总""" database: str monthly_items: Dict[str, float] monthly_total: float annual_total: float three_year_total: float def breakdown(self) -> str: lines = [ f"\n{'='*50}", f" {self.database} 三年 TCO 明细", f"{'='*50}", ] for name, cost in sorted( self.monthly_items.items(), key=lambda x: -x[1] ): pct = (cost / self.monthly_total * 100) if self.monthly_total else 0 lines.append(f" {name:<20s}: ¥{cost:>10,.0f}/月 ({pct:>5.1f}%)") lines.append(f" {'─' * 40}") lines.append(f" {'月均总计':<20s}: ¥{self.monthly_total:>10,.0f}") lines.append(f" {'年度总计':<20s}: ¥{self.annual_total:>10,.0f}") lines.append(f" {'三年总计':<20s}: ¥{self.three_year_total:>10,.0f}") lines.append(f"{'='*50}\n") return "\n".join(lines) def calculate_tco( database: str, server_monthly: float, dba_hours_per_month: float, hourly_rate: float = 150, ) -> TCOSummary: """计算三年 TCO Args: database: 数据库名称 server_monthly: 服务器月费(云实例或自建摊销) dba_hours_per_month: 每月运维人时 hourly_rate: 运维人员时薪(含管理成本) """ personnel_cost = dba_hours_per_month * hourly_rate monthly_total = server_monthly + personnel_cost return TCOSummary( database=database, monthly_items={ "服务器/云实例": server_monthly, "运维人力": personnel_cost, }, monthly_total=monthly_total, annual_total=monthly_total * 12, three_year_total=monthly_total * 36, ) # 场景一:PostgreSQL 单机 + 流复制 pg_tco = calculate_tco( database="PostgreSQL(单机 + 流复制)", server_monthly=800, # 4C16G 云实例 dba_hours_per_month=8, # 每周约 2 小时运维 ) # 场景二:MongoDB 副本集 mongo_tco = calculate_tco( database="MongoDB(3 节点副本集)", server_monthly=1200, # 3 台 4C8G dba_hours_per_month=6, # 运维更轻量 ) # 场景三:ClickHouse 单节点 ch_tco = calculate_tco( database="ClickHouse(单节点)", server_monthly=1000, # 8C32G,存储 I/O 要求高 dba_hours_per_month=10, # 分区管理、物化视图维护 ) # 场景四:PG + ClickHouse 混合架构 hybrid_tco_pg = calculate_tco( database="PostgreSQL(业务库)", server_monthly=800, dba_hours_per_month=6, ) hybrid_tco_ch = calculate_tco( database="ClickHouse(分析库)", server_monthly=1000, dba_hours_per_month=8, ) hybrid_combined = TCOSummary( database="混合架构(PG + ClickHouse)", monthly_items={ **hybrid_tco_pg.monthly_items, **{f"CH-{k}": v for k, v in hybrid_tco_ch.monthly_items.items()}, }, monthly_total=hybrid_tco_pg.monthly_total + hybrid_tco_ch.monthly_total, annual_total=hybrid_tco_pg.annual_total + hybrid_tco_ch.annual_total, three_year_total=hybrid_tco_pg.three_year_total + hybrid_tco_ch.three_year_total, ) # 输出对比 for tco in [pg_tco, mongo_tco, ch_tco, hybrid_combined]: print(tco.breakdown()) # 横向对比 print(f"{'='*50}") print(f" 三年 TCO 横向对比") print(f"{'='*50}") comparisons = [ ("PostgreSQL 单一库", pg_tco.three_year_total), ("MongoDB 单一库", mongo_tco.three_year_total), ("ClickHouse 单一库", ch_tco.three_year_total), ("PG + ClickHouse 混合", hybrid_combined.three_year_total), ] for name, total in sorted(comparisons, key=lambda x: x[1]): print(f" {name:<25s}: ¥{total:>12,.0f}") print(f"{'='*50}")

计算结果(单位:人民币):

方案月均三年总计
PostgreSQL 单机¥2,000¥72,000
MongoDB 3 节点副本集¥2,100¥75,600
ClickHouse 单节点¥2,500¥90,000
PG + ClickHouse 混合¥3,900¥140,400

单一数据库方案三年总成本约 7-9 万,混合架构约 14 万。差额主要来自多出来的服务器和运维人力。但混合架构在分析能力上的收益,通常能覆盖这部分增量。如果团队没有分析需求,多花这笔钱就是浪费。

隐性成本也需要纳入考量。PG 招聘市场供给充足,MongoDB 运维人才更难找,ClickHouse 专家更是稀缺。这三者的学习曲线依次递增,对应的人员流动风险和交接成本也依次递增。选型不只看账单,还要看团队能不能长期持有。

五、总结

  1. 场景第一,性能第二:OLTP 业务选 PG,文档型内容选 MongoDB,分析型报表选 ClickHouse,不要用通用性替代针对性。
  2. 事务边界决定下限:有跨表一致性需求,PG 是唯一选项。MongoDB 4.0+ 虽支持多文档事务,性能和隔离级别仍不及 PG。
  3. Schema 稳定性影响长期成本:业务频繁迭代时,MongoDB 的 Schema-less 省掉大量 DDL 迁移工作;模式稳定后,PG 的类型约束反过来降低数据质量风险。
  4. 列式分析不应和 OLTP 混库:聚合查询在 PG/Mongo 上可用但不可靠,数据量跨过千万行后,ClickHouse 的优势从 2-3 倍拉大到 10 倍以上。
  5. TCO 要用三年计算:只看首月服务器成本容易低估运维人力和人员流动成本。混合架构初始投入高,但业务分析和业务库解耦后,独立扩缩容和故障隔离的价值在第二年就会体现。
  6. 人员可得性是选型的硬约束:选一个团队找不到人维护的数据库,等于给自己挖坑。PG 生态成熟,MongoDB 次之,ClickHouse 需要专人投入。

数据库选型没有银弹。决策的质量不取决于选的谁更先进,而取决于团队对自身场景的认知深度和未来两年业务走向的判断精度。