Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
import com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties;
import com.backbase.stream.investment.InvestmentContentTask;
import com.backbase.stream.investment.model.ContentDocumentEntry;
import com.backbase.stream.investment.service.InvestmentClientService;
import com.backbase.stream.investment.service.InvestmentPortfolioService;
import com.backbase.stream.investment.model.ContentTag;
import com.backbase.stream.investment.model.MarketNewsEntry;
import com.backbase.stream.investment.service.resttemplate.InvestmentRestDocumentContentService;
import com.backbase.stream.investment.service.resttemplate.InvestmentRestNewsContentService;
import com.backbase.stream.worker.StreamTaskExecutor;
import com.backbase.stream.worker.model.StreamTask;
import com.backbase.stream.worker.model.StreamTask.State;
import java.util.List;
import java.util.Objects;
Expand All @@ -17,29 +16,31 @@
import reactor.core.publisher.Mono;

/**
* Saga orchestrating the complete investment client ingestion workflow.
* Saga orchestrating the complete investment content ingestion workflow.
*
* <p>This saga implements a multi-step process for ingesting investment data:
* <p>This saga implements a multi-step process for ingesting investment content data:
* <ol>
* <li>Upsert investment clients - Creates or updates client records</li>
* <li>Upsert investment products - Creates or updates portfolio products</li>
* <li>Upsert investment portfolios - Creates or updates portfolios with client associations</li>
* <li>Upsert news tags - Creates or updates market news tags</li>
* <li>Upsert news content - Creates or updates market news entries</li>
* <li>Upsert document tags - Creates or updates content document tags</li>
* <li>Upsert content documents - Creates or updates content document entries</li>
* </ol>
*
* <p>The saga uses idempotent operations to ensure safe re-execution and writes progress
* to the {@link StreamTask} history for observability. Each step builds upon the previous
* step's results, creating a complete investment setup.
* to the {@link com.backbase.stream.worker.model.StreamTask} history for observability.
* Each step builds upon the previous step's results, creating a complete content setup.
*
* <p>Design notes:
* <ul>
* <li>All operations are idempotent (safe to retry)</li>
* <li>Progress is tracked via StreamTask state and history</li>
* <li>Failures are logged with complete context for debugging</li>
* <li>All reactive operations include proper success and error handlers</li>
* <li>Content ingestion can be disabled via {@link com.backbase.stream.configuration.InvestmentIngestionConfigurationProperties}</li>
* </ul>
*
* @see InvestmentClientService
* @see InvestmentPortfolioService
* @see InvestmentRestNewsContentService
* @see InvestmentRestDocumentContentService
* @see StreamTaskExecutor
*/
@Slf4j
Expand All @@ -48,8 +49,11 @@ public class InvestmentContentSaga implements StreamTaskExecutor<InvestmentConte

public static final String INVESTMENT = "investment-content";
public static final String OP_UPSERT = "upsert";
public static final String RESULT_UPSERTED = "upserted";
public static final String RESULT_FAILED = "failed";

private static final String PROCESSING_PREFIX = "Processing ";

private final InvestmentRestNewsContentService investmentRestNewsContentService;
private final InvestmentRestDocumentContentService investmentRestDocumentContentService;
private final InvestmentIngestionConfigurationProperties coreConfigurationProperties;
Expand All @@ -63,8 +67,6 @@ public Mono<InvestmentContentTask> executeTask(InvestmentContentTask streamTask)
}
log.info("Starting investment content saga execution: taskId={}, taskName={}",
streamTask.getId(), streamTask.getName());
log.info("Starting investment saga execution: taskId={}, taskName={}",
streamTask.getId(), streamTask.getName());
return upsertNewsTags(streamTask)
.flatMap(this::upsertNewsContent)
.flatMap(this::upsertDocumentTags)
Expand All @@ -84,27 +86,87 @@ public Mono<InvestmentContentTask> executeTask(InvestmentContentTask streamTask)
}

