嘿,朋友。如果你正盯着屏幕上的数据库迁移任务发愁,或者担心把关系型数据搬到文档型数据库里会搞得一团糟,那这篇东西就是为你准备的。别被“无损”这两个字吓到了,虽然听起来像是要把每一比特数据都原封不动地搬过去,但在实际工程中,我们追求的是业务逻辑上的无损和数据一致性上的高保真。
MySQL是表格世界的王者,讲究严谨、秩序和关联;而MongoDB则是文档宇宙的霸主,崇尚灵活、速度和嵌套。把前者搬进后者,不仅仅是换个地方存数据,更像是一场“翻译”加“重构”的过程。今天,我不给你念枯燥的理论,咱们直接上手,聊聊怎么把这个活儿干漂亮,顺便避开那些让人头秃的坑。
第一步:心态转变——从“行与列”到“文档与嵌套”
在写第一行迁移代码之前,你得先换个脑子。在MySQL里,你习惯看到 Users 表,里面有一堆字段:id, name, email, address_city, address_street。你觉得这很清晰。
但在MongoDB里,如果你还这么存,你就输了。你会得到一个扁平的JSON对象,查询起来累死人。真正的“MongoDB思维”是把地址作为一个子文档嵌套进去:
{
"_id": "507f1f77bcf86cd799439011",
"name": "张三",
"email": "zhangsan@example.com",
"address": {
"city": "北京",
"street": "中关村大街1号",
"zip": "100080"
}
}
这种结构的变化,直接决定了你迁移脚本怎么写。如果你的MySQL表之间有着复杂的 JOIN 关系,那么在迁移时,你需要决定是保留这种关系(通过引用 _id),还是把数据合并到一个文档里(Embedding)。
核心原则:
- 高频访问且数据量小的数据,考虑嵌入(Embedding)。
- 数据量大或独立性强的关系,考虑引用(Referencing)。
- 不要试图完全映射表结构,要根据查询模式重新设计Schema。
第二步:前期准备——盘点家底,识别风险
别急着跑脚本。先打开你的MySQL数据库,看看里面到底有什么。
1. 数据类型映射表
这是最容易出错的地方。MySQL里的 DECIMAL、DATETIME、ENUM 在MongoDB里没有直接对应的原生类型,或者处理方式不同。
| MySQL 类型 | MongoDB 推荐类型 | 注意事项 |
|---|---|---|
INT, BIGINT |
Int32, Int64 |
注意溢出问题,JS默认整数安全范围是 Number.MAX_SAFE_INTEGER (\(2^{53}-1\))。 |
VARCHAR |
String |
确保编码统一为 UTF-8。 |
TEXT |
String |
MongoDB字符串最大长度为16MB,小心大文本。 |
DATETIME |
Date |
时区问题!MySQL可能存的是UTC,读取时需注意本地化。 |
DECIMAL |
Double 或 String |
大坑预警:Double 有精度丢失风险。涉及金额务必用 String 或专门的 Decimal128。 |
BOOLEAN |
Boolean |
基本没问题,但要注意MySQL中 TINYINT(1) 可能被误读。 |
JSON (MySQL 5.7+) |
Object |
完美映射,但需验证JSON合法性。 |
UUID |
ObjectId 或 String |
如果MySQL用了UUID做主键,MongoDB可以用 ObjectId 替代,或者保持 String。 |
2. 主键与索引策略
MySQL的主键通常是自增ID或UUID。MongoDB默认使用 ObjectId。
- 方案A(推荐):保留MySQL的主键值作为MongoDB文档的
_id。这样方便追踪和调试。 - 方案B:生成新的
ObjectId,并在另一个字段保留MySQL的id。这需要额外的索引支持。
切记:MongoDB的 _id 字段是只读的,一旦写入不能修改。所以迁移前必须确定好这个ID策略。
第三步:实战迁移——Python脚本详解
光说不练假把式。我推荐使用 Python,因为它的库生态丰富,且处理JSON非常方便。我们将使用 pymysql 连接MySQL,pymongo 连接MongoDB。
为了演示“无损”和“高性能”,我不会给你看那种简单的 for 循环插入,那是生产环境的杀手。我们要用批量插入和事务控制(如果版本支持)。
环境准备
pip install pymysql pymongo
核心迁移脚本
假设我们要迁移一张名为 orders 的表,它关联着 users 表。为了简化,我们先只迁移 orders 表,并将用户信息展开(因为订单查询通常不需要频繁查用户详情,或者我们可以用 $lookup 在查询时动态关联,这里为了演示文档结构,我们选择嵌入用户基本信息,假设用户信息变化不频繁)。
import pymysql
import pymongo
from datetime import datetime
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ================= 配置区域 =================
MYSQL_CONFIG = {
'host': 'localhost',
'port': 3306,
'user': 'root',
'password': 'your_password',
'database': 'ecommerce_db',
'charset': 'utf8mb4'
}
MONGO_CONFIG = {
'host': 'localhost',
'port': 27017,
'username': 'admin', # 如果有认证
'password': 'admin',
'database': 'mongo_ecommerce'
}
BATCH_SIZE = 1000 # 每次批量插入的数量,根据内存调整
# ============================================
def get_mysql_connection():
return pymysql.connect(**MYSQL_CONFIG)
def get_mongo_client():
# 注意:如果有认证,URI格式应为 mongodb://user:pass@host:port/db
return pymongo.MongoClient(f"mongodb://{MONGO_CONFIG['host']}:{MONGO_CONFIG['port']}/")
def migrate_orders(mysql_conn, mongo_db):
cursor = mysql_conn.cursor(pymysql.cursors.DictCursor)
# 获取总行数,用于进度显示
cursor.execute("SELECT COUNT(*) as count FROM orders")
total_count = cursor.fetchone()['count']
logger.info(f"Total orders to migrate: {total_count}")
offset = 0
migrated_count = 0
try:
while offset < total_count:
# 分页查询,避免一次性加载过多数据到内存
query = """
SELECT
o.id,
o.user_id,
o.total_amount,
o.created_at,
o.status,
u.name as user_name, -- 简单示例:联表查询获取用户名
u.email as user_email
FROM orders o
LEFT JOIN users u ON o.user_id = u.id
ORDER BY o.id ASC
LIMIT %s OFFSET %s
"""
cursor.execute(query, (BATCH_SIZE, offset))
rows = cursor.fetchall()
if not rows:
break
# 转换数据格式
documents = []
for row in rows:
doc = {
"_id": row['id'], # 使用MySQL ID作为MongoDB _id
"userId": row['user_id'],
"userInfo": { # 嵌入用户信息,形成文档结构
"name": row['user_name'],
"email": row['user_email']
},
"totalAmount": str(row['total_amount']), # 关键:金额转为字符串,防止精度丢失
"createdAt": row['created_at'],
"status": row['status']
}
# 处理空值情况,MongoDB中None会被存为null,有时需要显式处理
if doc['totalAmount'] is None:
doc['totalAmount'] = "0.00"
documents.append(doc)
# 批量插入到MongoDB
# use_collection.insert_many 是原子性的吗?在单文档操作中是,但在多文档批量中,
# 默认情况下是尽可能多的插入,失败则停止。
# 如果需要严格的事务,请使用 mongosql 或 pymongo 4.0+ 的事务功能。
try:
result = mongo_db.orders.insert_many(documents, ordered=False)
migrated_count += len(result.inserted_ids)
logger.info(f"Migrated batch at offset {offset}. Success: {len(result.inserted_ids)}")
except pymongo.errors.BulkWriteError as e:
logger.error(f"Bulk write error: {e.details}")
# 可以选择记录失败的ID,稍后重试或人工介入
for err in e.details.get('writeErrors', []):
logger.warning(f"Failed document ID: {err.get('document_index')}")
offset += BATCH_SIZE
finally:
cursor.close()
logger.info(f"Migration completed. Total migrated: {migrated_count}")
if __name__ == "__main__":
mysql_conn = get_mysql_connection()
mongo_client = get_mongo_client()
mongo_db = mongo_client[MONGO_CONFIG['database']]
# 确保集合存在(可选,pymongo会自动创建)
migrate_orders(mysql_conn, mongo_db)
mysql_conn.close()
mongo_client.close()
代码中的关键点解析
ordered=False:在insert_many中设置这个参数非常重要。如果其中一条数据有问题(比如重复的_id),它不会停止整个批次,而是继续尝试插入其他数据,并返回错误列表。这对于大批量迁移容错性更好。- 金额处理:注意
str(row['total_amount'])。MySQL的DECIMAL(10,2)在Python中可能是Decimal类型,直接转JSON可能会出错,或者在JS前端变成浮点数导致精度丢失。存为字符串是最安全的做法,前端展示时再格式化。 - 分页查询:使用
LIMIT和OFFSET。虽然对于超大表,OFFSET性能会下降,但在迁移场景下,这是最简单可控的方式。如果数据量达到千万级,建议基于主键ID的范围查询(如WHERE id > last_max_id AND id <= last_max_id + 1000)。
第四步:高级坑点规避指南
即使脚本跑通了,也不代表万事大吉。以下是资深工程师才会告诉你的“血泪经验”。
坑点一:时区地狱
MySQL默认存储的是服务器设置的时区时间(除非你明确用了 DATETIME 而非 TIMESTAMP,或者配置了 default_time_zone)。MongoDB的 Date 类型本质上是UTC时间戳。
现象:你在MySQL里看到的是
2023-10-01 10:00:00(北京时间),迁移到MongoDB后,查出来变成了2023-10-01 02:00:00UTC,前端展示时如果不处理时区,时间就全乱了。解决方案:
- 迁移时标准化:在Python脚本中,获取MySQL的时间后,强制转换为UTC时间再存入MongoDB。
import pytz from datetime import datetime # 假设 row['created_at'] 是 datetime 对象 if row['created_at'].tzinfo is None: # 假设MySQL是无时区的,默认它是北京时间 CST local_tz = pytz.timezone('Asia/Shanghai') row['created_at'] = local_tz.localize(row['created_at']) # 转换为UTC utc_time = row['created_at'].astimezone(pytz.utc) doc['createdAt'] = utc_time- 应用层处理:确保你的后端代码在读取MongoDB日期时,知道它是UTC,并在展示给用户前转换回当地时区。
坑点二:数组与重复数据
MySQL中,一个订单可能对应多个商品,通常存储在 order_items 表中。在MongoDB中,你可以有两种选择:
- 嵌入数组:在
orders文档里放一个items: [{...}, {...}]。 - 单独集合:建立
order_items集合,通过orderId关联。
- 决策依据:如果商品数量少(<100),且经常一起查询,选嵌入。如果商品数量巨大,或者经常单独更新某个商品的状态,选单独集合。
- 迁移注意:如果选择嵌入,你需要在迁移脚本中先查出所有
order_items,然后在内存中按orderId分组,组装成数组,再插入orders集合。这个过程容易出错,务必测试边界情况(如某个订单没有商品)。
坑点三:唯一约束失效
MySQL有 UNIQUE INDEX,保证 email 不重复。MongoDB也有唯一索引,但迁移过程中,如果你没有预先创建好唯一索引,可能会出现数据重复。
- 最佳实践:
- 先在MongoDB中创建好目标集合的结构和索引(包括唯一索引)。
- 迁移脚本中使用
upsert模式或者先检查是否存在。 - 或者,在迁移完成后,运行一个校验脚本,对比MySQL和MongoDB的记录数、关键字段哈希值。
坑点四:大对象与网格文件
如果MySQL里有 BLOB 类型的大字段(比如用户上传的图片Base64,或者PDF内容),MongoDB默认文档大小限制是16MB。
- 解决方案:如果文件超过16MB,不要直接存入文档。使用 GridFS。
- GridFS是MongoDB的一个规范,用于存储和恢复超出该大小的文件。
- 在迁移脚本中,检测字段大小,如果过大,调用
gridfsAPI写入,否则直接作为字符串或二进制数据嵌入。
第五步:验证与回滚——确保“无损”的最后一步
迁移完成不是结束,验证才是。
1. 数量核对
-- MySQL
SELECT COUNT(*) FROM orders;
-- MongoDB (使用mongo shell或驱动)
db.orders.countDocuments()
数字必须一致。
2. 抽样数据比对
随机抽取100个ID,对比两边的关键字段。特别是金额、日期、长文本。 你可以写一个简单的校验脚本,遍历这些ID,计算两边的MD5哈希值是否一致。
3. 业务逻辑回归测试
找几个典型的用户场景:
- 查询某个用户的所有订单。
- 查询某个时间段内的销售额。
- 更新一个订单状态。
确保在MongoDB中,这些操作的性能和结果符合预期。
4. 灰度发布与回滚计划
永远不要一次性切断MySQL的写入。
- 双写阶段:应用同时写MySQL和MongoDB。
- 数据同步阶段:运行迁移脚本,将历史数据导入MongoDB。
- 校验阶段:确认数据一致。
- 切换读取:应用开始从MongoDB读取数据,但仍写入MySQL(或者双写)。
- 最终切换:停止写入MySQL,只写MongoDB,然后逐步下线MySQL。
如果发现问题,立即切回MySQL读取。MongoDB的优势在于Schema灵活,即使迁移过程有误,也可以随时修正Schema并重新导入部分数据,这比关系型数据库的回滚要容易得多。
结语:这不是终点,而是起点
从MySQL到MongoDB的迁移,表面上是技术的升级,实质上是架构思维的转型。你不再是被表结构束缚的开发人员,而是数据模型的设计师。
在这个过程中,你会遇到各种奇怪的问题:有的数据在MySQL里是正常的,但在MongoDB里解析报错;有的查询在MySQL里毫秒级响应,在MongoDB里却慢得像蜗牛。别灰心,这些都是成长的代价。
记住,没有完美的迁移,只有不断优化的过程。先用脚本把数据搬过去,验证无误,然后根据实际的业务查询模式,逐步调整你的索引和文档结构。MongoDB的魅力在于它的灵活性,你可以边跑边改,这在传统关系型数据库中是不可想象的奢侈。
现在,打开你的终端,启动你的迁移脚本吧。祝你顺利,愿你的数据永远一致,愿你的查询永远飞快。如果有具体的报错信息,欢迎随时回来讨论,我们一起解决。
