Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,33 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.text.DateFormat;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;

Check warning on line 28 in stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentClientConfig.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused import 'org.springframework.http.client.reactive.ReactorClientHttpConnector'.

See more on https://sonarcloud.io/project/issues?id=com.backbase.stream%3Astream-services&issues=AZ0GSWrY9RflQ3Y3aTWe&open=AZ0GSWrY9RflQ3Y3aTWe&pullRequest=586
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.Builder;
import reactor.netty.http.client.HttpClient;

/**
* Configuration for Investment service REST client (ClientApi).
*
* <p>This configuration creates the Investment API client with proper codec configuration.
*
* <p><strong>Note:</strong> Connection pooling, timeouts, and rate limiting are configured in
* {@link InvestmentWebClientConfiguration} which should be imported alongside this class.
* The WebClient connection pool prevents resource exhaustion and 503 errors by limiting:
* <ul>
* <li>Maximum concurrent connections to 100 (configurable)</li>
* <li>Connection acquisition timeout to 45 seconds</li>
* <li>Read/Write timeouts to 30 seconds each</li>
* </ul>
*/
@Configuration
@ConditionalOnBean(InvestmentServiceConfiguration.class)
Expand All @@ -48,7 +62,9 @@
*/
@Bean
@ConditionalOnMissingBean
public ApiClient investmentApiClient(WebClient interServiceWebClient, ObjectMapper objectMapper,
public ApiClient investmentApiClient(WebClient interServiceWebClient,
@Qualifier("investmentHttpClient") HttpClient investmentHttpClient,
ObjectMapper objectMapper,
DateFormat dateFormat) {
ObjectMapper mapper = objectMapper.copy();
mapper.setSerializationInclusion(Include.NON_EMPTY);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package com.backbase.stream.configuration;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
* Fine-grained configuration properties for {@code InvestmentPortfolioService}.
*
* <p>Properties are grouped by the model they govern so operators can reason about each
* concern independently without touching unrelated settings.
*
* <p>All values can be overridden via {@code application.yml} / {@code application.properties}
* using the prefix {@code backbase.bootstrap.ingestions.investment.service}.
*
* <p>Ingestion-flow feature flags ({@code contentEnabled}, {@code assetUniverseEnabled}, etc.)
* live in the sibling class {@link InvestmentIngestionConfigurationProperties}.
*
* <p>Example:
* <pre>
* backbase:
* bootstrap:
* ingestions:
* investment:
* service:
* portfolio:
* default-currency: USD
* activation-past-months: 3
* allocation:
* model-portfolio-allocation-asset: "model_portfolio.allocation.asset"
* deposit:
* provider: real-bank
* default-amount: 25000.0
* </pre>
*/
@Data
@ConfigurationProperties(prefix = "backbase.bootstrap.ingestions.investment.service")
public class InvestmentIngestProperties {

public static final double DEFAULT_INIT_CASH = 10_000d;

private PortfolioConfig portfolio = new PortfolioConfig();
private AllocationConfig allocation = new AllocationConfig();
private DepositConfig deposit = new DepositConfig();
private AssetConfig asset = new AssetConfig();

// -------------------------------------------------------------------------
// Portfolio
// -------------------------------------------------------------------------

/**
* Settings that govern how individual investment portfolios are created and activated.
*/
@Data
public static class PortfolioConfig {

/**
* ISO 4217 currency code applied to new portfolios when none is specified in the source data.
*/
private String defaultCurrency = "EUR";

/**
* How many months into the past the portfolio's {@code activated} timestamp is set. A value of {@code 1} means
* the portfolio is considered to have been activated 1 month ago.
*/
private int activationPastMonths = 1;
}

// -------------------------------------------------------------------------
// Allocation
// -------------------------------------------------------------------------

/**
* Settings that govern portfolio-product allocation behaviour.
*/
@Data
public static class AllocationConfig {

/**
* The allocation-asset expansion key sent when listing or managing portfolio products. Changing this value
* allows switching between different allocation-asset model definitions without a code change.
*/
private String modelPortfolioAllocationAsset = "model_portfolio.allocation.asset";
private int allocationConcurrency = 5;

private double defaultAmount = DEFAULT_INIT_CASH;
}

// -------------------------------------------------------------------------
// Deposit
// -------------------------------------------------------------------------

/**
* Settings that govern the automatic seed deposit created for new portfolios.
*/
@Data
public static class DepositConfig {

/**
* The payment provider identifier sent with every deposit request. Set this to the real provider name for
* non-mock environments.
*/
private String provider = null;

/**
* The monetary amount used as the initial seed deposit when no previous deposit exists or when the current
* deposited total is below this threshold.
*/
private double defaultAmount = DEFAULT_INIT_CASH;
}

// -------------------------------------------------------------------------
// Deposit
// -------------------------------------------------------------------------

/**
* Settings that govern the automatic seed deposit created for new portfolios.
*/
@Data
public static class AssetConfig {

private boolean ingestImages = true;
private int marketConcurrency = 5;
private int marketSpecialDayConcurrency = 5;
private int assetCategoryConcurrency = 5;
private int assetCategoryTypeConcurrency = 5;

}

}

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

/**
* Configuration properties governing investment client ingestion behavior.
*
* <p>Controls which high-level ingestion flows are enabled. Service-level tuning
* (portfolio currencies, deposit provider, allocation assets, etc.) lives in
* {@link InvestmentIngestProperties}.
*/
@Data
@ConditionalOnBean(InvestmentServiceConfiguration.class)
Expand All @@ -16,7 +20,6 @@ public class InvestmentIngestionConfigurationProperties {
private boolean contentEnabled = true;
private boolean assetUniverseEnabled = true;
private boolean wealthEnabled = true;
private int portfolioActivationPastMonths = 1;


}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ public InvestmentRestDocumentContentService investmentRestContentDocumentService
}

@Bean
public InvestmentRestAssetUniverseService investmentRestAssetUniverseService(AssetUniverseApi assetUniverseApi,
com.backbase.investment.api.service.sync.ApiClient restInvestmentApiClient) {
return new InvestmentRestAssetUniverseService(assetUniverseApi, restInvestmentApiClient);
public InvestmentRestAssetUniverseService investmentRestAssetUniverseService(
com.backbase.investment.api.service.sync.ApiClient restInvestmentApiClient,
InvestmentIngestProperties portfolioProperties) {
return new InvestmentRestAssetUniverseService(restInvestmentApiClient, portfolioProperties);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
InvestmentClientConfig.class
})
@EnableConfigurationProperties({
InvestmentIngestionConfigurationProperties.class
InvestmentIngestionConfigurationProperties.class,
InvestmentIngestProperties.class
})
@RequiredArgsConstructor
@Configuration
Expand All @@ -55,15 +56,15 @@

@Bean
public CustomIntegrationApiService customIntegrationApiService(ApiClient apiClient) {
return new CustomIntegrationApiService(apiClient);

Check warning on line 59 in stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this call to a deprecated class, it has been marked for removal.

See more on https://sonarcloud.io/project/issues?id=com.backbase.stream%3Astream-services&issues=AZ0KtBtvOOAe5IMiPmTJ&open=AZ0KtBtvOOAe5IMiPmTJ&pullRequest=586
}

@Bean
public InvestmentPortfolioService investmentPortfolioService(PortfolioApi portfolioApi,
InvestmentProductsApi investmentProductsApi, PaymentsApi paymentsApi, PortfolioTradingAccountsApi portfolioTradingAccountsApi,
InvestmentIngestionConfigurationProperties configurationProperties) {
InvestmentIngestProperties portfolioProperties) {
return new InvestmentPortfolioService(investmentProductsApi, portfolioApi, paymentsApi,
portfolioTradingAccountsApi, configurationProperties);
portfolioTradingAccountsApi, portfolioProperties);
}

