Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions tests/test_history_jsonl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""测试 history 命令的 jsonl 输出格式。

不依赖真实 WeChat 数据库 — 直接测试 _build_history_record 与 output_jsonl
两个新增函数的契约。
"""

import io
import json
import unittest

from wechat_cli.core.messages import _build_history_record
from wechat_cli.output.formatter import output_jsonl


def _display_name_fn(username, names):
return names.get(username, username)


class BuildHistoryRecordTest(unittest.TestCase):

def _row(self, **kw):
# (local_id, local_type, create_time, real_sender_id, content, ct)
return (
kw.get('local_id', 1),
kw.get('local_type', 1),
kw.get('create_time', 1714048200),
kw.get('real_sender_id', 100),
kw.get('content', '今晚发你 deck'),
kw.get('ct', 0),
)

def test_one_to_one_self_message(self):
ctx = {
'username': 'wxid_zhangsicheng', 'display_name': '章师诚',
'is_group': False,
}
# In 1:1, real_sender_id=100 maps to "wxid_me" (NOT chat_username) → is_self=True
id_to_username = {100: 'wxid_me'}
names = {'wxid_zhangsicheng': '章师诚', 'wxid_me': '我'}
_, rec = _build_history_record(self._row(), ctx, names, id_to_username, _display_name_fn)
self.assertEqual(rec['local_id'], 1)
self.assertEqual(rec['create_time'], 1714048200)
self.assertEqual(rec['msg_type'], '文本')
self.assertEqual(rec['local_type'], 1)
self.assertTrue(rec['is_self'])
self.assertEqual(rec['sender_id'], 'wxid_me')
self.assertEqual(rec['is_group'], False)
self.assertEqual(rec['chat_username'], 'wxid_zhangsicheng')
self.assertEqual(rec['chat_display'], '章师诚')
self.assertEqual(rec['text'], '今晚发你 deck')

def test_one_to_one_other_party_message(self):
ctx = {
'username': 'wxid_zhangsicheng', 'display_name': '章师诚',
'is_group': False,
}
# real_sender_id maps back to chat_username → is_self=False, sender is the contact
id_to_username = {100: 'wxid_zhangsicheng'}
names = {'wxid_zhangsicheng': '章师诚'}
_, rec = _build_history_record(self._row(), ctx, names, id_to_username, _display_name_fn)
self.assertFalse(rec['is_self'])
self.assertEqual(rec['sender_id'], 'wxid_zhangsicheng')
self.assertEqual(rec['sender_display'], '章师诚')

def test_group_message(self):
ctx = {
'username': '12345@chatroom', 'display_name': 'AI交流群',
'is_group': True,
}
# group: content has "wxid_alice:\ntext" prefix; real_sender_id resolves
# to wxid_alice via Name2Id (id_to_username).
row = self._row(real_sender_id=100, content='wxid_alice:\n大家好')
id_to_username = {100: 'wxid_alice'}
names = {'wxid_alice': 'Alice', '12345@chatroom': 'AI交流群'}
_, rec = _build_history_record(row, ctx, names, id_to_username, _display_name_fn)
self.assertTrue(rec['is_group'])
self.assertEqual(rec['chat_username'], '12345@chatroom')
self.assertEqual(rec['sender_id'], 'wxid_alice')
self.assertEqual(rec['sender_display'], 'Alice')
self.assertFalse(rec['is_self'])
self.assertEqual(rec['text'], '大家好')

def test_group_self_message(self):
ctx = {
'username': '12345@chatroom', 'display_name': 'AI交流群',
'is_group': True,
}
# group + self: real_sender_id is unknown to Name2Id and content has no
# "sender:\n" prefix → both sender_username and sender_from_content empty.
row = self._row(real_sender_id=999, content='我说的话')
_, rec = _build_history_record(row, ctx, {}, {}, _display_name_fn)
self.assertTrue(rec['is_self'])
self.assertEqual(rec['sender_id'], '')
self.assertEqual(rec['text'], '我说的话')

def test_required_fields_present(self):
ctx = {'username': 'wxid_x', 'display_name': 'X', 'is_group': False}
_, rec = _build_history_record(
self._row(), ctx, {}, {}, _display_name_fn,
)
for key in (
'local_id', 'create_time', 'msg_type', 'local_type', 'is_self',
'sender_id', 'sender_display', 'is_group', 'chat_username',
'chat_display', 'text',
):
self.assertIn(key, rec, f'missing field: {key}')


class OutputJsonlTest(unittest.TestCase):

def test_one_object_per_line_and_valid_json(self):
records = [
{'local_id': 1, 'text': 'hello', 'is_self': False},
{'local_id': 2, 'text': '中文消息', 'is_self': True},
{'local_id': 3, 'text': 'line\nwith\nbreaks', 'is_self': False},
]
buf = io.StringIO()
output_jsonl(records, file=buf)
lines = buf.getvalue().rstrip('\n').split('\n')
self.assertEqual(len(lines), len(records))
for line, expected in zip(lines, records):
parsed = json.loads(line)
self.assertEqual(parsed, expected)

def test_unicode_not_escaped(self):
buf = io.StringIO()
output_jsonl([{'text': '章师诚'}], file=buf)
self.assertIn('章师诚', buf.getvalue())


if __name__ == '__main__':
unittest.main()
7 changes: 6 additions & 1 deletion wechat_cli/commands/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
@click.option("--offset", default=0, help="分页偏移量")
@click.option("--start-time", default="", help="起始时间 YYYY-MM-DD [HH:MM[:SS]]")
@click.option("--end-time", default="", help="结束时间 YYYY-MM-DD [HH:MM[:SS]]")
@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式")
@click.option("--format", "fmt", default="json", type=click.Choice(["json", "jsonl", "text"]), help="输出格式(jsonl: 每行一个结构化 JSON 对象,便于流式消费)")
@click.option("--type", "msg_type", default=None, type=click.Choice(MSG_TYPE_NAMES), help="消息类型过滤")
@click.option("--media", is_flag=True, help="解析媒体文件路径(图片/文件/视频/语音)")
@click.pass_context
Expand Down Expand Up @@ -57,6 +57,7 @@ def history(ctx, chat_name, limit, offset, start_time, end_time, fmt, msg_type,
chat_ctx, names, app.display_name_fn,
start_ts=start_ts, end_ts=end_ts, limit=limit, offset=offset,
msg_type_filter=type_filter, resolve_media=media, db_dir=app.db_dir,
as_records=(fmt == 'jsonl'),
)

if fmt == 'json':
Expand All @@ -73,6 +74,10 @@ def history(ctx, chat_name, limit, offset, start_time, end_time, fmt, msg_type,
'messages': lines,
'failures': failures if failures else None,
}, 'json')
elif fmt == 'jsonl':
output(lines, 'jsonl')
for fail in failures:
click.echo(f"failure: {fail}", err=True)
else:
header = f"{chat_ctx['display_name']} 的消息记录(返回 {len(lines)} 条,offset={offset}, limit={limit})"
if chat_ctx['is_group']:
Expand Down
51 changes: 48 additions & 3 deletions wechat_cli/core/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,45 @@ def _build_history_line(row, ctx, names, id_to_username, display_name_fn, resolv
return create_time, f'[{time_str}] {text}'


def _build_history_record(row, ctx, names, id_to_username, display_name_fn, resolve_media=False, db_dir=None):
"""构造单条消息的结构化记录(用于 jsonl 输出)。

