《腾讯企业邮箱迁移到 Google Workspace 一站式教程》支持多账号同步迁移、服务授权策略、IMAP拉取、Gmail API导入
准备总览
项目 | 说明 |
---|---|
Google Workspace 管理员权限 | ✅ 已具备 |
腾讯企业邮箱开启 IMAP 功能 | ✅ 已启用 |
Python 3 + gcloud CLI 环境 | ✅ 本地已安装 |
服务账号访问 Gmail API 权限 | ✅ 已授权 |
1. 安装 gcloud CLI 工具
访问:https://cloud.google.com/sdk/docs/install
Windows 用户安装完成后运行:
gcloud --version
gcloud init
2. 解决服务账号密钥创建受限问题
组织策略限制时会提示:
iam.disableServiceAccountKeyCreation = enforced
2.1 查询当前策略状态:
gcloud org-policies describe iam.disableServiceAccountKeyCreation --organization=75******2
2.2 删除组织策略:
gcloud org-policies delete iam.disableServiceAccountKeyCreation --organization=75******2
3. 配置 Google 服务账号与授权
- 在 Google Cloud Console 中启用 Gmail API
- 创建服务账号,生成 JSON 密钥文件
- 在 Google Admin 后台授权 OAuth Scope
https://mail.google.com/
4. 设置 Gmail 免登录导入
通过服务账号 impersonate 用户,无需登录授权:
from google.oauth2 import service_account
from googleapiclient.discovery import build
SCOPES = ["https://mail.google.com/"]
creds = service_account.Credentials.from_service_account_file("credentials.json", scopes=SCOPES)
delegated = creds.with_subject("user@example.com")
service = build("gmail", "v1", credentials=delegated)
5. 配置腾讯邮箱 IMAP 拉取
参数 | 值 |
---|---|
IMAP 服务器 | imap.exmail.qq.com |
端口 | 993 |
SSL | 是 |
登录 | 完整邮箱地址 + 授权码 |
6. Python 脚本迁移实现
基本流程:
1. 连接腾讯邮箱 IMAP 拉取邮件
2. 构造 MIME 邮件结构
3. 使用 Gmail API 插入邮件
4. 保存 JSON 日志供校验
完整版Python脚本(可直接运行)
import csv
import imaplib
import base64
import time
import os
import json
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from google.oauth2 import service_account
from googleapiclient.discovery import build
from google_auth_httplib2 import AuthorizedHttp
from google.auth.transport.requests import Request
import httplib2
import email as email_parser
from email.utils import parsedate_to_datetime, parseaddr
from email.header import decode_header
import re
import hashlib
from dateutil.parser import parse
import filelock
from datetime import datetime, timezone
from collections import defaultdict
# 配置常量
SERVICE_ACCOUNT_FILE = 'service_account.json' #服务账号JSON 密钥文件
SCOPES = ['https://mail.google.com/']
IMAP_SERVER = 'imap.exmail.qq.com'
MAX_PER_FOLDER = None
MAX_RETRY = 5
MAX_THREADS = 10
IMPORT_RECORDS_FILE = "import_records.json"
# 文件夹名称映射
FOLDER_ALIASES = {
'INBOX': '收件箱',
'Sent Messages': '已发送',
'Deleted Messages': '已删除',
'Trash': '已删除',
'Drafts': '草稿箱'
}
# Gmail标签映射
LABEL_MAPPING = {
'收件箱': 'INBOX',
'已发送': 'SENT',
'已删除': 'TRASH',
'草稿箱': 'DRAFT',
'垃圾邮件': 'SPAM',
'星标': 'STARRED',
'重要': 'IMPORTANT'
}
class ImportManager:
def __init__(self):
self.lock = filelock.FileLock(f"{IMPORT_RECORDS_FILE}.lock")
self.records = self._load_records()
def _load_records(self):
try:
with self.lock:
if os.path.exists(IMPORT_RECORDS_FILE):
with open(IMPORT_RECORDS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"⚠️ 加载记录文件失败: {e}")
return {"imported_emails": {}, "folder_states": {}}
def save_records(self):
with self.lock:
with open(IMPORT_RECORDS_FILE, 'w', encoding='utf-8') as f:
json.dump(self.records, f, indent=2, ensure_ascii=False)
def is_email_imported(self, email_id):
return email_id in self.records.get("imported_emails", {})
def mark_email_imported(self, email_id, folder, gmail_id=None):
if "imported_emails" not in self.records:
self.records["imported_emails"] = {}
self.records["imported_emails"][email_id] = {
"folder": folder,
"gmail_id": gmail_id,
"timestamp": datetime.now().isoformat()
}
self.save_records()
def save_result(email_address, result_data):
"""保存迁移结果到JSON文件(线程安全)"""
filename = f"{email_address.replace('@', '_at_')}.json"
lock = filelock.FileLock(f"{filename}.lock")
try:
with lock:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(result_data, f, ensure_ascii=False, indent=2)
print(f"📄 同步日志已保存: {filename}")
except Exception as e:
print(f"❌ 保存结果文件失败: {filename} | 错误: {e}")
def parse_email_date(date_str):
try:
date_str = re.sub(r'\(.*?\)', '', date_str).strip()
try:
dt = parsedate_to_datetime(date_str)
except:
dt = parse(date_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
else:
dt = dt.astimezone(timezone.utc)
if dt.year < 1990 or dt.year > 2100:
dt = dt.replace(year=2024)
return int(dt.timestamp() * 1000)
except Exception as e:
print(f"⚠️ 日期解析失败: {date_str} | 错误: {e}")
return int(datetime.now(timezone.utc).timestamp() * 1000)
def decode_email_header(header):
if header is None:
return ""
try:
decoded = []
for part, encoding in decode_header(header):
if isinstance(part, bytes):
try:
decoded.append(part.decode(encoding or 'utf-8', errors='replace'))
except:
decoded.append(part.decode('utf-8', errors='replace'))
else:
decoded.append(str(part))
return ' '.join(decoded)
except Exception as e:
print(f"⚠️ 头信息解码失败: {e}")
return str(header)[:500]
def get_email_unique_id(msg, imap_uid):
try:
headers = {
'From': decode_email_header(msg.get('From', '')),
'To': decode_email_header(msg.get('To', '')),
'Subject': decode_email_header(msg.get('Subject', '')),
'Date': decode_email_header(msg.get('Date', '')),
'Message-ID': decode_email_header(msg.get('Message-ID', ''))
}
normalized = json.dumps(headers, sort_keys=True, ensure_ascii=False)
fingerprint = hashlib.sha256(normalized.encode('utf-8')).hexdigest()
return f"{fingerprint}_{imap_uid}"
except Exception as e:
print(f"⚠️ 生成唯一ID失败: {e}")
return f"fallback_{hashlib.md5(str(imap_uid).encode()).hexdigest()}"
def ensure_labels_exist(service):
try:
existing_labels = {label['name']: label['id']
for label in service.users().labels().list(userId='me').execute().get('labels', [])}
system_labels = ['INBOX', 'SENT', 'TRASH', 'DRAFT', 'SPAM', 'STARRED', 'IMPORTANT']
for name, label_id in LABEL_MAPPING.items():
if name not in existing_labels and label_id not in system_labels:
try:
service.users().labels().create(
userId='me',
body={
'name': name,
'labelListVisibility': 'labelShow',
'messageListVisibility': 'show'
}
).execute()
print(f"🏷️ 创建标签: {name}")
except Exception as e:
print(f"⚠️ 创建标签失败 {name}: {e}")
except Exception as e:
print(f"⚠️ 标签检查失败: {e}")
def get_gmail_labels(folder_alias):
for name, label_id in LABEL_MAPPING.items():
if name == folder_alias:
return [label_id]
for name, label_id in LABEL_MAPPING.items():
if name in folder_alias:
return [label_id]
if 'sent' in folder_alias.lower():
return ['SENT']
if 'trash' in folder_alias.lower() or 'deleted' in folder_alias.lower():
return ['TRASH']
if 'draft' in folder_alias.lower():
return ['DRAFT']
return []
def import_email_to_gmail(service, raw_email, imap_uid, folder_alias, import_manager, internal_date):
try:
msg = email_parser.message_from_bytes(raw_email)
email_id = get_email_unique_id(msg, imap_uid)
if import_manager.is_email_imported(email_id):
print(f"⏩ 跳过已导入邮件 UID: {imap_uid}")
return {'status': 'skipped', 'imap_uid': imap_uid}
label_ids = get_gmail_labels(folder_alias)
message = {
'raw': base64.urlsafe_b64encode(raw_email).decode('utf-8'),
'internalDate': internal_date,
'labelIds': label_ids
}
result = service.users().messages().insert(
userId='me',
body=message
).execute()
if 'INBOX' in label_ids and 'INBOX' not in result.get('labelIds', []):
service.users().messages().modify(
userId='me',
id=result['id'],
body={'addLabelIds': ['INBOX']}
).execute()
import_manager.mark_email_imported(email_id, folder_alias, result['id'])
print(f"✅ 导入成功 | UID: {imap_uid} | 日期: {datetime.fromtimestamp(internal_date/1000)} | Gmail ID: {result['id']}")
return {
'status': 'success',
'gmail_id': result['id'],
'imap_uid': imap_uid
}
except Exception as e:
print(f"❌ 导入失败 UID: {imap_uid} | 错误: {str(e)}")
return {
'status': 'failed',
'error': str(e),
'imap_uid': imap_uid
}
def get_email_date(imap, uid):
"""获取邮件的日期"""
try:
_, data = imap.uid('fetch', uid, '(BODY.PEEK[HEADER.FIELDS (DATE)])')
if data and data[0]:
msg = email_parser.message_from_bytes(data[0][1])
date_str = msg.get('Date', '')
return parse_email_date(date_str)
except Exception as e:
print(f"⚠️ 获取邮件日期失败 UID: {uid} | 错误: {e}")
return int(datetime.now(timezone.utc).timestamp() * 1000)
def sync_folder(imap, folder_name, service, import_manager, result_data):
folder_alias = FOLDER_ALIASES.get(folder_name, folder_name)
folder_result = {
'folder': folder_alias,
'total': 0,
'success': 0,
'failed': 0,
'skipped': 0,
'emails': []
}
try:
if ' ' in folder_name:
folder_name = f'"{folder_name}"'
status, _ = imap.select(folder_name)
if status != 'OK':
raise Exception(f"无法选择文件夹: {folder_name}")
# 获取所有邮件的UID
_, data = imap.uid('search', None, 'ALL')
msg_uids = data[0].split()
folder_result['total'] = len(msg_uids)
if MAX_PER_FOLDER:
msg_uids = msg_uids[:MAX_PER_FOLDER]
# 获取每封邮件的日期并排序
print(f"⏳ 正在获取邮件日期并排序...")
emails_to_import = []
for uid in msg_uids:
uid_str = uid.decode('utf-8')
try:
# 先获取邮件日期
internal_date = get_email_date(imap, uid)
emails_to_import.append((uid_str, internal_date))
except Exception as e:
print(f"⚠️ 准备邮件失败 UID: {uid_str} | 错误: {e}")
folder_result['failed'] += 1
folder_result['emails'].append({
'uid': uid_str,
'status': 'failed',
'error': str(e)
})
# 按日期升序排序(从旧到新)
emails_to_import.sort(key=lambda x: x[1])
print(f"🔄 将按时间顺序导入 {len(emails_to_import)} 封邮件(从旧到新)")
# 按排序后的顺序导入邮件
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
futures = []
for uid_str, internal_date in emails_to_import:
try:
# 获取完整的邮件内容
_, msg_data = imap.uid('fetch', uid_str.encode(), '(RFC822)')
if not msg_data or not msg_data[0]:
print(f"⚠️ 获取邮件内容失败 UID: {uid_str}")
folder_result['failed'] += 1
folder_result['emails'].append({
'uid': uid_str,
'status': 'failed',
'error': '无法获取邮件内容'
})
continue
raw_email = msg_data[0][1]
futures.append((
uid_str,
executor.submit(
import_email_to_gmail,
service,
raw_email,
uid_str,
folder_alias,
import_manager,
internal_date
)
))
except Exception as e:
print(f"❌ 准备导入任务失败 UID: {uid_str} | 错误: {e}")
folder_result['failed'] += 1
folder_result['emails'].append({
'uid': uid_str,
'status': 'failed',
'error': str(e)
})
for uid_str, future in futures:
res = future.result()
folder_result['emails'].append({
'uid': uid_str,
'status': res['status'],
'gmail_id': res.get('gmail_id'),
'error': res.get('error')
})
if res['status'] == 'success':
folder_result['success'] += 1
elif res['status'] == 'skipped':
folder_result['skipped'] += 1
else:
folder_result['failed'] += 1
except Exception as e:
folder_result['error'] = str(e)
print(f"❌ 文件夹同步失败 {folder_name}: {e}")
result_data['folders'].append(folder_result)
return folder_result
def process_account(old_email, new_email, password):
print(f"\n🔁 开始同步 {old_email} → {new_email}")
import_manager = ImportManager()
imap = None
result_data = {
'account': {
'from': old_email,
'to': new_email
},
'start_time': datetime.now().isoformat(),
'folders': [],
'summary': {
'total': 0,
'success': 0,
'failed': 0,
'skipped': 0
}
}
try:
imap = imaplib.IMAP4_SSL(IMAP_SERVER)
imap.login(old_email, password)
creds = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE, scopes=SCOPES, subject=new_email)
creds.refresh(Request())
service = build('gmail', 'v1', http=AuthorizedHttp(creds))
ensure_labels_exist(service)
_, folders = imap.list()
valid_folders = []
for folder in folders:
try:
folder_info = folder.decode('utf-8')
if '"' in folder_info:
name = folder_info.split('"')[-2]
else:
name = folder_info.split()[-1]
if any(k.lower() in name.lower() for k in FOLDER_ALIASES.keys()):
valid_folders.append(name)
except Exception as e:
print(f"⚠️ 解析文件夹失败: {folder} | 错误: {e}")
print(f"📂 发现 {len(valid_folders)} 个需同步的文件夹: {valid_folders}")
for folder in valid_folders:
print(f"\n🔄 正在同步文件夹: {folder}")
folder_result = sync_folder(imap, folder, service, import_manager, result_data)
print(f"✅ 完成: {folder_result['success']} 成功, {folder_result['skipped']} 跳过, {folder_result['failed']} 失败")
# 更新汇总数据
result_data['summary']['total'] += folder_result['total']
result_data['summary']['success'] += folder_result['success']
result_data['summary']['failed'] += folder_result['failed']
result_data['summary']['skipped'] += folder_result['skipped']
except Exception as e:
result_data['error'] = str(e)
print(f"❌ 同步失败: {str(e)}")
finally:
if imap:
try:
imap.logout()
except:
pass
result_data['end_time'] = datetime.now().isoformat()
duration = datetime.fromisoformat(result_data['end_time']) - datetime.fromisoformat(result_data['start_time'])
result_data['duration'] = str(duration)
print(f"\n📊 同步汇总:")
print(f"总邮件数: {result_data['summary']['total']}")
print(f"成功: {result_data['summary']['success']}")
print(f"跳过: {result_data['summary']['skipped']}")
print(f"失败: {result_data['summary']['failed']}")
print(f"🏁 {old_email} 同步完成")
# 确保结果被保存
save_result(new_email, result_data)
def main():
if not os.path.exists(SERVICE_ACCOUNT_FILE):
print(f"❌ 服务账号文件 {SERVICE_ACCOUNT_FILE} 不存在")
return
with open('accounts.csv', newline='', encoding='utf-8') as csvfile:
for row in csv.DictReader(csvfile):
process_account(row['old_email'], row['new_email'], row['password'])
time.sleep(2)
if __name__ == '__main__':
main()
相关导入成功截图、以及避免重复导入的功能实现
7. 迁移结果校验与日志
建议日志格式:
{
"account": "user@example.com",
"total_migrated": 5120,
"failed": 2,
"folders": { "INBOX": 4300, "SENT": 800 },
"start_time": "2025-07-14T10:00:00",
"end_time": "2025-07-14T11:30:10"
}
8. 附录:常见问题与参考链接
- 服务账号密钥失败: 需解除组织策略限制
- Gmail 无法插入邮件: 检查是否已授权 OAuth Scope
参考链接:
THE END
二维码