diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentClientConfig.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentClientConfig.java index 09e913129..9036f373e 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentClientConfig.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentClientConfig.java @@ -18,19 +18,33 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import java.text.DateFormat; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.MediaType; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.http.codec.json.Jackson2JsonDecoder; import org.springframework.http.codec.json.Jackson2JsonEncoder; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient.Builder; +import reactor.netty.http.client.HttpClient; /** * Configuration for Investment service REST client (ClientApi). + * + *

This configuration creates the Investment API client with proper codec configuration. + * + *

Note: Connection pooling, timeouts, and rate limiting are configured in + * {@link InvestmentWebClientConfiguration} which should be imported alongside this class. + * The WebClient connection pool prevents resource exhaustion and 503 errors by limiting: + *

*/ @Configuration @ConditionalOnBean(InvestmentServiceConfiguration.class) @@ -48,7 +62,9 @@ public InvestmentClientConfig() { */ @Bean @ConditionalOnMissingBean - public ApiClient investmentApiClient(WebClient interServiceWebClient, ObjectMapper objectMapper, + public ApiClient investmentApiClient(WebClient interServiceWebClient, + @Qualifier("investmentHttpClient") HttpClient investmentHttpClient, + ObjectMapper objectMapper, DateFormat dateFormat) { ObjectMapper mapper = objectMapper.copy(); mapper.setSerializationInclusion(Include.NON_EMPTY); diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestProperties.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestProperties.java new file mode 100644 index 000000000..93be02d57 --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestProperties.java @@ -0,0 +1,130 @@ +package com.backbase.stream.configuration; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Fine-grained configuration properties for {@code InvestmentPortfolioService}. + * + *

Properties are grouped by the model they govern so operators can reason about each + * concern independently without touching unrelated settings. + * + *

All values can be overridden via {@code application.yml} / {@code application.properties} + * using the prefix {@code backbase.bootstrap.ingestions.investment.service}. + * + *

Ingestion-flow feature flags ({@code contentEnabled}, {@code assetUniverseEnabled}, etc.) + * live in the sibling class {@link InvestmentIngestionConfigurationProperties}. + * + *

Example: + *

+ * backbase:
+ *   bootstrap:
+ *     ingestions:
+ *       investment:
+ *         service:
+ *           portfolio:
+ *             default-currency: USD
+ *             activation-past-months: 3
+ *           allocation:
+ *             model-portfolio-allocation-asset: "model_portfolio.allocation.asset"
+ *           deposit:
+ *             provider: real-bank
+ *             default-amount: 25000.0
+ * 
+ */ +@Data +@ConfigurationProperties(prefix = "backbase.bootstrap.ingestions.investment.service") +public class InvestmentIngestProperties { + + public static final double DEFAULT_INIT_CASH = 10_000d; + + private PortfolioConfig portfolio = new PortfolioConfig(); + private AllocationConfig allocation = new AllocationConfig(); + private DepositConfig deposit = new DepositConfig(); + private AssetConfig asset = new AssetConfig(); + + // ------------------------------------------------------------------------- + // Portfolio + // ------------------------------------------------------------------------- + + /** + * Settings that govern how individual investment portfolios are created and activated. + */ + @Data + public static class PortfolioConfig { + + /** + * ISO 4217 currency code applied to new portfolios when none is specified in the source data. + */ + private String defaultCurrency = "EUR"; + + /** + * How many months into the past the portfolio's {@code activated} timestamp is set. A value of {@code 1} means + * the portfolio is considered to have been activated 1 month ago. + */ + private int activationPastMonths = 1; + } + + // ------------------------------------------------------------------------- + // Allocation + // ------------------------------------------------------------------------- + + /** + * Settings that govern portfolio-product allocation behaviour. + */ + @Data + public static class AllocationConfig { + + /** + * The allocation-asset expansion key sent when listing or managing portfolio products. Changing this value + * allows switching between different allocation-asset model definitions without a code change. + */ + private String modelPortfolioAllocationAsset = "model_portfolio.allocation.asset"; + private int allocationConcurrency = 5; + + private double defaultAmount = DEFAULT_INIT_CASH; + } + + // ------------------------------------------------------------------------- + // Deposit + // ------------------------------------------------------------------------- + + /** + * Settings that govern the automatic seed deposit created for new portfolios. + */ + @Data + public static class DepositConfig { + + /** + * The payment provider identifier sent with every deposit request. Set this to the real provider name for + * non-mock environments. + */ + private String provider = null; + + /** + * The monetary amount used as the initial seed deposit when no previous deposit exists or when the current + * deposited total is below this threshold. + */ + private double defaultAmount = DEFAULT_INIT_CASH; + } + + // ------------------------------------------------------------------------- + // Deposit + // ------------------------------------------------------------------------- + + /** + * Settings that govern the automatic seed deposit created for new portfolios. + */ + @Data + public static class AssetConfig { + + private boolean ingestImages = true; + private int marketConcurrency = 5; + private int marketSpecialDayConcurrency = 5; + private int assetCategoryConcurrency = 5; + private int assetCategoryTypeConcurrency = 5; + + } + +} + diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java index 53bb897db..8d4d3e1ed 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentIngestionConfigurationProperties.java @@ -7,6 +7,10 @@ /** * Configuration properties governing investment client ingestion behavior. + * + *

Controls which high-level ingestion flows are enabled. Service-level tuning + * (portfolio currencies, deposit provider, allocation assets, etc.) lives in + * {@link InvestmentIngestProperties}. */ @Data @ConditionalOnBean(InvestmentServiceConfiguration.class) @@ -16,7 +20,6 @@ public class InvestmentIngestionConfigurationProperties { private boolean contentEnabled = true; private boolean assetUniverseEnabled = true; private boolean wealthEnabled = true; - private int portfolioActivationPastMonths = 1; } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentRestServiceApiConfiguration.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentRestServiceApiConfiguration.java index e4de101a5..cd4875e99 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentRestServiceApiConfiguration.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentRestServiceApiConfiguration.java @@ -90,9 +90,10 @@ public InvestmentRestDocumentContentService investmentRestContentDocumentService } @Bean - public InvestmentRestAssetUniverseService investmentRestAssetUniverseService(AssetUniverseApi assetUniverseApi, - com.backbase.investment.api.service.sync.ApiClient restInvestmentApiClient) { - return new InvestmentRestAssetUniverseService(assetUniverseApi, restInvestmentApiClient); + public InvestmentRestAssetUniverseService investmentRestAssetUniverseService( + com.backbase.investment.api.service.sync.ApiClient restInvestmentApiClient, + InvestmentIngestProperties portfolioProperties) { + return new InvestmentRestAssetUniverseService(restInvestmentApiClient, portfolioProperties); } } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java index dc3561f7d..cbdbbdc73 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java @@ -41,7 +41,8 @@ InvestmentClientConfig.class }) @EnableConfigurationProperties({ - InvestmentIngestionConfigurationProperties.class + InvestmentIngestionConfigurationProperties.class, + InvestmentIngestProperties.class }) @RequiredArgsConstructor @Configuration @@ -61,9 +62,9 @@ public CustomIntegrationApiService customIntegrationApiService(ApiClient apiClie @Bean public InvestmentPortfolioService investmentPortfolioService(PortfolioApi portfolioApi, InvestmentProductsApi investmentProductsApi, PaymentsApi paymentsApi, PortfolioTradingAccountsApi portfolioTradingAccountsApi, - InvestmentIngestionConfigurationProperties configurationProperties) { + InvestmentIngestProperties portfolioProperties) { return new InvestmentPortfolioService(investmentProductsApi, portfolioApi, paymentsApi, - portfolioTradingAccountsApi, configurationProperties); + portfolioTradingAccountsApi, portfolioProperties); } @Bean @@ -96,9 +97,9 @@ public InvestmentIntradayAssetPriceService investmentIntradayAssetPriceService(A @Bean public InvestmentPortfolioAllocationService investmentPortfolioAllocationService(AllocationsApi allocationsApi, AssetUniverseApi assetUniverseApi, InvestmentApi investmentApi, - CustomIntegrationApiService customIntegrationApiService) { + CustomIntegrationApiService customIntegrationApiService, InvestmentIngestProperties portfolioProperties) { return new InvestmentPortfolioAllocationService(allocationsApi, assetUniverseApi, investmentApi, - customIntegrationApiService); + customIntegrationApiService, portfolioProperties); } @Bean @@ -124,10 +125,11 @@ public InvestmentAssetUniverseSaga investmentStaticDataSaga( InvestmentIntradayAssetPriceService investmentIntradayAssetPriceService, InvestmentCurrencyService investmentCurrencyService, AsyncTaskService asyncTaskService, - InvestmentIngestionConfigurationProperties coreConfigurationProperties) { + InvestmentIngestionConfigurationProperties coreConfigurationProperties, + InvestmentIngestProperties portfolioProperties) { return new InvestmentAssetUniverseSaga(investmentAssetUniverseService, investmentAssetPriceService, investmentIntradayAssetPriceService, investmentCurrencyService, asyncTaskService, - coreConfigurationProperties); + coreConfigurationProperties, portfolioProperties); } @Bean diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentWebClientConfiguration.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentWebClientConfiguration.java new file mode 100644 index 000000000..4ade2ad4b --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentWebClientConfiguration.java @@ -0,0 +1,95 @@ +package com.backbase.stream.configuration; + +import io.netty.channel.ChannelOption; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import reactor.netty.http.client.HttpClient; +import reactor.netty.resources.ConnectionProvider; + +/** + * WebClient configuration for Investment service communication. + * + *

Provides optimized connection pooling, timeouts, and rate limiting to prevent: + *

+ * + *

All tunable values are externalized via {@link InvestmentWebClientProperties} and can be + * overridden through {@code application.yml} without recompiling. + * + *

All beans are explicitly named so they are unambiguous and never accidentally + * replaced by Spring Boot auto-configuration or another generic bean of the same type. + */ +@Slf4j +@Configuration +@EnableConfigurationProperties(InvestmentWebClientProperties.class) +public class InvestmentWebClientConfiguration { + + /** + * Dedicated {@link ConnectionProvider} for the Investment service client pool. + * + *

Using an explicit named bean (rather than {@code @ConditionalOnMissingBean ReactorResourceFactory}) + * avoids silently falling back to Spring Boot's auto-configured shared pool when one already exists + * in the application context. + * + * @param props investment HTTP-client configuration properties + * @return investment-specific ConnectionProvider + */ + @Bean("investmentConnectionProvider") + public ConnectionProvider investmentConnectionProvider(InvestmentWebClientProperties props) { + ConnectionProvider provider = ConnectionProvider.builder("investment-client-pool") + .maxConnections(props.getMaxConnections()) + .maxIdleTime(Duration.ofMinutes(props.getMaxIdleTimeMinutes())) + .maxLifeTime(Duration.ofMinutes(props.getMaxLifeTimeMinutes())) + .pendingAcquireMaxCount(props.getMaxPendingAcquires()) + .pendingAcquireTimeout(Duration.ofMillis(props.getPendingAcquireTimeoutMillis())) + .evictInBackground(Duration.ofSeconds(props.getEvictInBackgroundSeconds())) + .build(); + + log.info("Configured investment ConnectionProvider: maxConnections={}, maxIdleTime={}min," + + " pendingAcquireMaxCount={}", + props.getMaxConnections(), props.getMaxIdleTimeMinutes(), props.getMaxPendingAcquires()); + + return provider; + } + + /** + * Creates an {@link HttpClient} backed by the investment-specific connection pool and + * with explicit connect / read / write timeouts. + * + *

This client ensures: + *

+ * + * @param connectionProvider the investment-specific connection provider + * @param props investment HTTP-client configuration properties + * @return configured HttpClient + */ + @Bean("investmentHttpClient") + public HttpClient investmentHttpClient( + @Qualifier("investmentConnectionProvider") ConnectionProvider connectionProvider, + InvestmentWebClientProperties props) { + return HttpClient.create(connectionProvider) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, props.getConnectTimeoutSeconds() * 1000) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.TCP_NODELAY, true) + .responseTimeout(Duration.ofSeconds(props.getReadTimeoutSeconds())) + .doOnConnected(connection -> connection + .addHandlerLast(new ReadTimeoutHandler(props.getReadTimeoutSeconds(), TimeUnit.SECONDS)) + .addHandlerLast(new WriteTimeoutHandler(props.getWriteTimeoutSeconds(), TimeUnit.SECONDS))); + } + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentWebClientProperties.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentWebClientProperties.java new file mode 100644 index 000000000..af6c23775 --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentWebClientProperties.java @@ -0,0 +1,79 @@ +package com.backbase.stream.configuration; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Configuration properties for the Investment service HTTP client connection pool and timeouts. + * + *