与 _build_history_line 平行 — 同样的字段解压与格式化,但返回 dict 而非格式化字符串。
"""
local_id, local_type, create_time, real_sender_id, content, ct = row
content = decompress_content(content, ct)
if content is None:
content = '(无法解压)'
sender_from_content, text = _format_message_text(
local_id, local_type, content, ctx['is_group'], ctx['username'], ctx['display_name'], names, display_name_fn,
db_dir=db_dir, create_time_ts=create_time, resolve_media=resolve_media,
)
sender_username = id_to_username.get(real_sender_id, '') or ''
sender_display = _resolve_sender_label(
real_sender_id, sender_from_content, ctx['is_group'], ctx['username'], ctx['display_name'],
names, id_to_username, display_name_fn,
)
# is_self: 在 1:1 聊天中 sender_username 不等于对方 username 即为自己;
# 在群聊中,sender_username 与 sender_from_content 都为空时通常为自己发的消息。
if ctx['is_group']:
is_self = not sender_username and not sender_from_content
else:
is_self = bool(sender_username) and sender_username != ctx['username']
return create_time, {
'local_id': local_id,
'create_time': create_time,
'msg_type': format_msg_type(local_type),
'local_type': local_type,
'is_self': is_self,
'sender_id': sender_username,
'sender_display': sender_display,
'is_group': ctx['is_group'],
'chat_username': ctx['username'],
'chat_display': ctx['display_name'],
'text': text,
}


def _build_search_entry(row, ctx, names, id_to_username, display_name_fn, resolve_media=False, db_dir=None):
local_id, local_type, create_time, real_sender_id, content, ct = row
content = decompress_content(content, ct)
Expand All @@ -552,11 +591,17 @@ def _build_search_entry(row, ctx, names, id_to_username, display_name_fn, resolv

# ---- 聊天记录查询 ----

def collect_chat_history(ctx, names, display_name_fn, start_ts=None, end_ts=None, limit=20, offset=0, msg_type_filter=None, resolve_media=False, db_dir=None):
def collect_chat_history(ctx, names, display_name_fn, start_ts=None, end_ts=None, limit=20, offset=0, msg_type_filter=None, resolve_media=False, db_dir=None, as_records=False):
"""收集聊天记录。

