From 0175bff81fe65e5f98d8ab81dd9c5ec098f2dfb0 Mon Sep 17 00:00:00 2001 From: xuhuafei <2508020102@qq.com> Date: Thu, 9 Apr 2026 19:01:25 +0800 Subject: [PATCH 1/2] fix issue #1160 --- .../java/io/agentscope/core/ReActAgent.java | 27 +++++- .../core/memory/StaticLongTermMemoryHook.java | 56 +++++++++--- .../memory/StaticLongTermMemoryHookTest.java | 87 +++++++++++++++++++ 3 files changed, 158 insertions(+), 12 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java b/agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java index 266c3fd8b..720e25b1b 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java +++ b/agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java @@ -1120,6 +1120,7 @@ public static class Builder { // Long-term memory configuration private LongTermMemory longTermMemory; private LongTermMemoryMode longTermMemoryMode = LongTermMemoryMode.BOTH; + private boolean longTermMemoryAsyncRecord = false; // State persistence configuration private StatePersistence statePersistence; @@ -1418,6 +1419,29 @@ public Builder longTermMemoryMode(LongTermMemoryMode mode) { return this; } + /** + * Sets whether long-term memory recording should be performed asynchronously. + * + *

When enabled, the framework will record memories to long-term storage + * in a fire-and-forget manner, without blocking the agent's main execution flow. + * This improves response latency but means memory persistence is not guaranteed + * before the agent returns its response. + * + *

When disabled (default), the framework waits for the recording operation + * to complete before returning the agent's response. This ensures memory + * persistence is finalized but may increase response latency. + * + *

