From f97d74890281786a347f9e73928557b673efb876 Mon Sep 17 00:00:00 2001 From: Roman Kniazevych Date: Thu, 19 Mar 2026 14:25:12 +0200 Subject: [PATCH 1/9] WLNP-12474: BJS: implement the possibility to run Investment setup in standalone mode retrieve user data arrangements inside the InvestmentDataHandler --- .../stream/investment/ClientUser.java | 2 +- .../investment/InvestmentArrangement.java | 2 +- .../stream/investment/InvestmentData.java | 13 +----- .../service/InvestmentClientService.java | 14 +++--- .../service/InvestmentPortfolioService.java | 43 ++++++------------- 5 files changed, 24 insertions(+), 50 deletions(-) diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/ClientUser.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/ClientUser.java index f136d66ae..2ae2b1add 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/ClientUser.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/ClientUser.java @@ -13,7 +13,7 @@ public class ClientUser { private UUID investmentClientId; private String internalUserId; private String externalUserId; - private String legalEntityExternalId; + private String legalEntityId; } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentArrangement.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentArrangement.java index f334d1063..4c21d9966 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentArrangement.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentArrangement.java @@ -18,6 +18,6 @@ public class InvestmentArrangement { private String currency; private UUID investmentProductId; - private List legalEntityExternalIds; + private List legalEntityIds; } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java index 5f5f59c7c..7376cb161 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java @@ -1,10 +1,8 @@ package com.backbase.stream.investment; import com.backbase.investment.api.service.v1.model.GroupResult; -import com.backbase.investment.api.service.v1.model.InvestorModelPortfolio; import com.backbase.investment.api.service.v1.model.PortfolioList; import com.backbase.investment.api.service.v1.model.PortfolioProduct; -import com.backbase.investment.api.service.v1.model.ProductTypeEnum; import com.backbase.stream.investment.model.InvestmentPortfolioTradingAccount; import java.util.ArrayList; import java.util.HashMap; @@ -34,7 +32,7 @@ public class InvestmentData { public Map> getClientsByLeExternalId() { Map> clientsByLeExternalId = new HashMap<>(); clientUsers.forEach( - c -> clientsByLeExternalId.computeIfAbsent(c.getLegalEntityExternalId(), l -> new ArrayList<>()) + c -> clientsByLeExternalId.computeIfAbsent(c.getLegalEntityId(), l -> new ArrayList<>()) .add(c.getInvestmentClientId())); return clientsByLeExternalId; } @@ -52,15 +50,6 @@ public void addPortfolioProducts(PortfolioProduct portfolioProduct) { } } - public Optional findPortfolioProduct(ProductTypeEnum productType, Integer riskLevel) { - return Optional.ofNullable(portfolioProducts) - .flatMap(ps -> ps.stream() - .filter(p -> p.getProductType().equals(productType) - && Optional.ofNullable(p.getModelPortfolio()).map(InvestorModelPortfolio::getRiskLevel) - .map(risk -> risk <= riskLevel).orElse(false)) - .findAny()); - } - public List getPriceAsyncTasks() { return Optional.ofNullable(investmentAssetData).map(InvestmentAssetData::getPriceAsyncTasks).orElse(List.of()); } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentClientService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentClientService.java index 0e6bc8e07..4a0d79089 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentClientService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentClientService.java @@ -46,7 +46,7 @@ public Mono> upsertClients(List clientUsers) { .flatMap(clientUser -> { log.debug("Upserting investment client: internalUserId={}, externalUserId={}, legalEntityExternalId={}", clientUser.getInternalUserId(), clientUser.getExternalUserId(), - clientUser.getLegalEntityExternalId()); + clientUser.getLegalEntityId()); ClientCreateRequest request = new ClientCreateRequest() .internalUserId(clientUser.getInternalUserId()) @@ -54,14 +54,14 @@ public Mono> upsertClients(List clientUsers) { .putExtraDataItem("user_external_id", clientUser.getExternalUserId()) .putExtraDataItem("keycloak_username", clientUser.getExternalUserId()); - return upsertClient(request, clientUser.getLegalEntityExternalId()) + return upsertClient(request, clientUser.getLegalEntityId()) .doOnSuccess(upsertedClient -> log.debug( "Successfully upserted client: investmentClientId={}, internalUserId={}", upsertedClient.getInvestmentClientId(), upsertedClient.getInternalUserId())) .doOnError(throwable -> log.error( "Failed to upsert client: internalUserId={}, externalUserId={}, legalEntityExternalId={}", clientUser.getInternalUserId(), clientUser.getExternalUserId(), - clientUser.getLegalEntityExternalId(), throwable)); + clientUser.getLegalEntityId(), throwable)); }) .collectList(); } @@ -101,7 +101,7 @@ private Mono upsertClient(@NotNull ClientCreateRequest request, Stri "Successfully upserted investment client: investmentClientId={}, internalUserId={}, " + "externalUserId={}, legalEntityExternalId={}", clientUser.getInvestmentClientId(), clientUser.getInternalUserId(), - clientUser.getExternalUserId(), clientUser.getLegalEntityExternalId())) + clientUser.getExternalUserId(), clientUser.getLegalEntityId())) .doOnError(throwable -> log.error( "Failed to upsert investment client: internalUserId={}, userExternalId={}, legalEntityExternalId={}", internalUserId, userExternalId, legalEntityExternalId, throwable)); @@ -220,16 +220,16 @@ private Mono createNewClient(ClientCreateRequest request, String leg * @param investmentClientId the investment client UUID * @param internalUserId the internal user ID * @param externalUserId the external user ID - * @param legalEntityExternalId the legal entity external ID + * @param legalEntityId the legal entity external ID * @return constructed ClientUser instance */ private ClientUser buildClientUser(UUID investmentClientId, String internalUserId, - String externalUserId, String legalEntityExternalId) { + String externalUserId, String legalEntityId) { return ClientUser.builder() .investmentClientId(investmentClientId) .internalUserId(internalUserId) .externalUserId(externalUserId) - .legalEntityExternalId(legalEntityExternalId) + .legalEntityId(legalEntityId) .build(); } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java index 64b66f46a..0a3a50d37 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java @@ -105,7 +105,7 @@ public Mono> upsertPortfolios(List in *
  • Updates the arrangement with the product UUID
  • * * - * @param investmentArrangement the investment arrangement to associate with the product (must not be null) + * @param investmentArrangements the investment arrangement to associate with the product (must not be null) * @return Mono emitting the created or updated portfolio product * @throws NullPointerException if investmentArrangement is null */ @@ -206,10 +206,6 @@ private Collection distinctProducts(List pro private static List findModelUuid(InvestmentData investmentData, String externalId, ProductTypeEnum productType) { -// Map> modelsByArrangementExternalId = investmentData.getModelsByArrangementExternalId(); -// Optional> modelUuid = modelsByArrangementExternalId.keySet().stream() -// .filter(a -> a.endsWith(externalId)).findAny() -// .map(modelsByArrangementExternalId::get); return investmentData.getModelPortfolios().stream() .filter(p -> p.getProductTypeEnum() == productType) .toList(); @@ -230,12 +226,12 @@ private static List findModelUuid(InvestmentData investmentData, * from the arrangement to client UUIDs via the provided lookup map. * * @param investmentArrangement the investment arrangement containing portfolio details (must not be null) - * @param clientsByLeExternalId map of legal entity external ID to client UUIDs for associations + * @param clientsByLeId map of legal entity ID to client UUIDs for associations * @return Mono emitting the created or existing portfolio * @throws NullPointerException if investmentArrangement is null */ public Mono upsertInvestmentPortfolios(InvestmentArrangement investmentArrangement, - Map> clientsByLeExternalId) { + Map> clientsByLeId) { if (investmentArrangement == null) { return Mono.error(new NullPointerException("InvestmentArrangement must not be null")); } @@ -246,8 +242,8 @@ public Mono upsertInvestmentPortfolios(InvestmentArrangement inve log.info("Upserting investment portfolio: externalId={}, name={}", externalId, arrangementName); return listExistingPortfolios(externalId) - .flatMap(p -> patchPortfolio(p, investmentArrangement, clientsByLeExternalId)) - .switchIfEmpty(Mono.defer(() -> createNewPortfolio(investmentArrangement, clientsByLeExternalId))) + .flatMap(p -> patchPortfolio(p, investmentArrangement, clientsByLeId)) + .switchIfEmpty(Mono.defer(() -> createNewPortfolio(investmentArrangement, clientsByLeId))) .doOnSuccess(portfolio -> log.info( "Successfully upserted investment portfolio: externalId={}, name={}, portfolioUuid={}", externalId, arrangementName, @@ -259,10 +255,10 @@ public Mono upsertInvestmentPortfolios(InvestmentArrangement inve private Mono patchPortfolio( PortfolioList existingProduct, InvestmentArrangement investmentArrangement, - Map> clientsByLeExternalId) { + Map> clientsByLeId) { String uuid = existingProduct.getUuid().toString(); - List associatedClients = getClients(investmentArrangement, clientsByLeExternalId); + List associatedClients = getClients(investmentArrangement, clientsByLeId); PatchedPortfolioUpdateRequest patchedPortfolioUpdateRequest = new PatchedPortfolioUpdateRequest() .product(investmentArrangement.getInvestmentProductId()) @@ -341,12 +337,12 @@ private Mono listExistingPortfolios(String externalId) { * Creates a new investment portfolio with associated clients. * * @param investmentArrangement the arrangement containing portfolio details - * @param clientsByLeExternalId map to resolve client UUIDs from legal entity external IDs + * @param clientsByLeId map to resolve client UUIDs from legal entity IDs * @return Mono emitting the newly created portfolio */ private Mono createNewPortfolio(InvestmentArrangement investmentArrangement, - Map> clientsByLeExternalId) { - List associatedClients = getClients(investmentArrangement, clientsByLeExternalId); + Map> clientsByLeId) { + List associatedClients = getClients(investmentArrangement, clientsByLeId); log.info("Creating new investment portfolio: externalId={}, name={}, clientCount={}", investmentArrangement.getExternalId(), investmentArrangement.getName(), associatedClients.size()); @@ -376,30 +372,19 @@ private Mono createNewPortfolio(InvestmentArrangement investmentA * using the provided lookup map. It filters out null values and ensures distinct results. * * @param investmentArrangement the arrangement containing legal entity external IDs - * @param clientsByLeExternalId map of legal entity external ID to client UUIDs + * @param clientsByLeId map of legal entity ID to client UUIDs * @return distinct list of client UUIDs associated with the arrangement's legal entities */ private static List getClients(InvestmentArrangement investmentArrangement, - Map> clientsByLeExternalId) { - return investmentArrangement.getLegalEntityExternalIds().stream() - .map(clientsByLeExternalId::get) + Map> clientsByLeId) { + return investmentArrangement.getLegalEntityIds().stream() + .map(clientsByLeId::get) .filter(Objects::nonNull) .flatMap(Collection::stream) .distinct() .toList(); } - private Mono listExistingPortfolioProducts(PortfolioProduct portfolioProduct, - InvestmentData investmentData) { - Integer modelPortfolioRiskLower = null; - ProductTypeEnum productType = portfolioProduct.getProductType(); - if (ProductTypeEnum.SELF_TRADING != productType) { - modelPortfolioRiskLower = portfolioProduct.getModelPortfolio().getRiskLevel(); - } - return Mono.justOrEmpty(investmentData.findPortfolioProduct(productType, modelPortfolioRiskLower)) - .switchIfEmpty(listExistingPortfolioProducts(productType, modelPortfolioRiskLower)); - } - private Mono listExistingPortfolioProducts(PortfolioProduct portfolioProduct) { Integer modelPortfolioRiskLower = null; ProductTypeEnum productType = portfolioProduct.getProductType(); From d73c9bac49bc47e04c081482adcdcd61ff1c2d53 Mon Sep 17 00:00:00 2001 From: Roman Kniazevych Date: Thu, 19 Mar 2026 14:29:56 +0200 Subject: [PATCH 2/9] WLNP-12474: BJS: implement the possibility to run Investment setup in standalone mode fix --- .../backbase/stream/investment/saga/InvestmentSagaTest.java | 4 ++-- .../investment/service/InvestmentPortfolioServiceTest.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentSagaTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentSagaTest.java index 73a4ca55b..664cce01c 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentSagaTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentSagaTest.java @@ -47,7 +47,7 @@ class InvestmentSagaTest { private static final String PORTFOLIO_EXTERNAL_ID = "some-portfolio-external-id"; private static final String ACCOUNT_ID = "some-account-id"; private static final String ACCOUNT_EXTERNAL_ID = "some-account-external-id"; - private static final String LE_EXTERNAL_ID = "some-le-external-id"; + private static final String LE_INTERNAL_ID = "some-le-internal-id"; @Mock private InvestmentClientService clientService; @@ -467,7 +467,7 @@ private InvestmentTask createFullTask() { .saExternalId(SA_EXTERNAL_ID) .clientUsers(List.of(ClientUser.builder() .investmentClientId(UUID.randomUUID()) - .legalEntityExternalId(LE_EXTERNAL_ID) + .legalEntityId(LE_INTERNAL_ID) .build())) .investmentArrangements(List.of(InvestmentArrangement.builder() .externalId(ARRANGEMENT_EXTERNAL_ID) diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java index 820538460..563e1b9c6 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java @@ -1167,7 +1167,7 @@ private InvestmentArrangement buildArrangement(String externalId, String name, when(arrangement.getExternalId()).thenReturn(externalId); when(arrangement.getName()).thenReturn(name); when(arrangement.getInvestmentProductId()).thenReturn(productId); - when(arrangement.getLegalEntityExternalIds()).thenReturn(List.of(legalEntityExternalId)); + when(arrangement.getLegalEntityIds()).thenReturn(List.of(legalEntityExternalId)); when(arrangement.getCurrency()).thenReturn("EUR"); when(arrangement.getInternalId()).thenReturn(null); return arrangement; @@ -1183,7 +1183,7 @@ private InvestmentArrangement buildArrangementWithProductType(String externalId, when(arrangement.getExternalId()).thenReturn(externalId); when(arrangement.getName()).thenReturn(name); when(arrangement.getProductTypeExternalId()).thenReturn(productTypeValue); - when(arrangement.getLegalEntityExternalIds()).thenReturn(List.of()); + when(arrangement.getLegalEntityIds()).thenReturn(List.of()); when(arrangement.getCurrency()).thenReturn("EUR"); when(arrangement.getInternalId()).thenReturn(null); return arrangement; From 90d16e4e229f60ee7ab78743f7a666df88df8679 Mon Sep 17 00:00:00 2001 From: Roman Kniazevych Date: Thu, 19 Mar 2026 15:13:03 +0200 Subject: [PATCH 3/9] WLNP-12474: BJS: implement the possibility to run Investment setup in standalone mode limit the amount of concurrent calls to investment --- .../configuration/InvestmentClientConfig.java | 18 ++- .../InvestmentWebClientConfiguration.java | 103 ++++++++++++++++++ .../saga/InvestmentAssetUniverseSaga.java | 10 +- .../InvestmentAssetUniverseService.java | 90 +++++++++++++-- .../service/InvestmentClientService.java | 79 +++++++++++++- 5 files changed, 276 insertions(+), 24 deletions(-) create mode 100644 stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentWebClientConfiguration.java diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentClientConfig.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentClientConfig.java index 09e913129..9036f373e 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentClientConfig.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentClientConfig.java @@ -18,19 +18,33 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import java.text.DateFormat; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.MediaType; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.http.codec.json.Jackson2JsonDecoder; import org.springframework.http.codec.json.Jackson2JsonEncoder; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient.Builder; +import reactor.netty.http.client.HttpClient; /** * Configuration for Investment service REST client (ClientApi). + * + *

    This configuration creates the Investment API client with proper codec configuration. + * + *

    Note: Connection pooling, timeouts, and rate limiting are configured in + * {@link InvestmentWebClientConfiguration} which should be imported alongside this class. + * The WebClient connection pool prevents resource exhaustion and 503 errors by limiting: + *

      + *
    • Maximum concurrent connections to 100 (configurable)
    • + *
    • Connection acquisition timeout to 45 seconds
    • + *
    • Read/Write timeouts to 30 seconds each
    • + *
    */ @Configuration @ConditionalOnBean(InvestmentServiceConfiguration.class) @@ -48,7 +62,9 @@ public InvestmentClientConfig() { */ @Bean @ConditionalOnMissingBean - public ApiClient investmentApiClient(WebClient interServiceWebClient, ObjectMapper objectMapper, + public ApiClient investmentApiClient(WebClient interServiceWebClient, + @Qualifier("investmentHttpClient") HttpClient investmentHttpClient, + ObjectMapper objectMapper, DateFormat dateFormat) { ObjectMapper mapper = objectMapper.copy(); mapper.setSerializationInclusion(Include.NON_EMPTY); diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentWebClientConfiguration.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentWebClientConfiguration.java new file mode 100644 index 000000000..a72bf3dac --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentWebClientConfiguration.java @@ -0,0 +1,103 @@ +package com.backbase.stream.configuration; + +import io.netty.channel.ChannelOption; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import reactor.netty.http.client.HttpClient; +import reactor.netty.resources.ConnectionProvider; + +/** + * WebClient configuration for Investment service communication. + * + *

    Provides optimized connection pooling, timeouts, and rate limiting to prevent: + *

      + *
    • Resource exhaustion from too many concurrent connections
    • + *
    • 503 Service Unavailable errors due to overwhelming the service
    • + *
    • Indefinite hangs from missing timeouts
    • + *
    + * + *

    All beans are explicitly named so they are unambiguous and never accidentally + * replaced by Spring Boot auto-configuration or another generic bean of the same type. + */ +@Slf4j +@Configuration +public class InvestmentWebClientConfiguration { + + // Connection pool configuration constants. + // MAX_CONNECTIONS=20 limits the number of open TCP connections to the Investment service, + // preventing it from being overwhelmed (which causes 503 responses). + // MAX_PENDING_ACQUIRES=100 bounds the in-memory queue so that callers receive a fast + // failure rather than accumulating an unbounded backlog. + private static final int MAX_CONNECTIONS = 20; + private static final long MAX_IDLE_TIME_MINUTES = 5; + private static final int MAX_PENDING_ACQUIRES = 100; + private static final long PENDING_ACQUIRE_TIMEOUT_MILLIS = 45_000; + + // Timeout configuration constants (in seconds) + private static final int CONNECT_TIMEOUT_SECONDS = 10; + private static final int READ_TIMEOUT_SECONDS = 30; + private static final int WRITE_TIMEOUT_SECONDS = 30; + + /** + * Dedicated {@link ConnectionProvider} for the Investment service client pool. + * + *

    Using an explicit named bean (rather than {@code @ConditionalOnMissingBean ReactorResourceFactory}) + * avoids silently falling back to Spring Boot's auto-configured shared pool when one already exists + * in the application context. + * + * @return investment-specific ConnectionProvider + */ + @Bean("investmentConnectionProvider") + public ConnectionProvider investmentConnectionProvider() { + ConnectionProvider provider = ConnectionProvider.builder("investment-client-pool") + .maxConnections(MAX_CONNECTIONS) + .maxIdleTime(Duration.ofMinutes(MAX_IDLE_TIME_MINUTES)) + .maxLifeTime(Duration.ofMinutes(30)) + .pendingAcquireMaxCount(MAX_PENDING_ACQUIRES) + .pendingAcquireTimeout(Duration.ofMillis(PENDING_ACQUIRE_TIMEOUT_MILLIS)) + .evictInBackground(Duration.ofSeconds(120)) + .build(); + + log.info("Configured investment ConnectionProvider: maxConnections={}, maxIdleTime={}min," + + " pendingAcquireMaxCount={}", + MAX_CONNECTIONS, MAX_IDLE_TIME_MINUTES, MAX_PENDING_ACQUIRES); + + return provider; + } + + /** + * Creates an {@link HttpClient} backed by the investment-specific connection pool and + * with explicit connect / read / write timeouts. + * + *

    This client ensures: + *

      + *
    • Connection timeout prevents hanging on unresponsive servers
    • + *
    • Read/Write timeouts prevent indefinite hangs
    • + *
    • TCP_NODELAY enables immediate sending of small packets
    • + *
    • SO_KEEPALIVE maintains connection health for idle connections
    • + *
    + * + * @param connectionProvider the investment-specific connection provider + * @return configured HttpClient + */ + @Bean("investmentHttpClient") + public HttpClient investmentHttpClient( + @Qualifier("investmentConnectionProvider") ConnectionProvider connectionProvider) { + return HttpClient.create(connectionProvider) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_SECONDS * 1000) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.TCP_NODELAY, true) + .responseTimeout(Duration.ofSeconds(READ_TIMEOUT_SECONDS)) + .doOnConnected(connection -> connection + .addHandlerLast(new ReadTimeoutHandler(READ_TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .addHandlerLast(new WriteTimeoutHandler(WRITE_TIMEOUT_SECONDS, TimeUnit.SECONDS))); + } + +} + diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java index 06537f940..b76ae204b 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java @@ -9,7 +9,6 @@ import com.backbase.stream.investment.service.AsyncTaskService; import com.backbase.stream.investment.service.InvestmentAssetPriceService; import com.backbase.stream.investment.service.InvestmentAssetUniverseService; -import com.backbase.stream.investment.service.InvestmentClientService; import com.backbase.stream.investment.service.InvestmentCurrencyService; import com.backbase.stream.investment.service.InvestmentIntradayAssetPriceService; import com.backbase.stream.investment.service.InvestmentPortfolioService; @@ -43,7 +42,6 @@ *
  • All reactive operations include proper success and error handlers
  • * * - * @see InvestmentClientService * @see InvestmentPortfolioService * @see StreamTaskExecutor */ @@ -186,7 +184,7 @@ public Mono upsertMarkets(InvestmentAssetsTask investmentT .sessionStart(market.getSessionStart()) .sessionEnd(market.getSessionEnd()) .timeZone(market.getTimeZone()) - )) + ), 5) // Limit concurrency to prevent 503 errors .collectList() // Collect all created/retrieved markets into a list .map(markets -> { // Update the task with the created markets @@ -244,7 +242,7 @@ public Mono upsertMarketSpecialDays(InvestmentAssetsTask i .sessionStart(marketSpecialDay.getSessionStart()) .sessionEnd(marketSpecialDay.getSessionEnd()) .description(marketSpecialDay.getDescription()) - )) + ), 5) // Limit concurrency to prevent 503 errors .collectList() // Collect all created/retrieved market special days into a list .map(marketSpecialDays -> { // Update the task with the created market special days @@ -294,7 +292,7 @@ public Mono upsertAssetCategories(InvestmentAssetsTask inv } return Flux.fromIterable(investmentData.getAssetCategories()) - .flatMap(assetUniverseService::upsertAssetCategory) + .flatMap(assetUniverseService::upsertAssetCategory, 5) // Limit concurrency to prevent 503 errors .collectList() .map(assetCategories -> { investmentTask.info(INVESTMENT, OP_CREATE, RESULT_CREATED, investmentTask.getName(), @@ -337,7 +335,7 @@ public Mono upsertAssetCategoryTypes(InvestmentAssetsTask .name(assetCategoryType.getName()) .code(assetCategoryType.getCode()); return assetUniverseService.upsertAssetCategoryType(request); - }) + }, 5) // Limit concurrency to prevent 503 errors .collectList() .map(assetCategoryTypes -> { investmentTask.setAssetCategoryTypes(assetCategoryTypes); diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java index 4acaedec8..581d29180 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java @@ -11,6 +11,7 @@ import com.backbase.investment.api.service.v1.model.PaginatedAssetCategoryList; import com.backbase.stream.investment.model.AssetCategoryEntry; import com.backbase.stream.investment.service.resttemplate.InvestmentRestAssetUniverseService; +import java.time.Duration; import java.time.LocalDate; import java.util.List; import java.util.Map; @@ -26,6 +27,7 @@ import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; @Slf4j @RequiredArgsConstructor @@ -45,20 +47,21 @@ public class InvestmentAssetUniverseService { public Mono upsertMarket(MarketRequest marketRequest) { log.debug("Creating market: {}", marketRequest); return assetUniverseApi.getMarket(marketRequest.getCode()) - // If getMarket returns 404 NOT_FOUND, treat as "not found" and return Mono.empty() .onErrorResume(error -> { if (error instanceof org.springframework.web.reactive.function.client.WebClientResponseException.NotFound) { log.info("Market not found for code: {}", marketRequest.getCode()); return Mono.empty(); } - // Propagate other errors return Mono.error(error); }) - // If market exists, return it .flatMap(existingMarket -> { log.info("Market already exists: {}", existingMarket.getCode()); log.debug("Market already exists: {}", existingMarket); return assetUniverseApi.updateMarket(existingMarket.getCode(), marketRequest) + .retryWhen(Retry.backoff(3, Duration.ofMillis(100)) + .filter(this::isRetryableError) + .doBeforeRetry(signal -> log.warn("Retrying market update: code={}, attempt={}", + marketRequest.getCode(), signal.totalRetries() + 1))) .doOnSuccess(updatedMarket -> log.info("Updated market: {}", updatedMarket)) .doOnError(error -> { if (error instanceof WebClientResponseException w) { @@ -70,11 +73,14 @@ public Mono upsertMarket(MarketRequest marketRequest) { } }); }) - // If Mono is empty (market not found), create the market - .switchIfEmpty(Mono.defer(() -> assetUniverseApi.createMarket(marketRequest) + .switchIfEmpty(assetUniverseApi.createMarket(marketRequest) + .retryWhen(Retry.backoff(3, Duration.ofMillis(100)) + .filter(this::isRetryableError) + .doBeforeRetry(signal -> log.warn("Retrying market create: code={}, attempt={}", + marketRequest.getCode(), signal.totalRetries() + 1))) .doOnSuccess(createdMarket -> log.info("Created market: {}", createdMarket)) .doOnError(error -> log.error("Error creating market: {}", error.getMessage(), error)) - )); + ); } /** @@ -151,6 +157,11 @@ public Mono upsertMarketSpecialDay(MarketSpecialDayRequest mar log.info("Market special day already exists for day: {}", marketSpecialDayRequest); return assetUniverseApi.updateMarketSpecialDay(matchingSpecialDay.get().getUuid().toString(), marketSpecialDayRequest) + .retryWhen(Retry.backoff(3, Duration.ofMillis(100)) + .filter(this::isRetryableError) + .doBeforeRetry(signal -> log.warn( + "Retrying market special day update: date={}, attempt={}", + date, signal.totalRetries() + 1))) .doOnSuccess(updatedMarketSpecialDay -> log.info("Updated market special day: {}", updatedMarketSpecialDay)) .doOnError(error -> { @@ -172,6 +183,11 @@ public Mono upsertMarketSpecialDay(MarketSpecialDayRequest mar }) // If Mono is empty (market special day not found), create the market special day .switchIfEmpty(Mono.defer(() -> assetUniverseApi.createMarketSpecialDay(marketSpecialDayRequest) + .retryWhen(Retry.backoff(3, Duration.ofMillis(100)) + .filter(this::isRetryableError) + .doBeforeRetry(signal -> log.warn( + "Retrying market special day create: date={}, attempt={}", + marketSpecialDayRequest.getDate(), signal.totalRetries() + 1))) .doOnSuccess( createdMarketSpecialDay -> log.info("Created market special day: {}", createdMarketSpecialDay)) .doOnError(error -> { @@ -200,11 +216,54 @@ public Flux createAssets(List categoryIdByCode = categories.stream() .collect(Collectors.toMap(com.backbase.investment.api.service.v1.model.AssetCategory::getCode, com.backbase.investment.api.service.v1.model.AssetCategory::getUuid)); - return Flux.fromIterable(assets) - .flatMap(asset -> this.getOrCreateAsset(asset, categoryIdByCode)); + // Deduplicate assets by key to prevent concurrent creation of the same asset + Map uniqueAssets = assets.stream() + .collect(Collectors.toMap( + com.backbase.stream.investment.Asset::getKeyString, + a -> a, + (existing, replacement) -> { + log.warn("Duplicate asset key found: {}. Using first occurrence.", + existing.getKeyString()); + return existing; + } + )); + // Limit concurrency to 5 to prevent overwhelming the service and triggering 503 errors + return Flux.fromIterable(uniqueAssets.values()) + .flatMap(asset -> this.getOrCreateAsset(asset, categoryIdByCode) + .retryWhen(Retry.backoff(3, Duration.ofMillis(100)) + .filter(this::isRetryableError) + .doBeforeRetry(signal -> log.warn( + "Retrying asset upsert: key={}, attempt={}", + asset.getKeyString(), signal.totalRetries() + 1))), + 5); }); } + /** + * Determines whether an error is retryable based on HTTP status code. + * + *

    Retryable errors: + *

      + *
    • 409 Conflict – race condition during concurrent creation
    • + *
    • 503 Service Unavailable – temporary service overload or maintenance
    • + *
    + * + * @param throwable the error to evaluate + * @return {@code true} if the error should trigger a retry + */ + private boolean isRetryableError(Throwable throwable) { + if (throwable instanceof WebClientResponseException ex) { + int statusCode = ex.getStatusCode().value(); + boolean retryable = statusCode == 409 || statusCode == 503; + if (retryable) { + log.debug("Identified retryable error: status={}, reason={}", + statusCode, statusCode == 409 ? "CONFLICT" : "SERVICE_UNAVAILABLE"); + } + return retryable; + } + return false; + } + /** * Gets an existing asset category by its code, or creates it if not found. Handles empty results by creating the * asset category. @@ -285,6 +344,11 @@ public Mono upsertAssetCategoryType(AssetCategoryTypeRequest assetCategoryTypeRequest.getCode()); return assetUniverseApi.updateAssetCategoryType(matchingType.get().getUuid().toString(), assetCategoryTypeRequest) + .retryWhen(Retry.backoff(3, Duration.ofMillis(100)) + .filter(this::isRetryableError) + .doBeforeRetry(signal -> log.warn( + "Retrying asset category type update: code={}, attempt={}", + assetCategoryTypeRequest.getCode(), signal.totalRetries() + 1))) .doOnSuccess(updatedType -> log.info("Updated asset category type: {}", updatedType)) .doOnError(error -> { if (error instanceof WebClientResponseException w) { @@ -305,6 +369,11 @@ public Mono upsertAssetCategoryType(AssetCategoryTypeRequest }) .switchIfEmpty( Mono.defer(() -> assetUniverseApi.createAssetCategoryType(assetCategoryTypeRequest) + .retryWhen(Retry.backoff(3, Duration.ofMillis(100)) + .filter(this::isRetryableError) + .doBeforeRetry(signal -> log.warn( + "Retrying asset category type create: code={}, attempt={}", + assetCategoryTypeRequest.getCode(), signal.totalRetries() + 1))) .doOnSuccess(createdType -> log.info("Created asset category type: {}", createdType)) .doOnError(error -> { if (error instanceof WebClientResponseException w) { @@ -313,10 +382,9 @@ public Mono upsertAssetCategoryType(AssetCategoryTypeRequest w.getStatusCode(), w.getResponseBodyAsString()); } else { log.error("Error creating asset category type: {} : {}", - assetCategoryTypeRequest.getCode(), - error.getMessage(), error); + assetCategoryTypeRequest.getCode(), error.getMessage(), error); } })) ); } -} \ No newline at end of file +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentClientService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentClientService.java index 4a0d79089..4890af55e 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentClientService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentClientService.java @@ -8,17 +8,21 @@ import com.backbase.investment.api.service.v1.model.Status836Enum; import com.backbase.stream.investment.ClientUser; import jakarta.validation.constraints.NotNull; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; /** * Service wrapper around generated {@link ClientApi} providing guarded create / patch operations with logging, minimal @@ -41,8 +45,34 @@ public class InvestmentClientService { private final ClientApi clientApi; + /** + * Upserts a list of clients with controlled concurrency to prevent race conditions. + * + *

    Design considerations: + *

      + *
    • Deduplicates clients by internalUserId before processing
    • + *
    • Limits concurrent requests to avoid overwhelming the service and triggering 503 errors
    • + *
    • Implements exponential backoff retry with 503 and 409 conflict handling
    • + *
    • Processes clients sequentially through the same ExecutorService to maintain order and prevent race conditions
    • + *
    + * + * @param clientUsers the list of clients to upsert + * @return Mono emitting the list of successfully upserted clients + */ public Mono> upsertClients(List clientUsers) { - return Flux.fromIterable(clientUsers) + // Deduplicate by internalUserId to prevent duplicate processing + Map uniqueClients = clientUsers.stream() + .collect(Collectors.toMap(ClientUser::getInternalUserId, Function.identity(), + (existing, replacement) -> { + log.warn("Duplicate internalUserId found: {}. Using first occurrence.", + existing.getInternalUserId()); + return existing; + } + )); + + // Process with controlled concurrency (default: 5 concurrent requests) + // This prevents overwhelming the Investment Service and triggering 503 responses + return Flux.fromIterable(uniqueClients.values()) .flatMap(clientUser -> { log.debug("Upserting investment client: internalUserId={}, externalUserId={}, legalEntityExternalId={}", clientUser.getInternalUserId(), clientUser.getExternalUserId(), @@ -55,6 +85,17 @@ public Mono> upsertClients(List clientUsers) { .putExtraDataItem("keycloak_username", clientUser.getExternalUserId()); return upsertClient(request, clientUser.getLegalEntityId()) + .retryWhen(Retry.backoff(3, Duration.ofMillis(500)) + .maxBackoff(Duration.ofSeconds(5)) + .jitter(0.5) + .filter(this::isRetryableError) + .doBeforeRetry(signal -> { + int statusCode = signal.failure() instanceof WebClientResponseException ex + ? ex.getStatusCode().value() : 0; + log.warn( + "Retrying upsert for client: internalUserId={}, attempt={}, statusCode={}", + clientUser.getInternalUserId(), signal.totalRetries() + 1, statusCode); + })) .doOnSuccess(upsertedClient -> log.debug( "Successfully upserted client: investmentClientId={}, internalUserId={}", upsertedClient.getInvestmentClientId(), upsertedClient.getInternalUserId())) @@ -62,7 +103,8 @@ public Mono> upsertClients(List clientUsers) { "Failed to upsert client: internalUserId={}, externalUserId={}, legalEntityExternalId={}", clientUser.getInternalUserId(), clientUser.getExternalUserId(), clientUser.getLegalEntityId(), throwable)); - }) + + }, 5) // Max 5 concurrent requests to avoid overwhelming the service .collectList(); } @@ -217,10 +259,10 @@ private Mono createNewClient(ClientCreateRequest request, String leg /** * Builds a {@link ClientUser} instance from the provided data. * - * @param investmentClientId the investment client UUID - * @param internalUserId the internal user ID - * @param externalUserId the external user ID - * @param legalEntityId the legal entity external ID + * @param investmentClientId the investment client UUID + * @param internalUserId the internal user ID + * @param externalUserId the external user ID + * @param legalEntityId the legal entity external ID * @return constructed ClientUser instance */ private ClientUser buildClientUser(UUID investmentClientId, String internalUserId, @@ -374,4 +416,29 @@ private void logUpdateError(UUID uuid, Throwable throwable) { } } + /** + * Determines if an error is retryable based on HTTP status code. + * + *

    Retryable errors include: + *

      + *
    • 409 Conflict: Race condition during concurrent client creation/update
    • + *
    • 503 Service Unavailable: Temporary service overload or maintenance
    • + *
    + * + * @param throwable the exception to evaluate + * @return true if the error is retryable, false otherwise + */ + private boolean isRetryableError(Throwable throwable) { + if (throwable instanceof WebClientResponseException ex) { + int statusCode = ex.getStatusCode().value(); + boolean isRetryable = statusCode == 409 || statusCode == 503; + if (isRetryable) { + log.debug("Identified retryable error: status={}, reason={}", + statusCode, statusCode == 409 ? "CONFLICT" : "SERVICE_UNAVAILABLE"); + } + return isRetryable; + } + return false; + } + } From 2464e50faf9a008142ad22c1ef0d9269ace7e8fb Mon Sep 17 00:00:00 2001 From: Roman Kniazevych Date: Fri, 20 Mar 2026 10:30:25 +0200 Subject: [PATCH 4/9] WLNP-12474: BJS: implement the possibility to run Investment setup in standalone mode fix a limitation of concurrent execution --- .../InvestmentWebClientConfiguration.java | 50 +++++------- .../InvestmentWebClientProperties.java | 79 +++++++++++++++++++ .../InvestmentPortfolioAllocationService.java | 2 +- 3 files changed, 101 insertions(+), 30 deletions(-) create mode 100644 stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentWebClientProperties.java diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentWebClientConfiguration.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentWebClientConfiguration.java index a72bf3dac..4ade2ad4b 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentWebClientConfiguration.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentWebClientConfiguration.java @@ -7,6 +7,7 @@ import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import reactor.netty.http.client.HttpClient; @@ -22,28 +23,17 @@ *
  • Indefinite hangs from missing timeouts
  • * * + *

    All tunable values are externalized via {@link InvestmentWebClientProperties} and can be + * overridden through {@code application.yml} without recompiling. + * *

    All beans are explicitly named so they are unambiguous and never accidentally * replaced by Spring Boot auto-configuration or another generic bean of the same type. */ @Slf4j @Configuration +@EnableConfigurationProperties(InvestmentWebClientProperties.class) public class InvestmentWebClientConfiguration { - // Connection pool configuration constants. - // MAX_CONNECTIONS=20 limits the number of open TCP connections to the Investment service, - // preventing it from being overwhelmed (which causes 503 responses). - // MAX_PENDING_ACQUIRES=100 bounds the in-memory queue so that callers receive a fast - // failure rather than accumulating an unbounded backlog. - private static final int MAX_CONNECTIONS = 20; - private static final long MAX_IDLE_TIME_MINUTES = 5; - private static final int MAX_PENDING_ACQUIRES = 100; - private static final long PENDING_ACQUIRE_TIMEOUT_MILLIS = 45_000; - - // Timeout configuration constants (in seconds) - private static final int CONNECT_TIMEOUT_SECONDS = 10; - private static final int READ_TIMEOUT_SECONDS = 30; - private static final int WRITE_TIMEOUT_SECONDS = 30; - /** * Dedicated {@link ConnectionProvider} for the Investment service client pool. * @@ -51,22 +41,23 @@ public class InvestmentWebClientConfiguration { * avoids silently falling back to Spring Boot's auto-configured shared pool when one already exists * in the application context. * + * @param props investment HTTP-client configuration properties * @return investment-specific ConnectionProvider */ @Bean("investmentConnectionProvider") - public ConnectionProvider investmentConnectionProvider() { + public ConnectionProvider investmentConnectionProvider(InvestmentWebClientProperties props) { ConnectionProvider provider = ConnectionProvider.builder("investment-client-pool") - .maxConnections(MAX_CONNECTIONS) - .maxIdleTime(Duration.ofMinutes(MAX_IDLE_TIME_MINUTES)) - .maxLifeTime(Duration.ofMinutes(30)) - .pendingAcquireMaxCount(MAX_PENDING_ACQUIRES) - .pendingAcquireTimeout(Duration.ofMillis(PENDING_ACQUIRE_TIMEOUT_MILLIS)) - .evictInBackground(Duration.ofSeconds(120)) + .maxConnections(props.getMaxConnections()) + .maxIdleTime(Duration.ofMinutes(props.getMaxIdleTimeMinutes())) + .maxLifeTime(Duration.ofMinutes(props.getMaxLifeTimeMinutes())) + .pendingAcquireMaxCount(props.getMaxPendingAcquires()) + .pendingAcquireTimeout(Duration.ofMillis(props.getPendingAcquireTimeoutMillis())) + .evictInBackground(Duration.ofSeconds(props.getEvictInBackgroundSeconds())) .build(); log.info("Configured investment ConnectionProvider: maxConnections={}, maxIdleTime={}min," + " pendingAcquireMaxCount={}", - MAX_CONNECTIONS, MAX_IDLE_TIME_MINUTES, MAX_PENDING_ACQUIRES); + props.getMaxConnections(), props.getMaxIdleTimeMinutes(), props.getMaxPendingAcquires()); return provider; } @@ -84,20 +75,21 @@ public ConnectionProvider investmentConnectionProvider() { * * * @param connectionProvider the investment-specific connection provider + * @param props investment HTTP-client configuration properties * @return configured HttpClient */ @Bean("investmentHttpClient") public HttpClient investmentHttpClient( - @Qualifier("investmentConnectionProvider") ConnectionProvider connectionProvider) { + @Qualifier("investmentConnectionProvider") ConnectionProvider connectionProvider, + InvestmentWebClientProperties props) { return HttpClient.create(connectionProvider) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_SECONDS * 1000) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, props.getConnectTimeoutSeconds() * 1000) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) - .responseTimeout(Duration.ofSeconds(READ_TIMEOUT_SECONDS)) + .responseTimeout(Duration.ofSeconds(props.getReadTimeoutSeconds())) .doOnConnected(connection -> connection - .addHandlerLast(new ReadTimeoutHandler(READ_TIMEOUT_SECONDS, TimeUnit.SECONDS)) - .addHandlerLast(new WriteTimeoutHandler(WRITE_TIMEOUT_SECONDS, TimeUnit.SECONDS))); + .addHandlerLast(new ReadTimeoutHandler(props.getReadTimeoutSeconds(), TimeUnit.SECONDS)) + .addHandlerLast(new WriteTimeoutHandler(props.getWriteTimeoutSeconds(), TimeUnit.SECONDS))); } } - diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentWebClientProperties.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentWebClientProperties.java new file mode 100644 index 000000000..af6c23775 --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentWebClientProperties.java @@ -0,0 +1,79 @@ +package com.backbase.stream.configuration; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Configuration properties for the Investment service HTTP client connection pool and timeouts. + * + *

    All values can be overridden via {@code application.yml} / {@code application.properties} + * using the prefix {@code backbase.communication.services.investment.http-client}. + * + *

    Example: + *

    + * backbase:
    + *   communication:
    + *     services:
    + *       investment:
    + *         http-client:
    + *           max-connections: 20
    + *           max-idle-time-minutes: 5
    + *           max-pending-acquires: 100
    + *           pending-acquire-timeout-millis: 45000
    + *           connect-timeout-seconds: 10
    + *           read-timeout-seconds: 30
    + *           write-timeout-seconds: 30
    + * 
    + */ +@Data +@ConfigurationProperties(prefix = "backbase.communication.services.investment.http-client") +public class InvestmentWebClientProperties { + + /** + * Maximum number of open TCP connections to the Investment service. + * Limiting this prevents the service from being overwhelmed (which causes 503 responses). + */ + private int maxConnections = 20; + + /** + * Maximum time (in minutes) that a connection can remain idle in the pool before being evicted. + */ + private long maxIdleTimeMinutes = 5; + + /** + * Maximum lifetime (in minutes) of a pooled connection regardless of activity. + */ + private long maxLifeTimeMinutes = 30; + + /** + * Maximum number of requests that can be queued waiting for a connection. + * Bounds the in-memory queue so callers receive a fast failure rather than an unbounded backlog. + */ + private int maxPendingAcquires = 100; + + /** + * Maximum time (in milliseconds) a request will wait to acquire a connection from the pool. + */ + private long pendingAcquireTimeoutMillis = 45_000; + + /** + * Background eviction interval (in seconds) for idle/expired connections in the pool. + */ + private long evictInBackgroundSeconds = 120; + + /** + * TCP connection timeout in seconds. Prevents hanging on unresponsive servers. + */ + private int connectTimeoutSeconds = 10; + + /** + * Read timeout in seconds. Prevents indefinite hangs waiting for a response. + */ + private int readTimeoutSeconds = 30; + + /** + * Write timeout in seconds. Prevents indefinite hangs while sending a request. + */ + private int writeTimeoutSeconds = 30; +} + diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java index 7aa13305b..310556616 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java @@ -150,7 +150,7 @@ private Mono> generateAllocations( private Mono> upsertAllocations(String portfolioId, List allocations) { return Flux.fromIterable(allocations) - .flatMap(a -> customIntegrationApiService.createPortfolioAllocation(portfolioId, a, null, null, null)) + .flatMap(a -> customIntegrationApiService.createPortfolioAllocation(portfolioId, a, null, null, null), 5) .collectList().doOnSuccess( created -> log.info("Successfully upserted investment portfolio allocation: count {}", created.size())) .doOnError(throwable -> { From e650a4fa24faeb3c330b328d93462cd108da031e Mon Sep 17 00:00:00 2001 From: Roman Kniazevych Date: Fri, 20 Mar 2026 11:48:55 +0200 Subject: [PATCH 5/9] WLNP-12474: BJS: implement the possibility to run Investment setup in standalone mode add configs for different models ingest setup updates --- .../InvestmentIngestProperties.java | 106 ++++++++++++++++++ ...tmentIngestionConfigurationProperties.java | 5 +- .../InvestmentServiceConfiguration.java | 7 +- .../investment/InvestmentAssetData.java | 14 ++- .../investment/InvestmentAssetsTask.java | 8 +- .../investment/InvestmentClientTask.java | 43 ------- .../investment/InvestmentContentData.java | 10 +- .../investment/InvestmentContentTask.java | 7 +- .../stream/investment/InvestmentData.java | 16 ++- .../investment/InvestmentDataValue.java | 13 +++ .../stream/investment/InvestmentTask.java | 6 +- .../service/InvestmentPortfolioService.java | 35 +++--- .../investment/saga/InvestmentSagaTest.java | 8 -- .../InvestmentPortfolioServiceTest.java | 8 +- 14 files changed, 199 insertions(+), 87 deletions(-) create mode 100644 stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestProperties.java delete mode 100644 stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentClientTask.java create mode 100644 stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentDataValue.java diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestProperties.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestProperties.java new file mode 100644 index 000000000..e24c1bfcb --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestProperties.java @@ -0,0 +1,106 @@ +package com.backbase.stream.configuration; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Fine-grained configuration properties for {@code InvestmentPortfolioService}. + * + *

    Properties are grouped by the model they govern so operators can reason about each + * concern independently without touching unrelated settings. + * + *

    All values can be overridden via {@code application.yml} / {@code application.properties} + * using the prefix {@code backbase.bootstrap.ingestions.investment.service}. + * + *

    Ingestion-flow feature flags ({@code contentEnabled}, {@code assetUniverseEnabled}, etc.) + * live in the sibling class {@link InvestmentIngestionConfigurationProperties}. + * + *

    Example: + *

    + * backbase:
    + *   bootstrap:
    + *     ingestions:
    + *       investment:
    + *         service:
    + *           portfolio:
    + *             default-currency: USD
    + *             activation-past-months: 3
    + *           allocation:
    + *             model-portfolio-allocation-asset: "model_portfolio.allocation.asset"
    + *           deposit:
    + *             provider: real-bank
    + *             default-amount: 25000.0
    + * 
    + */ +@Data +@ConfigurationProperties(prefix = "backbase.bootstrap.ingestions.investment.service") +public class InvestmentIngestProperties { + + private PortfolioConfig portfolio = new PortfolioConfig(); + private AllocationConfig allocation = new AllocationConfig(); + private DepositConfig deposit = new DepositConfig(); + + // ------------------------------------------------------------------------- + // Portfolio + // ------------------------------------------------------------------------- + + /** + * Settings that govern how individual investment portfolios are created and activated. + */ + @Data + public static class PortfolioConfig { + + /** + * ISO 4217 currency code applied to new portfolios when none is specified in the source data. + */ + private String defaultCurrency = "EUR"; + + /** + * How many months into the past the portfolio's {@code activated} timestamp is set. + * A value of {@code 1} means the portfolio is considered to have been activated 1 month ago. + */ + private int activationPastMonths = 1; + } + + // ------------------------------------------------------------------------- + // Allocation + // ------------------------------------------------------------------------- + + /** + * Settings that govern portfolio-product allocation behaviour. + */ + @Data + public static class AllocationConfig { + + /** + * The allocation-asset expansion key sent when listing or managing portfolio products. + * Changing this value allows switching between different allocation-asset model definitions + * without a code change. + */ + private String modelPortfolioAllocationAsset = "model_portfolio.allocation.asset"; + } + + // ------------------------------------------------------------------------- + // Deposit + // ------------------------------------------------------------------------- + + /** + * Settings that govern the automatic seed deposit created for new portfolios. + */ + @Data + public static class DepositConfig { + + /** + * The payment provider identifier sent with every deposit request. + * Set this to the real provider name for non-mock environments. + */ + private String provider = "mock"; + + /** + * The monetary amount used as the initial seed deposit when no previous deposit exists + * or when the current deposited total is below this threshold. + */ + private double defaultAmount = 10_000d; + } +} + diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java index 53bb897db..8d4d3e1ed 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java @@ -7,6 +7,10 @@ /** * Configuration properties governing investment client ingestion behavior. + * + *

    Controls which high-level ingestion flows are enabled. Service-level tuning + * (portfolio currencies, deposit provider, allocation assets, etc.) lives in + * {@link InvestmentIngestProperties}. */ @Data @ConditionalOnBean(InvestmentServiceConfiguration.class) @@ -16,7 +20,6 @@ public class InvestmentIngestionConfigurationProperties { private boolean contentEnabled = true; private boolean assetUniverseEnabled = true; private boolean wealthEnabled = true; - private int portfolioActivationPastMonths = 1; } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java index dc3561f7d..de2d2cd5f 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java @@ -41,7 +41,8 @@ InvestmentClientConfig.class }) @EnableConfigurationProperties({ - InvestmentIngestionConfigurationProperties.class + InvestmentIngestionConfigurationProperties.class, + InvestmentIngestProperties.class }) @RequiredArgsConstructor @Configuration @@ -61,9 +62,9 @@ public CustomIntegrationApiService customIntegrationApiService(ApiClient apiClie @Bean public InvestmentPortfolioService investmentPortfolioService(PortfolioApi portfolioApi, InvestmentProductsApi investmentProductsApi, PaymentsApi paymentsApi, PortfolioTradingAccountsApi portfolioTradingAccountsApi, - InvestmentIngestionConfigurationProperties configurationProperties) { + InvestmentIngestProperties portfolioProperties) { return new InvestmentPortfolioService(investmentProductsApi, portfolioApi, paymentsApi, - portfolioTradingAccountsApi, configurationProperties); + portfolioTradingAccountsApi, portfolioProperties); } @Bean diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java index 0409debb3..8311955bc 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java @@ -15,11 +15,13 @@ import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; @EqualsAndHashCode @Data @Builder -public class InvestmentAssetData { +@Slf4j +public class InvestmentAssetData implements InvestmentDataValue { private List currencies; private List markets; @@ -42,4 +44,14 @@ public Map getAssetByUuid() { } + @Override + public long getTotalProcessedValues() { + log.debug( + "Calculating total processed values: currencies={}, markets={}, marketSpecialDays={}, assetCategoryTypes={}, assetCategories={}, assets={}, assetPrices={}", + getSize(currencies), getSize(markets), getSize(marketSpecialDays), getSize(assetCategoryTypes), + getSize(assetCategories), getSize(assets), getSize(assetPrices)); + return getSize(currencies) + getSize(markets) + getSize(marketSpecialDays) + getSize(assetCategoryTypes) + + getSize(assetCategories) + getSize(assets) + getSize(assetPrices); + } + } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetsTask.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetsTask.java index 2c3151676..7ba2ad9af 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetsTask.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetsTask.java @@ -11,7 +11,7 @@ @EqualsAndHashCode(callSuper = true) @Data -public class InvestmentAssetsTask extends StreamTask { +public class InvestmentAssetsTask extends StreamTask implements InvestmentDataValue { private final InvestmentAssetData data; @@ -50,4 +50,10 @@ public InvestmentAssetsTask setIntradayPriceTasks(List tasks) { data.setIntradayPriceAsyncTasks(tasks); return this; } + + @Override + public long getTotalProcessedValues() { + return data.getTotalProcessedValues(); + } + } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentClientTask.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentClientTask.java deleted file mode 100644 index 163294a98..000000000 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentClientTask.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.backbase.stream.investment; - -import com.backbase.investment.api.service.v1.model.ClientCreate; -import com.backbase.investment.api.service.v1.model.ClientCreateRequest; -import com.backbase.investment.api.service.v1.model.OASClient; -import com.backbase.investment.api.service.v1.model.OASClientUpdateRequest; -import com.backbase.investment.api.service.v1.model.PatchedOASClientUpdateRequest; -import com.backbase.stream.worker.model.StreamTask; -import java.util.UUID; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; - -/** - * Stream task representing ingestion or update of an Investment Client. - */ -@EqualsAndHashCode(callSuper = true) -@Data -@NoArgsConstructor -public class InvestmentClientTask extends StreamTask { - - public enum Operation { CREATE, PATCH, UPDATE } - - private Operation operation; - private ClientCreateRequest createRequest; - private PatchedOASClientUpdateRequest patchRequest; - private OASClientUpdateRequest updateRequest; - private UUID clientUuid; // Required for PATCH / UPDATE; populated after CREATE - - private ClientCreate createdClient; // response after CREATE - private OASClient updatedClient; // response after PATCH / UPDATE - - public InvestmentClientTask(String id, Operation operation) { - super(id); - this.operation = operation; - } - - @Override - public String getName() { - return getId(); - } -} - diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentData.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentData.java index 8a30a2581..ff1347124 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentData.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentData.java @@ -7,15 +7,23 @@ import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; @EqualsAndHashCode @Data @Builder -public class InvestmentContentData { +@Slf4j +public class InvestmentContentData implements InvestmentDataValue { private List marketNewsTags; private List marketNews; private List documentTags; private List documents; + public long getTotalProcessedValues() { + log.debug( + "Calculating total processed values: marketNewsTags={}, marketNews={}, documentTags={}, documents={}", + getSize(marketNewsTags), getSize(marketNews), getSize(documentTags), getSize(documents)); + return getSize(marketNewsTags) + getSize(marketNews) + getSize(documentTags) + getSize(documents); + } } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentTask.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentTask.java index f6356ceae..a7a4d6d04 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentTask.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentTask.java @@ -6,7 +6,7 @@ @EqualsAndHashCode(callSuper = true) @Data -public class InvestmentContentTask extends StreamTask { +public class InvestmentContentTask extends StreamTask implements InvestmentDataValue { private final InvestmentContentData data; @@ -20,4 +20,9 @@ public String getName() { return "investment-content"; } + @Override + public long getTotalProcessedValues() { + return data.getTotalProcessedValues(); + } + } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java index 7376cb161..5de517853 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java @@ -13,14 +13,14 @@ import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; @EqualsAndHashCode @Data @Builder -public class InvestmentData { +@Slf4j +public class InvestmentData implements InvestmentDataValue { - private String saName; - private String saExternalId; private List clientUsers; private List investmentArrangements; private List modelPortfolios; @@ -53,4 +53,14 @@ public void addPortfolioProducts(PortfolioProduct portfolioProduct) { public List getPriceAsyncTasks() { return Optional.ofNullable(investmentAssetData).map(InvestmentAssetData::getPriceAsyncTasks).orElse(List.of()); } + + public long getTotalProcessedValues() { + log.debug( + "Calculating total processed values: portfolios={}, portfolioProducts={}, modelPortfolios={}, clientUsers={}, investmentPortfolioTradingAccounts={}", + getSize(portfolios), getSize(portfolioProducts), getSize(modelPortfolios), getSize(clientUsers), + getSize(investmentPortfolioTradingAccounts)); + return getSize(portfolios) + getSize(portfolioProducts) + getSize(modelPortfolios) + getSize(clientUsers) + + getSize(investmentPortfolioTradingAccounts); + } + } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentDataValue.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentDataValue.java new file mode 100644 index 000000000..784cd10de --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentDataValue.java @@ -0,0 +1,13 @@ +package com.backbase.stream.investment; + +import java.util.List; + +public interface InvestmentDataValue { + + long getTotalProcessedValues(); + + default int getSize(List list) { + return list != null ? list.size() : 0; + } + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentTask.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentTask.java index c39e5648d..3267f8f90 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentTask.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentTask.java @@ -8,7 +8,7 @@ @EqualsAndHashCode(callSuper = true) @Data -public class InvestmentTask extends StreamTask { +public class InvestmentTask extends StreamTask implements InvestmentDataValue { private final InvestmentData data; @@ -30,4 +30,8 @@ public void setPortfolios(List portfolios) { data.setPortfolios(portfolios); } + public long getTotalProcessedValues() { + return data.getTotalProcessedValues(); + } + } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java index 0a3a50d37..dc9821e5f 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java @@ -21,7 +21,7 @@ import com.backbase.investment.api.service.v1.model.ProductTypeEnum; import com.backbase.investment.api.service.v1.model.Status08fEnum; import com.backbase.investment.api.service.v1.model.StatusA3dEnum; -import com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties; +import com.backbase.stream.configuration.InvestmentIngestProperties; import com.backbase.stream.investment.InvestmentArrangement; import com.backbase.stream.investment.InvestmentData; import com.backbase.stream.investment.ModelPortfolio; @@ -66,14 +66,11 @@ @RequiredArgsConstructor public class InvestmentPortfolioService { - private static final String DEFAULT_CURRENCY = "EUR"; - private static final String MODEL_PORTFOLIO_ALLOCATION_ASSET = "model_portfolio.allocation.asset"; - private final InvestmentProductsApi productsApi; private final PortfolioApi portfolioApi; private final PaymentsApi paymentsApi; private final PortfolioTradingAccountsApi portfolioTradingAccountsApi; - private final InvestmentIngestionConfigurationProperties config; + private final InvestmentIngestProperties config; public Mono> upsertPortfolios(List investmentArrangements, Map> clientsByLeExternalId) { @@ -266,7 +263,7 @@ private Mono patchPortfolio( .name(investmentArrangement.getName()) .clients(associatedClients) .status(StatusA3dEnum.ACTIVE) - .activated(OffsetDateTime.now().minusMonths(config.getPortfolioActivationPastMonths())); + .activated(OffsetDateTime.now().minusMonths(config.getPortfolio().getActivationPastMonths())); log.debug("Attempting to patch existing portfolio: uuid={}, externalId={}", uuid, investmentArrangement.getExternalId()); @@ -353,9 +350,9 @@ private Mono createNewPortfolio(InvestmentArrangement investmentA .externalId(investmentArrangement.getExternalId()) .name(investmentArrangement.getName()) .clients(associatedClients) - .currency(Optional.ofNullable(investmentArrangement.getCurrency()).orElse(DEFAULT_CURRENCY)) + .currency(Optional.ofNullable(investmentArrangement.getCurrency()).orElse(config.getPortfolio().getDefaultCurrency())) .status(StatusA3dEnum.ACTIVE) - .activated(OffsetDateTime.now().minusMonths(config.getPortfolioActivationPastMonths())); + .activated(OffsetDateTime.now().minusMonths(config.getPortfolio().getActivationPastMonths())); return portfolioApi.createPortfolio(request, null, null, null) .doOnSuccess(created -> log.info( @@ -395,7 +392,7 @@ private Mono listExistingPortfolioProducts(PortfolioProduct po } private Mono listExistingPortfolioProducts(ProductTypeEnum productType, Integer riskLevel) { - return productsApi.listPortfolioProducts(List.of(MODEL_PORTFOLIO_ALLOCATION_ASSET), null, null, + return productsApi.listPortfolioProducts(List.of(config.getAllocation().getModelPortfolioAllocationAsset()), null, null, 1, null, null, riskLevel, null, null, "-model_portfolio__risk_level", List.of(productType.getValue())) .doOnSuccess(products -> log.debug( @@ -432,10 +429,9 @@ private Mono listExistingPortfolioProducts(ProductTypeEnum pro * Updates an existing portfolio product by patching it. Falls back to the original product if the patch operation * fails. * - * @param investmentArrangement the arrangement to update with product UUID - * @param existingProduct the existing product to update - * @param portfolioProduct the template product containing desired values - * @param investmentData + * @param existingProduct the existing product to update + * @param portfolioProduct the template product containing desired values + * @param investmentData the investment data context used to register the updated product * @return Mono emitting the updated product */ private Mono updateExistingPortfolioProduct(PortfolioProduct existingProduct, @@ -453,7 +449,7 @@ private Mono updateExistingPortfolioProduct(PortfolioProduct e log.debug("Attempting to patch existing portfolio product: uuid={}, productType={}", productUuid, portfolioProduct.getProductType()); - return productsApi.patchPortfolioProduct(productUuid.toString(), List.of(MODEL_PORTFOLIO_ALLOCATION_ASSET), + return productsApi.patchPortfolioProduct(productUuid.toString(), List.of(config.getAllocation().getModelPortfolioAllocationAsset()), null, null, patch) .doOnSuccess(updated -> { log.info("Successfully patched existing investment product: uuid={}", updated.getUuid()); @@ -478,9 +474,8 @@ private Mono updateExistingPortfolioProduct(PortfolioProduct e /** * Creates a portfolio product with an optional model portfolio UUID. * - * @param investmentArrangement the arrangement to update with product UUID - * @param portfolioProduct the portfolio product template - * @param investmentData + * @param portfolioProduct the portfolio product template + * @param investmentData the investment data context used to register the created product * @return Mono emitting the newly created portfolio product */ private Mono createPortfolioProductWithModel(PortfolioProduct portfolioProduct, @@ -497,7 +492,7 @@ private Mono createPortfolioProductWithModel(PortfolioProduct .modelPortfolio(modelPortfolioUuid) .productType(portfolioProduct.getProductType()); - return productsApi.createPortfolioProduct(request, List.of(MODEL_PORTFOLIO_ALLOCATION_ASSET), null, null) + return productsApi.createPortfolioProduct(request, List.of(config.getAllocation().getModelPortfolioAllocationAsset()), null, null) .retry(2) .retryWhen(reactor.util.retry.Retry.fixedDelay(1, java.time.Duration.ofSeconds(1))) .doOnSuccess(created -> { @@ -510,7 +505,7 @@ private Mono createPortfolioProductWithModel(PortfolioProduct } public Mono upsertDeposits(PortfolioList portfolio) { - double defaultAmount = 10_000d; + double defaultAmount = config.getDeposit().getDefaultAmount(); return paymentsApi.listDeposits(null, null, null, null, null, null, portfolio.getUuid(), null, null, null) .filter(Objects::nonNull) @@ -539,7 +534,7 @@ public Mono upsertDeposits(PortfolioList portfolio) { private Mono createDeposit(PortfolioList portfolio, double defaultAmount) { return paymentsApi.createDeposit(new DepositRequest() .portfolio(portfolio.getUuid()) - .provider("mock") + .provider(config.getDeposit().getProvider()) .reason(UUID.randomUUID().toString()) .status(Status08fEnum.COMPLETED) .transactedAt(portfolio.getActivated().plusDays(1)) diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentSagaTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentSagaTest.java index 664cce01c..38280186d 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentSagaTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentSagaTest.java @@ -41,8 +41,6 @@ */ class InvestmentSagaTest { - private static final String SA_NAME = "some-sa-name"; - private static final String SA_EXTERNAL_ID = "some-sa-external-id"; private static final String ARRANGEMENT_EXTERNAL_ID = "some-arrangement-id"; private static final String PORTFOLIO_EXTERNAL_ID = "some-portfolio-external-id"; private static final String ACCOUNT_ID = "some-account-id"; @@ -436,8 +434,6 @@ void upsertDepositsAndAllocations_error_marksTaskFailed() { private InvestmentTask createMinimalTask() { return new InvestmentTask("minimal-task", InvestmentData.builder() - .saName(SA_NAME) - .saExternalId(SA_EXTERNAL_ID) .clientUsers(Collections.emptyList()) .investmentArrangements(Collections.emptyList()) .modelPortfolios(Collections.emptyList()) @@ -448,8 +444,6 @@ private InvestmentTask createMinimalTask() { private InvestmentTask createTaskWithModelPortfolios() { return new InvestmentTask("model-portfolio-task", InvestmentData.builder() - .saName(SA_NAME) - .saExternalId(SA_EXTERNAL_ID) .clientUsers(Collections.emptyList()) .investmentArrangements(Collections.emptyList()) .modelPortfolios(List.of(ModelPortfolio.builder() @@ -463,8 +457,6 @@ private InvestmentTask createTaskWithModelPortfolios() { private InvestmentTask createFullTask() { return new InvestmentTask("full-task", InvestmentData.builder() - .saName(SA_NAME) - .saExternalId(SA_EXTERNAL_ID) .clientUsers(List.of(ClientUser.builder() .investmentClientId(UUID.randomUUID()) .legalEntityId(LE_INTERNAL_ID) diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java index 563e1b9c6..66d6bec62 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java @@ -26,7 +26,7 @@ import com.backbase.investment.api.service.v1.model.PortfolioTradingAccountRequest; import com.backbase.investment.api.service.v1.model.ProductTypeEnum; import com.backbase.investment.api.service.v1.model.StatusA3dEnum; -import com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties; +import com.backbase.stream.configuration.InvestmentIngestProperties; import com.backbase.stream.investment.InvestmentArrangement; import com.backbase.stream.investment.InvestmentData; import com.backbase.stream.investment.ModelPortfolio; @@ -69,7 +69,7 @@ class InvestmentPortfolioServiceTest { private PortfolioApi portfolioApi; private PaymentsApi paymentsApi; private PortfolioTradingAccountsApi portfolioTradingAccountsApi; - private InvestmentIngestionConfigurationProperties config; + private InvestmentIngestProperties config; private InvestmentPortfolioService service; @BeforeEach @@ -78,8 +78,8 @@ void setUp() { portfolioApi = Mockito.mock(PortfolioApi.class); paymentsApi = Mockito.mock(PaymentsApi.class); portfolioTradingAccountsApi = Mockito.mock(PortfolioTradingAccountsApi.class); - config = Mockito.mock(InvestmentIngestionConfigurationProperties.class); - when(config.getPortfolioActivationPastMonths()).thenReturn(6); + config = Mockito.mock(InvestmentIngestProperties.class); + when(config.getPortfolio().getActivationPastMonths()).thenReturn(6); service = new InvestmentPortfolioService( productsApi, portfolioApi, paymentsApi, portfolioTradingAccountsApi, config); } From 33e8e49fe8af91be8a72e823fdbfb5a1593d586e Mon Sep 17 00:00:00 2001 From: Roman Kniazevych Date: Fri, 20 Mar 2026 11:49:56 +0200 Subject: [PATCH 6/9] WLNP-12474: BJS: implement the possibility to run Investment setup in standalone mode fix --- .../stream/configuration/InvestmentIngestProperties.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestProperties.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestProperties.java index e24c1bfcb..0bbb493a9 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestProperties.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestProperties.java @@ -94,7 +94,7 @@ public static class DepositConfig { * The payment provider identifier sent with every deposit request. * Set this to the real provider name for non-mock environments. */ - private String provider = "mock"; + private String provider = null; /** * The monetary amount used as the initial seed deposit when no previous deposit exists From ec51dc0bd323e51a66ac2381130deb291a9696d7 Mon Sep 17 00:00:00 2001 From: Roman Kniazevych Date: Fri, 20 Mar 2026 13:35:24 +0200 Subject: [PATCH 7/9] WLNP-12474: BJS: implement the possibility to run Investment setup in standalone mode add configs for concurrency changes --- .../InvestmentIngestProperties.java | 44 ++++++++++++++----- ...InvestmentRestServiceApiConfiguration.java | 7 +-- .../InvestmentServiceConfiguration.java | 9 ++-- .../saga/InvestmentAssetUniverseSaga.java | 14 ++++-- .../service/InvestmentClientService.java | 3 +- .../InvestmentPortfolioAllocationService.java | 22 +++++++--- .../InvestmentRestAssetUniverseService.java | 21 ++++++--- .../saga/InvestmentAssetUniverseSagaTest.java | 7 ++- ...estmentPortfolioAllocationServiceTest.java | 4 +- ...nvestmentRestAssetUniverseServiceTest.java | 5 ++- 10 files changed, 96 insertions(+), 40 deletions(-) diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestProperties.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestProperties.java index 0bbb493a9..93be02d57 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestProperties.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestProperties.java @@ -36,9 +36,12 @@ @ConfigurationProperties(prefix = "backbase.bootstrap.ingestions.investment.service") public class InvestmentIngestProperties { + public static final double DEFAULT_INIT_CASH = 10_000d; + private PortfolioConfig portfolio = new PortfolioConfig(); private AllocationConfig allocation = new AllocationConfig(); private DepositConfig deposit = new DepositConfig(); + private AssetConfig asset = new AssetConfig(); // ------------------------------------------------------------------------- // Portfolio @@ -56,8 +59,8 @@ public static class PortfolioConfig { private String defaultCurrency = "EUR"; /** - * How many months into the past the portfolio's {@code activated} timestamp is set. - * A value of {@code 1} means the portfolio is considered to have been activated 1 month ago. + * How many months into the past the portfolio's {@code activated} timestamp is set. A value of {@code 1} means + * the portfolio is considered to have been activated 1 month ago. */ private int activationPastMonths = 1; } @@ -73,11 +76,13 @@ public static class PortfolioConfig { public static class AllocationConfig { /** - * The allocation-asset expansion key sent when listing or managing portfolio products. - * Changing this value allows switching between different allocation-asset model definitions - * without a code change. + * The allocation-asset expansion key sent when listing or managing portfolio products. Changing this value + * allows switching between different allocation-asset model definitions without a code change. */ private String modelPortfolioAllocationAsset = "model_portfolio.allocation.asset"; + private int allocationConcurrency = 5; + + private double defaultAmount = DEFAULT_INIT_CASH; } // ------------------------------------------------------------------------- @@ -91,16 +96,35 @@ public static class AllocationConfig { public static class DepositConfig { /** - * The payment provider identifier sent with every deposit request. - * Set this to the real provider name for non-mock environments. + * The payment provider identifier sent with every deposit request. Set this to the real provider name for + * non-mock environments. */ private String provider = null; /** - * The monetary amount used as the initial seed deposit when no previous deposit exists - * or when the current deposited total is below this threshold. + * The monetary amount used as the initial seed deposit when no previous deposit exists or when the current + * deposited total is below this threshold. */ - private double defaultAmount = 10_000d; + private double defaultAmount = DEFAULT_INIT_CASH; } + + // ------------------------------------------------------------------------- + // Deposit + // ------------------------------------------------------------------------- + + /** + * Settings that govern the automatic seed deposit created for new portfolios. + */ + @Data + public static class AssetConfig { + + private boolean ingestImages = true; + private int marketConcurrency = 5; + private int marketSpecialDayConcurrency = 5; + private int assetCategoryConcurrency = 5; + private int assetCategoryTypeConcurrency = 5; + + } + } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentRestServiceApiConfiguration.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentRestServiceApiConfiguration.java index e4de101a5..cd4875e99 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentRestServiceApiConfiguration.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentRestServiceApiConfiguration.java @@ -90,9 +90,10 @@ public InvestmentRestDocumentContentService investmentRestContentDocumentService } @Bean - public InvestmentRestAssetUniverseService investmentRestAssetUniverseService(AssetUniverseApi assetUniverseApi, - com.backbase.investment.api.service.sync.ApiClient restInvestmentApiClient) { - return new InvestmentRestAssetUniverseService(assetUniverseApi, restInvestmentApiClient); + public InvestmentRestAssetUniverseService investmentRestAssetUniverseService( + com.backbase.investment.api.service.sync.ApiClient restInvestmentApiClient, + InvestmentIngestProperties portfolioProperties) { + return new InvestmentRestAssetUniverseService(restInvestmentApiClient, portfolioProperties); } } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java index de2d2cd5f..cbdbbdc73 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java @@ -97,9 +97,9 @@ public InvestmentIntradayAssetPriceService investmentIntradayAssetPriceService(A @Bean public InvestmentPortfolioAllocationService investmentPortfolioAllocationService(AllocationsApi allocationsApi, AssetUniverseApi assetUniverseApi, InvestmentApi investmentApi, - CustomIntegrationApiService customIntegrationApiService) { + CustomIntegrationApiService customIntegrationApiService, InvestmentIngestProperties portfolioProperties) { return new InvestmentPortfolioAllocationService(allocationsApi, assetUniverseApi, investmentApi, - customIntegrationApiService); + customIntegrationApiService, portfolioProperties); } @Bean @@ -125,10 +125,11 @@ public InvestmentAssetUniverseSaga investmentStaticDataSaga( InvestmentIntradayAssetPriceService investmentIntradayAssetPriceService, InvestmentCurrencyService investmentCurrencyService, AsyncTaskService asyncTaskService, - InvestmentIngestionConfigurationProperties coreConfigurationProperties) { + InvestmentIngestionConfigurationProperties coreConfigurationProperties, + InvestmentIngestProperties portfolioProperties) { return new InvestmentAssetUniverseSaga(investmentAssetUniverseService, investmentAssetPriceService, investmentIntradayAssetPriceService, investmentCurrencyService, asyncTaskService, - coreConfigurationProperties); + coreConfigurationProperties, portfolioProperties); } @Bean diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java index b76ae204b..a4270d75e 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java @@ -3,6 +3,7 @@ import com.backbase.investment.api.service.v1.model.AssetCategoryTypeRequest; import com.backbase.investment.api.service.v1.model.MarketRequest; import com.backbase.investment.api.service.v1.model.MarketSpecialDayRequest; +import com.backbase.stream.configuration.InvestmentIngestProperties; import com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties; import com.backbase.stream.investment.InvestmentAssetData; import com.backbase.stream.investment.InvestmentAssetsTask; @@ -63,6 +64,7 @@ public class InvestmentAssetUniverseSaga implements StreamTaskExecutor executeTask(InvestmentAssetsTask streamTask) { @@ -184,7 +186,7 @@ public Mono upsertMarkets(InvestmentAssetsTask investmentT .sessionStart(market.getSessionStart()) .sessionEnd(market.getSessionEnd()) .timeZone(market.getTimeZone()) - ), 5) // Limit concurrency to prevent 503 errors + ), ingestProperties.getAsset().getMarketConcurrency()) // Limit concurrency to prevent 503 errors .collectList() // Collect all created/retrieved markets into a list .map(markets -> { // Update the task with the created markets @@ -242,7 +244,8 @@ public Mono upsertMarketSpecialDays(InvestmentAssetsTask i .sessionStart(marketSpecialDay.getSessionStart()) .sessionEnd(marketSpecialDay.getSessionEnd()) .description(marketSpecialDay.getDescription()) - ), 5) // Limit concurrency to prevent 503 errors + ), ingestProperties.getAsset().getMarketSpecialDayConcurrency()) + // Limit concurrency to prevent 503 errors .collectList() // Collect all created/retrieved market special days into a list .map(marketSpecialDays -> { // Update the task with the created market special days @@ -292,7 +295,9 @@ public Mono upsertAssetCategories(InvestmentAssetsTask inv } return Flux.fromIterable(investmentData.getAssetCategories()) - .flatMap(assetUniverseService::upsertAssetCategory, 5) // Limit concurrency to prevent 503 errors + .flatMap(assetUniverseService::upsertAssetCategory, + ingestProperties.getAsset().getAssetCategoryConcurrency()) + // Limit concurrency to prevent 503 errors .collectList() .map(assetCategories -> { investmentTask.info(INVESTMENT, OP_CREATE, RESULT_CREATED, investmentTask.getName(), @@ -335,7 +340,8 @@ public Mono upsertAssetCategoryTypes(InvestmentAssetsTask .name(assetCategoryType.getName()) .code(assetCategoryType.getCode()); return assetUniverseService.upsertAssetCategoryType(request); - }, 5) // Limit concurrency to prevent 503 errors + }, ingestProperties.getAsset().getAssetCategoryTypeConcurrency()) + // Limit concurrency to prevent 503 errors .collectList() .map(assetCategoryTypes -> { investmentTask.setAssetCategoryTypes(assetCategoryTypes); diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentClientService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentClientService.java index 4890af55e..1c2e0b56a 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentClientService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentClientService.java @@ -104,7 +104,8 @@ public Mono> upsertClients(List clientUsers) { clientUser.getInternalUserId(), clientUser.getExternalUserId(), clientUser.getLegalEntityId(), throwable)); - }, 5) // Max 5 concurrent requests to avoid overwhelming the service + }, 5) + // Max 5 concurrent requests to avoid overwhelming the service .collectList(); } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java index 310556616..106d3326e 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java @@ -22,6 +22,7 @@ import com.backbase.investment.api.service.v1.model.PortfolioList; import com.backbase.investment.api.service.v1.model.PortfolioProduct; import com.backbase.investment.api.service.v1.model.StatusA7dEnum; +import com.backbase.stream.configuration.InvestmentIngestProperties; import com.backbase.stream.investment.Allocation; import com.backbase.stream.investment.Asset; import com.backbase.stream.investment.InvestmentAssetData; @@ -75,6 +76,7 @@ public class InvestmentPortfolioAllocationService { private final AssetUniverseApi assetUniverseApi; private final InvestmentApi investmentApi; private final CustomIntegrationApiService customIntegrationApiService; + private final InvestmentIngestProperties ingestProperties; public Mono removeAllocations(PortfolioList portfolio) { String portfolioUuid = portfolio.getUuid().toString(); @@ -114,7 +116,7 @@ public Mono> generateAllocations(PortfolioList port lastValuation.getCashActive(), lastValuation.getTradeTotal()); }).switchIfEmpty( orderPositions(portfolio.getUuid(), workDays(startDay, endDay), m, priceDayByAssetKey, - 10_000d) + ingestProperties.getAllocation().getDefaultAmount()) .flatMap(dp -> generateAllocations(priceDayByAssetKey, dp))) .flatMap(allocations -> this.upsertAllocations(portfolio.getUuid().toString(), allocations))); }) @@ -142,7 +144,8 @@ private Mono> generateAllocations( List portfolioPositions = List.copyOf(dayPositions.getSecond()); double totalTrade = calculateTrades(portfolioPositions); - double initialCash = 10_000d; + + double initialCash = ingestProperties.getAllocation().getDefaultAmount(); double cash = initialCash - totalTrade; return Mono.just(generateAllocations(priceDayByAssetKey, dayPositions, initialCash, cash, totalTrade)); } @@ -150,7 +153,9 @@ private Mono> generateAllocations( private Mono> upsertAllocations(String portfolioId, List allocations) { return Flux.fromIterable(allocations) - .flatMap(a -> customIntegrationApiService.createPortfolioAllocation(portfolioId, a, null, null, null), 5) + .flatMap(a -> customIntegrationApiService.createPortfolioAllocation(portfolioId, a, null, null, null), + ingestProperties.getAllocation().getAllocationConcurrency()) + .collectList().doOnSuccess( created -> log.info("Successfully upserted investment portfolio allocation: count {}", created.size())) .doOnError(throwable -> { @@ -213,7 +218,8 @@ private Mono, List>> or .method(Method570Enum.MARKET) .completed(startDay.atTime(LocalTime.MIDNIGHT).atOffset(ZoneOffset.UTC)) .currency(currency(p.getAsset())).shares(roundPrice(p.getShares())) - .priceAvg(roundPrice(p.getPrice())), null, null, null).doOnSuccess( + .priceAvg(roundPrice(p.getPrice())), null, null, null) + .doOnSuccess( created -> log.info("Successfully upserted investment portfolio allocation")) .doOnError(throwable -> { if (throwable instanceof WebClientResponseException ex) { @@ -241,8 +247,9 @@ private Mono>> getPriceDayByAssetKey(ModelPor ModelAsset asset = a.asset(); return Objects.requireNonNull( assetUniverseApi.listAssetClosePrices(asset.getCurrency(), startDat, endDay, null, null, null, null, - asset.getIsin(), null, asset.getMarket(), null, null).map(PaginatedOASPriceList::getResults).map( - l -> Map.of(asset.getKeyString(), l.stream() + asset.getIsin(), null, asset.getMarket(), null, null) + .map(PaginatedOASPriceList::getResults) + .map(l -> Map.of(asset.getKeyString(), l.stream() .collect(Collectors.toMap(dp -> dp.getDatetime().toLocalDate(), OASPrice::getAmount))))); }).collectList().map(maps -> { Map> dayPriceByAssetKey = new HashMap<>(); @@ -260,7 +267,8 @@ private Mono getPortfolioModel(PortfolioList portfolio, List new Allocation(new ModelAsset(a.getIsin(), a.getMarket(), a.getCurrency()), 0.2)).toList()) + .map(a -> new Allocation(new ModelAsset(a.getIsin(), a.getMarket(), a.getCurrency()), 0.2)) + .toList()) .build())); } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseService.java index b0f332c5f..ddc6e4e03 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseService.java @@ -1,12 +1,12 @@ package com.backbase.stream.investment.service.resttemplate; import com.backbase.investment.api.service.sync.ApiClient; -import com.backbase.investment.api.service.sync.v1.AssetUniverseApi; import com.backbase.investment.api.service.sync.v1.model.AssetCategory; import com.backbase.investment.api.service.sync.v1.model.AssetCategoryRequest; import com.backbase.investment.api.service.sync.v1.model.OASAssetRequestDataRequest; import com.backbase.investment.api.service.sync.v1.model.PatchedAssetCategoryRequest; import com.backbase.investment.api.service.v1.model.Asset; +import com.backbase.stream.configuration.InvestmentIngestProperties; import com.backbase.stream.investment.model.AssetCategoryEntry; import java.util.Collections; import java.util.HashMap; @@ -33,8 +33,8 @@ @RequiredArgsConstructor public class InvestmentRestAssetUniverseService { - private final AssetUniverseApi assetUniverseApi; private final ApiClient apiClient; + private final InvestmentIngestProperties ingestProperties; private final RestTemplateAssetMapper assetMapper = Mappers.getMapper(RestTemplateAssetMapper.class); public Mono createAsset(com.backbase.stream.investment.Asset asset, @@ -99,7 +99,7 @@ private com.backbase.investment.api.service.sync.v1.model.Asset createAsset(OASA if (data != null) { localVarFormParams.add("data", data); } - if (logo != null) { + if (ingestProperties.getAsset().isIngestImages() && logo != null) { localVarFormParams.add("logo", logo); } @@ -145,7 +145,7 @@ public com.backbase.investment.api.service.sync.v1.model.Asset patchAsset(String if (data != null) { localVarFormParams.add("data", data); } - if (logo != null) { + if (ingestProperties.getAsset().isIngestImages() && logo != null) { localVarFormParams.add("logo", logo); } @@ -197,7 +197,9 @@ public Mono setAssetCategoryLogo(UUID assetCategoryId, Resource logo) { final MultiValueMap localVarCookieParams = new LinkedMultiValueMap(); final MultiValueMap localVarFormParams = new LinkedMultiValueMap(); - localVarFormParams.add("image", logo); + if (ingestProperties.getAsset().isIngestImages()) { + localVarFormParams.add("image", logo); + } final String[] localVarAccepts = {"application/json"}; final List localVarAccept = apiClient.selectHeaderAccept(localVarAccepts); @@ -268,7 +270,9 @@ public Mono patchAssetCategory(UUID assetCategoryId, .ifPresent(v -> localVarFormParams.add("excerpt", v)); Optional.ofNullable(assetCategoryPatch.getDescription()) .ifPresent(v -> localVarFormParams.add("description", v)); - Optional.ofNullable(image).ifPresent(v -> localVarFormParams.add("image", v)); + if (ingestProperties.getAsset().isIngestImages()) { + Optional.ofNullable(image).ifPresent(v -> localVarFormParams.add("image", v)); + } final String[] localVarAccepts = { "application/json" @@ -331,7 +335,9 @@ public Mono createAssetCategory(AssetCategoryEntry assetCategoryE .ifPresent(v -> localVarFormParams.add("excerpt", v)); Optional.ofNullable(assetCategoryRequest.getDescription()) .ifPresent(v -> localVarFormParams.add("description", v)); - Optional.ofNullable(image).ifPresent(v -> localVarFormParams.add("image", v)); + if (ingestProperties.getAsset().isIngestImages()) { + Optional.ofNullable(image).ifPresent(v -> localVarFormParams.add("image", v)); + } final String[] localVarAccepts = { "application/json" @@ -352,6 +358,7 @@ public Mono createAssetCategory(AssetCategoryEntry assetCategoryE localVarAccept, localVarContentType, localVarAuthNames, localReturnType) .getBody()); }); + } } \ No newline at end of file diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSagaTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSagaTest.java index 9e79eb14d..52dc4fdf6 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSagaTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSagaTest.java @@ -10,6 +10,7 @@ import com.backbase.investment.api.service.v1.model.AssetTypeEnum; import com.backbase.investment.api.service.v1.model.GroupResult; +import com.backbase.stream.configuration.InvestmentIngestProperties; import com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties; import com.backbase.stream.investment.Asset; import com.backbase.stream.investment.AssetPrice; @@ -102,6 +103,9 @@ class InvestmentAssetUniverseSagaTest { @Mock private InvestmentIngestionConfigurationProperties configurationProperties; + @Mock + private InvestmentIngestProperties ingestProperties; + private InvestmentAssetUniverseSaga saga; /** @@ -119,7 +123,8 @@ void setUp() { investmentIntradayAssetPriceService, investmentCurrencyService, asyncTaskService, - configurationProperties + configurationProperties, + ingestProperties ); } diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationServiceTest.java index fc5b90e95..d1258f023 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationServiceTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationServiceTest.java @@ -27,6 +27,7 @@ import com.backbase.investment.api.service.v1.model.PortfolioList; import com.backbase.investment.api.service.v1.model.PortfolioProduct; import com.backbase.investment.api.service.v1.model.RelatedAssetSerializerWithAssetCategories; +import com.backbase.stream.configuration.InvestmentIngestProperties; import com.backbase.stream.investment.Allocation; import com.backbase.stream.investment.Asset; import com.backbase.stream.investment.InvestmentAssetData; @@ -67,6 +68,7 @@ class InvestmentPortfolioAllocationServiceTest { private InvestmentApi investmentApi; private CustomIntegrationApiService customIntegrationApiService; private InvestmentPortfolioAllocationService service; + private InvestmentIngestProperties ingestProperties; @BeforeEach void setUp() { @@ -75,7 +77,7 @@ void setUp() { investmentApi = mock(InvestmentApi.class); customIntegrationApiService = mock(CustomIntegrationApiService.class); service = new InvestmentPortfolioAllocationService( - allocationsApi, assetUniverseApi, investmentApi, customIntegrationApiService); + allocationsApi, assetUniverseApi, investmentApi, customIntegrationApiService, ingestProperties); } // ========================================================================= diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseServiceTest.java index 57301c021..d38d4578e 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseServiceTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseServiceTest.java @@ -11,6 +11,7 @@ import com.backbase.investment.api.service.sync.ApiClient; import com.backbase.investment.api.service.sync.v1.AssetUniverseApi; import com.backbase.investment.api.service.sync.v1.model.AssetCategory; +import com.backbase.stream.configuration.InvestmentIngestProperties; import com.backbase.stream.investment.Asset; import com.backbase.stream.investment.model.AssetCategoryEntry; import java.util.Map; @@ -32,7 +33,7 @@ class InvestmentRestAssetUniverseServiceTest { @Mock - private AssetUniverseApi assetUniverseApi; + private InvestmentIngestProperties ingestProperties; @Mock private ApiClient apiClient; @@ -41,7 +42,7 @@ class InvestmentRestAssetUniverseServiceTest { @BeforeEach void setUp() { - service = new InvestmentRestAssetUniverseService(assetUniverseApi, apiClient); + service = new InvestmentRestAssetUniverseService(apiClient, ingestProperties); } // ----------------------------------------------------------------------- From 080a35c7fc1dd3c83e27ad7ea8d441b551049498 Mon Sep 17 00:00:00 2001 From: Roman Kniazevych Date: Fri, 20 Mar 2026 14:03:01 +0200 Subject: [PATCH 8/9] WLNP-12474: BJS: implement the possibility to run Investment setup in standalone mode consider Arrangement available cash --- .../investment/InvestmentArrangement.java | 2 + .../stream/investment/InvestmentData.java | 4 +- .../stream/investment/InvestmentTask.java | 3 +- .../investment/model/InvestmentPortfolio.java | 25 ++++++ .../investment/saga/InvestmentSaga.java | 4 +- .../InvestmentPortfolioAllocationService.java | 13 ++-- .../service/InvestmentPortfolioService.java | 76 +++++++++++-------- 7 files changed, 86 insertions(+), 41 deletions(-) create mode 100644 stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/InvestmentPortfolio.java diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentArrangement.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentArrangement.java index 4c21d9966..a3f026ce5 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentArrangement.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentArrangement.java @@ -1,5 +1,6 @@ package com.backbase.stream.investment; +import java.math.BigDecimal; import java.util.List; import java.util.UUID; import lombok.Builder; @@ -16,6 +17,7 @@ public class InvestmentArrangement { private String externalUserId; private String productTypeExternalId; private String currency; + private BigDecimal initialCash; private UUID investmentProductId; private List legalEntityIds; diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java index 5de517853..f1df463a9 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java @@ -1,8 +1,8 @@ package com.backbase.stream.investment; import com.backbase.investment.api.service.v1.model.GroupResult; -import com.backbase.investment.api.service.v1.model.PortfolioList; import com.backbase.investment.api.service.v1.model.PortfolioProduct; +import com.backbase.stream.investment.model.InvestmentPortfolio; import com.backbase.stream.investment.model.InvestmentPortfolioTradingAccount; import java.util.ArrayList; import java.util.HashMap; @@ -26,7 +26,7 @@ public class InvestmentData implements InvestmentDataValue { private List modelPortfolios; private List portfolioProducts; private InvestmentAssetData investmentAssetData; - private List portfolios; + private List portfolios; private List investmentPortfolioTradingAccounts; public Map> getClientsByLeExternalId() { diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentTask.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentTask.java index 3267f8f90..a198784f8 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentTask.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentTask.java @@ -1,6 +1,7 @@ package com.backbase.stream.investment; import com.backbase.investment.api.service.v1.model.PortfolioList; +import com.backbase.stream.investment.model.InvestmentPortfolio; import com.backbase.stream.worker.model.StreamTask; import java.util.List; import lombok.Data; @@ -26,7 +27,7 @@ public void data(List clients) { data.setClientUsers(clients); } - public void setPortfolios(List portfolios) { + public void setPortfolios(List portfolios) { data.setPortfolios(portfolios); } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/InvestmentPortfolio.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/InvestmentPortfolio.java new file mode 100644 index 000000000..ea85ed093 --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/InvestmentPortfolio.java @@ -0,0 +1,25 @@ +package com.backbase.stream.investment.model; + +import com.backbase.investment.api.service.v1.model.PortfolioList; +import java.math.BigDecimal; +import java.util.Optional; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +@AllArgsConstructor +@Builder +@Getter +@Setter +public class InvestmentPortfolio { + + private PortfolioList portfolio; + private BigDecimal initialCash; + + public double getInitialCashOrDefault(double defaultAmount) { + return Optional.ofNullable(initialCash).map(BigDecimal::doubleValue) + .orElse(defaultAmount); + } + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java index b174bbfcd..8afed3d1a 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java @@ -4,6 +4,7 @@ import com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties; import com.backbase.stream.investment.InvestmentData; import com.backbase.stream.investment.InvestmentTask; +import com.backbase.stream.investment.model.InvestmentPortfolio; import com.backbase.stream.investment.model.InvestmentPortfolioTradingAccount; import com.backbase.stream.investment.service.AsyncTaskService; import com.backbase.stream.investment.service.InvestmentClientService; @@ -140,7 +141,8 @@ private Mono upsertPortfoliosAllocations(InvestmentTask investme .collectList() .doOnError(throwable -> { log.error("Allocation generation failed for portfolios:{} taskId={}", - data.getPortfolios().stream().map(PortfolioList::getUuid).toList(), investmentTask.getId(), + data.getPortfolios().stream().map(InvestmentPortfolio::getPortfolio).map(PortfolioList::getUuid) + .toList(), investmentTask.getId(), throwable); investmentTask.error(INVESTMENT_PORTFOLIO_TRADING_ACCOUNTS, OP_UPSERT, RESULT_FAILED, investmentTask.getName(), investmentTask.getId(), diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java index 106d3326e..b3a844288 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java @@ -28,6 +28,7 @@ import com.backbase.stream.investment.InvestmentAssetData; import com.backbase.stream.investment.ModelAsset; import com.backbase.stream.investment.ModelPortfolio; +import com.backbase.stream.investment.model.InvestmentPortfolio; import java.time.LocalDate; import java.time.LocalTime; import java.time.OffsetDateTime; @@ -86,8 +87,11 @@ public Mono removeAllocations(PortfolioList portfolio) { .flatMap(v -> Mono.empty()); } - public Mono> generateAllocations(PortfolioList portfolio, + public Mono> generateAllocations(InvestmentPortfolio investmentPortfolio, List portfolioProducts, InvestmentAssetData investmentAssetData) { + PortfolioList portfolio = investmentPortfolio.getPortfolio(); + double initAmount = investmentPortfolio.getInitialCashOrDefault( + ingestProperties.getAllocation().getDefaultAmount()); return getPortfolioModel(portfolio, portfolioProducts, investmentAssetData.getAssetByUuid()) .flatMap(m -> { LocalDate endDay = LocalDate.now(); @@ -116,8 +120,8 @@ public Mono> generateAllocations(PortfolioList port lastValuation.getCashActive(), lastValuation.getTradeTotal()); }).switchIfEmpty( orderPositions(portfolio.getUuid(), workDays(startDay, endDay), m, priceDayByAssetKey, - ingestProperties.getAllocation().getDefaultAmount()) - .flatMap(dp -> generateAllocations(priceDayByAssetKey, dp))) + initAmount) + .flatMap(dp -> generateAllocations(priceDayByAssetKey, dp, initAmount))) .flatMap(allocations -> this.upsertAllocations(portfolio.getUuid().toString(), allocations))); }) .onErrorResume(ex -> Mono.empty()); @@ -139,13 +143,12 @@ private Mono> getAllocations(String portfolioId, private Mono> generateAllocations( Map> priceDayByAssetKey, - Pair, List> dayPositions) { + Pair, List> dayPositions, double initialCash) { List portfolioPositions = List.copyOf(dayPositions.getSecond()); double totalTrade = calculateTrades(portfolioPositions); - double initialCash = ingestProperties.getAllocation().getDefaultAmount(); double cash = initialCash - totalTrade; return Mono.just(generateAllocations(priceDayByAssetKey, dayPositions, initialCash, cash, totalTrade)); } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java index dc9821e5f..1eb6bec60 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java @@ -25,7 +25,9 @@ import com.backbase.stream.investment.InvestmentArrangement; import com.backbase.stream.investment.InvestmentData; import com.backbase.stream.investment.ModelPortfolio; +import com.backbase.stream.investment.model.InvestmentPortfolio; import com.backbase.stream.investment.model.InvestmentPortfolioTradingAccount; +import java.math.BigDecimal; import java.time.OffsetDateTime; import java.util.Arrays; import java.util.Collection; @@ -72,7 +74,7 @@ public class InvestmentPortfolioService { private final PortfolioTradingAccountsApi portfolioTradingAccountsApi; private final InvestmentIngestProperties config; - public Mono> upsertPortfolios(List investmentArrangements, + public Mono> upsertPortfolios(List investmentArrangements, Map> clientsByLeExternalId) { return Flux.fromIterable(investmentArrangements) .flatMap(arrangement -> { @@ -80,9 +82,14 @@ public Mono> upsertPortfolios(List in arrangement.getExternalId(), arrangement.getName(), arrangement.getInvestmentProductId()); return upsertInvestmentPortfolios(arrangement, clientsByLeExternalId) - .doOnSuccess(portfolio -> log.debug( + .map(p -> InvestmentPortfolio + .builder() + .portfolio(p) + .initialCash(arrangement.getInitialCash()) + .build()) + .doOnSuccess(ip -> log.debug( "Successfully upserted investment portfolio: portfolioUuid={}, externalId={}, name={}", - portfolio.getUuid(), portfolio.getExternalId(), portfolio.getName())) + ip.getPortfolio().getUuid(), ip.getPortfolio().getExternalId(), ip.getPortfolio().getName())) .doOnError(throwable -> log.error( "Failed to upsert investment portfolio: arrangementExternalId={}, arrangementName={}", arrangement.getExternalId(), arrangement.getName(), throwable)); @@ -152,7 +159,8 @@ public Mono> upsertInvestmentProducts(InvestmentData inve .flatMap( existingProduct -> updateExistingPortfolioProduct(existingProduct, p, investmentData) .onErrorResume(WebClientResponseException.class, ex -> { - log.info("Using existing product data due to patch failure: uuid={}", existingProduct.getUuid()); + log.info("Using existing product data due to patch failure: uuid={}", + existingProduct.getUuid()); return Mono.just(existingProduct); }) ) @@ -223,7 +231,7 @@ private static List findModelUuid(InvestmentData investmentData, * from the arrangement to client UUIDs via the provided lookup map. * * @param investmentArrangement the investment arrangement containing portfolio details (must not be null) - * @param clientsByLeId map of legal entity ID to client UUIDs for associations + * @param clientsByLeId map of legal entity ID to client UUIDs for associations * @return Mono emitting the created or existing portfolio * @throws NullPointerException if investmentArrangement is null */ @@ -334,7 +342,7 @@ private Mono listExistingPortfolios(String externalId) { * Creates a new investment portfolio with associated clients. * * @param investmentArrangement the arrangement containing portfolio details - * @param clientsByLeId map to resolve client UUIDs from legal entity IDs + * @param clientsByLeId map to resolve client UUIDs from legal entity IDs * @return Mono emitting the newly created portfolio */ private Mono createNewPortfolio(InvestmentArrangement investmentArrangement, @@ -350,7 +358,8 @@ private Mono createNewPortfolio(InvestmentArrangement investmentA .externalId(investmentArrangement.getExternalId()) .name(investmentArrangement.getName()) .clients(associatedClients) - .currency(Optional.ofNullable(investmentArrangement.getCurrency()).orElse(config.getPortfolio().getDefaultCurrency())) + .currency(Optional.ofNullable(investmentArrangement.getCurrency()) + .orElse(config.getPortfolio().getDefaultCurrency())) .status(StatusA3dEnum.ACTIVE) .activated(OffsetDateTime.now().minusMonths(config.getPortfolio().getActivationPastMonths())); @@ -369,7 +378,7 @@ private Mono createNewPortfolio(InvestmentArrangement investmentA * using the provided lookup map. It filters out null values and ensures distinct results. * * @param investmentArrangement the arrangement containing legal entity external IDs - * @param clientsByLeId map of legal entity ID to client UUIDs + * @param clientsByLeId map of legal entity ID to client UUIDs * @return distinct list of client UUIDs associated with the arrangement's legal entities */ private static List getClients(InvestmentArrangement investmentArrangement, @@ -392,7 +401,8 @@ private Mono listExistingPortfolioProducts(PortfolioProduct po } private Mono listExistingPortfolioProducts(ProductTypeEnum productType, Integer riskLevel) { - return productsApi.listPortfolioProducts(List.of(config.getAllocation().getModelPortfolioAllocationAsset()), null, null, + return productsApi.listPortfolioProducts(List.of(config.getAllocation().getModelPortfolioAllocationAsset()), + null, null, 1, null, null, riskLevel, null, null, "-model_portfolio__risk_level", List.of(productType.getValue())) .doOnSuccess(products -> log.debug( @@ -449,7 +459,8 @@ private Mono updateExistingPortfolioProduct(PortfolioProduct e log.debug("Attempting to patch existing portfolio product: uuid={}, productType={}", productUuid, portfolioProduct.getProductType()); - return productsApi.patchPortfolioProduct(productUuid.toString(), List.of(config.getAllocation().getModelPortfolioAllocationAsset()), + return productsApi.patchPortfolioProduct(productUuid.toString(), + List.of(config.getAllocation().getModelPortfolioAllocationAsset()), null, null, patch) .doOnSuccess(updated -> { log.info("Successfully patched existing investment product: uuid={}", updated.getUuid()); @@ -492,7 +503,8 @@ private Mono createPortfolioProductWithModel(PortfolioProduct .modelPortfolio(modelPortfolioUuid) .productType(portfolioProduct.getProductType()); - return productsApi.createPortfolioProduct(request, List.of(config.getAllocation().getModelPortfolioAllocationAsset()), null, null) + return productsApi.createPortfolioProduct(request, + List.of(config.getAllocation().getModelPortfolioAllocationAsset()), null, null) .retry(2) .retryWhen(reactor.util.retry.Retry.fixedDelay(1, java.time.Duration.ofSeconds(1))) .doOnSuccess(created -> { @@ -504,8 +516,9 @@ private Mono createPortfolioProductWithModel(PortfolioProduct .doOnError(throwable -> logPortfolioProductCreationError(productType, throwable)); } - public Mono upsertDeposits(PortfolioList portfolio) { - double defaultAmount = config.getDeposit().getDefaultAmount(); + public Mono upsertDeposits(InvestmentPortfolio investmentPortfolio) { + PortfolioList portfolio = investmentPortfolio.getPortfolio(); + double initAmount = investmentPortfolio.getInitialCashOrDefault(config.getDeposit().getDefaultAmount()); return paymentsApi.listDeposits(null, null, null, null, null, null, portfolio.getUuid(), null, null, null) .filter(Objects::nonNull) @@ -516,15 +529,15 @@ public Mono upsertDeposits(PortfolioList portfolio) { .filter(list -> !list.isEmpty())) .flatMap(deposits -> { double deposited = deposits.stream().mapToDouble(Deposit::getAmount).sum(); - double remaining = defaultAmount - deposited; + double remaining = initAmount - deposited; return remaining > 0 ? createDeposit(portfolio, remaining) : Mono.just(deposits.getLast()); }) - .switchIfEmpty(Mono.defer(() -> createDeposit(portfolio, defaultAmount))) + .switchIfEmpty(Mono.defer(() -> createDeposit(portfolio, initAmount))) .onErrorResume(ex -> Mono.just(new Deposit() .portfolio(portfolio.getUuid()) - .amount(defaultAmount) + .amount(initAmount) .completedAt(portfolio.getActivated().plusDays(2)) ) ); @@ -561,8 +574,8 @@ private Mono createDeposit(PortfolioList portfolio, double defaultAmoun * Upserts portfolio trading accounts derived from the provided investment portfolio accounts. * *

    This method constructs {@link PortfolioTradingAccount} instances directly from - * {@link InvestmentPortfolioTradingAccount} data, resolving each account's portfolio UUID - * via the portfolio service before upserting. + * {@link InvestmentPortfolioTradingAccount} data, resolving each account's portfolio UUID via the portfolio service + * before upserting. * *

    Processing flow: *

      @@ -653,8 +666,8 @@ private Mono fetchPortfolioInternalId(String portfolioExternalId) { } /** - * Builds a {@link PortfolioTradingAccountRequest} directly from an {@link InvestmentPortfolioTradingAccount} - * and a resolved portfolio UUID, skipping the intermediate {@link PortfolioTradingAccount} domain object. + * Builds a {@link PortfolioTradingAccountRequest} directly from an {@link InvestmentPortfolioTradingAccount} and a + * resolved portfolio UUID, skipping the intermediate {@link PortfolioTradingAccount} domain object. * *

      Maps the following fields: *

        @@ -666,7 +679,7 @@ private Mono fetchPortfolioInternalId(String portfolioExternalId) { *
      * * @param investmentPortfolioTradingAccount the source account containing field data - * @param portfolioUuid the resolved internal portfolio UUID + * @param portfolioUuid the resolved internal portfolio UUID * @return the constructed {@link PortfolioTradingAccountRequest} */ private PortfolioTradingAccountRequest buildTradingAccountRequest( @@ -710,11 +723,10 @@ public Mono upsertPortfolioTradingAccount(PortfolioTrad * Patches an existing portfolio trading account with updated values. * *

      If the patch operation fails (e.g., due to validation errors or conflicts), - * falls back to returning the existing account to preserve data integrity - * and prevent batch failures. + * falls back to returning the existing account to preserve data integrity and prevent batch failures. * * @param existing the existing trading account to update - * @param request the request containing updated values + * @param request the request containing updated values * @return Mono emitting the updated trading account, or the existing account if patch fails */ private Mono patchExistingPortfolioTradingAccount( @@ -793,7 +805,7 @@ private Mono listExistingPortfolioTradingAccounts( *

    1. Returns an error for multiple results — indicates a data setup issue
    2. * * - * @param accounts the paginated list of trading accounts returned by the API + * @param accounts the paginated list of trading accounts returned by the API * @param externalAccountId the external account ID used in the search, for logging purposes * @return Mono emitting the single matching trading account, or empty if no results found * @throws IllegalStateException if multiple trading accounts are found with the same external account ID @@ -828,15 +840,15 @@ private Mono validateAndExtractPortfolioTradingAccount( * Logs errors occurring during portfolio trading account operations. * *

      Provides enhanced error context for {@link WebClientResponseException}, - * including HTTP status code and response body. For other exceptions, logs - * basic error information. + * including HTTP status code and response body. For other exceptions, logs basic error information. * - * @param operation a short description of the operation that failed (e.g., "PATCH", "CREATE") - * @param idLabel the label for the identifier (e.g., "uuid", "externalAccountId") - * @param idValue the value of the identifier - * @param throwable the exception that occurred + * @param operation a short description of the operation that failed (e.g., "PATCH", "CREATE") + * @param idLabel the label for the identifier (e.g., "uuid", "externalAccountId") + * @param idValue the value of the identifier + * @param throwable the exception that occurred */ - private void logPortfolioTradingAccountError(String operation, String idLabel, String idValue, Throwable throwable) { + private void logPortfolioTradingAccountError(String operation, String idLabel, String idValue, + Throwable throwable) { if (throwable instanceof WebClientResponseException ex) { log.warn("Portfolio trading account {} failed: {}={}, status={}, body={}", operation, idLabel, idValue, ex.getStatusCode(), ex.getResponseBodyAsString()); From 7cbefbb94bba79537aabff80e7b6465023ba4a24 Mon Sep 17 00:00:00 2001 From: Roman Kniazevych Date: Fri, 20 Mar 2026 14:18:19 +0200 Subject: [PATCH 9/9] WLNP-12474: BJS: implement the possibility to run Investment setup in standalone mode fix tests --- .../investment/saga/InvestmentSagaTest.java | 9 ++++---- ...estmentPortfolioAllocationServiceTest.java | 16 +++++++++----- .../InvestmentPortfolioServiceTest.java | 21 ++++++++++++------- 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentSagaTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentSagaTest.java index 38280186d..065c579de 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentSagaTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentSagaTest.java @@ -15,6 +15,7 @@ import com.backbase.stream.investment.InvestmentData; import com.backbase.stream.investment.InvestmentTask; import com.backbase.stream.investment.ModelPortfolio; +import com.backbase.stream.investment.model.InvestmentPortfolio; import com.backbase.stream.investment.model.InvestmentPortfolioTradingAccount; import com.backbase.stream.investment.service.AsyncTaskService; import com.backbase.stream.investment.service.InvestmentClientService; @@ -371,7 +372,7 @@ void upsertPortfolioTradingAccounts_error_marksTaskFailed() { when(investmentPortfolioService.upsertInvestmentProducts(any(), any())) .thenReturn(Mono.just(List.of(new PortfolioProduct()))); when(investmentPortfolioService.upsertPortfolios(any(), any())) - .thenReturn(Mono.just(List.of(new PortfolioList()))); + .thenReturn(Mono.just(List.of(InvestmentPortfolio.builder().build()))); when(investmentPortfolioService.upsertPortfolioTradingAccounts(any())) .thenReturn(Mono.error(new RuntimeException("Trading account upsert failure"))); @@ -412,7 +413,7 @@ void upsertDepositsAndAllocations_error_marksTaskFailed() { when(investmentPortfolioService.upsertInvestmentProducts(any(), any())) .thenReturn(Mono.just(List.of(new PortfolioProduct()))); when(investmentPortfolioService.upsertPortfolios(any(), any())) - .thenReturn(Mono.just(List.of(new PortfolioList()))); + .thenReturn(Mono.just(List.of(InvestmentPortfolio.builder().build()))); when(investmentPortfolioService.upsertPortfolioTradingAccounts(any())) .thenReturn(Mono.empty()); when(investmentPortfolioService.upsertDeposits(any())) @@ -475,7 +476,7 @@ private InvestmentTask createFullTask() { .isDefault(true) .isInternal(false) .build())) - .portfolios(List.of(new PortfolioList())) + .portfolios(List.of(InvestmentPortfolio.builder().portfolio(new PortfolioList()).build())) .build()); } @@ -487,7 +488,7 @@ private void stubAllServicesSuccess() { when(investmentPortfolioService.upsertInvestmentProducts(any(), any())) .thenReturn(Mono.just(List.of(new PortfolioProduct()))); when(investmentPortfolioService.upsertPortfolios(any(), any())) - .thenReturn(Mono.just(List.of(new PortfolioList()))); + .thenReturn(Mono.just(List.of(InvestmentPortfolio.builder().build()))); when(investmentPortfolioService.upsertPortfolioTradingAccounts(any())) .thenReturn(Mono.empty()); when(investmentPortfolioService.upsertDeposits(any())) diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationServiceTest.java index d1258f023..65c26aa1b 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationServiceTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationServiceTest.java @@ -32,6 +32,7 @@ import com.backbase.stream.investment.Asset; import com.backbase.stream.investment.InvestmentAssetData; import com.backbase.stream.investment.ModelAsset; +import com.backbase.stream.investment.model.InvestmentPortfolio; import java.time.LocalDate; import java.time.OffsetDateTime; import java.time.ZoneOffset; @@ -76,6 +77,7 @@ void setUp() { assetUniverseApi = mock(AssetUniverseApi.class); investmentApi = mock(InvestmentApi.class); customIntegrationApiService = mock(CustomIntegrationApiService.class); + ingestProperties = new InvestmentIngestProperties(); service = new InvestmentPortfolioAllocationService( allocationsApi, assetUniverseApi, investmentApi, customIntegrationApiService, ingestProperties); } @@ -373,7 +375,7 @@ void createDepositAllocation_nullCompletedAt_usesTodayAsValuationDate() { /** * Tests for - * {@link InvestmentPortfolioAllocationService#generateAllocations(PortfolioList, List, InvestmentAssetData)}. + * {@link InvestmentPortfolioAllocationService#generateAllocations(InvestmentPortfolio, List, InvestmentAssetData)}. * *

      Covers: *

        @@ -393,6 +395,7 @@ void generateAllocations_pipelineError_returnsEmptyMono() { // Arrange UUID portfolioUuid = UUID.randomUUID(); PortfolioList portfolio = buildPortfolioList(portfolioUuid); + InvestmentPortfolio investmentPortfolio = InvestmentPortfolio.builder().portfolio(portfolio).build(); when(portfolio.getProduct()).thenReturn(UUID.randomUUID()); when(portfolio.getActivated()).thenReturn(OffsetDateTime.now().minusMonths(6)); @@ -408,7 +411,7 @@ void generateAllocations_pipelineError_returnsEmptyMono() { .build(); // Act & Assert - StepVerifier.create(service.generateAllocations(portfolio, List.of(nonMatchingProduct), assetData)) + StepVerifier.create(service.generateAllocations(investmentPortfolio, List.of(nonMatchingProduct), assetData)) .verifyComplete(); } @@ -421,6 +424,7 @@ void generateAllocations_noExistingAllocations_createsNewAllocations() { UUID modelUuid = UUID.randomUUID(); PortfolioList portfolio = buildPortfolioList(portfolioUuid); + InvestmentPortfolio investmentPortfolio = InvestmentPortfolio.builder().portfolio(portfolio).build(); when(portfolio.getProduct()).thenReturn(productUuid); when(portfolio.getActivated()).thenReturn(OffsetDateTime.now().minusMonths(2)); @@ -468,7 +472,7 @@ void generateAllocations_noExistingAllocations_createsNewAllocations() { .build(); // Act & Assert — one allocation created per work day from priceDay to today - StepVerifier.create(service.generateAllocations(portfolio, List.of(portfolioProduct), assetData)) + StepVerifier.create(service.generateAllocations(investmentPortfolio, List.of(portfolioProduct), assetData)) .expectNextMatches(result -> !result.isEmpty()) .verifyComplete(); @@ -485,6 +489,7 @@ void generateAllocations_lastValuationIsToday_noPendingDays_returnsEmptyList() { UUID modelUuid = UUID.randomUUID(); PortfolioList portfolio = buildPortfolioList(portfolioUuid); + InvestmentPortfolio investmentPortfolio = InvestmentPortfolio.builder().portfolio(portfolio).build(); when(portfolio.getProduct()).thenReturn(productUuid); when(portfolio.getActivated()).thenReturn(OffsetDateTime.now().minusMonths(2)); @@ -525,7 +530,7 @@ void generateAllocations_lastValuationIsToday_noPendingDays_returnsEmptyList() { .build(); // Act & Assert - StepVerifier.create(service.generateAllocations(portfolio, List.of(portfolioProduct), assetData)) + StepVerifier.create(service.generateAllocations(investmentPortfolio, List.of(portfolioProduct), assetData)) .expectNextMatches(List::isEmpty) .verifyComplete(); @@ -540,6 +545,7 @@ void generateAllocations_noMatchingPortfolioProduct_fallsBackToDefaultModel() { UUID portfolioUuid = UUID.randomUUID(); PortfolioList portfolio = buildPortfolioList(portfolioUuid); + InvestmentPortfolio investmentPortfolio = InvestmentPortfolio.builder().portfolio(portfolio).build(); when(portfolio.getProduct()).thenReturn(UUID.randomUUID()); when(portfolio.getActivated()).thenReturn(OffsetDateTime.now().minusMonths(2)); @@ -599,7 +605,7 @@ void generateAllocations_noMatchingPortfolioProduct_fallsBackToDefaultModel() { .build(); // Act & Assert - StepVerifier.create(service.generateAllocations(portfolio, List.of(nonMatchingProduct), assetData)) + StepVerifier.create(service.generateAllocations(investmentPortfolio, List.of(nonMatchingProduct), assetData)) .expectNextMatches(result -> !result.isEmpty()) .verifyComplete(); diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java index 66d6bec62..dada24cd2 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java @@ -30,6 +30,7 @@ import com.backbase.stream.investment.InvestmentArrangement; import com.backbase.stream.investment.InvestmentData; import com.backbase.stream.investment.ModelPortfolio; +import com.backbase.stream.investment.model.InvestmentPortfolio; import com.backbase.stream.investment.model.InvestmentPortfolioTradingAccount; import java.nio.charset.StandardCharsets; import java.time.OffsetDateTime; @@ -78,8 +79,7 @@ void setUp() { portfolioApi = Mockito.mock(PortfolioApi.class); paymentsApi = Mockito.mock(PaymentsApi.class); portfolioTradingAccountsApi = Mockito.mock(PortfolioTradingAccountsApi.class); - config = Mockito.mock(InvestmentIngestProperties.class); - when(config.getPortfolio().getActivationPastMonths()).thenReturn(6); + config = new InvestmentIngestProperties(); service = new InvestmentPortfolioService( productsApi, portfolioApi, paymentsApi, portfolioTradingAccountsApi, config); } @@ -746,7 +746,7 @@ void upsertPortfolios_emptyArrangements_returnsEmptyList() { // ========================================================================= /** - * Tests for {@link InvestmentPortfolioService#upsertDeposits(PortfolioList)}. + * Tests for {@link InvestmentPortfolioService#upsertDeposits(InvestmentPortfolio)}. * *

        Covers: *

          @@ -768,6 +768,7 @@ void upsertDeposits_noExistingDeposits_createsDefaultDeposit() { UUID portfolioUuid = UUID.randomUUID(); PortfolioList portfolio = buildPortfolioList(portfolioUuid, "EXT-DEP-001", OffsetDateTime.now().minusMonths(6)); + InvestmentPortfolio investmentPortfolio = InvestmentPortfolio.builder().portfolio(portfolio).build(); when(paymentsApi.listDeposits(isNull(), isNull(), isNull(), isNull(), isNull(), isNull(), eq(portfolioUuid), isNull(), isNull(), isNull())) @@ -779,7 +780,7 @@ void upsertDeposits_noExistingDeposits_createsDefaultDeposit() { .thenReturn(Mono.just(created)); // Act & Assert - StepVerifier.create(service.upsertDeposits(portfolio)) + StepVerifier.create(service.upsertDeposits(investmentPortfolio)) .expectNextMatches(d -> Double.valueOf(10_000d).equals(d.getAmount())) .verifyComplete(); @@ -793,6 +794,7 @@ void upsertDeposits_existingDepositsLessThanDefault_topsUpRemainingAmount() { UUID portfolioUuid = UUID.randomUUID(); PortfolioList portfolio = buildPortfolioList(portfolioUuid, "EXT-DEP-002", OffsetDateTime.now().minusMonths(6)); + InvestmentPortfolio investmentPortfolio = InvestmentPortfolio.builder().portfolio(portfolio).build(); Deposit existingDeposit = Mockito.mock(Deposit.class); when(existingDeposit.getAmount()).thenReturn(4_000d); @@ -807,7 +809,7 @@ void upsertDeposits_existingDepositsLessThanDefault_topsUpRemainingAmount() { .thenReturn(Mono.just(topUpDeposit)); // Act & Assert - StepVerifier.create(service.upsertDeposits(portfolio)) + StepVerifier.create(service.upsertDeposits(investmentPortfolio)) .expectNextMatches(d -> Double.valueOf(6_000d).equals(d.getAmount())) .verifyComplete(); @@ -821,6 +823,7 @@ void upsertDeposits_existingDepositsEqualToDefault_doesNotCreateNewDeposit() { UUID portfolioUuid = UUID.randomUUID(); PortfolioList portfolio = buildPortfolioList(portfolioUuid, "EXT-DEP-003", OffsetDateTime.now().minusMonths(6)); + InvestmentPortfolio investmentPortfolio = InvestmentPortfolio.builder().portfolio(portfolio).build(); Deposit existingDeposit = Mockito.mock(Deposit.class); when(existingDeposit.getAmount()).thenReturn(10_000d); @@ -832,7 +835,7 @@ void upsertDeposits_existingDepositsEqualToDefault_doesNotCreateNewDeposit() { Deposit fallbackDeposit = Mockito.mock(Deposit.class); when(paymentsApi.createDeposit(any())).thenReturn(Mono.just(fallbackDeposit)); // Act & Assert - StepVerifier.create(service.upsertDeposits(portfolio)) + StepVerifier.create(service.upsertDeposits(investmentPortfolio)) .expectNextMatches(d -> Double.valueOf(10_000d).equals(d.getAmount())) .verifyComplete(); @@ -846,6 +849,7 @@ void upsertDeposits_nullDepositResultList_createsDefaultDeposit() { UUID portfolioUuid = UUID.randomUUID(); PortfolioList portfolio = buildPortfolioList(portfolioUuid, "EXT-DEP-NULL", OffsetDateTime.now().minusMonths(6)); + InvestmentPortfolio investmentPortfolio = InvestmentPortfolio.builder().portfolio(portfolio).build(); when(paymentsApi.listDeposits(isNull(), isNull(), isNull(), isNull(), isNull(), isNull(), eq(portfolioUuid), isNull(), isNull(), isNull())) @@ -857,7 +861,7 @@ void upsertDeposits_nullDepositResultList_createsDefaultDeposit() { .thenReturn(Mono.just(created)); // Act & Assert - StepVerifier.create(service.upsertDeposits(portfolio)) + StepVerifier.create(service.upsertDeposits(investmentPortfolio)) .expectNextMatches(d -> Double.valueOf(10_000d).equals(d.getAmount())) .verifyComplete(); @@ -871,13 +875,14 @@ void upsertDeposits_apiError_returnsFallbackDeposit() { UUID portfolioUuid = UUID.randomUUID(); OffsetDateTime activated = OffsetDateTime.now().minusMonths(6); PortfolioList portfolio = buildPortfolioList(portfolioUuid, "EXT-DEP-ERR", activated); + InvestmentPortfolio investmentPortfolio = InvestmentPortfolio.builder().portfolio(portfolio).build(); when(paymentsApi.listDeposits(isNull(), isNull(), isNull(), isNull(), isNull(), isNull(), eq(portfolioUuid), isNull(), isNull(), isNull())) .thenReturn(Mono.error(new RuntimeException("API unavailable"))); // Act & Assert - StepVerifier.create(service.upsertDeposits(portfolio)) + StepVerifier.create(service.upsertDeposits(investmentPortfolio)) .expectNextMatches(d -> portfolioUuid.equals(d.getPortfolio()) && d.getAmount() == 10_000d) .verifyComplete();