as_records=False(默认):返回格式化字符串列表(保持与现有调用方兼容)。
as_records=True:返回结构化 dict 列表(供 jsonl 输出等场景使用)。
"""
collected = []
failures = []
candidate_limit = _candidate_page_size(limit, offset)
batch_size = min(candidate_limit, _HISTORY_QUERY_BATCH_SIZE)
builder = _build_history_record if as_records else _build_history_line

for table_ctx in _iter_table_contexts(ctx):
try:
Expand All @@ -571,7 +616,7 @@ def collect_chat_history(ctx, names, display_name_fn, start_ts=None, end_ts=None
fetch_offset += len(rows)
for row in rows:
try:
collected.append(_build_history_line(row, table_ctx, names, id_to_username, display_name_fn, resolve_media=resolve_media, db_dir=db_dir))
collected.append(builder(row, table_ctx, names, id_to_username, display_name_fn, resolve_media=resolve_media, db_dir=db_dir))
except Exception as e:
failures.append(f"local_id={row[0]}: {e}")
if len(collected) - before >= candidate_limit:
Expand All @@ -582,7 +627,7 @@ def collect_chat_history(ctx, names, display_name_fn, start_ts=None, end_ts=None
failures.append(f"{table_ctx['db_path']}: {e}")

paged = _page_ranked_entries(collected, limit, offset)
return [line for _, line in paged], failures
return [item for _, item in paged], failures


# ---- 搜索查询 ----
Expand Down
10 changes: 10 additions & 0 deletions wechat_cli/output/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,19 @@ def output_text(text, file=None):
file.write('\n')


def output_jsonl(records, file=None):
"""逐行输出 JSON 对象(每行一个),便于流式消费。"""
file = file or sys.stdout
for rec in records:
json.dump(rec, file, ensure_ascii=False)
file.write('\n')


def output(data, fmt='json', file=None):
if fmt == 'json':
output_json(data, file)
elif fmt == 'jsonl':
output_jsonl(data, file)
else:
if isinstance(data, str):
output_text(data, file)
Expand Down