All values can be overridden via {@code application.yml} / {@code application.properties} + * using the prefix {@code backbase.communication.services.investment.http-client}. + * + *

Example: + *

+ * backbase:
+ *   communication:
+ *     services:
+ *       investment:
+ *         http-client:
+ *           max-connections: 20
+ *           max-idle-time-minutes: 5
+ *           max-pending-acquires: 100
+ *           pending-acquire-timeout-millis: 45000
+ *           connect-timeout-seconds: 10
+ *           read-timeout-seconds: 30
+ *           write-timeout-seconds: 30
+ * 
+ */ +@Data +@ConfigurationProperties(prefix = "backbase.communication.services.investment.http-client") +public class InvestmentWebClientProperties { + + /** + * Maximum number of open TCP connections to the Investment service. + * Limiting this prevents the service from being overwhelmed (which causes 503 responses). + */ + private int maxConnections = 20; + + /** + * Maximum time (in minutes) that a connection can remain idle in the pool before being evicted. + */ + private long maxIdleTimeMinutes = 5; + + /** + * Maximum lifetime (in minutes) of a pooled connection regardless of activity. + */ + private long maxLifeTimeMinutes = 30; + + /** + * Maximum number of requests that can be queued waiting for a connection. + * Bounds the in-memory queue so callers receive a fast failure rather than an unbounded backlog. + */ + private int maxPendingAcquires = 100; + + /** + * Maximum time (in milliseconds) a request will wait to acquire a connection from the pool. + */ + private long pendingAcquireTimeoutMillis = 45_000; + + /** + * Background eviction interval (in seconds) for idle/expired connections in the pool. + */ + private long evictInBackgroundSeconds = 120; + + /** + * TCP connection timeout in seconds. Prevents hanging on unresponsive servers. + */ + private int connectTimeoutSeconds = 10; + + /** + * Read timeout in seconds. Prevents indefinite hangs waiting for a response. + */ + private int readTimeoutSeconds = 30; + + /** + * Write timeout in seconds. Prevents indefinite hangs while sending a request. + */ + private int writeTimeoutSeconds = 30; +} + diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/ClientUser.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/ClientUser.java index f136d66ae..2ae2b1add 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/ClientUser.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/ClientUser.java @@ -13,7 +13,7 @@ public class ClientUser { private UUID investmentClientId; private String internalUserId; private String externalUserId; - private String legalEntityExternalId; + private String legalEntityId; } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentArrangement.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentArrangement.java index f334d1063..a3f026ce5 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentArrangement.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentArrangement.java @@ -1,5 +1,6 @@ package com.backbase.stream.investment; +import java.math.BigDecimal; import java.util.List; import java.util.UUID; import lombok.Builder; @@ -16,8 +17,9 @@ public class InvestmentArrangement { private String externalUserId; private String productTypeExternalId; private String currency; + private BigDecimal initialCash; private UUID investmentProductId; - private List legalEntityExternalIds; + private List legalEntityIds; } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java index 0409debb3..8311955bc 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetData.java @@ -15,11 +15,13 @@ import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; @EqualsAndHashCode @Data @Builder -public class InvestmentAssetData { +@Slf4j +public class InvestmentAssetData implements InvestmentDataValue { private List currencies; private List markets; @@ -42,4 +44,14 @@ public Map getAssetByUuid() { } + @Override + public long getTotalProcessedValues() { + log.debug( + "Calculating total processed values: currencies={}, markets={}, marketSpecialDays={}, assetCategoryTypes={}, assetCategories={}, assets={}, assetPrices={}", + getSize(currencies), getSize(markets), getSize(marketSpecialDays), getSize(assetCategoryTypes), + getSize(assetCategories), getSize(assets), getSize(assetPrices)); + return getSize(currencies) + getSize(markets) + getSize(marketSpecialDays) + getSize(assetCategoryTypes) + + getSize(assetCategories) + getSize(assets) + getSize(assetPrices); + } + } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetsTask.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetsTask.java index 2c3151676..7ba2ad9af 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetsTask.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentAssetsTask.java @@ -11,7 +11,7 @@ @EqualsAndHashCode(callSuper = true) @Data -public class InvestmentAssetsTask extends StreamTask { +public class InvestmentAssetsTask extends StreamTask implements InvestmentDataValue { private final InvestmentAssetData data; @@ -50,4 +50,10 @@ public InvestmentAssetsTask setIntradayPriceTasks(List tasks) { data.setIntradayPriceAsyncTasks(tasks); return this; } + + @Override + public long getTotalProcessedValues() { + return data.getTotalProcessedValues(); + } + } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentClientTask.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentClientTask.java deleted file mode 100644 index 163294a98..000000000 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentClientTask.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.backbase.stream.investment; - -import com.backbase.investment.api.service.v1.model.ClientCreate; -import com.backbase.investment.api.service.v1.model.ClientCreateRequest; -import com.backbase.investment.api.service.v1.model.OASClient; -import com.backbase.investment.api.service.v1.model.OASClientUpdateRequest; -import com.backbase.investment.api.service.v1.model.PatchedOASClientUpdateRequest; -import com.backbase.stream.worker.model.StreamTask; -import java.util.UUID; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; - -/** - * Stream task representing ingestion or update of an Investment Client. - */ -@EqualsAndHashCode(callSuper = true) -@Data -@NoArgsConstructor -public class InvestmentClientTask extends StreamTask { - - public enum Operation { CREATE, PATCH, UPDATE } - - private Operation operation; - private ClientCreateRequest createRequest; - private PatchedOASClientUpdateRequest patchRequest; - private OASClientUpdateRequest updateRequest; - private UUID clientUuid; // Required for PATCH / UPDATE; populated after CREATE - - private ClientCreate createdClient; // response after CREATE - private OASClient updatedClient; // response after PATCH / UPDATE - - public InvestmentClientTask(String id, Operation operation) { - super(id); - this.operation = operation; - } - - @Override - public String getName() { - return getId(); - } -} - diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentData.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentData.java index 8a30a2581..ff1347124 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentData.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentData.java @@ -7,15 +7,23 @@ import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; @EqualsAndHashCode @Data @Builder -public class InvestmentContentData { +@Slf4j +public class InvestmentContentData implements InvestmentDataValue { private List marketNewsTags; private List marketNews; private List documentTags; private List documents; + public long getTotalProcessedValues() { + log.debug( + "Calculating total processed values: marketNewsTags={}, marketNews={}, documentTags={}, documents={}", + getSize(marketNewsTags), getSize(marketNews), getSize(documentTags), getSize(documents)); + return getSize(marketNewsTags) + getSize(marketNews) + getSize(documentTags) + getSize(documents); + } } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentTask.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentTask.java index f6356ceae..a7a4d6d04 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentTask.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentContentTask.java @@ -6,7 +6,7 @@ @EqualsAndHashCode(callSuper = true) @Data -public class InvestmentContentTask extends StreamTask { +public class InvestmentContentTask extends StreamTask implements InvestmentDataValue { private final InvestmentContentData data; @@ -20,4 +20,9 @@ public String getName() { return "investment-content"; } + @Override + public long getTotalProcessedValues() { + return data.getTotalProcessedValues(); + } + } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java index 5f5f59c7c..f1df463a9 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentData.java @@ -1,10 +1,8 @@ package com.backbase.stream.investment; import com.backbase.investment.api.service.v1.model.GroupResult; -import com.backbase.investment.api.service.v1.model.InvestorModelPortfolio; -import com.backbase.investment.api.service.v1.model.PortfolioList; import com.backbase.investment.api.service.v1.model.PortfolioProduct; -import com.backbase.investment.api.service.v1.model.ProductTypeEnum; +import com.backbase.stream.investment.model.InvestmentPortfolio; import com.backbase.stream.investment.model.InvestmentPortfolioTradingAccount; import java.util.ArrayList; import java.util.HashMap; @@ -15,26 +13,26 @@ import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; @EqualsAndHashCode @Data @Builder -public class InvestmentData { +@Slf4j +public class InvestmentData implements InvestmentDataValue { - private String saName; - private String saExternalId; private List clientUsers; private List investmentArrangements; private List modelPortfolios; private List portfolioProducts; private InvestmentAssetData investmentAssetData; - private List portfolios; + private List portfolios; private List investmentPortfolioTradingAccounts; public Map> getClientsByLeExternalId() { Map> clientsByLeExternalId = new HashMap<>(); clientUsers.forEach( - c -> clientsByLeExternalId.computeIfAbsent(c.getLegalEntityExternalId(), l -> new ArrayList<>()) + c -> clientsByLeExternalId.computeIfAbsent(c.getLegalEntityId(), l -> new ArrayList<>()) .add(c.getInvestmentClientId())); return clientsByLeExternalId; } @@ -52,16 +50,17 @@ public void addPortfolioProducts(PortfolioProduct portfolioProduct) { } } - public Optional findPortfolioProduct(ProductTypeEnum productType, Integer riskLevel) { - return Optional.ofNullable(portfolioProducts) - .flatMap(ps -> ps.stream() - .filter(p -> p.getProductType().equals(productType) - && Optional.ofNullable(p.getModelPortfolio()).map(InvestorModelPortfolio::getRiskLevel) - .map(risk -> risk <= riskLevel).orElse(false)) - .findAny()); - } - public List getPriceAsyncTasks() { return Optional.ofNullable(investmentAssetData).map(InvestmentAssetData::getPriceAsyncTasks).orElse(List.of()); } + + public long getTotalProcessedValues() { + log.debug( + "Calculating total processed values: portfolios={}, portfolioProducts={}, modelPortfolios={}, clientUsers={}, investmentPortfolioTradingAccounts={}", + getSize(portfolios), getSize(portfolioProducts), getSize(modelPortfolios), getSize(clientUsers), + getSize(investmentPortfolioTradingAccounts)); + return getSize(portfolios) + getSize(portfolioProducts) + getSize(modelPortfolios) + getSize(clientUsers) + + getSize(investmentPortfolioTradingAccounts); + } + } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentDataValue.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentDataValue.java new file mode 100644 index 000000000..784cd10de --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentDataValue.java @@ -0,0 +1,13 @@ +package com.backbase.stream.investment; + +import java.util.List; + +public interface InvestmentDataValue { + + long getTotalProcessedValues(); + + default int getSize(List list) { + return list != null ? list.size() : 0; + } + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentTask.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentTask.java index c39e5648d..a198784f8 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentTask.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/InvestmentTask.java @@ -1,6 +1,7 @@ package com.backbase.stream.investment; import com.backbase.investment.api.service.v1.model.PortfolioList; +import com.backbase.stream.investment.model.InvestmentPortfolio; import com.backbase.stream.worker.model.StreamTask; import java.util.List; import lombok.Data; @@ -8,7 +9,7 @@ @EqualsAndHashCode(callSuper = true) @Data -public class InvestmentTask extends StreamTask { +public class InvestmentTask extends StreamTask implements InvestmentDataValue { private final InvestmentData data; @@ -26,8 +27,12 @@ public void data(List clients) { data.setClientUsers(clients); } - public void setPortfolios(List portfolios) { + public void setPortfolios(List portfolios) { data.setPortfolios(portfolios); } + public long getTotalProcessedValues() { + return data.getTotalProcessedValues(); + } + } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/InvestmentPortfolio.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/InvestmentPortfolio.java new file mode 100644 index 000000000..ea85ed093 --- /dev/null +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/model/InvestmentPortfolio.java @@ -0,0 +1,25 @@ +package com.backbase.stream.investment.model; + +import com.backbase.investment.api.service.v1.model.PortfolioList; +import java.math.BigDecimal; +import java.util.Optional; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +@AllArgsConstructor +@Builder +@Getter +@Setter +public class InvestmentPortfolio { + + private PortfolioList portfolio; + private BigDecimal initialCash; + + public double getInitialCashOrDefault(double defaultAmount) { + return Optional.ofNullable(initialCash).map(BigDecimal::doubleValue) + .orElse(defaultAmount); + } + +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java index 06537f940..a4270d75e 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSaga.java @@ -3,13 +3,13 @@ import com.backbase.investment.api.service.v1.model.AssetCategoryTypeRequest; import com.backbase.investment.api.service.v1.model.MarketRequest; import com.backbase.investment.api.service.v1.model.MarketSpecialDayRequest; +import com.backbase.stream.configuration.InvestmentIngestProperties; import com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties; import com.backbase.stream.investment.InvestmentAssetData; import com.backbase.stream.investment.InvestmentAssetsTask; import com.backbase.stream.investment.service.AsyncTaskService; import com.backbase.stream.investment.service.InvestmentAssetPriceService; import com.backbase.stream.investment.service.InvestmentAssetUniverseService; -import com.backbase.stream.investment.service.InvestmentClientService; import com.backbase.stream.investment.service.InvestmentCurrencyService; import com.backbase.stream.investment.service.InvestmentIntradayAssetPriceService; import com.backbase.stream.investment.service.InvestmentPortfolioService; @@ -43,7 +43,6 @@ *
  • All reactive operations include proper success and error handlers
  • * * - * @see InvestmentClientService * @see InvestmentPortfolioService * @see StreamTaskExecutor */ @@ -65,6 +64,7 @@ public class InvestmentAssetUniverseSaga implements StreamTaskExecutor executeTask(InvestmentAssetsTask streamTask) { @@ -186,7 +186,7 @@ public Mono upsertMarkets(InvestmentAssetsTask investmentT .sessionStart(market.getSessionStart()) .sessionEnd(market.getSessionEnd()) .timeZone(market.getTimeZone()) - )) + ), ingestProperties.getAsset().getMarketConcurrency()) // Limit concurrency to prevent 503 errors .collectList() // Collect all created/retrieved markets into a list .map(markets -> { // Update the task with the created markets @@ -244,7 +244,8 @@ public Mono upsertMarketSpecialDays(InvestmentAssetsTask i .sessionStart(marketSpecialDay.getSessionStart()) .sessionEnd(marketSpecialDay.getSessionEnd()) .description(marketSpecialDay.getDescription()) - )) + ), ingestProperties.getAsset().getMarketSpecialDayConcurrency()) + // Limit concurrency to prevent 503 errors .collectList() // Collect all created/retrieved market special days into a list .map(marketSpecialDays -> { // Update the task with the created market special days @@ -294,7 +295,9 @@ public Mono upsertAssetCategories(InvestmentAssetsTask inv } return Flux.fromIterable(investmentData.getAssetCategories()) - .flatMap(assetUniverseService::upsertAssetCategory) + .flatMap(assetUniverseService::upsertAssetCategory, + ingestProperties.getAsset().getAssetCategoryConcurrency()) + // Limit concurrency to prevent 503 errors .collectList() .map(assetCategories -> { investmentTask.info(INVESTMENT, OP_CREATE, RESULT_CREATED, investmentTask.getName(), @@ -337,7 +340,8 @@ public Mono upsertAssetCategoryTypes(InvestmentAssetsTask .name(assetCategoryType.getName()) .code(assetCategoryType.getCode()); return assetUniverseService.upsertAssetCategoryType(request); - }) + }, ingestProperties.getAsset().getAssetCategoryTypeConcurrency()) + // Limit concurrency to prevent 503 errors .collectList() .map(assetCategoryTypes -> { investmentTask.setAssetCategoryTypes(assetCategoryTypes); diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java index b174bbfcd..8afed3d1a 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/saga/InvestmentSaga.java @@ -4,6 +4,7 @@ import com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties; import com.backbase.stream.investment.InvestmentData; import com.backbase.stream.investment.InvestmentTask; +import com.backbase.stream.investment.model.InvestmentPortfolio; import com.backbase.stream.investment.model.InvestmentPortfolioTradingAccount; import com.backbase.stream.investment.service.AsyncTaskService; import com.backbase.stream.investment.service.InvestmentClientService; @@ -140,7 +141,8 @@ private Mono upsertPortfoliosAllocations(InvestmentTask investme .collectList() .doOnError(throwable -> { log.error("Allocation generation failed for portfolios:{} taskId={}", - data.getPortfolios().stream().map(PortfolioList::getUuid).toList(), investmentTask.getId(), + data.getPortfolios().stream().map(InvestmentPortfolio::getPortfolio).map(PortfolioList::getUuid) + .toList(), investmentTask.getId(), throwable); investmentTask.error(INVESTMENT_PORTFOLIO_TRADING_ACCOUNTS, OP_UPSERT, RESULT_FAILED, investmentTask.getName(), investmentTask.getId(), diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java index 4acaedec8..581d29180 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentAssetUniverseService.java @@ -11,6 +11,7 @@ import com.backbase.investment.api.service.v1.model.PaginatedAssetCategoryList; import com.backbase.stream.investment.model.AssetCategoryEntry; import com.backbase.stream.investment.service.resttemplate.InvestmentRestAssetUniverseService; +import java.time.Duration; import java.time.LocalDate; import java.util.List; import java.util.Map; @@ -26,6 +27,7 @@ import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; @Slf4j @RequiredArgsConstructor @@ -45,20 +47,21 @@ public class InvestmentAssetUniverseService { public Mono upsertMarket(MarketRequest marketRequest) { log.debug("Creating market: {}", marketRequest); return assetUniverseApi.getMarket(marketRequest.getCode()) - // If getMarket returns 404 NOT_FOUND, treat as "not found" and return Mono.empty() .onErrorResume(error -> { if (error instanceof org.springframework.web.reactive.function.client.WebClientResponseException.NotFound) { log.info("Market not found for code: {}", marketRequest.getCode()); return Mono.empty(); } - // Propagate other errors return Mono.error(error); }) - // If market exists, return it .flatMap(existingMarket -> { log.info("Market already exists: {}", existingMarket.getCode()); log.debug("Market already exists: {}", existingMarket); return assetUniverseApi.updateMarket(existingMarket.getCode(), marketRequest) + .retryWhen(Retry.backoff(3, Duration.ofMillis(100)) + .filter(this::isRetryableError) + .doBeforeRetry(signal -> log.warn("Retrying market update: code={}, attempt={}", + marketRequest.getCode(), signal.totalRetries() + 1))) .doOnSuccess(updatedMarket -> log.info("Updated market: {}", updatedMarket)) .doOnError(error -> { if (error instanceof WebClientResponseException w) { @@ -70,11 +73,14 @@ public Mono upsertMarket(MarketRequest marketRequest) { } }); }) - // If Mono is empty (market not found), create the market - .switchIfEmpty(Mono.defer(() -> assetUniverseApi.createMarket(marketRequest) + .switchIfEmpty(assetUniverseApi.createMarket(marketRequest) + .retryWhen(Retry.backoff(3, Duration.ofMillis(100)) + .filter(this::isRetryableError) + .doBeforeRetry(signal -> log.warn("Retrying market create: code={}, attempt={}", + marketRequest.getCode(), signal.totalRetries() + 1))) .doOnSuccess(createdMarket -> log.info("Created market: {}", createdMarket)) .doOnError(error -> log.error("Error creating market: {}", error.getMessage(), error)) - )); + ); } /** @@ -151,6 +157,11 @@ public Mono upsertMarketSpecialDay(MarketSpecialDayRequest mar log.info("Market special day already exists for day: {}", marketSpecialDayRequest); return assetUniverseApi.updateMarketSpecialDay(matchingSpecialDay.get().getUuid().toString(), marketSpecialDayRequest) + .retryWhen(Retry.backoff(3, Duration.ofMillis(100)) + .filter(this::isRetryableError) + .doBeforeRetry(signal -> log.warn( + "Retrying market special day update: date={}, attempt={}", + date, signal.totalRetries() + 1))) .doOnSuccess(updatedMarketSpecialDay -> log.info("Updated market special day: {}", updatedMarketSpecialDay)) .doOnError(error -> { @@ -172,6 +183,11 @@ public Mono upsertMarketSpecialDay(MarketSpecialDayRequest mar }) // If Mono is empty (market special day not found), create the market special day .switchIfEmpty(Mono.defer(() -> assetUniverseApi.createMarketSpecialDay(marketSpecialDayRequest) + .retryWhen(Retry.backoff(3, Duration.ofMillis(100)) + .filter(this::isRetryableError) + .doBeforeRetry(signal -> log.warn( + "Retrying market special day create: date={}, attempt={}", + marketSpecialDayRequest.getDate(), signal.totalRetries() + 1))) .doOnSuccess( createdMarketSpecialDay -> log.info("Created market special day: {}", createdMarketSpecialDay)) .doOnError(error -> { @@ -200,11 +216,54 @@ public Flux createAssets(List categoryIdByCode = categories.stream() .collect(Collectors.toMap(com.backbase.investment.api.service.v1.model.AssetCategory::getCode, com.backbase.investment.api.service.v1.model.AssetCategory::getUuid)); - return Flux.fromIterable(assets) - .flatMap(asset -> this.getOrCreateAsset(asset, categoryIdByCode)); + // Deduplicate assets by key to prevent concurrent creation of the same asset + Map uniqueAssets = assets.stream() + .collect(Collectors.toMap( + com.backbase.stream.investment.Asset::getKeyString, + a -> a, + (existing, replacement) -> { + log.warn("Duplicate asset key found: {}. Using first occurrence.", + existing.getKeyString()); + return existing; + } + )); + // Limit concurrency to 5 to prevent overwhelming the service and triggering 503 errors + return Flux.fromIterable(uniqueAssets.values()) + .flatMap(asset -> this.getOrCreateAsset(asset, categoryIdByCode) + .retryWhen(Retry.backoff(3, Duration.ofMillis(100)) + .filter(this::isRetryableError) + .doBeforeRetry(signal -> log.warn( + "Retrying asset upsert: key={}, attempt={}", + asset.getKeyString(), signal.totalRetries() + 1))), + 5); }); } + /** + * Determines whether an error is retryable based on HTTP status code. + * + *

    Retryable errors: + *

      + *
    • 409 Conflict – race condition during concurrent creation
    • + *
    • 503 Service Unavailable – temporary service overload or maintenance
    • + *
    + * + * @param throwable the error to evaluate + * @return {@code true} if the error should trigger a retry + */ + private boolean isRetryableError(Throwable throwable) { + if (throwable instanceof WebClientResponseException ex) { + int statusCode = ex.getStatusCode().value(); + boolean retryable = statusCode == 409 || statusCode == 503; + if (retryable) { + log.debug("Identified retryable error: status={}, reason={}", + statusCode, statusCode == 409 ? "CONFLICT" : "SERVICE_UNAVAILABLE"); + } + return retryable; + } + return false; + } + /** * Gets an existing asset category by its code, or creates it if not found. Handles empty results by creating the * asset category. @@ -285,6 +344,11 @@ public Mono upsertAssetCategoryType(AssetCategoryTypeRequest assetCategoryTypeRequest.getCode()); return assetUniverseApi.updateAssetCategoryType(matchingType.get().getUuid().toString(), assetCategoryTypeRequest) + .retryWhen(Retry.backoff(3, Duration.ofMillis(100)) + .filter(this::isRetryableError) + .doBeforeRetry(signal -> log.warn( + "Retrying asset category type update: code={}, attempt={}", + assetCategoryTypeRequest.getCode(), signal.totalRetries() + 1))) .doOnSuccess(updatedType -> log.info("Updated asset category type: {}", updatedType)) .doOnError(error -> { if (error instanceof WebClientResponseException w) { @@ -305,6 +369,11 @@ public Mono upsertAssetCategoryType(AssetCategoryTypeRequest }) .switchIfEmpty( Mono.defer(() -> assetUniverseApi.createAssetCategoryType(assetCategoryTypeRequest) + .retryWhen(Retry.backoff(3, Duration.ofMillis(100)) + .filter(this::isRetryableError) + .doBeforeRetry(signal -> log.warn( + "Retrying asset category type create: code={}, attempt={}", + assetCategoryTypeRequest.getCode(), signal.totalRetries() + 1))) .doOnSuccess(createdType -> log.info("Created asset category type: {}", createdType)) .doOnError(error -> { if (error instanceof WebClientResponseException w) { @@ -313,10 +382,9 @@ public Mono upsertAssetCategoryType(AssetCategoryTypeRequest w.getStatusCode(), w.getResponseBodyAsString()); } else { log.error("Error creating asset category type: {} : {}", - assetCategoryTypeRequest.getCode(), - error.getMessage(), error); + assetCategoryTypeRequest.getCode(), error.getMessage(), error); } })) ); } -} \ No newline at end of file +} diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentClientService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentClientService.java index 0e6bc8e07..1c2e0b56a 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentClientService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentClientService.java @@ -8,17 +8,21 @@ import com.backbase.investment.api.service.v1.model.Status836Enum; import com.backbase.stream.investment.ClientUser; import jakarta.validation.constraints.NotNull; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; /** * Service wrapper around generated {@link ClientApi} providing guarded create / patch operations with logging, minimal @@ -41,12 +45,38 @@ public class InvestmentClientService { private final ClientApi clientApi; + /** + * Upserts a list of clients with controlled concurrency to prevent race conditions. + * + *

    Design considerations: + *

      + *
    • Deduplicates clients by internalUserId before processing
    • + *
    • Limits concurrent requests to avoid overwhelming the service and triggering 503 errors
    • + *
    • Implements exponential backoff retry with 503 and 409 conflict handling
    • + *
    • Processes clients sequentially through the same ExecutorService to maintain order and prevent race conditions
    • + *
    + * + * @param clientUsers the list of clients to upsert + * @return Mono emitting the list of successfully upserted clients + */ public Mono> upsertClients(List clientUsers) { - return Flux.fromIterable(clientUsers) + // Deduplicate by internalUserId to prevent duplicate processing + Map uniqueClients = clientUsers.stream() + .collect(Collectors.toMap(ClientUser::getInternalUserId, Function.identity(), + (existing, replacement) -> { + log.warn("Duplicate internalUserId found: {}. Using first occurrence.", + existing.getInternalUserId()); + return existing; + } + )); + + // Process with controlled concurrency (default: 5 concurrent requests) + // This prevents overwhelming the Investment Service and triggering 503 responses + return Flux.fromIterable(uniqueClients.values()) .flatMap(clientUser -> { log.debug("Upserting investment client: internalUserId={}, externalUserId={}, legalEntityExternalId={}", clientUser.getInternalUserId(), clientUser.getExternalUserId(), - clientUser.getLegalEntityExternalId()); + clientUser.getLegalEntityId()); ClientCreateRequest request = new ClientCreateRequest() .internalUserId(clientUser.getInternalUserId()) @@ -54,15 +84,28 @@ public Mono> upsertClients(List clientUsers) { .putExtraDataItem("user_external_id", clientUser.getExternalUserId()) .putExtraDataItem("keycloak_username", clientUser.getExternalUserId()); - return upsertClient(request, clientUser.getLegalEntityExternalId()) + return upsertClient(request, clientUser.getLegalEntityId()) + .retryWhen(Retry.backoff(3, Duration.ofMillis(500)) + .maxBackoff(Duration.ofSeconds(5)) + .jitter(0.5) + .filter(this::isRetryableError) + .doBeforeRetry(signal -> { + int statusCode = signal.failure() instanceof WebClientResponseException ex + ? ex.getStatusCode().value() : 0; + log.warn( + "Retrying upsert for client: internalUserId={}, attempt={}, statusCode={}", + clientUser.getInternalUserId(), signal.totalRetries() + 1, statusCode); + })) .doOnSuccess(upsertedClient -> log.debug( "Successfully upserted client: investmentClientId={}, internalUserId={}", upsertedClient.getInvestmentClientId(), upsertedClient.getInternalUserId())) .doOnError(throwable -> log.error( "Failed to upsert client: internalUserId={}, externalUserId={}, legalEntityExternalId={}", clientUser.getInternalUserId(), clientUser.getExternalUserId(), - clientUser.getLegalEntityExternalId(), throwable)); - }) + clientUser.getLegalEntityId(), throwable)); + + }, 5) + // Max 5 concurrent requests to avoid overwhelming the service .collectList(); } @@ -101,7 +144,7 @@ private Mono upsertClient(@NotNull ClientCreateRequest request, Stri "Successfully upserted investment client: investmentClientId={}, internalUserId={}, " + "externalUserId={}, legalEntityExternalId={}", clientUser.getInvestmentClientId(), clientUser.getInternalUserId(), - clientUser.getExternalUserId(), clientUser.getLegalEntityExternalId())) + clientUser.getExternalUserId(), clientUser.getLegalEntityId())) .doOnError(throwable -> log.error( "Failed to upsert investment client: internalUserId={}, userExternalId={}, legalEntityExternalId={}", internalUserId, userExternalId, legalEntityExternalId, throwable)); @@ -217,19 +260,19 @@ private Mono createNewClient(ClientCreateRequest request, String leg /** * Builds a {@link ClientUser} instance from the provided data. * - * @param investmentClientId the investment client UUID - * @param internalUserId the internal user ID - * @param externalUserId the external user ID - * @param legalEntityExternalId the legal entity external ID + * @param investmentClientId the investment client UUID + * @param internalUserId the internal user ID + * @param externalUserId the external user ID + * @param legalEntityId the legal entity external ID * @return constructed ClientUser instance */ private ClientUser buildClientUser(UUID investmentClientId, String internalUserId, - String externalUserId, String legalEntityExternalId) { + String externalUserId, String legalEntityId) { return ClientUser.builder() .investmentClientId(investmentClientId) .internalUserId(internalUserId) .externalUserId(externalUserId) - .legalEntityExternalId(legalEntityExternalId) + .legalEntityId(legalEntityId) .build(); } @@ -374,4 +417,29 @@ private void logUpdateError(UUID uuid, Throwable throwable) { } } + /** + * Determines if an error is retryable based on HTTP status code. + * + *

    Retryable errors include: + *

      + *
    • 409 Conflict: Race condition during concurrent client creation/update
    • + *
    • 503 Service Unavailable: Temporary service overload or maintenance
    • + *
    + * + * @param throwable the exception to evaluate + * @return true if the error is retryable, false otherwise + */ + private boolean isRetryableError(Throwable throwable) { + if (throwable instanceof WebClientResponseException ex) { + int statusCode = ex.getStatusCode().value(); + boolean isRetryable = statusCode == 409 || statusCode == 503; + if (isRetryable) { + log.debug("Identified retryable error: status={}, reason={}", + statusCode, statusCode == 409 ? "CONFLICT" : "SERVICE_UNAVAILABLE"); + } + return isRetryable; + } + return false; + } + } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java index 7aa13305b..b3a844288 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationService.java @@ -22,11 +22,13 @@ import com.backbase.investment.api.service.v1.model.PortfolioList; import com.backbase.investment.api.service.v1.model.PortfolioProduct; import com.backbase.investment.api.service.v1.model.StatusA7dEnum; +import com.backbase.stream.configuration.InvestmentIngestProperties; import com.backbase.stream.investment.Allocation; import com.backbase.stream.investment.Asset; import com.backbase.stream.investment.InvestmentAssetData; import com.backbase.stream.investment.ModelAsset; import com.backbase.stream.investment.ModelPortfolio; +import com.backbase.stream.investment.model.InvestmentPortfolio; import java.time.LocalDate; import java.time.LocalTime; import java.time.OffsetDateTime; @@ -75,6 +77,7 @@ public class InvestmentPortfolioAllocationService { private final AssetUniverseApi assetUniverseApi; private final InvestmentApi investmentApi; private final CustomIntegrationApiService customIntegrationApiService; + private final InvestmentIngestProperties ingestProperties; public Mono removeAllocations(PortfolioList portfolio) { String portfolioUuid = portfolio.getUuid().toString(); @@ -84,8 +87,11 @@ public Mono removeAllocations(PortfolioList portfolio) { .flatMap(v -> Mono.empty()); } - public Mono> generateAllocations(PortfolioList portfolio, + public Mono> generateAllocations(InvestmentPortfolio investmentPortfolio, List portfolioProducts, InvestmentAssetData investmentAssetData) { + PortfolioList portfolio = investmentPortfolio.getPortfolio(); + double initAmount = investmentPortfolio.getInitialCashOrDefault( + ingestProperties.getAllocation().getDefaultAmount()); return getPortfolioModel(portfolio, portfolioProducts, investmentAssetData.getAssetByUuid()) .flatMap(m -> { LocalDate endDay = LocalDate.now(); @@ -114,8 +120,8 @@ public Mono> generateAllocations(PortfolioList port lastValuation.getCashActive(), lastValuation.getTradeTotal()); }).switchIfEmpty( orderPositions(portfolio.getUuid(), workDays(startDay, endDay), m, priceDayByAssetKey, - 10_000d) - .flatMap(dp -> generateAllocations(priceDayByAssetKey, dp))) + initAmount) + .flatMap(dp -> generateAllocations(priceDayByAssetKey, dp, initAmount))) .flatMap(allocations -> this.upsertAllocations(portfolio.getUuid().toString(), allocations))); }) .onErrorResume(ex -> Mono.empty()); @@ -137,12 +143,12 @@ private Mono> getAllocations(String portfolioId, private Mono> generateAllocations( Map> priceDayByAssetKey, - Pair, List> dayPositions) { + Pair, List> dayPositions, double initialCash) { List portfolioPositions = List.copyOf(dayPositions.getSecond()); double totalTrade = calculateTrades(portfolioPositions); - double initialCash = 10_000d; + double cash = initialCash - totalTrade; return Mono.just(generateAllocations(priceDayByAssetKey, dayPositions, initialCash, cash, totalTrade)); } @@ -150,7 +156,9 @@ private Mono> generateAllocations( private Mono> upsertAllocations(String portfolioId, List allocations) { return Flux.fromIterable(allocations) - .flatMap(a -> customIntegrationApiService.createPortfolioAllocation(portfolioId, a, null, null, null)) + .flatMap(a -> customIntegrationApiService.createPortfolioAllocation(portfolioId, a, null, null, null), + ingestProperties.getAllocation().getAllocationConcurrency()) + .collectList().doOnSuccess( created -> log.info("Successfully upserted investment portfolio allocation: count {}", created.size())) .doOnError(throwable -> { @@ -213,7 +221,8 @@ private Mono, List>> or .method(Method570Enum.MARKET) .completed(startDay.atTime(LocalTime.MIDNIGHT).atOffset(ZoneOffset.UTC)) .currency(currency(p.getAsset())).shares(roundPrice(p.getShares())) - .priceAvg(roundPrice(p.getPrice())), null, null, null).doOnSuccess( + .priceAvg(roundPrice(p.getPrice())), null, null, null) + .doOnSuccess( created -> log.info("Successfully upserted investment portfolio allocation")) .doOnError(throwable -> { if (throwable instanceof WebClientResponseException ex) { @@ -241,8 +250,9 @@ private Mono>> getPriceDayByAssetKey(ModelPor ModelAsset asset = a.asset(); return Objects.requireNonNull( assetUniverseApi.listAssetClosePrices(asset.getCurrency(), startDat, endDay, null, null, null, null, - asset.getIsin(), null, asset.getMarket(), null, null).map(PaginatedOASPriceList::getResults).map( - l -> Map.of(asset.getKeyString(), l.stream() + asset.getIsin(), null, asset.getMarket(), null, null) + .map(PaginatedOASPriceList::getResults) + .map(l -> Map.of(asset.getKeyString(), l.stream() .collect(Collectors.toMap(dp -> dp.getDatetime().toLocalDate(), OASPrice::getAmount))))); }).collectList().map(maps -> { Map> dayPriceByAssetKey = new HashMap<>(); @@ -260,7 +270,8 @@ private Mono getPortfolioModel(PortfolioList portfolio, List new Allocation(new ModelAsset(a.getIsin(), a.getMarket(), a.getCurrency()), 0.2)).toList()) + .map(a -> new Allocation(new ModelAsset(a.getIsin(), a.getMarket(), a.getCurrency()), 0.2)) + .toList()) .build())); } diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java index 64b66f46a..1eb6bec60 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/InvestmentPortfolioService.java @@ -21,11 +21,13 @@ import com.backbase.investment.api.service.v1.model.ProductTypeEnum; import com.backbase.investment.api.service.v1.model.Status08fEnum; import com.backbase.investment.api.service.v1.model.StatusA3dEnum; -import com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties; +import com.backbase.stream.configuration.InvestmentIngestProperties; import com.backbase.stream.investment.InvestmentArrangement; import com.backbase.stream.investment.InvestmentData; import com.backbase.stream.investment.ModelPortfolio; +import com.backbase.stream.investment.model.InvestmentPortfolio; import com.backbase.stream.investment.model.InvestmentPortfolioTradingAccount; +import java.math.BigDecimal; import java.time.OffsetDateTime; import java.util.Arrays; import java.util.Collection; @@ -66,16 +68,13 @@ @RequiredArgsConstructor public class InvestmentPortfolioService { - private static final String DEFAULT_CURRENCY = "EUR"; - private static final String MODEL_PORTFOLIO_ALLOCATION_ASSET = "model_portfolio.allocation.asset"; - private final InvestmentProductsApi productsApi; private final PortfolioApi portfolioApi; private final PaymentsApi paymentsApi; private final PortfolioTradingAccountsApi portfolioTradingAccountsApi; - private final InvestmentIngestionConfigurationProperties config; + private final InvestmentIngestProperties config; - public Mono> upsertPortfolios(List investmentArrangements, + public Mono> upsertPortfolios(List investmentArrangements, Map> clientsByLeExternalId) { return Flux.fromIterable(investmentArrangements) .flatMap(arrangement -> { @@ -83,9 +82,14 @@ public Mono> upsertPortfolios(List in arrangement.getExternalId(), arrangement.getName(), arrangement.getInvestmentProductId()); return upsertInvestmentPortfolios(arrangement, clientsByLeExternalId) - .doOnSuccess(portfolio -> log.debug( + .map(p -> InvestmentPortfolio + .builder() + .portfolio(p) + .initialCash(arrangement.getInitialCash()) + .build()) + .doOnSuccess(ip -> log.debug( "Successfully upserted investment portfolio: portfolioUuid={}, externalId={}, name={}", - portfolio.getUuid(), portfolio.getExternalId(), portfolio.getName())) + ip.getPortfolio().getUuid(), ip.getPortfolio().getExternalId(), ip.getPortfolio().getName())) .doOnError(throwable -> log.error( "Failed to upsert investment portfolio: arrangementExternalId={}, arrangementName={}", arrangement.getExternalId(), arrangement.getName(), throwable)); @@ -105,7 +109,7 @@ public Mono> upsertPortfolios(List in *
  • Updates the arrangement with the product UUID
  • * * - * @param investmentArrangement the investment arrangement to associate with the product (must not be null) + * @param investmentArrangements the investment arrangement to associate with the product (must not be null) * @return Mono emitting the created or updated portfolio product * @throws NullPointerException if investmentArrangement is null */ @@ -155,7 +159,8 @@ public Mono> upsertInvestmentProducts(InvestmentData inve .flatMap( existingProduct -> updateExistingPortfolioProduct(existingProduct, p, investmentData) .onErrorResume(WebClientResponseException.class, ex -> { - log.info("Using existing product data due to patch failure: uuid={}", existingProduct.getUuid()); + log.info("Using existing product data due to patch failure: uuid={}", + existingProduct.getUuid()); return Mono.just(existingProduct); }) ) @@ -206,10 +211,6 @@ private Collection distinctProducts(List pro private static List findModelUuid(InvestmentData investmentData, String externalId, ProductTypeEnum productType) { -// Map> modelsByArrangementExternalId = investmentData.getModelsByArrangementExternalId(); -// Optional> modelUuid = modelsByArrangementExternalId.keySet().stream() -// .filter(a -> a.endsWith(externalId)).findAny() -// .map(modelsByArrangementExternalId::get); return investmentData.getModelPortfolios().stream() .filter(p -> p.getProductTypeEnum() == productType) .toList(); @@ -230,12 +231,12 @@ private static List findModelUuid(InvestmentData investmentData, * from the arrangement to client UUIDs via the provided lookup map. * * @param investmentArrangement the investment arrangement containing portfolio details (must not be null) - * @param clientsByLeExternalId map of legal entity external ID to client UUIDs for associations + * @param clientsByLeId map of legal entity ID to client UUIDs for associations * @return Mono emitting the created or existing portfolio * @throws NullPointerException if investmentArrangement is null */ public Mono upsertInvestmentPortfolios(InvestmentArrangement investmentArrangement, - Map> clientsByLeExternalId) { + Map> clientsByLeId) { if (investmentArrangement == null) { return Mono.error(new NullPointerException("InvestmentArrangement must not be null")); } @@ -246,8 +247,8 @@ public Mono upsertInvestmentPortfolios(InvestmentArrangement inve log.info("Upserting investment portfolio: externalId={}, name={}", externalId, arrangementName); return listExistingPortfolios(externalId) - .flatMap(p -> patchPortfolio(p, investmentArrangement, clientsByLeExternalId)) - .switchIfEmpty(Mono.defer(() -> createNewPortfolio(investmentArrangement, clientsByLeExternalId))) + .flatMap(p -> patchPortfolio(p, investmentArrangement, clientsByLeId)) + .switchIfEmpty(Mono.defer(() -> createNewPortfolio(investmentArrangement, clientsByLeId))) .doOnSuccess(portfolio -> log.info( "Successfully upserted investment portfolio: externalId={}, name={}, portfolioUuid={}", externalId, arrangementName, @@ -259,10 +260,10 @@ public Mono upsertInvestmentPortfolios(InvestmentArrangement inve private Mono patchPortfolio( PortfolioList existingProduct, InvestmentArrangement investmentArrangement, - Map> clientsByLeExternalId) { + Map> clientsByLeId) { String uuid = existingProduct.getUuid().toString(); - List associatedClients = getClients(investmentArrangement, clientsByLeExternalId); + List associatedClients = getClients(investmentArrangement, clientsByLeId); PatchedPortfolioUpdateRequest patchedPortfolioUpdateRequest = new PatchedPortfolioUpdateRequest() .product(investmentArrangement.getInvestmentProductId()) @@ -270,7 +271,7 @@ private Mono patchPortfolio( .name(investmentArrangement.getName()) .clients(associatedClients) .status(StatusA3dEnum.ACTIVE) - .activated(OffsetDateTime.now().minusMonths(config.getPortfolioActivationPastMonths())); + .activated(OffsetDateTime.now().minusMonths(config.getPortfolio().getActivationPastMonths())); log.debug("Attempting to patch existing portfolio: uuid={}, externalId={}", uuid, investmentArrangement.getExternalId()); @@ -341,12 +342,12 @@ private Mono listExistingPortfolios(String externalId) { * Creates a new investment portfolio with associated clients. * * @param investmentArrangement the arrangement containing portfolio details - * @param clientsByLeExternalId map to resolve client UUIDs from legal entity external IDs + * @param clientsByLeId map to resolve client UUIDs from legal entity IDs * @return Mono emitting the newly created portfolio */ private Mono createNewPortfolio(InvestmentArrangement investmentArrangement, - Map> clientsByLeExternalId) { - List associatedClients = getClients(investmentArrangement, clientsByLeExternalId); + Map> clientsByLeId) { + List associatedClients = getClients(investmentArrangement, clientsByLeId); log.info("Creating new investment portfolio: externalId={}, name={}, clientCount={}", investmentArrangement.getExternalId(), investmentArrangement.getName(), associatedClients.size()); @@ -357,9 +358,10 @@ private Mono createNewPortfolio(InvestmentArrangement investmentA .externalId(investmentArrangement.getExternalId()) .name(investmentArrangement.getName()) .clients(associatedClients) - .currency(Optional.ofNullable(investmentArrangement.getCurrency()).orElse(DEFAULT_CURRENCY)) + .currency(Optional.ofNullable(investmentArrangement.getCurrency()) + .orElse(config.getPortfolio().getDefaultCurrency())) .status(StatusA3dEnum.ACTIVE) - .activated(OffsetDateTime.now().minusMonths(config.getPortfolioActivationPastMonths())); + .activated(OffsetDateTime.now().minusMonths(config.getPortfolio().getActivationPastMonths())); return portfolioApi.createPortfolio(request, null, null, null) .doOnSuccess(created -> log.info( @@ -376,30 +378,19 @@ private Mono createNewPortfolio(InvestmentArrangement investmentA * using the provided lookup map. It filters out null values and ensures distinct results. * * @param investmentArrangement the arrangement containing legal entity external IDs - * @param clientsByLeExternalId map of legal entity external ID to client UUIDs + * @param clientsByLeId map of legal entity ID to client UUIDs * @return distinct list of client UUIDs associated with the arrangement's legal entities */ private static List getClients(InvestmentArrangement investmentArrangement, - Map> clientsByLeExternalId) { - return investmentArrangement.getLegalEntityExternalIds().stream() - .map(clientsByLeExternalId::get) + Map> clientsByLeId) { + return investmentArrangement.getLegalEntityIds().stream() + .map(clientsByLeId::get) .filter(Objects::nonNull) .flatMap(Collection::stream) .distinct() .toList(); } - private Mono listExistingPortfolioProducts(PortfolioProduct portfolioProduct, - InvestmentData investmentData) { - Integer modelPortfolioRiskLower = null; - ProductTypeEnum productType = portfolioProduct.getProductType(); - if (ProductTypeEnum.SELF_TRADING != productType) { - modelPortfolioRiskLower = portfolioProduct.getModelPortfolio().getRiskLevel(); - } - return Mono.justOrEmpty(investmentData.findPortfolioProduct(productType, modelPortfolioRiskLower)) - .switchIfEmpty(listExistingPortfolioProducts(productType, modelPortfolioRiskLower)); - } - private Mono listExistingPortfolioProducts(PortfolioProduct portfolioProduct) { Integer modelPortfolioRiskLower = null; ProductTypeEnum productType = portfolioProduct.getProductType(); @@ -410,7 +401,8 @@ private Mono listExistingPortfolioProducts(PortfolioProduct po } private Mono listExistingPortfolioProducts(ProductTypeEnum productType, Integer riskLevel) { - return productsApi.listPortfolioProducts(List.of(MODEL_PORTFOLIO_ALLOCATION_ASSET), null, null, + return productsApi.listPortfolioProducts(List.of(config.getAllocation().getModelPortfolioAllocationAsset()), + null, null, 1, null, null, riskLevel, null, null, "-model_portfolio__risk_level", List.of(productType.getValue())) .doOnSuccess(products -> log.debug( @@ -447,10 +439,9 @@ private Mono listExistingPortfolioProducts(ProductTypeEnum pro * Updates an existing portfolio product by patching it. Falls back to the original product if the patch operation * fails. * - * @param investmentArrangement the arrangement to update with product UUID - * @param existingProduct the existing product to update - * @param portfolioProduct the template product containing desired values - * @param investmentData + * @param existingProduct the existing product to update + * @param portfolioProduct the template product containing desired values + * @param investmentData the investment data context used to register the updated product * @return Mono emitting the updated product */ private Mono updateExistingPortfolioProduct(PortfolioProduct existingProduct, @@ -468,7 +459,8 @@ private Mono updateExistingPortfolioProduct(PortfolioProduct e log.debug("Attempting to patch existing portfolio product: uuid={}, productType={}", productUuid, portfolioProduct.getProductType()); - return productsApi.patchPortfolioProduct(productUuid.toString(), List.of(MODEL_PORTFOLIO_ALLOCATION_ASSET), + return productsApi.patchPortfolioProduct(productUuid.toString(), + List.of(config.getAllocation().getModelPortfolioAllocationAsset()), null, null, patch) .doOnSuccess(updated -> { log.info("Successfully patched existing investment product: uuid={}", updated.getUuid()); @@ -493,9 +485,8 @@ private Mono updateExistingPortfolioProduct(PortfolioProduct e /** * Creates a portfolio product with an optional model portfolio UUID. * - * @param investmentArrangement the arrangement to update with product UUID - * @param portfolioProduct the portfolio product template - * @param investmentData + * @param portfolioProduct the portfolio product template + * @param investmentData the investment data context used to register the created product * @return Mono emitting the newly created portfolio product */ private Mono createPortfolioProductWithModel(PortfolioProduct portfolioProduct, @@ -512,7 +503,8 @@ private Mono createPortfolioProductWithModel(PortfolioProduct .modelPortfolio(modelPortfolioUuid) .productType(portfolioProduct.getProductType()); - return productsApi.createPortfolioProduct(request, List.of(MODEL_PORTFOLIO_ALLOCATION_ASSET), null, null) + return productsApi.createPortfolioProduct(request, + List.of(config.getAllocation().getModelPortfolioAllocationAsset()), null, null) .retry(2) .retryWhen(reactor.util.retry.Retry.fixedDelay(1, java.time.Duration.ofSeconds(1))) .doOnSuccess(created -> { @@ -524,8 +516,9 @@ private Mono createPortfolioProductWithModel(PortfolioProduct .doOnError(throwable -> logPortfolioProductCreationError(productType, throwable)); } - public Mono upsertDeposits(PortfolioList portfolio) { - double defaultAmount = 10_000d; + public Mono upsertDeposits(InvestmentPortfolio investmentPortfolio) { + PortfolioList portfolio = investmentPortfolio.getPortfolio(); + double initAmount = investmentPortfolio.getInitialCashOrDefault(config.getDeposit().getDefaultAmount()); return paymentsApi.listDeposits(null, null, null, null, null, null, portfolio.getUuid(), null, null, null) .filter(Objects::nonNull) @@ -536,15 +529,15 @@ public Mono upsertDeposits(PortfolioList portfolio) { .filter(list -> !list.isEmpty())) .flatMap(deposits -> { double deposited = deposits.stream().mapToDouble(Deposit::getAmount).sum(); - double remaining = defaultAmount - deposited; + double remaining = initAmount - deposited; return remaining > 0 ? createDeposit(portfolio, remaining) : Mono.just(deposits.getLast()); }) - .switchIfEmpty(Mono.defer(() -> createDeposit(portfolio, defaultAmount))) + .switchIfEmpty(Mono.defer(() -> createDeposit(portfolio, initAmount))) .onErrorResume(ex -> Mono.just(new Deposit() .portfolio(portfolio.getUuid()) - .amount(defaultAmount) + .amount(initAmount) .completedAt(portfolio.getActivated().plusDays(2)) ) ); @@ -554,7 +547,7 @@ public Mono upsertDeposits(PortfolioList portfolio) { private Mono createDeposit(PortfolioList portfolio, double defaultAmount) { return paymentsApi.createDeposit(new DepositRequest() .portfolio(portfolio.getUuid()) - .provider("mock") + .provider(config.getDeposit().getProvider()) .reason(UUID.randomUUID().toString()) .status(Status08fEnum.COMPLETED) .transactedAt(portfolio.getActivated().plusDays(1)) @@ -581,8 +574,8 @@ private Mono createDeposit(PortfolioList portfolio, double defaultAmoun * Upserts portfolio trading accounts derived from the provided investment portfolio accounts. * *

    This method constructs {@link PortfolioTradingAccount} instances directly from - * {@link InvestmentPortfolioTradingAccount} data, resolving each account's portfolio UUID - * via the portfolio service before upserting. + * {@link InvestmentPortfolioTradingAccount} data, resolving each account's portfolio UUID via the portfolio service + * before upserting. * *

    Processing flow: *

      @@ -673,8 +666,8 @@ private Mono fetchPortfolioInternalId(String portfolioExternalId) { } /** - * Builds a {@link PortfolioTradingAccountRequest} directly from an {@link InvestmentPortfolioTradingAccount} - * and a resolved portfolio UUID, skipping the intermediate {@link PortfolioTradingAccount} domain object. + * Builds a {@link PortfolioTradingAccountRequest} directly from an {@link InvestmentPortfolioTradingAccount} and a + * resolved portfolio UUID, skipping the intermediate {@link PortfolioTradingAccount} domain object. * *

      Maps the following fields: *

        @@ -686,7 +679,7 @@ private Mono fetchPortfolioInternalId(String portfolioExternalId) { *
      * * @param investmentPortfolioTradingAccount the source account containing field data - * @param portfolioUuid the resolved internal portfolio UUID + * @param portfolioUuid the resolved internal portfolio UUID * @return the constructed {@link PortfolioTradingAccountRequest} */ private PortfolioTradingAccountRequest buildTradingAccountRequest( @@ -730,11 +723,10 @@ public Mono upsertPortfolioTradingAccount(PortfolioTrad * Patches an existing portfolio trading account with updated values. * *

      If the patch operation fails (e.g., due to validation errors or conflicts), - * falls back to returning the existing account to preserve data integrity - * and prevent batch failures. + * falls back to returning the existing account to preserve data integrity and prevent batch failures. * * @param existing the existing trading account to update - * @param request the request containing updated values + * @param request the request containing updated values * @return Mono emitting the updated trading account, or the existing account if patch fails */ private Mono patchExistingPortfolioTradingAccount( @@ -813,7 +805,7 @@ private Mono listExistingPortfolioTradingAccounts( *

    1. Returns an error for multiple results — indicates a data setup issue
    2. * * - * @param accounts the paginated list of trading accounts returned by the API + * @param accounts the paginated list of trading accounts returned by the API * @param externalAccountId the external account ID used in the search, for logging purposes * @return Mono emitting the single matching trading account, or empty if no results found * @throws IllegalStateException if multiple trading accounts are found with the same external account ID @@ -848,15 +840,15 @@ private Mono validateAndExtractPortfolioTradingAccount( * Logs errors occurring during portfolio trading account operations. * *

      Provides enhanced error context for {@link WebClientResponseException}, - * including HTTP status code and response body. For other exceptions, logs - * basic error information. + * including HTTP status code and response body. For other exceptions, logs basic error information. * - * @param operation a short description of the operation that failed (e.g., "PATCH", "CREATE") - * @param idLabel the label for the identifier (e.g., "uuid", "externalAccountId") - * @param idValue the value of the identifier - * @param throwable the exception that occurred + * @param operation a short description of the operation that failed (e.g., "PATCH", "CREATE") + * @param idLabel the label for the identifier (e.g., "uuid", "externalAccountId") + * @param idValue the value of the identifier + * @param throwable the exception that occurred */ - private void logPortfolioTradingAccountError(String operation, String idLabel, String idValue, Throwable throwable) { + private void logPortfolioTradingAccountError(String operation, String idLabel, String idValue, + Throwable throwable) { if (throwable instanceof WebClientResponseException ex) { log.warn("Portfolio trading account {} failed: {}={}, status={}, body={}", operation, idLabel, idValue, ex.getStatusCode(), ex.getResponseBodyAsString()); diff --git a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseService.java b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseService.java index b0f332c5f..ddc6e4e03 100644 --- a/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseService.java +++ b/stream-investment/investment-core/src/main/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseService.java @@ -1,12 +1,12 @@ package com.backbase.stream.investment.service.resttemplate; import com.backbase.investment.api.service.sync.ApiClient; -import com.backbase.investment.api.service.sync.v1.AssetUniverseApi; import com.backbase.investment.api.service.sync.v1.model.AssetCategory; import com.backbase.investment.api.service.sync.v1.model.AssetCategoryRequest; import com.backbase.investment.api.service.sync.v1.model.OASAssetRequestDataRequest; import com.backbase.investment.api.service.sync.v1.model.PatchedAssetCategoryRequest; import com.backbase.investment.api.service.v1.model.Asset; +import com.backbase.stream.configuration.InvestmentIngestProperties; import com.backbase.stream.investment.model.AssetCategoryEntry; import java.util.Collections; import java.util.HashMap; @@ -33,8 +33,8 @@ @RequiredArgsConstructor public class InvestmentRestAssetUniverseService { - private final AssetUniverseApi assetUniverseApi; private final ApiClient apiClient; + private final InvestmentIngestProperties ingestProperties; private final RestTemplateAssetMapper assetMapper = Mappers.getMapper(RestTemplateAssetMapper.class); public Mono createAsset(com.backbase.stream.investment.Asset asset, @@ -99,7 +99,7 @@ private com.backbase.investment.api.service.sync.v1.model.Asset createAsset(OASA if (data != null) { localVarFormParams.add("data", data); } - if (logo != null) { + if (ingestProperties.getAsset().isIngestImages() && logo != null) { localVarFormParams.add("logo", logo); } @@ -145,7 +145,7 @@ public com.backbase.investment.api.service.sync.v1.model.Asset patchAsset(String if (data != null) { localVarFormParams.add("data", data); } - if (logo != null) { + if (ingestProperties.getAsset().isIngestImages() && logo != null) { localVarFormParams.add("logo", logo); } @@ -197,7 +197,9 @@ public Mono setAssetCategoryLogo(UUID assetCategoryId, Resource logo) { final MultiValueMap localVarCookieParams = new LinkedMultiValueMap(); final MultiValueMap localVarFormParams = new LinkedMultiValueMap(); - localVarFormParams.add("image", logo); + if (ingestProperties.getAsset().isIngestImages()) { + localVarFormParams.add("image", logo); + } final String[] localVarAccepts = {"application/json"}; final List localVarAccept = apiClient.selectHeaderAccept(localVarAccepts); @@ -268,7 +270,9 @@ public Mono patchAssetCategory(UUID assetCategoryId, .ifPresent(v -> localVarFormParams.add("excerpt", v)); Optional.ofNullable(assetCategoryPatch.getDescription()) .ifPresent(v -> localVarFormParams.add("description", v)); - Optional.ofNullable(image).ifPresent(v -> localVarFormParams.add("image", v)); + if (ingestProperties.getAsset().isIngestImages()) { + Optional.ofNullable(image).ifPresent(v -> localVarFormParams.add("image", v)); + } final String[] localVarAccepts = { "application/json" @@ -331,7 +335,9 @@ public Mono createAssetCategory(AssetCategoryEntry assetCategoryE .ifPresent(v -> localVarFormParams.add("excerpt", v)); Optional.ofNullable(assetCategoryRequest.getDescription()) .ifPresent(v -> localVarFormParams.add("description", v)); - Optional.ofNullable(image).ifPresent(v -> localVarFormParams.add("image", v)); + if (ingestProperties.getAsset().isIngestImages()) { + Optional.ofNullable(image).ifPresent(v -> localVarFormParams.add("image", v)); + } final String[] localVarAccepts = { "application/json" @@ -352,6 +358,7 @@ public Mono createAssetCategory(AssetCategoryEntry assetCategoryE localVarAccept, localVarContentType, localVarAuthNames, localReturnType) .getBody()); }); + } } \ No newline at end of file diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSagaTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSagaTest.java index 9e79eb14d..52dc4fdf6 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSagaTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentAssetUniverseSagaTest.java @@ -10,6 +10,7 @@ import com.backbase.investment.api.service.v1.model.AssetTypeEnum; import com.backbase.investment.api.service.v1.model.GroupResult; +import com.backbase.stream.configuration.InvestmentIngestProperties; import com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties; import com.backbase.stream.investment.Asset; import com.backbase.stream.investment.AssetPrice; @@ -102,6 +103,9 @@ class InvestmentAssetUniverseSagaTest { @Mock private InvestmentIngestionConfigurationProperties configurationProperties; + @Mock + private InvestmentIngestProperties ingestProperties; + private InvestmentAssetUniverseSaga saga; /** @@ -119,7 +123,8 @@ void setUp() { investmentIntradayAssetPriceService, investmentCurrencyService, asyncTaskService, - configurationProperties + configurationProperties, + ingestProperties ); } diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentSagaTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentSagaTest.java index 73a4ca55b..065c579de 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentSagaTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/saga/InvestmentSagaTest.java @@ -15,6 +15,7 @@ import com.backbase.stream.investment.InvestmentData; import com.backbase.stream.investment.InvestmentTask; import com.backbase.stream.investment.ModelPortfolio; +import com.backbase.stream.investment.model.InvestmentPortfolio; import com.backbase.stream.investment.model.InvestmentPortfolioTradingAccount; import com.backbase.stream.investment.service.AsyncTaskService; import com.backbase.stream.investment.service.InvestmentClientService; @@ -41,13 +42,11 @@ */ class InvestmentSagaTest { - private static final String SA_NAME = "some-sa-name"; - private static final String SA_EXTERNAL_ID = "some-sa-external-id"; private static final String ARRANGEMENT_EXTERNAL_ID = "some-arrangement-id"; private static final String PORTFOLIO_EXTERNAL_ID = "some-portfolio-external-id"; private static final String ACCOUNT_ID = "some-account-id"; private static final String ACCOUNT_EXTERNAL_ID = "some-account-external-id"; - private static final String LE_EXTERNAL_ID = "some-le-external-id"; + private static final String LE_INTERNAL_ID = "some-le-internal-id"; @Mock private InvestmentClientService clientService; @@ -373,7 +372,7 @@ void upsertPortfolioTradingAccounts_error_marksTaskFailed() { when(investmentPortfolioService.upsertInvestmentProducts(any(), any())) .thenReturn(Mono.just(List.of(new PortfolioProduct()))); when(investmentPortfolioService.upsertPortfolios(any(), any())) - .thenReturn(Mono.just(List.of(new PortfolioList()))); + .thenReturn(Mono.just(List.of(InvestmentPortfolio.builder().build()))); when(investmentPortfolioService.upsertPortfolioTradingAccounts(any())) .thenReturn(Mono.error(new RuntimeException("Trading account upsert failure"))); @@ -414,7 +413,7 @@ void upsertDepositsAndAllocations_error_marksTaskFailed() { when(investmentPortfolioService.upsertInvestmentProducts(any(), any())) .thenReturn(Mono.just(List.of(new PortfolioProduct()))); when(investmentPortfolioService.upsertPortfolios(any(), any())) - .thenReturn(Mono.just(List.of(new PortfolioList()))); + .thenReturn(Mono.just(List.of(InvestmentPortfolio.builder().build()))); when(investmentPortfolioService.upsertPortfolioTradingAccounts(any())) .thenReturn(Mono.empty()); when(investmentPortfolioService.upsertDeposits(any())) @@ -436,8 +435,6 @@ void upsertDepositsAndAllocations_error_marksTaskFailed() { private InvestmentTask createMinimalTask() { return new InvestmentTask("minimal-task", InvestmentData.builder() - .saName(SA_NAME) - .saExternalId(SA_EXTERNAL_ID) .clientUsers(Collections.emptyList()) .investmentArrangements(Collections.emptyList()) .modelPortfolios(Collections.emptyList()) @@ -448,8 +445,6 @@ private InvestmentTask createMinimalTask() { private InvestmentTask createTaskWithModelPortfolios() { return new InvestmentTask("model-portfolio-task", InvestmentData.builder() - .saName(SA_NAME) - .saExternalId(SA_EXTERNAL_ID) .clientUsers(Collections.emptyList()) .investmentArrangements(Collections.emptyList()) .modelPortfolios(List.of(ModelPortfolio.builder() @@ -463,11 +458,9 @@ private InvestmentTask createTaskWithModelPortfolios() { private InvestmentTask createFullTask() { return new InvestmentTask("full-task", InvestmentData.builder() - .saName(SA_NAME) - .saExternalId(SA_EXTERNAL_ID) .clientUsers(List.of(ClientUser.builder() .investmentClientId(UUID.randomUUID()) - .legalEntityExternalId(LE_EXTERNAL_ID) + .legalEntityId(LE_INTERNAL_ID) .build())) .investmentArrangements(List.of(InvestmentArrangement.builder() .externalId(ARRANGEMENT_EXTERNAL_ID) @@ -483,7 +476,7 @@ private InvestmentTask createFullTask() { .isDefault(true) .isInternal(false) .build())) - .portfolios(List.of(new PortfolioList())) + .portfolios(List.of(InvestmentPortfolio.builder().portfolio(new PortfolioList()).build())) .build()); } @@ -495,7 +488,7 @@ private void stubAllServicesSuccess() { when(investmentPortfolioService.upsertInvestmentProducts(any(), any())) .thenReturn(Mono.just(List.of(new PortfolioProduct()))); when(investmentPortfolioService.upsertPortfolios(any(), any())) - .thenReturn(Mono.just(List.of(new PortfolioList()))); + .thenReturn(Mono.just(List.of(InvestmentPortfolio.builder().build()))); when(investmentPortfolioService.upsertPortfolioTradingAccounts(any())) .thenReturn(Mono.empty()); when(investmentPortfolioService.upsertDeposits(any())) diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationServiceTest.java index fc5b90e95..65c26aa1b 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationServiceTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioAllocationServiceTest.java @@ -27,10 +27,12 @@ import com.backbase.investment.api.service.v1.model.PortfolioList; import com.backbase.investment.api.service.v1.model.PortfolioProduct; import com.backbase.investment.api.service.v1.model.RelatedAssetSerializerWithAssetCategories; +import com.backbase.stream.configuration.InvestmentIngestProperties; import com.backbase.stream.investment.Allocation; import com.backbase.stream.investment.Asset; import com.backbase.stream.investment.InvestmentAssetData; import com.backbase.stream.investment.ModelAsset; +import com.backbase.stream.investment.model.InvestmentPortfolio; import java.time.LocalDate; import java.time.OffsetDateTime; import java.time.ZoneOffset; @@ -67,6 +69,7 @@ class InvestmentPortfolioAllocationServiceTest { private InvestmentApi investmentApi; private CustomIntegrationApiService customIntegrationApiService; private InvestmentPortfolioAllocationService service; + private InvestmentIngestProperties ingestProperties; @BeforeEach void setUp() { @@ -74,8 +77,9 @@ void setUp() { assetUniverseApi = mock(AssetUniverseApi.class); investmentApi = mock(InvestmentApi.class); customIntegrationApiService = mock(CustomIntegrationApiService.class); + ingestProperties = new InvestmentIngestProperties(); service = new InvestmentPortfolioAllocationService( - allocationsApi, assetUniverseApi, investmentApi, customIntegrationApiService); + allocationsApi, assetUniverseApi, investmentApi, customIntegrationApiService, ingestProperties); } // ========================================================================= @@ -371,7 +375,7 @@ void createDepositAllocation_nullCompletedAt_usesTodayAsValuationDate() { /** * Tests for - * {@link InvestmentPortfolioAllocationService#generateAllocations(PortfolioList, List, InvestmentAssetData)}. + * {@link InvestmentPortfolioAllocationService#generateAllocations(InvestmentPortfolio, List, InvestmentAssetData)}. * *

      Covers: *

        @@ -391,6 +395,7 @@ void generateAllocations_pipelineError_returnsEmptyMono() { // Arrange UUID portfolioUuid = UUID.randomUUID(); PortfolioList portfolio = buildPortfolioList(portfolioUuid); + InvestmentPortfolio investmentPortfolio = InvestmentPortfolio.builder().portfolio(portfolio).build(); when(portfolio.getProduct()).thenReturn(UUID.randomUUID()); when(portfolio.getActivated()).thenReturn(OffsetDateTime.now().minusMonths(6)); @@ -406,7 +411,7 @@ void generateAllocations_pipelineError_returnsEmptyMono() { .build(); // Act & Assert - StepVerifier.create(service.generateAllocations(portfolio, List.of(nonMatchingProduct), assetData)) + StepVerifier.create(service.generateAllocations(investmentPortfolio, List.of(nonMatchingProduct), assetData)) .verifyComplete(); } @@ -419,6 +424,7 @@ void generateAllocations_noExistingAllocations_createsNewAllocations() { UUID modelUuid = UUID.randomUUID(); PortfolioList portfolio = buildPortfolioList(portfolioUuid); + InvestmentPortfolio investmentPortfolio = InvestmentPortfolio.builder().portfolio(portfolio).build(); when(portfolio.getProduct()).thenReturn(productUuid); when(portfolio.getActivated()).thenReturn(OffsetDateTime.now().minusMonths(2)); @@ -466,7 +472,7 @@ void generateAllocations_noExistingAllocations_createsNewAllocations() { .build(); // Act & Assert — one allocation created per work day from priceDay to today - StepVerifier.create(service.generateAllocations(portfolio, List.of(portfolioProduct), assetData)) + StepVerifier.create(service.generateAllocations(investmentPortfolio, List.of(portfolioProduct), assetData)) .expectNextMatches(result -> !result.isEmpty()) .verifyComplete(); @@ -483,6 +489,7 @@ void generateAllocations_lastValuationIsToday_noPendingDays_returnsEmptyList() { UUID modelUuid = UUID.randomUUID(); PortfolioList portfolio = buildPortfolioList(portfolioUuid); + InvestmentPortfolio investmentPortfolio = InvestmentPortfolio.builder().portfolio(portfolio).build(); when(portfolio.getProduct()).thenReturn(productUuid); when(portfolio.getActivated()).thenReturn(OffsetDateTime.now().minusMonths(2)); @@ -523,7 +530,7 @@ void generateAllocations_lastValuationIsToday_noPendingDays_returnsEmptyList() { .build(); // Act & Assert - StepVerifier.create(service.generateAllocations(portfolio, List.of(portfolioProduct), assetData)) + StepVerifier.create(service.generateAllocations(investmentPortfolio, List.of(portfolioProduct), assetData)) .expectNextMatches(List::isEmpty) .verifyComplete(); @@ -538,6 +545,7 @@ void generateAllocations_noMatchingPortfolioProduct_fallsBackToDefaultModel() { UUID portfolioUuid = UUID.randomUUID(); PortfolioList portfolio = buildPortfolioList(portfolioUuid); + InvestmentPortfolio investmentPortfolio = InvestmentPortfolio.builder().portfolio(portfolio).build(); when(portfolio.getProduct()).thenReturn(UUID.randomUUID()); when(portfolio.getActivated()).thenReturn(OffsetDateTime.now().minusMonths(2)); @@ -597,7 +605,7 @@ void generateAllocations_noMatchingPortfolioProduct_fallsBackToDefaultModel() { .build(); // Act & Assert - StepVerifier.create(service.generateAllocations(portfolio, List.of(nonMatchingProduct), assetData)) + StepVerifier.create(service.generateAllocations(investmentPortfolio, List.of(nonMatchingProduct), assetData)) .expectNextMatches(result -> !result.isEmpty()) .verifyComplete(); diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java index 820538460..dada24cd2 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/InvestmentPortfolioServiceTest.java @@ -26,10 +26,11 @@ import com.backbase.investment.api.service.v1.model.PortfolioTradingAccountRequest; import com.backbase.investment.api.service.v1.model.ProductTypeEnum; import com.backbase.investment.api.service.v1.model.StatusA3dEnum; -import com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties; +import com.backbase.stream.configuration.InvestmentIngestProperties; import com.backbase.stream.investment.InvestmentArrangement; import com.backbase.stream.investment.InvestmentData; import com.backbase.stream.investment.ModelPortfolio; +import com.backbase.stream.investment.model.InvestmentPortfolio; import com.backbase.stream.investment.model.InvestmentPortfolioTradingAccount; import java.nio.charset.StandardCharsets; import java.time.OffsetDateTime; @@ -69,7 +70,7 @@ class InvestmentPortfolioServiceTest { private PortfolioApi portfolioApi; private PaymentsApi paymentsApi; private PortfolioTradingAccountsApi portfolioTradingAccountsApi; - private InvestmentIngestionConfigurationProperties config; + private InvestmentIngestProperties config; private InvestmentPortfolioService service; @BeforeEach @@ -78,8 +79,7 @@ void setUp() { portfolioApi = Mockito.mock(PortfolioApi.class); paymentsApi = Mockito.mock(PaymentsApi.class); portfolioTradingAccountsApi = Mockito.mock(PortfolioTradingAccountsApi.class); - config = Mockito.mock(InvestmentIngestionConfigurationProperties.class); - when(config.getPortfolioActivationPastMonths()).thenReturn(6); + config = new InvestmentIngestProperties(); service = new InvestmentPortfolioService( productsApi, portfolioApi, paymentsApi, portfolioTradingAccountsApi, config); } @@ -746,7 +746,7 @@ void upsertPortfolios_emptyArrangements_returnsEmptyList() { // ========================================================================= /** - * Tests for {@link InvestmentPortfolioService#upsertDeposits(PortfolioList)}. + * Tests for {@link InvestmentPortfolioService#upsertDeposits(InvestmentPortfolio)}. * *

        Covers: *

          @@ -768,6 +768,7 @@ void upsertDeposits_noExistingDeposits_createsDefaultDeposit() { UUID portfolioUuid = UUID.randomUUID(); PortfolioList portfolio = buildPortfolioList(portfolioUuid, "EXT-DEP-001", OffsetDateTime.now().minusMonths(6)); + InvestmentPortfolio investmentPortfolio = InvestmentPortfolio.builder().portfolio(portfolio).build(); when(paymentsApi.listDeposits(isNull(), isNull(), isNull(), isNull(), isNull(), isNull(), eq(portfolioUuid), isNull(), isNull(), isNull())) @@ -779,7 +780,7 @@ void upsertDeposits_noExistingDeposits_createsDefaultDeposit() { .thenReturn(Mono.just(created)); // Act & Assert - StepVerifier.create(service.upsertDeposits(portfolio)) + StepVerifier.create(service.upsertDeposits(investmentPortfolio)) .expectNextMatches(d -> Double.valueOf(10_000d).equals(d.getAmount())) .verifyComplete(); @@ -793,6 +794,7 @@ void upsertDeposits_existingDepositsLessThanDefault_topsUpRemainingAmount() { UUID portfolioUuid = UUID.randomUUID(); PortfolioList portfolio = buildPortfolioList(portfolioUuid, "EXT-DEP-002", OffsetDateTime.now().minusMonths(6)); + InvestmentPortfolio investmentPortfolio = InvestmentPortfolio.builder().portfolio(portfolio).build(); Deposit existingDeposit = Mockito.mock(Deposit.class); when(existingDeposit.getAmount()).thenReturn(4_000d); @@ -807,7 +809,7 @@ void upsertDeposits_existingDepositsLessThanDefault_topsUpRemainingAmount() { .thenReturn(Mono.just(topUpDeposit)); // Act & Assert - StepVerifier.create(service.upsertDeposits(portfolio)) + StepVerifier.create(service.upsertDeposits(investmentPortfolio)) .expectNextMatches(d -> Double.valueOf(6_000d).equals(d.getAmount())) .verifyComplete(); @@ -821,6 +823,7 @@ void upsertDeposits_existingDepositsEqualToDefault_doesNotCreateNewDeposit() { UUID portfolioUuid = UUID.randomUUID(); PortfolioList portfolio = buildPortfolioList(portfolioUuid, "EXT-DEP-003", OffsetDateTime.now().minusMonths(6)); + InvestmentPortfolio investmentPortfolio = InvestmentPortfolio.builder().portfolio(portfolio).build(); Deposit existingDeposit = Mockito.mock(Deposit.class); when(existingDeposit.getAmount()).thenReturn(10_000d); @@ -832,7 +835,7 @@ void upsertDeposits_existingDepositsEqualToDefault_doesNotCreateNewDeposit() { Deposit fallbackDeposit = Mockito.mock(Deposit.class); when(paymentsApi.createDeposit(any())).thenReturn(Mono.just(fallbackDeposit)); // Act & Assert - StepVerifier.create(service.upsertDeposits(portfolio)) + StepVerifier.create(service.upsertDeposits(investmentPortfolio)) .expectNextMatches(d -> Double.valueOf(10_000d).equals(d.getAmount())) .verifyComplete(); @@ -846,6 +849,7 @@ void upsertDeposits_nullDepositResultList_createsDefaultDeposit() { UUID portfolioUuid = UUID.randomUUID(); PortfolioList portfolio = buildPortfolioList(portfolioUuid, "EXT-DEP-NULL", OffsetDateTime.now().minusMonths(6)); + InvestmentPortfolio investmentPortfolio = InvestmentPortfolio.builder().portfolio(portfolio).build(); when(paymentsApi.listDeposits(isNull(), isNull(), isNull(), isNull(), isNull(), isNull(), eq(portfolioUuid), isNull(), isNull(), isNull())) @@ -857,7 +861,7 @@ void upsertDeposits_nullDepositResultList_createsDefaultDeposit() { .thenReturn(Mono.just(created)); // Act & Assert - StepVerifier.create(service.upsertDeposits(portfolio)) + StepVerifier.create(service.upsertDeposits(investmentPortfolio)) .expectNextMatches(d -> Double.valueOf(10_000d).equals(d.getAmount())) .verifyComplete(); @@ -871,13 +875,14 @@ void upsertDeposits_apiError_returnsFallbackDeposit() { UUID portfolioUuid = UUID.randomUUID(); OffsetDateTime activated = OffsetDateTime.now().minusMonths(6); PortfolioList portfolio = buildPortfolioList(portfolioUuid, "EXT-DEP-ERR", activated); + InvestmentPortfolio investmentPortfolio = InvestmentPortfolio.builder().portfolio(portfolio).build(); when(paymentsApi.listDeposits(isNull(), isNull(), isNull(), isNull(), isNull(), isNull(), eq(portfolioUuid), isNull(), isNull(), isNull())) .thenReturn(Mono.error(new RuntimeException("API unavailable"))); // Act & Assert - StepVerifier.create(service.upsertDeposits(portfolio)) + StepVerifier.create(service.upsertDeposits(investmentPortfolio)) .expectNextMatches(d -> portfolioUuid.equals(d.getPortfolio()) && d.getAmount() == 10_000d) .verifyComplete(); @@ -1167,7 +1172,7 @@ private InvestmentArrangement buildArrangement(String externalId, String name, when(arrangement.getExternalId()).thenReturn(externalId); when(arrangement.getName()).thenReturn(name); when(arrangement.getInvestmentProductId()).thenReturn(productId); - when(arrangement.getLegalEntityExternalIds()).thenReturn(List.of(legalEntityExternalId)); + when(arrangement.getLegalEntityIds()).thenReturn(List.of(legalEntityExternalId)); when(arrangement.getCurrency()).thenReturn("EUR"); when(arrangement.getInternalId()).thenReturn(null); return arrangement; @@ -1183,7 +1188,7 @@ private InvestmentArrangement buildArrangementWithProductType(String externalId, when(arrangement.getExternalId()).thenReturn(externalId); when(arrangement.getName()).thenReturn(name); when(arrangement.getProductTypeExternalId()).thenReturn(productTypeValue); - when(arrangement.getLegalEntityExternalIds()).thenReturn(List.of()); + when(arrangement.getLegalEntityIds()).thenReturn(List.of()); when(arrangement.getCurrency()).thenReturn("EUR"); when(arrangement.getInternalId()).thenReturn(null); return arrangement; diff --git a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseServiceTest.java b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseServiceTest.java index 57301c021..d38d4578e 100644 --- a/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseServiceTest.java +++ b/stream-investment/investment-core/src/test/java/com/backbase/stream/investment/service/resttemplate/InvestmentRestAssetUniverseServiceTest.java @@ -11,6 +11,7 @@ import com.backbase.investment.api.service.sync.ApiClient; import com.backbase.investment.api.service.sync.v1.AssetUniverseApi; import com.backbase.investment.api.service.sync.v1.model.AssetCategory; +import com.backbase.stream.configuration.InvestmentIngestProperties; import com.backbase.stream.investment.Asset; import com.backbase.stream.investment.model.AssetCategoryEntry; import java.util.Map; @@ -32,7 +33,7 @@ class InvestmentRestAssetUniverseServiceTest { @Mock - private AssetUniverseApi assetUniverseApi; + private InvestmentIngestProperties ingestProperties; @Mock private ApiClient apiClient; @@ -41,7 +42,7 @@ class InvestmentRestAssetUniverseServiceTest { @BeforeEach void setUp() { - service = new InvestmentRestAssetUniverseService(assetUniverseApi, apiClient); + service = new InvestmentRestAssetUniverseService(apiClient, ingestProperties); } // -----------------------------------------------------------------------