@Bean
Expand Down Expand Up @@ -96,9 +97,9 @@
@Bean
public InvestmentPortfolioAllocationService investmentPortfolioAllocationService(AllocationsApi allocationsApi,
AssetUniverseApi assetUniverseApi, InvestmentApi investmentApi,
CustomIntegrationApiService customIntegrationApiService) {
CustomIntegrationApiService customIntegrationApiService, InvestmentIngestProperties portfolioProperties) {

Check warning on line 100 in stream-investment/investment-core/src/main/java/com/backbase/stream/configuration/InvestmentServiceConfiguration.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this call to a deprecated class, it has been marked for removal.

See more on https://sonarcloud.io/project/issues?id=com.backbase.stream%3Astream-services&issues=AZ0LPPWIuA2AjTxdwwOY&open=AZ0LPPWIuA2AjTxdwwOY&pullRequest=586
return new InvestmentPortfolioAllocationService(allocationsApi, assetUniverseApi, investmentApi,
customIntegrationApiService);
customIntegrationApiService, portfolioProperties);
}

@Bean
Expand All @@ -124,10 +125,11 @@
InvestmentIntradayAssetPriceService investmentIntradayAssetPriceService,
InvestmentCurrencyService investmentCurrencyService,
AsyncTaskService asyncTaskService,
InvestmentIngestionConfigurationProperties coreConfigurationProperties) {
InvestmentIngestionConfigurationProperties coreConfigurationProperties,
InvestmentIngestProperties portfolioProperties) {
return new InvestmentAssetUniverseSaga(investmentAssetUniverseService, investmentAssetPriceService,
investmentIntradayAssetPriceService, investmentCurrencyService, asyncTaskService,
coreConfigurationProperties);
coreConfigurationProperties, portfolioProperties);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.backbase.stream.configuration;

