-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathpst_parser.py
More file actions
184 lines (160 loc) · 8.29 KB
/
pst_parser.py
File metadata and controls
184 lines (160 loc) · 8.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pypff
import csv
from datetime import datetime, timezone
from pathlib import Path
import re
from bs4 import BeautifulSoup
def clean_html_content(html_content):
"""清理HTML内容,只保留文本"""
if not html_content:
return ""
try:
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 获取纯文本内容
text = soup.get_text(separator=' ', strip=True)
# 清理多余的空白字符
text = re.sub(r'\s+', ' ', text)
return text
except Exception as e:
print(f"清理HTML内容时出错: {str(e)}")
return html_content
# 添加归一化主题函数,用于邮件线程的归类
def normalize_subject(subject):
"""归一化邮件主题,去除常见前缀,如 're:'、'fw:' 以便识别邮件线程"""
if not subject:
return "无主题"
subject = subject.lower().strip()
# 去除 "re:", "fw:" 等前缀
subject = re.sub(r'^(re:|fw:|fwd:)\s*', '', subject)
return subject
def parse_pst(pst_path, output_csv):
"""解析PST文件并提取关键元数据"""
pst = pypff.file()
pst.open(pst_path)
# 使用列表来累积所有邮件记录,用于后续按时间序列排序和归纳
records = []
message_counts = {}
def process_folder(folder, folder_path=""):
# 获取文件夹名称,如果为None则使用空字符串
folder_name = folder.name if hasattr(folder, 'name') and folder.name else ""
current_path = f"{folder_path}/{folder_name}" if folder_path else folder_name
# 设定需要排除的文件夹关键字,不处理这些文件夹(例如日历、联系人、通讯录、任务等)
current_path_lower = current_path.lower()
exclude_keywords = ['日历', 'calendar', '联系人', '通讯录', '任务']
if any(keyword in current_path_lower for keyword in exclude_keywords):
# 跳过该文件夹及其子文件夹
return
# 判断当前文件夹类型
folder_type = "收件箱" if "收件箱" in current_path_lower else "发件箱" if "发件箱" in current_path_lower else "其他"
for message in folder.sub_messages:
try:
message_id = str(message.identifier)
if message_id in message_counts:
message_counts[message_id] += 1
print(f"发现重复邮件: ID={message_id}, 主题={message.subject}, 路径={current_path}")
continue
message_counts[message_id] = 1
# 尝试获取收件人列表
recipients = []
try:
# 如果message有收件人数属性,则循环提取
if hasattr(message, 'number_of_recipients') and message.number_of_recipients:
for i in range(message.number_of_recipients):
try:
recipient = message.get_recipient(i)
# 尝试获取收件人名称;如果没有则使用"未知收件人"
recipient_name = getattr(recipient, 'name', None)
if not recipient_name:
recipient_name = "未知收件人"
recipients.append(recipient_name)
except Exception as e:
print(f"获取单个收件人时出错: {e}")
except Exception as e:
print(f"获取收件人信息失败: {e}")
recipients_str = '; '.join(recipients) if recipients else "无收件人"
# 处理邮件
sender_name = message.sender_name or "未知"
headers = message.transport_headers or ""
from_header = [line.partition(": ")[2].strip() for line in headers.split("\r\n") if line.lower().startswith("from:")]
sender_address = from_header[0].split("<")[-1].split(">")[0] if from_header else "未知发件人"
timestamp = message.client_submit_time
if timestamp:
dt = timestamp.astimezone(timezone.utc).replace(tzinfo=None)
formatted_time = dt.strftime('%Y-%m-%d %H:%M:%S')
else:
dt = None
formatted_time = "无时间戳"
attachments = []
for attachment in message.attachments:
att_info = {
'name': getattr(attachment, 'filename', '未命名'),
'size': getattr(attachment, 'size', 0),
'type': getattr(attachment, 'content_type', '未知类型')
}
attachments.append(f"{att_info['name']} ({att_info['type']}, {att_info['size']}字节)")
# 提取并清理正文
body = message.plain_text_body
if not body and message.html_body:
# 如果纯文本不存在,清理HTML正文
body = clean_html_content(message.html_body)
if not body:
body = "无正文"
else:
# 处理编码问题
if isinstance(body, bytes):
try:
body = body.decode('utf-8')
except UnicodeDecodeError:
try:
body = body.decode('gbk')
except UnicodeDecodeError:
body = body.decode('utf-8', errors='ignore')
# 清理正文中的特殊字符和多余空白
body = re.sub(r'\r\n|\r|\n', ' ', body)
body = re.sub(r'\s+', ' ', body)
body = body.strip()
# 归一化主题以便识别邮件线程
conversation = normalize_subject(message.subject or "无主题")
# 累积邮件记录,增加"收件人"字段
record = [
f"{sender_name} <{sender_address}>",
formatted_time,
message.subject or "无主题",
conversation,
len(attachments),
'; '.join(attachments) if attachments else '无附件',
current_path,
folder_type,
recipients_str,
body
]
records.append((dt, record))
except Exception as e:
error_msg = f"邮件ID:{message.identifier} 错误类型:{type(e).__name__} 详细信息:{str(e)}"
print(error_msg)
records.append((None, ["错误记录", "", error_msg, "", "", "", current_path, "", ""]))
continue
# 处理子文件夹
for subfolder in folder.sub_folders:
process_folder(subfolder, current_path)
# 从根文件夹开始处理
process_folder(pst.root_folder)
# 按时间排序记录,缺失时间的记录放在最后
records.sort(key=lambda x: (x[0] is None, x[0]))
# 写入CSV文件
with open(output_csv, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['发件人', '时间戳', '主题', '邮件线程', '附件数量', '附件信息', '文件夹路径', '文件夹类型', '收件人', '邮件正文'])
for _, row in records:
writer.writerow(row)
pst.close()
print(f"\n解析完成,结果已保存至:{output_csv}")
if __name__ == "__main__":
input_pst = r"D:\Projects\Email_Audit\V4-\input.pst"
output_dir = Path(input_pst).parent / "output"
output_dir.mkdir(exist_ok=True)
output_csv = output_dir / "metadata.csv"
parse_pst(input_pst, output_csv)