diff --git a/tests/test_history_jsonl.py b/tests/test_history_jsonl.py new file mode 100644 index 0000000..972bc5d --- /dev/null +++ b/tests/test_history_jsonl.py @@ -0,0 +1,132 @@ +"""测试 history 命令的 jsonl 输出格式。 + +不依赖真实 WeChat 数据库 — 直接测试 _build_history_record 与 output_jsonl +两个新增函数的契约。 +""" + +import io +import json +import unittest + +from wechat_cli.core.messages import _build_history_record +from wechat_cli.output.formatter import output_jsonl + + +def _display_name_fn(username, names): + return names.get(username, username) + + +class BuildHistoryRecordTest(unittest.TestCase): + + def _row(self, **kw): + # (local_id, local_type, create_time, real_sender_id, content, ct) + return ( + kw.get('local_id', 1), + kw.get('local_type', 1), + kw.get('create_time', 1714048200), + kw.get('real_sender_id', 100), + kw.get('content', '今晚发你 deck'), + kw.get('ct', 0), + ) + + def test_one_to_one_self_message(self): + ctx = { + 'username': 'wxid_zhangsicheng', 'display_name': '章师诚', + 'is_group': False, + } + # In 1:1, real_sender_id=100 maps to "wxid_me" (NOT chat_username) → is_self=True + id_to_username = {100: 'wxid_me'} + names = {'wxid_zhangsicheng': '章师诚', 'wxid_me': '我'} + _, rec = _build_history_record(self._row(), ctx, names, id_to_username, _display_name_fn) + self.assertEqual(rec['local_id'], 1) + self.assertEqual(rec['create_time'], 1714048200) + self.assertEqual(rec['msg_type'], '文本') + self.assertEqual(rec['local_type'], 1) + self.assertTrue(rec['is_self']) + self.assertEqual(rec['sender_id'], 'wxid_me') + self.assertEqual(rec['is_group'], False) + self.assertEqual(rec['chat_username'], 'wxid_zhangsicheng') + self.assertEqual(rec['chat_display'], '章师诚') + self.assertEqual(rec['text'], '今晚发你 deck') + + def test_one_to_one_other_party_message(self): + ctx = { + 'username': 'wxid_zhangsicheng', 'display_name': '章师诚', + 'is_group': False, + } + # real_sender_id maps back to chat_username → is_self=False, sender is the contact + id_to_username = {100: 'wxid_zhangsicheng'} + names = {'wxid_zhangsicheng': '章师诚'} + _, rec = _build_history_record(self._row(), ctx, names, id_to_username, _display_name_fn) + self.assertFalse(rec['is_self']) + self.assertEqual(rec['sender_id'], 'wxid_zhangsicheng') + self.assertEqual(rec['sender_display'], '章师诚') + + def test_group_message(self): + ctx = { + 'username': '12345@chatroom', 'display_name': 'AI交流群', + 'is_group': True, + } + # group: content has "wxid_alice:\ntext" prefix; real_sender_id resolves + # to wxid_alice via Name2Id (id_to_username). + row = self._row(real_sender_id=100, content='wxid_alice:\n大家好') + id_to_username = {100: 'wxid_alice'} + names = {'wxid_alice': 'Alice', '12345@chatroom': 'AI交流群'} + _, rec = _build_history_record(row, ctx, names, id_to_username, _display_name_fn) + self.assertTrue(rec['is_group']) + self.assertEqual(rec['chat_username'], '12345@chatroom') + self.assertEqual(rec['sender_id'], 'wxid_alice') + self.assertEqual(rec['sender_display'], 'Alice') + self.assertFalse(rec['is_self']) + self.assertEqual(rec['text'], '大家好') + + def test_group_self_message(self): + ctx = { + 'username': '12345@chatroom', 'display_name': 'AI交流群', + 'is_group': True, + } + # group + self: real_sender_id is unknown to Name2Id and content has no + # "sender:\n" prefix → both sender_username and sender_from_content empty. + row = self._row(real_sender_id=999, content='我说的话') + _, rec = _build_history_record(row, ctx, {}, {}, _display_name_fn) + self.assertTrue(rec['is_self']) + self.assertEqual(rec['sender_id'], '') + self.assertEqual(rec['text'], '我说的话') + + def test_required_fields_present(self): + ctx = {'username': 'wxid_x', 'display_name': 'X', 'is_group': False} + _, rec = _build_history_record( + self._row(), ctx, {}, {}, _display_name_fn, + ) + for key in ( + 'local_id', 'create_time', 'msg_type', 'local_type', 'is_self', + 'sender_id', 'sender_display', 'is_group', 'chat_username', + 'chat_display', 'text', + ): + self.assertIn(key, rec, f'missing field: {key}') + + +class OutputJsonlTest(unittest.TestCase): + + def test_one_object_per_line_and_valid_json(self): + records = [ + {'local_id': 1, 'text': 'hello', 'is_self': False}, + {'local_id': 2, 'text': '中文消息', 'is_self': True}, + {'local_id': 3, 'text': 'line\nwith\nbreaks', 'is_self': False}, + ] + buf = io.StringIO() + output_jsonl(records, file=buf) + lines = buf.getvalue().rstrip('\n').split('\n') + self.assertEqual(len(lines), len(records)) + for line, expected in zip(lines, records): + parsed = json.loads(line) + self.assertEqual(parsed, expected) + + def test_unicode_not_escaped(self): + buf = io.StringIO() + output_jsonl([{'text': '章师诚'}], file=buf) + self.assertIn('章师诚', buf.getvalue()) + + +if __name__ == '__main__': + unittest.main() diff --git a/wechat_cli/commands/history.py b/wechat_cli/commands/history.py index 3e88452..463f041 100644 --- a/wechat_cli/commands/history.py +++ b/wechat_cli/commands/history.py @@ -20,7 +20,7 @@ @click.option("--offset", default=0, help="分页偏移量") @click.option("--start-time", default="", help="起始时间 YYYY-MM-DD [HH:MM[:SS]]") @click.option("--end-time", default="", help="结束时间 YYYY-MM-DD [HH:MM[:SS]]") -@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式") +@click.option("--format", "fmt", default="json", type=click.Choice(["json", "jsonl", "text"]), help="输出格式(jsonl: 每行一个结构化 JSON 对象,便于流式消费)") @click.option("--type", "msg_type", default=None, type=click.Choice(MSG_TYPE_NAMES), help="消息类型过滤") @click.option("--media", is_flag=True, help="解析媒体文件路径(图片/文件/视频/语音)") @click.pass_context @@ -57,6 +57,7 @@ def history(ctx, chat_name, limit, offset, start_time, end_time, fmt, msg_type, chat_ctx, names, app.display_name_fn, start_ts=start_ts, end_ts=end_ts, limit=limit, offset=offset, msg_type_filter=type_filter, resolve_media=media, db_dir=app.db_dir, + as_records=(fmt == 'jsonl'), ) if fmt == 'json': @@ -73,6 +74,10 @@ def history(ctx, chat_name, limit, offset, start_time, end_time, fmt, msg_type, 'messages': lines, 'failures': failures if failures else None, }, 'json') + elif fmt == 'jsonl': + output(lines, 'jsonl') + for fail in failures: + click.echo(f"failure: {fail}", err=True) else: header = f"{chat_ctx['display_name']} 的消息记录(返回 {len(lines)} 条,offset={offset}, limit={limit})" if chat_ctx['is_group']: diff --git a/wechat_cli/core/messages.py b/wechat_cli/core/messages.py index d62ef33..7c4c16a 100644 --- a/wechat_cli/core/messages.py +++ b/wechat_cli/core/messages.py @@ -528,6 +528,45 @@ def _build_history_line(row, ctx, names, id_to_username, display_name_fn, resolv return create_time, f'[{time_str}] {text}' +def _build_history_record(row, ctx, names, id_to_username, display_name_fn, resolve_media=False, db_dir=None): + """构造单条消息的结构化记录(用于 jsonl 输出)。 + + 与 _build_history_line 平行 — 同样的字段解压与格式化,但返回 dict 而非格式化字符串。 + """ + local_id, local_type, create_time, real_sender_id, content, ct = row + content = decompress_content(content, ct) + if content is None: + content = '(无法解压)' + sender_from_content, text = _format_message_text( + local_id, local_type, content, ctx['is_group'], ctx['username'], ctx['display_name'], names, display_name_fn, + db_dir=db_dir, create_time_ts=create_time, resolve_media=resolve_media, + ) + sender_username = id_to_username.get(real_sender_id, '') or '' + sender_display = _resolve_sender_label( + real_sender_id, sender_from_content, ctx['is_group'], ctx['username'], ctx['display_name'], + names, id_to_username, display_name_fn, + ) + # is_self: 在 1:1 聊天中 sender_username 不等于对方 username 即为自己; + # 在群聊中,sender_username 与 sender_from_content 都为空时通常为自己发的消息。 + if ctx['is_group']: + is_self = not sender_username and not sender_from_content + else: + is_self = bool(sender_username) and sender_username != ctx['username'] + return create_time, { + 'local_id': local_id, + 'create_time': create_time, + 'msg_type': format_msg_type(local_type), + 'local_type': local_type, + 'is_self': is_self, + 'sender_id': sender_username, + 'sender_display': sender_display, + 'is_group': ctx['is_group'], + 'chat_username': ctx['username'], + 'chat_display': ctx['display_name'], + 'text': text, + } + + def _build_search_entry(row, ctx, names, id_to_username, display_name_fn, resolve_media=False, db_dir=None): local_id, local_type, create_time, real_sender_id, content, ct = row content = decompress_content(content, ct) @@ -552,11 +591,17 @@ def _build_search_entry(row, ctx, names, id_to_username, display_name_fn, resolv # ---- 聊天记录查询 ---- -def collect_chat_history(ctx, names, display_name_fn, start_ts=None, end_ts=None, limit=20, offset=0, msg_type_filter=None, resolve_media=False, db_dir=None): +def collect_chat_history(ctx, names, display_name_fn, start_ts=None, end_ts=None, limit=20, offset=0, msg_type_filter=None, resolve_media=False, db_dir=None, as_records=False): + """收集聊天记录。 + + as_records=False(默认):返回格式化字符串列表(保持与现有调用方兼容)。 + as_records=True:返回结构化 dict 列表(供 jsonl 输出等场景使用)。 + """ collected = [] failures = [] candidate_limit = _candidate_page_size(limit, offset) batch_size = min(candidate_limit, _HISTORY_QUERY_BATCH_SIZE) + builder = _build_history_record if as_records else _build_history_line for table_ctx in _iter_table_contexts(ctx): try: @@ -571,7 +616,7 @@ def collect_chat_history(ctx, names, display_name_fn, start_ts=None, end_ts=None fetch_offset += len(rows) for row in rows: try: - collected.append(_build_history_line(row, table_ctx, names, id_to_username, display_name_fn, resolve_media=resolve_media, db_dir=db_dir)) + collected.append(builder(row, table_ctx, names, id_to_username, display_name_fn, resolve_media=resolve_media, db_dir=db_dir)) except Exception as e: failures.append(f"local_id={row[0]}: {e}") if len(collected) - before >= candidate_limit: @@ -582,7 +627,7 @@ def collect_chat_history(ctx, names, display_name_fn, start_ts=None, end_ts=None failures.append(f"{table_ctx['db_path']}: {e}") paged = _page_ranked_entries(collected, limit, offset) - return [line for _, line in paged], failures + return [item for _, item in paged], failures # ---- 搜索查询 ---- diff --git a/wechat_cli/output/formatter.py b/wechat_cli/output/formatter.py index 0b1891b..3be0f62 100644 --- a/wechat_cli/output/formatter.py +++ b/wechat_cli/output/formatter.py @@ -17,9 +17,19 @@ def output_text(text, file=None): file.write('\n') +def output_jsonl(records, file=None): + """逐行输出 JSON 对象(每行一个),便于流式消费。""" + file = file or sys.stdout + for rec in records: + json.dump(rec, file, ensure_ascii=False) + file.write('\n') + + def output(data, fmt='json', file=None): if fmt == 'json': output_json(data, file) + elif fmt == 'jsonl': + output_jsonl(data, file) else: if isinstance(data, str): output_text(data, file)