import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;

/**
* WebClient configuration for Investment service communication.
*
* <p>Provides optimized connection pooling, timeouts, and rate limiting to prevent:
* <ul>
* <li>Resource exhaustion from too many concurrent connections</li>
* <li>503 Service Unavailable errors due to overwhelming the service</li>
* <li>Indefinite hangs from missing timeouts</li>
* </ul>
*
* <p>All tunable values are externalized via {@link InvestmentWebClientProperties} and can be
* overridden through {@code application.yml} without recompiling.
*
* <p>All beans are explicitly named so they are unambiguous and never accidentally
* replaced by Spring Boot auto-configuration or another generic bean of the same type.
*/
@Slf4j
@Configuration
@EnableConfigurationProperties(InvestmentWebClientProperties.class)
public class InvestmentWebClientConfiguration {

/**
* Dedicated {@link ConnectionProvider} for the Investment service client pool.
*
* <p>Using an explicit named bean (rather than {@code @ConditionalOnMissingBean ReactorResourceFactory})
* avoids silently falling back to Spring Boot's auto-configured shared pool when one already exists
* in the application context.
*
* @param props investment HTTP-client configuration properties
* @return investment-specific ConnectionProvider
*/
@Bean("investmentConnectionProvider")
public ConnectionProvider investmentConnectionProvider(InvestmentWebClientProperties props) {
ConnectionProvider provider = ConnectionProvider.builder("investment-client-pool")
.maxConnections(props.getMaxConnections())
.maxIdleTime(Duration.ofMinutes(props.getMaxIdleTimeMinutes()))
.maxLifeTime(Duration.ofMinutes(props.getMaxLifeTimeMinutes()))
.pendingAcquireMaxCount(props.getMaxPendingAcquires())
.pendingAcquireTimeout(Duration.ofMillis(props.getPendingAcquireTimeoutMillis()))
.evictInBackground(Duration.ofSeconds(props.getEvictInBackgroundSeconds()))
.build();

log.info("Configured investment ConnectionProvider: maxConnections={}, maxIdleTime={}min,"
+ " pendingAcquireMaxCount={}",
props.getMaxConnections(), props.getMaxIdleTimeMinutes(), props.getMaxPendingAcquires());

return provider;
}

/**
* Creates an {@link HttpClient} backed by the investment-specific connection pool and
* with explicit connect / read / write timeouts.
*
* <p>This client ensures:
* <ul>
* <li>Connection timeout prevents hanging on unresponsive servers</li>
* <li>Read/Write timeouts prevent indefinite hangs</li>
* <li>TCP_NODELAY enables immediate sending of small packets</li>
* <li>SO_KEEPALIVE maintains connection health for idle connections</li>
* </ul>
*
* @param connectionProvider the investment-specific connection provider
* @param props investment HTTP-client configuration properties
* @return configured HttpClient
*/
@Bean("investmentHttpClient")
public HttpClient investmentHttpClient(
@Qualifier("investmentConnectionProvider") ConnectionProvider connectionProvider,
InvestmentWebClientProperties props) {
return HttpClient.create(connectionProvider)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, props.getConnectTimeoutSeconds() * 1000)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.responseTimeout(Duration.ofSeconds(props.getReadTimeoutSeconds()))
.doOnConnected(connection -> connection
.addHandlerLast(new ReadTimeoutHandler(props.getReadTimeoutSeconds(), TimeUnit.SECONDS))
.addHandlerLast(new WriteTimeoutHandler(props.getWriteTimeoutSeconds(), TimeUnit.SECONDS)));
}

}
Loading
Loading