private Mono<InvestmentContentTask> upsertNewsContent(InvestmentContentTask investmentContentTask) {
List<MarketNewsEntry> marketNews = Objects.requireNonNullElse(investmentContentTask.getData().getMarketNews(), List.of());
investmentContentTask.info(INVESTMENT, OP_UPSERT, null, investmentContentTask.getName(),
investmentContentTask.getId(),
PROCESSING_PREFIX + marketNews.size() + " investment news content");
investmentContentTask.setState(State.IN_PROGRESS);
return investmentRestNewsContentService
.upsertContent(Objects.requireNonNullElse(investmentContentTask.getData().getMarketNews(), List.of()))
.upsertContent(marketNews)
.doOnSuccess(v -> {
investmentContentTask.info(INVESTMENT, OP_UPSERT, RESULT_UPSERTED, investmentContentTask.getName(),
investmentContentTask.getId(),
RESULT_UPSERTED + " " + marketNews.size() + " Investment News Content");
investmentContentTask.setState(State.COMPLETED);
})
.doOnError(throwable ->
investmentContentTask.error(INVESTMENT, OP_UPSERT, RESULT_FAILED, investmentContentTask.getName(),
investmentContentTask.getId(),
"Failed to upsert investment news content: " + throwable.getMessage()))
.thenReturn(investmentContentTask);
}

private Mono<InvestmentContentTask> upsertNewsTags(InvestmentContentTask investmentContentTask) {
List<ContentTag> newsTags = Objects.requireNonNullElse(investmentContentTask.getData().getMarketNewsTags(), List.of());
investmentContentTask.info(INVESTMENT, OP_UPSERT, null, investmentContentTask.getName(),
investmentContentTask.getId(),
PROCESSING_PREFIX + newsTags.size() + " investment news tags");
investmentContentTask.setState(State.IN_PROGRESS);
return investmentRestNewsContentService
.upsertTags(Objects.requireNonNullElse(investmentContentTask.getData().getMarketNewsTags(), List.of()))
.upsertTags(newsTags)
.doOnSuccess(v -> {
investmentContentTask.info(INVESTMENT, OP_UPSERT, RESULT_UPSERTED, investmentContentTask.getName(),
investmentContentTask.getId(),
RESULT_UPSERTED + " " + newsTags.size() + " Investment News Tags");
investmentContentTask.setState(State.COMPLETED);
})
.doOnError(throwable ->
investmentContentTask.error(INVESTMENT, OP_UPSERT, RESULT_FAILED, investmentContentTask.getName(),
investmentContentTask.getId(),
"Failed to upsert investment news tags: " + throwable.getMessage()))
.thenReturn(investmentContentTask);
}

private Mono<InvestmentContentTask> upsertDocumentTags(InvestmentContentTask investmentContentTask) {
List<ContentTag> documentTags = Objects.requireNonNullElse(investmentContentTask.getData().getDocumentTags(), List.of());
investmentContentTask.info(INVESTMENT, OP_UPSERT, null, investmentContentTask.getName(),
investmentContentTask.getId(),
PROCESSING_PREFIX + documentTags.size() + " investment document tags");
investmentContentTask.setState(State.IN_PROGRESS);
return investmentRestDocumentContentService
.upsertContentTags(Objects.requireNonNullElse(investmentContentTask.getData().getDocumentTags(), List.of()))
.upsertContentTags(documentTags)
.doOnSuccess(v -> {
investmentContentTask.info(INVESTMENT, OP_UPSERT, RESULT_UPSERTED, investmentContentTask.getName(),
investmentContentTask.getId(),
RESULT_UPSERTED + " " + documentTags.size() + " Investment Document Tags");
investmentContentTask.setState(State.COMPLETED);
})
.doOnError(throwable ->
investmentContentTask.error(INVESTMENT, OP_UPSERT, RESULT_FAILED, investmentContentTask.getName(),
investmentContentTask.getId(),
"Failed to upsert investment document tags: " + throwable.getMessage()))
.thenReturn(investmentContentTask);
}

private Mono<InvestmentContentTask> upsertContentDocuments(InvestmentContentTask investmentContentTask) {
List<ContentDocumentEntry> documents = investmentContentTask.getData().getDocuments();
List<ContentDocumentEntry> documents =
Objects.requireNonNullElse(investmentContentTask.getData().getDocuments(), List.of());
investmentContentTask.info(INVESTMENT, OP_UPSERT, null, investmentContentTask.getName(),
investmentContentTask.getId(),
PROCESSING_PREFIX + documents.size() + " investment content documents");
investmentContentTask.setState(State.IN_PROGRESS);
return investmentRestDocumentContentService
.upsertDocuments(Objects.requireNonNullElse(documents, List.of()))
.upsertDocuments(documents)
.doOnSuccess(v -> {
investmentContentTask.info(INVESTMENT, OP_UPSERT, RESULT_UPSERTED, investmentContentTask.getName(),
investmentContentTask.getId(),
RESULT_UPSERTED + " " + documents.size() + " Investment Content Documents");
investmentContentTask.setState(State.COMPLETED);
})
.doOnError(throwable ->
investmentContentTask.error(INVESTMENT, OP_UPSERT, RESULT_FAILED, investmentContentTask.getName(),
investmentContentTask.getId(),
"Failed to upsert investment content documents: " + throwable.getMessage()))
.thenReturn(investmentContentTask);
}