Note: This setting only affects the static control mode (STATIC_CONTROL, BOTH). + * Agent-controlled recording through tools is always synchronous. + * + * @param asyncRecord Whether to record memories asynchronously + * @return This builder instance for method chaining + */ + public Builder longTermMemoryAsyncRecord(boolean asyncRecord) { + this.longTermMemoryAsyncRecord = asyncRecord; + return this; + } + /** * Sets the state persistence configuration. * @@ -1591,7 +1615,8 @@ private void configureLongTermMemory(Toolkit agentToolkit) { if (longTermMemoryMode == LongTermMemoryMode.STATIC_CONTROL || longTermMemoryMode == LongTermMemoryMode.BOTH) { StaticLongTermMemoryHook hook = - new StaticLongTermMemoryHook(longTermMemory, memory); + new StaticLongTermMemoryHook( + longTermMemory, memory, longTermMemoryAsyncRecord); hooks.add(hook); } } diff --git a/agentscope-core/src/main/java/io/agentscope/core/memory/StaticLongTermMemoryHook.java b/agentscope-core/src/main/java/io/agentscope/core/memory/StaticLongTermMemoryHook.java index b57bb5433..0c582ef30 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/memory/StaticLongTermMemoryHook.java +++ b/agentscope-core/src/main/java/io/agentscope/core/memory/StaticLongTermMemoryHook.java @@ -76,15 +76,29 @@ public class StaticLongTermMemoryHook implements Hook { private final LongTermMemory longTermMemory; private final Memory memory; + private final boolean asyncRecord; /** - * Creates a new StaticLongTermMemoryHook. + * Creates a new StaticLongTermMemoryHook with synchronous recording. * * @param longTermMemory The long-term memory instance for persistent storage * @param memory The agent's memory for accessing conversation history * @throws IllegalArgumentException if longTermMemory or memory is null */ public StaticLongTermMemoryHook(LongTermMemory longTermMemory, Memory memory) { + this(longTermMemory, memory, false); + } + + /** + * Creates a new StaticLongTermMemoryHook. + * + * @param longTermMemory The long-term memory instance for persistent storage + * @param memory The agent's memory for accessing conversation history + * @param asyncRecord Whether to record memories asynchronously (fire-and-forget) + * @throws IllegalArgumentException if longTermMemory or memory is null + */ + public StaticLongTermMemoryHook( + LongTermMemory longTermMemory, Memory memory, boolean asyncRecord) { if (longTermMemory == null) { throw new IllegalArgumentException("Long-term memory cannot be null"); } @@ -93,6 +107,7 @@ public StaticLongTermMemoryHook(LongTermMemory longTermMemory, Memory memory) { } this.longTermMemory = longTermMemory; this.memory = memory; + this.asyncRecord = asyncRecord; } @Override @@ -180,6 +195,10 @@ private Mono handlePreCall(PreCallEvent event) { * the long-term memory backend (e.g., Mem0) to extract memorable information from * the entire conversation context. * + *

When {@code asyncRecord} is enabled, the recording is performed in a + * fire-and-forget manner that does not block the agent's response. Otherwise, + * the recording completes before returning the event. + * * @param event the PostCallEvent * @return Mono containing the unmodified event */ @@ -191,16 +210,31 @@ private Mono handlePostCall(PostCallEvent event) { } // Record to long-term memory - return longTermMemory - .record(allMessages) - .thenReturn(event) - .onErrorResume( - error -> { - // Log error but don't interrupt the flow - log.warn( - "Failed to record to long-term memory: {}", error.getMessage()); - return Mono.just(event); - }); + if (asyncRecord) { + // Fire-and-forget: do not block the agent's response + longTermMemory + .record(allMessages) + .subscribe( + unused -> {}, + error -> + log.warn( + "Failed to asynchronously record to long-term memory:" + + " {}", + error.getMessage())); + return Mono.just(event); + } else { + return longTermMemory + .record(allMessages) + .thenReturn(event) + .onErrorResume( + error -> { + // Log error but don't interrupt the flow + log.warn( + "Failed to record to long-term memory: {}", + error.getMessage()); + return Mono.just(event); + }); + } } /** diff --git a/agentscope-core/src/test/java/io/agentscope/core/memory/StaticLongTermMemoryHookTest.java b/agentscope-core/src/test/java/io/agentscope/core/memory/StaticLongTermMemoryHookTest.java index 37417ee26..f4eebaa37 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/memory/StaticLongTermMemoryHookTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/memory/StaticLongTermMemoryHookTest.java @@ -38,6 +38,8 @@ import io.agentscope.core.message.TextBlock; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -251,4 +253,89 @@ void testOnEventWithPostCallEventRecordError() { StepVerifier.create(hook.onEvent(event)).expectNext(event).verifyComplete(); } + + @Test + void testOnEventWithPostCallEventAsyncRecord() throws InterruptedException { + List allMessages = new ArrayList<>(); + allMessages.add( + Msg.builder() + .role(MsgRole.USER) + .content(TextBlock.builder().text("User message").build()) + .build()); + allMessages.add( + Msg.builder() + .role(MsgRole.ASSISTANT) + .content(TextBlock.builder().text("Assistant reply").build()) + .build()); + + CountDownLatch latch = new CountDownLatch(1); + + when(mockMemory.getMessages()).thenReturn(allMessages); + when(mockLongTermMemory.record(anyList())) + .thenAnswer( + invocation -> { + latch.countDown(); + return Mono.empty(); + }); + + StaticLongTermMemoryHook asyncHook = + new StaticLongTermMemoryHook(mockLongTermMemory, mockMemory, true); + + Msg replyMsg = + Msg.builder() + .role(MsgRole.ASSISTANT) + .content(TextBlock.builder().text("Reply").build()) + .build(); + PostCallEvent event = new PostCallEvent(mockAgent, replyMsg); + + // Should return immediately without waiting for record + StepVerifier.create(asyncHook.onEvent(event)).expectNext(event).verifyComplete(); + + // Verify record was still called (async) + assertTrue(latch.await(1, TimeUnit.SECONDS), "Async record should have been called"); + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + verify(mockLongTermMemory, times(1)).record(captor.capture()); + assertEquals(2, captor.getValue().size()); + } + + @Test + void testOnEventWithPostCallEventAsyncRecordError() throws InterruptedException { + List messages = List.of(Msg.builder().role(MsgRole.USER).build()); + CountDownLatch latch = new CountDownLatch(1); + + when(mockMemory.getMessages()).thenReturn(messages); + when(mockLongTermMemory.record(anyList())) + .thenAnswer( + invocation -> { + latch.countDown(); + return Mono.error(new RuntimeException("Async record error")); + }); + + StaticLongTermMemoryHook asyncHook = + new StaticLongTermMemoryHook(mockLongTermMemory, mockMemory, true); + + Msg replyMsg = Msg.builder().role(MsgRole.ASSISTANT).build(); + PostCallEvent event = new PostCallEvent(mockAgent, replyMsg); + + // Should still return the event (error is logged, not thrown) + StepVerifier.create(asyncHook.onEvent(event)).expectNext(event).verifyComplete(); + + // Verify record was attempted + assertTrue(latch.await(1, TimeUnit.SECONDS), "Async record should have been called"); + } + + @Test + void testOnEventWithPostCallEventAsyncRecordEmptyMemory() { + when(mockMemory.getMessages()).thenReturn(new ArrayList<>()); + + StaticLongTermMemoryHook asyncHook = + new StaticLongTermMemoryHook(mockLongTermMemory, mockMemory, true); + + Msg replyMsg = Msg.builder().role(MsgRole.ASSISTANT).build(); + PostCallEvent event = new PostCallEvent(mockAgent, replyMsg); + + StepVerifier.create(asyncHook.onEvent(event)).expectNext(event).verifyComplete(); + + verify(mockLongTermMemory, never()).record(anyList()); + } } From 4b3ba4f793c86549da4f9940fec2ca1c55710b12 Mon Sep 17 00:00:00 2001 From: xuhuafei <2508020102@qq.com> Date: Thu, 9 Apr 2026 19:44:52 +0800 Subject: [PATCH 2/2] optimize test case --- .../core/memory/StaticLongTermMemoryHook.java | 21 +- .../ReActAgentLongTermMemoryConfigTest.java | 258 ++++++++++++++++++ 2 files changed, 271 insertions(+), 8 deletions(-) create mode 100644 agentscope-core/src/test/java/io/agentscope/core/agent/ReActAgentLongTermMemoryConfigTest.java diff --git a/agentscope-core/src/main/java/io/agentscope/core/memory/StaticLongTermMemoryHook.java b/agentscope-core/src/main/java/io/agentscope/core/memory/StaticLongTermMemoryHook.java index 0c582ef30..382b781fc 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/memory/StaticLongTermMemoryHook.java +++ b/agentscope-core/src/main/java/io/agentscope/core/memory/StaticLongTermMemoryHook.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; /** * Static Long-Term Memory Hook for automatic memory management. @@ -211,16 +212,20 @@ private Mono handlePostCall(PostCallEvent event) { // Record to long-term memory if (asyncRecord) { - // Fire-and-forget: do not block the agent's response + // Fire-and-forget: schedule on boundedElastic so the agent's + // response is not blocked. subscribe() is intentional here — + // the record pipeline runs independently of the event chain. longTermMemory .record(allMessages) - .subscribe( - unused -> {}, - error -> - log.warn( - "Failed to asynchronously record to long-term memory:" - + " {}", - error.getMessage())); + .subscribeOn(Schedulers.boundedElastic()) + .onErrorResume( + error -> { + log.warn( + "Failed to asynchronously record to long-term memory: {}", + error.getMessage()); + return Mono.empty(); + }) + .subscribe(); return Mono.just(event); } else { return longTermMemory diff --git a/agentscope-core/src/test/java/io/agentscope/core/agent/ReActAgentLongTermMemoryConfigTest.java b/agentscope-core/src/test/java/io/agentscope/core/agent/ReActAgentLongTermMemoryConfigTest.java new file mode 100644 index 000000000..355c18b20 --- /dev/null +++ b/agentscope-core/src/test/java/io/agentscope/core/agent/ReActAgentLongTermMemoryConfigTest.java @@ -0,0 +1,258 @@ +/* + * Copyright 2024-2026 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.agentscope.core.agent; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.agentscope.core.ReActAgent; +import io.agentscope.core.memory.InMemoryMemory; +import io.agentscope.core.memory.LongTermMemory; +import io.agentscope.core.memory.LongTermMemoryMode; +import io.agentscope.core.memory.StaticLongTermMemoryHook; +import io.agentscope.core.message.Msg; +import io.agentscope.core.message.MsgRole; +import io.agentscope.core.message.TextBlock; +import io.agentscope.core.model.ChatResponse; +import io.agentscope.core.model.GenerateOptions; +import io.agentscope.core.model.Model; +import io.agentscope.core.model.ToolSchema; +import io.agentscope.core.tool.Toolkit; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Unit tests for ReActAgent's long-term memory configuration, + * including the {@code longTermMemoryAsyncRecord} builder option + * and its integration with {@link StaticLongTermMemoryHook}. + */ +@Tag("unit") +@DisplayName("ReActAgent Long-Term Memory Configuration Tests") +class ReActAgentLongTermMemoryConfigTest { + + private Model mockModel; + + @org.junit.jupiter.api.BeforeEach + void setUp() { + mockModel = + new Model() { + @Override + public Flux stream( + List messages, List tools, GenerateOptions options) { + ChatResponse response = + ChatResponse.builder() + .content( + List.of( + TextBlock.builder() + .text("Test response") + .build())) + .build(); + return Flux.just(response); + } + + @Override + public String getModelName() { + return "mock-model"; + } + }; + } + + @Test + @DisplayName("Should build agent with longTermMemoryAsyncRecord(true)") + void testAsyncRecordTrue() { + CountingLongTermMemory countingMemory = new CountingLongTermMemory(); + + ReActAgent agent = + ReActAgent.builder() + .name("TestAgent") + .model(mockModel) + .toolkit(new Toolkit()) + .memory(new InMemoryMemory()) + .longTermMemory(countingMemory) + .longTermMemoryMode(LongTermMemoryMode.STATIC_CONTROL) + .longTermMemoryAsyncRecord(true) + .build(); + + assertNotNull(agent); + assertEquals("TestAgent", agent.getName()); + } + + @Test + @DisplayName("Should build agent with longTermMemoryAsyncRecord(false) - default") + void testAsyncRecordFalse() { + CountingLongTermMemory countingMemory = new CountingLongTermMemory(); + + ReActAgent agent = + ReActAgent.builder() + .name("TestAgent") + .model(mockModel) + .toolkit(new Toolkit()) + .memory(new InMemoryMemory()) + .longTermMemory(countingMemory) + .longTermMemoryMode(LongTermMemoryMode.STATIC_CONTROL) + .longTermMemoryAsyncRecord(false) + .build(); + + assertNotNull(agent); + } + + @Test + @DisplayName("Should work with BOTH mode and asyncRecord enabled") + void testBothModeWithAsyncRecord() { + CountingLongTermMemory countingMemory = new CountingLongTermMemory(); + + ReActAgent agent = + ReActAgent.builder() + .name("TestAgent") + .model(mockModel) + .toolkit(new Toolkit()) + .memory(new InMemoryMemory()) + .longTermMemory(countingMemory) + .longTermMemoryMode(LongTermMemoryMode.BOTH) + .longTermMemoryAsyncRecord(true) + .build(); + + assertNotNull(agent); + // In BOTH mode, memory tools should be registered + // Tool names use underscore convention from method names + boolean hasRecordTool = + agent.getToolkit().getToolNames().stream() + .anyMatch(name -> name.contains("record")); + boolean hasRetrieveTool = + agent.getToolkit().getToolNames().stream() + .anyMatch(name -> name.contains("retrieve")); + assertTrue(hasRecordTool, "record tool should be registered in BOTH mode"); + assertTrue(hasRetrieveTool, "retrieve tool should be registered in BOTH mode"); + } + + @Test + @DisplayName("Async record should complete without blocking and call onSuccess") + void testAsyncRecordDoesNotBlock() throws InterruptedException { + CountDownLatch recordLatch = new CountDownLatch(1); + AtomicBoolean onSuccessCalled = new AtomicBoolean(false); + + LongTermMemory asyncMemory = + new LongTermMemory() { + @Override + public Mono record(List msgs) { + return Mono.fromRunnable( + () -> { + onSuccessCalled.set(true); + recordLatch.countDown(); + }) + .then(); + } + + @Override + public Mono retrieve(Msg msg) { + return Mono.just(""); + } + }; + + StaticLongTermMemoryHook asyncHook = + new StaticLongTermMemoryHook(asyncMemory, new InMemoryMemory(), true); + + // Simulate PostCallEvent scenario + InMemoryMemory agentMemory = new InMemoryMemory(); + agentMemory.addMessage( + Msg.builder() + .role(MsgRole.USER) + .content(TextBlock.builder().text("Hello").build()) + .build()); + agentMemory.addMessage( + Msg.builder() + .role(MsgRole.ASSISTANT) + .content(TextBlock.builder().text("Hi there").build()) + .build()); + + StaticLongTermMemoryHook hookWithAccess = + new StaticLongTermMemoryHook(asyncMemory, agentMemory, true); + + io.agentscope.core.hook.PostCallEvent event = + new io.agentscope.core.hook.PostCallEvent( + createMockAgent(), + Msg.builder() + .role(MsgRole.ASSISTANT) + .content(TextBlock.builder().text("Reply").build()) + .build()); + + // Execute hook - should return immediately + Mono resultMono = hookWithAccess.onEvent(event); + io.agentscope.core.hook.PostCallEvent result = resultMono.block(); + + assertNotNull(result); + + // Wait for async record to complete + assertTrue( + recordLatch.await(1, TimeUnit.SECONDS), "Async record should have been scheduled"); + assertTrue( + onSuccessCalled.get(), + "The onSuccess lambda (unused -> {}) should have been invoked"); + } + + private Agent createMockAgent() { + return new AgentBase("MockAgent") { + @Override + protected Mono doCall(List msgs) { + return Mono.just(msgs.get(0)); + } + + @Override + protected Mono doObserve(Msg msg) { + return Mono.empty(); + } + + @Override + protected Mono handleInterrupt( + io.agentscope.core.interruption.InterruptContext context, Msg... originalArgs) { + return Mono.just( + Msg.builder() + .name(getName()) + .role(MsgRole.ASSISTANT) + .content(TextBlock.builder().text("Interrupted").build()) + .build()); + } + }; + } + + /** Simple LongTermMemory implementation that counts record calls. */ + private static class CountingLongTermMemory implements LongTermMemory { + private int recordCount = 0; + + @Override + public Mono record(List msgs) { + recordCount++; + return Mono.empty(); + } + + @Override + public Mono retrieve(Msg msg) { + return Mono.just(""); + } + + public int getRecordCount() { + return recordCount; + } + } +}