Expand All @@ -125,4 +187,3 @@ public Mono<InvestmentContentTask> rollBack(InvestmentContentTask streamTask) {
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public Mono<InvestmentTask> executeTask(InvestmentTask streamTask) {
.flatMap(this::upsertPortfolioTradingAccounts)
.flatMap(this::upsertInvestmentPortfolioDeposits)
.flatMap(this::upsertPortfoliosAllocations)
.doOnSuccess(completedTask -> log.info(
.doOnNext(completedTask -> log.info(
"Successfully completed investment saga: taskId={}, taskName={}, state={}",
completedTask.getId(), completedTask.getName(), completedTask.getState()))
.doOnError(throwable -> {
Expand Down Expand Up @@ -132,22 +132,21 @@ public Mono<InvestmentTask> rollBack(InvestmentTask streamTask) {
private Mono<InvestmentTask> upsertPortfoliosAllocations(InvestmentTask investmentTask) {
InvestmentData data = investmentTask.getData();
return asyncTaskService.checkPriceAsyncTasksFinished(data.getPriceAsyncTasks())
.then(Flux.fromIterable(Objects.requireNonNullElse(data.getPortfolios(), List.of()))
.thenMany(Flux.fromIterable(Objects.requireNonNullElse(data.getPortfolios(), List.of()))
.flatMap(
p -> investmentPortfolioAllocationService.generateAllocations(p,
data.getPortfolioProducts(),
investmentTask.getData().getInvestmentAssetData()))
.collectList()
.doOnError(throwable -> {
log.error("Allocation generation failed for portfolios:{} taskId={}",
data.getPortfolios().stream().map(PortfolioList::getUuid).toList(), investmentTask.getId(),
throwable);
investmentTask.error(INVESTMENT_PORTFOLIO_TRADING_ACCOUNTS, OP_UPSERT, RESULT_FAILED,
investmentTask.getName(), investmentTask.getId(),
"Failed to upsert investment portfolio trading accounts: " + throwable.getMessage());
})
.map(o -> investmentTask)
);
investmentTask.getData().getInvestmentAssetData())))
.collectList()
.doOnError(throwable -> {
log.error("Allocation generation failed for portfolios:{} taskId={}",
data.getPortfolios().stream().map(PortfolioList::getUuid).toList(), investmentTask.getId(),
throwable);
investmentTask.error(INVESTMENT_PORTFOLIO_TRADING_ACCOUNTS, OP_UPSERT, RESULT_FAILED,
investmentTask.getName(), investmentTask.getId(),
"Failed to upsert investment portfolio trading accounts: " + throwable.getMessage());
})
.map(o -> investmentTask);
}

/**
Expand Down Expand Up @@ -287,15 +286,14 @@ private Mono<InvestmentTask> upsertPortfolioTradingAccounts(InvestmentTask inves
investmentTask.getId(), PROCESSING_PREFIX + accountsCount + " investment portfolio trading accounts");

return investmentPortfolioService.upsertPortfolioTradingAccounts(investmentPortfolioTradingAccounts)
.map(products -> {
.doOnNext(products -> {
investmentTask.info(INVESTMENT_PORTFOLIO_TRADING_ACCOUNTS, OP_UPSERT, RESULT_CREATED,
investmentTask.getName(), investmentTask.getId(),
UPSERTED_PREFIX + products.size() + " investment portfolio trading accounts");
log.info("Successfully upserted all investment portfolio trading accounts: taskId={}, productCount={}",
investmentTask.getId(), products.size());

return investmentTask;
})
.thenReturn(investmentTask)
.doOnError(throwable -> {
log.error("Failed to upsert investment portfolio trading accounts: taskId={}, arrangementCount={}",
investmentTask.getId(), accountsCount, throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ private Mono<List<List<GroupResult>>> generatePrices(List<Asset> assets, Map<Str
null, null, null, null,
asset.getIsin(), daysOfPrices + 1, asset.getMarket(), null, null)
.filter(Objects::nonNull)
.map(PaginatedOASPriceList::getResults)
.filter(Objects::nonNull)
// Using mapNotNull to handle null values directly instead of map(...).filter(Objects::nonNull).
// This avoids NullPointerException since Reactor's map() operator does not allow null values.
// mapNotNull drops elements where getResults() is null, ensuring the pipeline completes normally.
.mapNotNull(PaginatedOASPriceList::getResults)
.flatMap(prices -> {
RandomPriceParam priceParam = findPrice(priceByAsset, asset, getLastPrice(prices));
LocalDate lastDate =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ public Mono<Market> upsertMarket(MarketRequest marketRequest) {
});
})
// If Mono is empty (market not found), create the market
.switchIfEmpty(assetUniverseApi.createMarket(marketRequest)
.switchIfEmpty(Mono.defer(() -> assetUniverseApi.createMarket(marketRequest)
.doOnSuccess(createdMarket -> log.info("Created market: {}", createdMarket))
.doOnError(error -> log.error("Error creating market: {}", error.getMessage(), error))
);
));
}

/**
Expand Down Expand Up @@ -108,7 +108,7 @@ public Mono<com.backbase.stream.investment.Asset> getOrCreateAsset(com.backbase.
})
.map(assetMapper::map)
// If Mono is empty (asset not found), create the asset
.switchIfEmpty(investmentRestAssetUniverseService.createAsset(asset, categoryIdByCode)
.switchIfEmpty(Mono.defer(() -> investmentRestAssetUniverseService.createAsset(asset, categoryIdByCode)
.doOnSuccess(createdAsset -> log.info("Created asset with assetIdentifier: {}", assetIdentifier))
.doOnError(error -> {
if (error instanceof WebClientResponseException w) {
Expand All @@ -119,7 +119,7 @@ public Mono<com.backbase.stream.investment.Asset> getOrCreateAsset(com.backbase.
error.getMessage(), error);
}
})
);
));
}

/**
Expand Down Expand Up @@ -171,7 +171,7 @@ public Mono<MarketSpecialDay> upsertMarketSpecialDay(MarketSpecialDayRequest mar
}
})
// If Mono is empty (market special day not found), create the market special day
.switchIfEmpty(assetUniverseApi.createMarketSpecialDay(marketSpecialDayRequest)
.switchIfEmpty(Mono.defer(() -> assetUniverseApi.createMarketSpecialDay(marketSpecialDayRequest)
.doOnSuccess(
createdMarketSpecialDay -> log.info("Created market special day: {}", createdMarketSpecialDay))
.doOnError(error -> {
Expand All @@ -184,7 +184,7 @@ public Mono<MarketSpecialDay> upsertMarketSpecialDay(MarketSpecialDayRequest mar
}

})
);
));
}

public Flux<com.backbase.stream.investment.Asset> createAssets(List<com.backbase.stream.investment.Asset> assets) {
Expand Down Expand Up @@ -236,8 +236,8 @@ public Mono<AssetCategory> upsertAssetCategory(AssetCategoryEntry assetCategoryE
return Mono.empty();
})
.switchIfEmpty(
investmentRestAssetUniverseService
.createAssetCategory(assetCategoryEntry, assetCategoryEntry.getImageResource())
Mono.defer(() -> investmentRestAssetUniverseService
.createAssetCategory(assetCategoryEntry, assetCategoryEntry.getImageResource()))
)
.doOnSuccess(updatedCategory -> {
assetCategoryEntry.setUuid(updatedCategory.getUuid());
Expand Down Expand Up @@ -304,7 +304,7 @@ public Mono<AssetCategoryType> upsertAssetCategoryType(AssetCategoryTypeRequest
}
})
.switchIfEmpty(
assetUniverseApi.createAssetCategoryType(assetCategoryTypeRequest)
Mono.defer(() -> assetUniverseApi.createAssetCategoryType(assetCategoryTypeRequest)
.doOnSuccess(createdType -> log.info("Created asset category type: {}", createdType))
.doOnError(error -> {
if (error instanceof WebClientResponseException w) {
Expand All @@ -316,7 +316,7 @@ public Mono<AssetCategoryType> upsertAssetCategoryType(AssetCategoryTypeRequest
assetCategoryTypeRequest.getCode(),
error.getMessage(), error);
}
})
}))
);
}
}
Loading
Loading