diff --git a/bundles/remote_services/README.md b/bundles/remote_services/README.md index bc090a9c7..7bbd83540 100644 --- a/bundles/remote_services/README.md +++ b/bundles/remote_services/README.md @@ -27,9 +27,9 @@ The Remote Service Admin Service subproject contains an adapted implementation o The topology manager decides which services should be imported and exported according to a defined policy. Currently, only one policy is implemented in Celix, the *promiscuous* policy, which simply imports and exports all services. -| **Bundle** | `Celix::rsa_topology_manager` | -|--|-----------------------------------------| -| **Configuration** | *None* | +| **Bundle** | `Celix::rsa_topology_manager` | +|--|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **Configuration** | `CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS`: defines the ranking offsets for imported services. The value of this property is a comma-separated list of `=`. The `` is the value of the `service.exported.configs` property of the service registration, and the `` is an integer that is added to the ranking of the imported service. This allows you to influence the ranking of imported services based on their configuration type. For example, if you want to give a higher ranking to remote services with configuration type "shm" than remote services with configuration type "http", you can set this property to `http=-2,shm=-1`. | ### Remote Service Admin diff --git a/bundles/remote_services/topology_manager/CMakeLists.txt b/bundles/remote_services/topology_manager/CMakeLists.txt index ca742341e..6e43433f8 100644 --- a/bundles/remote_services/topology_manager/CMakeLists.txt +++ b/bundles/remote_services/topology_manager/CMakeLists.txt @@ -22,7 +22,7 @@ add_celix_bundle(rsa_topology_manager src/topology_manager.c src/scope.c src/activator.c - VERSION 0.9.0 + VERSION 0.10.0 SYMBOLIC_NAME "apache_celix_rs_topology_manager" GROUP "Celix/RSA" NAME "Apache Celix RS Topology Manager" diff --git a/bundles/remote_services/topology_manager/gtest/CMakeLists.txt b/bundles/remote_services/topology_manager/gtest/CMakeLists.txt index f063854e6..ec77f6abc 100644 --- a/bundles/remote_services/topology_manager/gtest/CMakeLists.txt +++ b/bundles/remote_services/topology_manager/gtest/CMakeLists.txt @@ -49,12 +49,14 @@ if (EI_TESTS) Celix::framework Celix::threads_ei # Celix::bundle_ctx_ei -# Celix::string_hash_map_ei + Celix::string_hash_map_ei Celix::long_hash_map_ei Celix::array_list_ei Celix::properties_ei Celix::utils_ei Celix::malloc_ei + Celix::filter_ei + Celix::asprintf_ei GTest::gtest GTest::gtest_main ) diff --git a/bundles/remote_services/topology_manager/gtest/src/TopologyManagerErrorInjectionTestSuite.cc b/bundles/remote_services/topology_manager/gtest/src/TopologyManagerErrorInjectionTestSuite.cc index 1faddaec2..f1bd8b924 100644 --- a/bundles/remote_services/topology_manager/gtest/src/TopologyManagerErrorInjectionTestSuite.cc +++ b/bundles/remote_services/topology_manager/gtest/src/TopologyManagerErrorInjectionTestSuite.cc @@ -26,6 +26,9 @@ #include "celix_properties_ei.h" #include "celix_utils_ei.h" #include "celix_threads_ei.h" +#include "celix_string_hash_map_ei.h" +#include "celix_filter_ei.h" +#include "asprintf_ei.h" class TopologyManagerCreatingErrorInjectionTestSuite : public ::testing::Test { public: @@ -45,6 +48,10 @@ class TopologyManagerCreatingErrorInjectionTestSuite : public ::testing::Test { celix_ei_expect_calloc(nullptr, 0, nullptr); celix_ei_expect_celixThreadMutex_create(nullptr, 0, 0); celix_ei_expect_celix_longHashMap_create(nullptr, 0, nullptr); + celix_ei_expect_celix_stringHashMap_create(nullptr, 0, nullptr); + celix_ei_expect_celix_stringHashMap_putLong(nullptr, 1, 0); + celix_ei_expect_celix_utils_strdup(nullptr, 0, nullptr); + celix_ei_expect_celix_stringHashMap_createWithOptions(nullptr, 0, nullptr); } std::shared_ptr fw{}; @@ -60,6 +67,42 @@ TEST_F(TopologyManagerCreatingErrorInjectionTestSuite, AllocingMemoryErrorTest) EXPECT_EQ(CELIX_ENOMEM, status); } +TEST_F(TopologyManagerCreatingErrorInjectionTestSuite, CreatingImportedServiceRankingOffsetMapErrorTest) { + celix_ei_expect_celix_stringHashMap_create((void*)topologyManager_create, 0, nullptr); + void *scope{}; + topology_manager_t *tmPtr{}; + auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); + EXPECT_EQ(CELIX_ENOMEM, status); +} + +TEST_F(TopologyManagerCreatingErrorInjectionTestSuite, ImportedServiceRankingOffsetStringDuplicationErrorTest) { + setenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS", "configType1=10,configType2=20", 1); + celix_ei_expect_celix_utils_strdup((void*)topologyManager_create, 1, nullptr); + void *scope{}; + topology_manager_t *tmPtr{}; + auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); + EXPECT_EQ(CELIX_ENOMEM, status); + unsetenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS"); +} + +TEST_F(TopologyManagerCreatingErrorInjectionTestSuite, AddingImportedServiceRankingOffsetErrorTest) { + setenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS", "configType1=10,configType2=20", 1); + celix_ei_expect_celix_stringHashMap_putLong((void*)topologyManager_create, 1, ENOMEM); + void *scope{}; + topology_manager_t *tmPtr{}; + auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); + EXPECT_EQ(ENOMEM, status); + unsetenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS"); +} + +TEST_F(TopologyManagerCreatingErrorInjectionTestSuite, CreatingImportedServiceMapErrorTest) { + celix_ei_expect_celix_stringHashMap_createWithOptions((void*)topologyManager_create, 0, nullptr); + void *scope{}; + topology_manager_t *tmPtr{}; + auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); + EXPECT_EQ(CELIX_ENOMEM, status); +} + TEST_F(TopologyManagerCreatingErrorInjectionTestSuite, CreatingMutexErrorTest) { celix_ei_expect_celixThreadMutex_create((void*)topologyManager_create, 0, CELIX_ENOMEM); void *scope{}; @@ -121,6 +164,12 @@ class TopologyManagerErrorInjectionTestSuite : public TopologyManagerTestSuiteBa celix_ei_expect_celix_properties_set(nullptr, 0, 0); celix_ei_expect_celix_utils_strdup(nullptr, 0, nullptr); celix_ei_expect_celix_arrayList_add(nullptr, 0, 0); + celix_ei_expect_celix_filter_create(nullptr, 0, nullptr); + celix_ei_expect_celix_properties_getAsStringArrayList(nullptr, 0, 0); + celix_ei_expect_celix_properties_setLong(nullptr, 0, 0); + celix_ei_expect_asprintf(nullptr, 0, 0); + celix_ei_expect_celix_arrayList_createWithOptions(nullptr, 0, nullptr); + celix_ei_expect_celix_stringHashMap_put(nullptr, 0, 0); } void TestExportServiceFailure(void (*errorInject)(void)) { @@ -263,5 +312,118 @@ TEST_F(TopologyManagerErrorInjectionTestSuite, AddDynamicIpEndpointToListErrorTe }); } +TEST_F(TopologyManagerErrorInjectionTestSuite, CreatingFilterErrorWhenImportScopeChangedTest) { + celix_ei_expect_celix_filter_create(CELIX_EI_UNKNOWN_CALLER, 0, nullptr, 2); + auto status = tm_addImportScope(tms.get(), (char*)"(service.imported.configs=tm_test_config_type)"); + EXPECT_EQ(CELIX_ILLEGAL_ARGUMENT, status); +} + +TEST_F(TopologyManagerErrorInjectionTestSuite, AllocingMemoryForImportedServiceEntryErrorTest) { + TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) { + celix_ei_expect_calloc(CELIX_EI_UNKNOWN_CALLER, 0, nullptr); + auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_ENOMEM, status); + + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + }); +} + +TEST_F(TopologyManagerErrorInjectionTestSuite, GettingServiceImportedConfigsErrorTest) { + TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) { + celix_ei_expect_celix_properties_getAsStringArrayList(CELIX_EI_UNKNOWN_CALLER, 0, ENOMEM); + auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(ENOMEM, status); + + celix_properties_unset(importEndpoint->properties, CELIX_RSA_SERVICE_IMPORTED_CONFIGS); + status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_ENOMEM, status); + }); +} + +TEST_F(TopologyManagerErrorInjectionTestSuite, CopyImportedServicePropertiesErrorTest) { + TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) { + celix_ei_expect_celix_properties_copy(CELIX_EI_UNKNOWN_CALLER, 0, nullptr);//called in endpointDescription_clone + auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(ENOMEM, status); + }); +} + +TEST_F(TopologyManagerErrorInjectionTestSuite, SettingImportedServiceRankingErrorTest) { + setenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS", "tm_test_config_type=10", 1); + tm.reset(); + void* scope = nullptr; + topology_manager_t* tmPtr{}; + auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); + EXPECT_EQ(status, CELIX_SUCCESS); + tm = std::shared_ptr{tmPtr, [](auto t) {topologyManager_destroy(t);}}; + + TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) { + celix_ei_expect_celix_properties_setLong(CELIX_EI_UNKNOWN_CALLER, 0, ENOMEM);//called in endpointDescription_clone + auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(ENOMEM, status); + }); + + unsetenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS"); +} + + +TEST_F(TopologyManagerErrorInjectionTestSuite, CreatingImportsMapFailureTest) { + TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) { + celix_ei_expect_celix_longHashMap_create(CELIX_EI_UNKNOWN_CALLER, 0, nullptr); + auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(ENOMEM, status); + + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + }); +} + +TEST_F(TopologyManagerErrorInjectionTestSuite, AddingImportedServiceToMapFailureTest) { + TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) { + celix_ei_expect_celix_stringHashMap_put(CELIX_EI_UNKNOWN_CALLER, 0, ENOMEM); + auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(ENOMEM, status); + + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + }); +} + +TEST_F(TopologyManagerErrorInjectionTestSuite, AddingImportedRegistrationToMapFailureTest) { + TestImportService([this](topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint) { + auto status = topologyManager_rsaAdded(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + celix_ei_expect_celix_longHashMap_put(CELIX_EI_UNKNOWN_CALLER, 0, ENOMEM); + status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + celix_service_filter_options_t opts{}; + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + auto svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_rsaRemoved(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + }); +} +TEST_F(TopologyManagerErrorInjectionTestSuite, AddingImportedRegistrationToMapFailureWhenAddRsaTest) { + TestImportService([this](topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint) { + auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + celix_ei_expect_celix_longHashMap_put(CELIX_EI_UNKNOWN_CALLER, 0, ENOMEM, 2); + status = topologyManager_rsaAdded(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + celix_service_filter_options_t opts{}; + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + auto svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + status = topologyManager_rsaRemoved(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + }); +} \ No newline at end of file diff --git a/bundles/remote_services/topology_manager/gtest/src/TopologyManagerTestSuite.cc b/bundles/remote_services/topology_manager/gtest/src/TopologyManagerTestSuite.cc index 6f3d028a1..6ab5c5467 100644 --- a/bundles/remote_services/topology_manager/gtest/src/TopologyManagerTestSuite.cc +++ b/bundles/remote_services/topology_manager/gtest/src/TopologyManagerTestSuite.cc @@ -17,6 +17,7 @@ * under the License. */ +#include #include #include "TopologyManagerTestSuiteBaseClass.h" @@ -469,4 +470,310 @@ TEST_F(TopologyManagerTestSuite, ImportService2Test) { status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); EXPECT_EQ(CELIX_SUCCESS, status); }); +} + +TEST_F(TopologyManagerTestSuite, ImportServiceFailureTest) { + TestImportService([](topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint) { + //first add the rsa and then the import endpoint + remote_service_admin_service_t* rsa = (remote_service_admin_service_t*)rsaSvc; + rsa->importService = [](remote_service_admin_t*, endpoint_description_t*, import_registration_t**) -> celix_status_t { + return CELIX_BUNDLE_EXCEPTION; + }; + auto status = topologyManager_rsaAdded(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_rsaRemoved(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + }); +} + +TEST_F(TopologyManagerTestSuite, ImportServiceFailureWhenAddRsaTest) { + TestImportService([](topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint) { + //first add the import endpoint and then the rsa + auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + + remote_service_admin_service_t* rsa = (remote_service_admin_service_t*)rsaSvc; + rsa->importService = [](remote_service_admin_t*, endpoint_description_t*, import_registration_t**) -> celix_status_t { + return CELIX_BUNDLE_EXCEPTION; + }; + status = topologyManager_rsaAdded(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + + status = topologyManager_rsaRemoved(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + }); +} + +TEST_F(TopologyManagerTestSuite, ImportDifferentRankingEndpointTest) { + setenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS", "tm_test_config_type=10,tm_test_config_type2=20", 1); + tm.reset(); + void* scope = nullptr; + topology_manager_t* tmPtr{}; + auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); + EXPECT_EQ(status, CELIX_SUCCESS); + tm = std::shared_ptr{tmPtr, [](auto t) {topologyManager_destroy(t);}}; + + TestImportService([this](topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint) { + //first add the rsa and then the import endpoint + auto status = topologyManager_rsaAdded(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + celix_service_filter_options_t opts{}; + opts.filter = (char *)"(&(service.imported.configs=tm_test_config_type)(service.ranking=10))"; + auto svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + + celix_autoptr(endpoint_description_t) importEndpoint2 = endpointDescription_clone(importEndpoint); + celix_properties_set(importEndpoint2->properties, CELIX_RSA_ENDPOINT_ID, "319bddfa-0252-4654-a3bd-298354d30208"); + celix_properties_set(importEndpoint2->properties, CELIX_RSA_SERVICE_IMPORTED_CONFIGS, "tm_test_config_type2"); + importEndpoint2->id = celix_properties_get(importEndpoint2->properties, CELIX_RSA_ENDPOINT_ID, nullptr); + status = topologyManager_addImportedService(tm, importEndpoint2, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(&(service.imported.configs=tm_test_config_type)(service.ranking=10))"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + opts.filter = (char *)"(&(service.imported.configs=tm_test_config_type2)(service.ranking=20))"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + + status = topologyManager_removeImportedService(tm, importEndpoint2, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type2)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + + status = topologyManager_rsaRemoved(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + }); + + unsetenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS"); +} + +TEST_F(TopologyManagerTestSuite, AddRsaAfterImportServiceWithMultiEndpointsTest) { + setenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS", "tm_test_config_type=10,tm_test_config_type2=20", 1); + tm.reset(); + void* scope = nullptr; + topology_manager_t* tmPtr{}; + auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); + EXPECT_EQ(status, CELIX_SUCCESS); + tm = std::shared_ptr{tmPtr, [](auto t) {topologyManager_destroy(t);}}; + + TestImportService([this](topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint) { + //first add the rsa and then the import endpoint + auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + celix_service_filter_options_t opts{}; + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + auto svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + + celix_autoptr(endpoint_description_t) importEndpoint2 = endpointDescription_clone(importEndpoint); + celix_properties_set(importEndpoint2->properties, CELIX_RSA_ENDPOINT_ID, "319bddfa-0252-4654-a3bd-298354d30208"); + celix_properties_set(importEndpoint2->properties, CELIX_RSA_SERVICE_IMPORTED_CONFIGS, "tm_test_config_type2"); + importEndpoint2->id = celix_properties_get(importEndpoint2->properties, CELIX_RSA_ENDPOINT_ID, nullptr); + status = topologyManager_addImportedService(tm, importEndpoint2, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type2)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + + status = topologyManager_rsaAdded(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(&(service.imported.configs=tm_test_config_type)(service.ranking=10))"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + opts.filter = (char *)"(&(service.imported.configs=tm_test_config_type2)(service.ranking=20))"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + + status = topologyManager_rsaRemoved(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type2)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + + status = topologyManager_removeImportedService(tm, importEndpoint2, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + }); + + unsetenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS"); +} + +TEST_F(TopologyManagerTestSuite, CloseImportsTest) { + TestImportService([this](topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint) { + auto status = topologyManager_rsaAdded(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + celix_service_filter_options_t opts{}; + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + auto svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + status = topologyManager_closeImports(tm); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + + //Cannot add new imports after close + celix_autoptr(endpoint_description_t) importEndpoint2 = endpointDescription_clone(importEndpoint); + celix_properties_set(importEndpoint2->properties, CELIX_RSA_ENDPOINT_ID, "319bddfa-0252-4654-a3bd-298354d30208"); + status = topologyManager_addImportedService(tm, importEndpoint2, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_rsaRemoved(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + }); +} + +TEST_F(TopologyManagerTestSuite, ImportScopeChangedTest) { + TestImportService([this](topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint) { + auto status = topologyManager_rsaAdded(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + status = tm_addImportScope(tms.get(), (char*)"(service.imported.configs=tm_test_config_type)"); + EXPECT_EQ(CELIX_SUCCESS, status); + status = tm_addImportScope(tms.get(), (char*)"(service.imported.configs=tm_test_config_type2)"); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + celix_service_filter_options_t opts{}; + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + auto svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + + status = tm_removeImportScope(tms.get(), (char*)"(service.imported.configs=tm_test_config_type)"); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_rsaRemoved(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + }); +} + +TEST_F(TopologyManagerTestSuite, ImportedServiceNotInScopeTest) { + TestImportService([this](topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint) { + auto status = tm_addImportScope(tms.get(), (char*)"(service.imported.configs=tm_test_config_type2)"); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_rsaAdded(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + celix_service_filter_options_t opts{}; + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + auto svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + + status = tm_addImportScope(tms.get(), (char*)"(service.imported.configs=tm_test_config_type)"); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + status = tm_removeImportScope(tms.get(), (char*)"(service.imported.configs=tm_test_config_type)"); + EXPECT_EQ(CELIX_SUCCESS, status); + + status = topologyManager_rsaRemoved(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + status = tm_removeImportScope(tms.get(), (char*)"(service.imported.configs=tm_test_config_type2)"); + EXPECT_EQ(CELIX_SUCCESS, status); + }); +} + +TEST_F(TopologyManagerTestSuite, ImportedServiceRankingOverflowUpwardTest) { + setenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS", "tm_test_config_type=1", 1); + tm.reset(); + void* scope = nullptr; + topology_manager_t* tmPtr{}; + auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); + EXPECT_EQ(status, CELIX_SUCCESS); + tm = std::shared_ptr{tmPtr, [](auto t) {topologyManager_destroy(t);}}; + + TestImportService([this](topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint) { + auto status = topologyManager_rsaAdded(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + celix_properties_setLong(importEndpoint->properties, CELIX_FRAMEWORK_SERVICE_RANKING, LONG_MAX); + status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + + celix_service_filter_options_t opts{}; + char filter[256]; + (void)snprintf(filter, 256, "(&(%s=tm_test_config_type)(%s=%ld))", CELIX_RSA_SERVICE_IMPORTED_CONFIGS, CELIX_FRAMEWORK_SERVICE_RANKING, LONG_MAX); + opts.filter = filter; + auto svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_rsaRemoved(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + }); + + unsetenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS"); +} + +TEST_F(TopologyManagerTestSuite, ImportedServiceRankingOverflowDownwardTest) { + setenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS", "tm_test_config_type=-1", 1); + tm.reset(); + void* scope = nullptr; + topology_manager_t* tmPtr{}; + auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); + EXPECT_EQ(status, CELIX_SUCCESS); + tm = std::shared_ptr{tmPtr, [](auto t) {topologyManager_destroy(t);}}; + + TestImportService([this](topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint) { + auto status = topologyManager_rsaAdded(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + celix_properties_setLong(importEndpoint->properties, CELIX_FRAMEWORK_SERVICE_RANKING, LONG_MIN); + status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + + celix_service_filter_options_t opts{}; + char filter[256]; + (void)snprintf(filter, 256, "(&(%s=tm_test_config_type)(%s=%ld))", CELIX_RSA_SERVICE_IMPORTED_CONFIGS, CELIX_FRAMEWORK_SERVICE_RANKING, LONG_MIN); + opts.filter = filter; + auto svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_rsaRemoved(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + }); + + unsetenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS"); } \ No newline at end of file diff --git a/bundles/remote_services/topology_manager/gtest/src/TopologyManagerTestSuiteBaseClass.h b/bundles/remote_services/topology_manager/gtest/src/TopologyManagerTestSuiteBaseClass.h index 27540175e..5c40ad29b 100644 --- a/bundles/remote_services/topology_manager/gtest/src/TopologyManagerTestSuiteBaseClass.h +++ b/bundles/remote_services/topology_manager/gtest/src/TopologyManagerTestSuiteBaseClass.h @@ -24,6 +24,8 @@ extern "C" { #endif #include +#include +#include #include #include "celix_errno.h" #include "celix_constants.h" @@ -38,6 +40,7 @@ extern "C" { struct import_registration { endpoint_description_t *endpoint; + long proxyServiceId; }; struct export_reference { @@ -71,6 +74,7 @@ class TopologyManagerTestSuiteBaseClass : public ::testing::Test { auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); EXPECT_EQ(status, CELIX_SUCCESS); tm = std::shared_ptr{tmPtr, [](auto t) {topologyManager_destroy(t);}}; + tms = std::unique_ptr{scope, [](void*) {}}; } ~TopologyManagerTestSuiteBaseClass() = default; @@ -217,7 +221,7 @@ class TopologyManagerTestSuiteBaseClass : public ::testing::Test { } } - void TestImportService(void (*testBody)(topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint)) { + void TestImportService(const std::function &testBody) { remote_service_admin_t rsa{}; rsa.ctx = ctx.get(); remote_service_admin_service_t rsaSvc{}; @@ -227,10 +231,18 @@ class TopologyManagerTestSuiteBaseClass : public ::testing::Test { auto importReg = (import_registration_t*)calloc(1, sizeof(import_registration_t)); importReg->endpoint = endpoint; *registration = importReg; + static struct TmTestService { + void* handle; + } tmTestService; + celix_properties_t* props = celix_properties_copy(endpoint->properties); + EXPECT_NE(nullptr, props); + importReg->proxyServiceId = celix_bundleContext_registerService(admin->ctx, &tmTestService, "tmTestImportedService", props); + EXPECT_GE(importReg->proxyServiceId, 0); return CELIX_SUCCESS; }; rsaSvc.importRegistration_close = [](remote_service_admin_t* admin, import_registration_t* registration) -> celix_status_t { (void)admin; + celix_bundleContext_unregisterService(admin->ctx, registration->proxyServiceId); free(registration); return CELIX_SUCCESS; }; @@ -265,6 +277,7 @@ class TopologyManagerTestSuiteBaseClass : public ::testing::Test { std::shared_ptr ctx{}; std::shared_ptr logHelper{}; std::shared_ptr tm{}; + std::unique_ptr tms{nullptr, [](void*) {}}; }; #ifdef __cplusplus diff --git a/bundles/remote_services/topology_manager/src/topology_manager.c b/bundles/remote_services/topology_manager/src/topology_manager.c index 04cd630f7..e98762abc 100644 --- a/bundles/remote_services/topology_manager/src/topology_manager.c +++ b/bundles/remote_services/topology_manager/src/topology_manager.c @@ -28,6 +28,7 @@ #include #include #include +#include #include @@ -47,14 +48,17 @@ #include "service_reference.h" #include "service_registration.h" #include "celix_log_helper.h" -#include "topology_manager.h" #include "scope.h" #include "hash_map.h" #include "celix_array_list.h" +#include "celix_string_hash_map.h" +#include "celix_convert_utils.h" //The prefix of the config property which is used to store the interfaces of a port. e.g. CELIX_RSA_INTERFACES_OF_PORT_8080. The value is a comma-separated list of interface names. #define CELIX_RSA_INTERFACES_OF_PORT_PREFIX "CELIX_RSA_INTERFACES_OF_PORT_" +//The config property for the ranking offset of imported services. The value is a comma-separated list of =. e.g. CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS="celix.remote.admin.shm=-1,org.amdatu.remote.admin.http=-2". The ranking offset will be added to the original service ranking of the imported service. This allows the user to configure the ranking of imported services based on different configuration types. +#define CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS "CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS" typedef struct celix_rsa_service_entry { remote_service_admin_service_t* rsa; @@ -66,6 +70,11 @@ typedef struct celix_exported_service_entry { celix_long_hash_map_t* registrations; //key:rsa service id, val:celix_array_list_t } celix_exported_service_entry_t; +typedef struct celix_imported_service_entry { + endpoint_description_t* endpoint; + celix_long_hash_map_t* imports; //key:rsa service id, val:import_registration_t* +} celix_imported_service_entry_t; + struct topology_manager { celix_bundle_context_t *context; @@ -77,7 +86,7 @@ struct topology_manager { celix_long_hash_map_t* exportedServices;//key:service id, val:celix_exported_service_entry_t* - hash_map_pt importedServices; + celix_string_hash_map_t* importedServices;//key:the string of endpoint id, val:celix_imported_service_entry_t* bool closed; @@ -87,10 +96,11 @@ struct topology_manager { scope_pt scope; celix_log_helper_t *loghelper; + celix_string_hash_map_t *importedServiceRankingOffsets; //key:config type, val:ranking offset(Type is long) }; -celix_status_t topologyManager_exportScopeChanged(void *handle, char *service_name); -celix_status_t topologyManager_importScopeChanged(void *handle, char *service_name); +celix_status_t topologyManager_exportScopeChanged(void *handle, char *filterStr); +celix_status_t topologyManager_importScopeChanged(void *handle, char *filterStr); static celix_status_t topologyManager_notifyListenersEndpointAdded(topology_manager_pt manager, remote_service_admin_service_t *rsa, celix_array_list_t *registrations); static celix_status_t topologyManager_notifyListenersEndpointRemoved(topology_manager_pt manager, remote_service_admin_service_t *rsa, export_registration_t *export); @@ -99,6 +109,38 @@ static celix_status_t topologyManager_addImportedService_nolock(void *handle, en static celix_status_t topologyManager_removeImportedService_nolock(void *handle, endpoint_description_t *endpoint, char *matchedFilter); static celix_status_t topologyManager_addExportedService_nolock(void * handle, service_reference_pt reference); static void topologyManager_removeExportedService_nolock(void * handle, service_reference_pt reference); +static void topologyManager_closeImportedService(topology_manager_t* manager, celix_imported_service_entry_t* importedEntry); +static void topologyManager_importedServiceEntryDestroy(void* entryPtr); + +static celix_status_t parseRankingOffsetsString(const char* offsetsStr, celix_string_hash_map_t* rankingOffsets) { + celix_autofree char* offsetsStrDup = celix_utils_strdup(offsetsStr); + if (offsetsStrDup == NULL) { + return CELIX_ENOMEM; + } + + char* save= NULL; + char* token = strtok_r(offsetsStrDup, ",", &save); + while (token != NULL) { + celix_status_t status = CELIX_ILLEGAL_ARGUMENT; + char *equalSign = strchr(token, '='); + if (equalSign != NULL) { + *equalSign = '\0'; + const char *configType = token; + const char *offsetStr = equalSign + 1; + bool converted = false; + long offset = celix_utils_convertStringToLong(offsetStr, 0, &converted); + if (converted) { + status = celix_stringHashMap_putLong(rankingOffsets, configType, offset); + } + } + if (status != CELIX_SUCCESS) { + return status; + } + token = strtok_r(NULL, ",", &save); + } + + return CELIX_SUCCESS; +} celix_status_t topologyManager_create(celix_bundle_context_t *context, celix_log_helper_t *logHelper, topology_manager_pt *manager, void **scope) { celix_status_t status = CELIX_SUCCESS; @@ -112,6 +154,22 @@ celix_status_t topologyManager_create(celix_bundle_context_t *context, celix_log tm->loghelper = logHelper; tm->closed = false; + celix_autoptr(celix_string_hash_map_t) importedServiceRankingOffsets = tm->importedServiceRankingOffsets = celix_stringHashMap_create(); + if (importedServiceRankingOffsets == NULL) { + celix_logHelper_logTssErrors(logHelper, CELIX_LOG_LEVEL_ERROR); + celix_logHelper_error(logHelper, "TOPOLOGY_MANAGER: Error creating string hash map for imported service ranking offsets."); + return CELIX_ENOMEM; + } + const char *offsetsStr = celix_bundleContext_getProperty(context, CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS, NULL); + if (offsetsStr != NULL) { + status = parseRankingOffsetsString(offsetsStr, importedServiceRankingOffsets); + if (status != CELIX_SUCCESS) { + celix_logHelper_logTssErrors(logHelper, CELIX_LOG_LEVEL_ERROR); + celix_logHelper_error(logHelper, "TOPOLOGY_MANAGER: Error parsing imported service ranking offsets string '%s'.", offsetsStr); + return status; + } + } + status = celixThreadMutex_create(&tm->lock, NULL); if (status != CELIX_SUCCESS) { celix_logHelper_error(logHelper, "TOPOLOGY_MANAGER: Error creating mutex."); @@ -145,12 +203,20 @@ celix_status_t topologyManager_create(celix_bundle_context_t *context, celix_log //TODO remove deprecated hashMap (*manager)->listenerList = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL); - (*manager)->importedServices = hashMap_create(NULL, NULL, NULL, NULL); + celix_string_hash_map_create_options_t opts = CELIX_EMPTY_STRING_HASH_MAP_CREATE_OPTIONS; + opts.simpleRemovedCallback = topologyManager_importedServiceEntryDestroy; + opts.storeKeysWeakly = true; + celix_autoptr(celix_string_hash_map_t) importedServices = tm->importedServices = celix_stringHashMap_createWithOptions(&opts); + if (importedServices == NULL) { + celix_logHelper_logTssErrors(logHelper, CELIX_LOG_LEVEL_ERROR); + celix_logHelper_error(logHelper, "TOPOLOGY_MANAGER: Error creating string hash map for imported services."); + hashMap_destroy(tm->listenerList, false, false); + return CELIX_ENOMEM; + } status = scope_scopeCreate(tm, &tm->scope); if (status != CELIX_SUCCESS) { celix_logHelper_error(logHelper, "TOPOLOGY_MANAGER: Error creating scope."); - hashMap_destroy(tm->importedServices, false, false); hashMap_destroy(tm->listenerList, false, false); return status; } @@ -158,11 +224,13 @@ celix_status_t topologyManager_create(celix_bundle_context_t *context, celix_log scope_setImportScopeChangedCallback(tm->scope, topologyManager_importScopeChanged); *scope = tm->scope; + celix_steal_ptr(importedServices); celix_steal_ptr(exportedServices); celix_steal_ptr(networkIfNames); celix_steal_ptr(dynamicIpEndpoints); celix_steal_ptr(rsaMap); celix_steal_ptr(lock); + celix_steal_ptr(importedServiceRankingOffsets); celix_steal_ptr(tm); return status; @@ -175,7 +243,7 @@ celix_status_t topologyManager_destroy(topology_manager_pt manager) { celixThreadMutex_lock(&manager->lock); - hashMap_destroy(manager->importedServices, false, false); + celix_stringHashMap_destroy(manager->importedServices); hashMap_destroy(manager->listenerList, false, false); assert(celix_longHashMap_size(manager->exportedServices) == 0); @@ -196,52 +264,25 @@ celix_status_t topologyManager_destroy(topology_manager_pt manager) { celixThreadMutex_unlock(&manager->lock); celixThreadMutex_destroy(&manager->lock); + celix_stringHashMap_destroy(manager->importedServiceRankingOffsets); + free(manager); return status; } celix_status_t topologyManager_closeImports(topology_manager_pt manager) { - celix_status_t status; - - status = celixThreadMutex_lock(&manager->lock); - - manager->closed = true; - - hash_map_iterator_pt iter = hashMapIterator_create(manager->importedServices); - - while (hashMapIterator_hasNext(iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - endpoint_description_t *ep = hashMapEntry_getKey(entry); - hash_map_pt imports = hashMapEntry_getValue(entry); - - if (imports != NULL) { - celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_INFO, "TOPOLOGY_MANAGER: Remove imported service (%s; %s).", ep->serviceName, ep->id); - hash_map_iterator_pt importsIter = hashMapIterator_create(imports); - - while (hashMapIterator_hasNext(importsIter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(importsIter); - - remote_service_admin_service_t *rsa = hashMapEntry_getKey(entry); - import_registration_t *import = hashMapEntry_getValue(entry); - - status = rsa->importRegistration_close(rsa->admin, import); - if (status == CELIX_SUCCESS) { - hashMapIterator_remove(importsIter); - } - } - hashMapIterator_destroy(importsIter); - - hashMapIterator_remove(iter); + celix_auto(celix_mutex_lock_guard_t) lockGuard = celixMutexLockGuard_init(&manager->lock); - hashMap_destroy(imports, false, false); - } - } - hashMapIterator_destroy(iter); + manager->closed = true; - status = celixThreadMutex_unlock(&manager->lock); + CELIX_STRING_HASH_MAP_ITERATE(manager->importedServices, iter) { + celix_imported_service_entry_t* importedEntry = iter.value.ptrValue; + topologyManager_closeImportedService(manager, importedEntry); + } + celix_stringHashMap_clear(manager->importedServices); - return status; + return CELIX_SUCCESS; } celix_status_t topologyManager_rsaAdding(void * handle, service_reference_pt reference, void **service) { @@ -567,29 +608,26 @@ celix_status_t topologyManager_rsaAdded(void * handle, service_reference_pt rsaS celix_steal_ptr(rsaSvcEntry); // add already imported services to new rsa - hash_map_iterator_pt importedServicesIterator = hashMapIterator_create(manager->importedServices); - - while (hashMapIterator_hasNext(importedServicesIterator)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(importedServicesIterator); - endpoint_description_t *endpoint = hashMapEntry_getKey(entry); - if (scope_allowImport(manager->scope, endpoint)) { - import_registration_t *import = NULL; - celix_status_t status = rsa->importService(rsa->admin, endpoint, &import); - if (status == CELIX_SUCCESS) { - hash_map_pt imports = hashMapEntry_getValue(entry); - - if (imports == NULL) { - imports = hashMap_create(NULL, NULL, NULL, NULL); - hashMap_put(manager->importedServices, endpoint, imports); - } - - hashMap_put(imports, service, import); + CELIX_STRING_HASH_MAP_ITERATE(manager->importedServices, iter) { + celix_imported_service_entry_t* importedEntry = iter.value.ptrValue; + endpoint_description_t* endpoint = importedEntry->endpoint; + if(!scope_allowImport(manager->scope, importedEntry->endpoint)) { + celix_logHelper_info(manager->loghelper, "TOPOLOGY_MANAGER: Endpoint (%s; %s) is not allowed by scope, will not be imported to new RSA.", endpoint->serviceName, endpoint->id); + continue; + } + import_registration_t* import = NULL; + celix_status_t status = rsa->importService(rsa->admin, endpoint, &import); + if (status != CELIX_SUCCESS) { + celix_logHelper_error(manager->loghelper, "TOPOLOGY_MANAGER: Error importing service (%s; %s), error:%d.", endpoint->serviceName, endpoint->id, status); + } else if (import != NULL) { + status = celix_longHashMap_put(importedEntry->imports, rsaSvcId, import); + if (status != CELIX_SUCCESS) { + celix_logHelper_error(manager->loghelper, "TOPOLOGY_MANAGER: Error adding import registration to map for service (%s; %s).", endpoint->serviceName, endpoint->id); + rsa->importRegistration_close(rsa->admin, import); } } } - hashMapIterator_destroy(importedServicesIterator); - // add already exported services to new rsa CELIX_LONG_HASH_MAP_ITERATE(manager->exportedServices, iter) { celix_exported_service_entry_t* svcEntry = iter.value.ptrValue; @@ -634,7 +672,6 @@ celix_status_t topologyManager_rsaModified(void * handle, service_reference_pt r } celix_status_t topologyManager_rsaRemoved(void * handle, service_reference_pt reference, void * service) { - celix_status_t status = CELIX_SUCCESS; topology_manager_pt manager = (topology_manager_pt) handle; remote_service_admin_service_t *rsa = (remote_service_admin_service_t *) service; long rsaSvcId = serviceReference_getServiceId(reference); @@ -662,22 +699,14 @@ celix_status_t topologyManager_rsaRemoved(void * handle, service_reference_pt re } } - hash_map_iterator_pt importedSvcIter = hashMapIterator_create(manager->importedServices); - - while (hashMapIterator_hasNext(importedSvcIter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(importedSvcIter); - hash_map_pt imports = hashMapEntry_getValue(entry); - - import_registration_t *import = (import_registration_t *)hashMap_remove(imports, rsa); - - if (import != NULL) { - celix_status_t subStatus = rsa->importRegistration_close(rsa->admin, import); - if (subStatus != CELIX_SUCCESS) { - celix_logHelper_error(manager->loghelper, "TOPOLOGY_MANAGER: Failed to close imported endpoint."); - } - } - } - hashMapIterator_destroy(importedSvcIter); + CELIX_STRING_HASH_MAP_ITERATE(manager->importedServices, iter) { + celix_imported_service_entry_t* importedEntry = iter.value.ptrValue; + import_registration_t* import = celix_longHashMap_get(importedEntry->imports, rsaSvcId); + if (import != NULL) { + rsa->importRegistration_close(rsa->admin, import); + celix_longHashMap_remove(importedEntry->imports, rsaSvcId); + } + } free(celix_longHashMap_get(manager->rsaMap, rsaSvcId)); celix_longHashMap_remove(manager->rsaMap, rsaSvcId); @@ -686,7 +715,7 @@ celix_status_t topologyManager_rsaRemoved(void * handle, service_reference_pt re celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_INFO, "TOPOLOGY_MANAGER: Removed RSA"); - return status; + return CELIX_SUCCESS; } celix_status_t topologyManager_exportScopeChanged(void *handle, char *filterStr) { @@ -748,44 +777,169 @@ celix_status_t topologyManager_exportScopeChanged(void *handle, char *filterStr) return status; } -celix_status_t topologyManager_importScopeChanged(void *handle, char *service_name) { - celix_status_t status = CELIX_SUCCESS; - endpoint_description_t *endpoint; - topology_manager_pt manager = (topology_manager_pt) handle; - bool found = false; +//XXX: call in locked section +static void topologyManager_closeImportedService(topology_manager_t* manager, celix_imported_service_entry_t* importedEntry) { + CELIX_LONG_HASH_MAP_ITERATE(importedEntry->imports, iter) { + import_registration_t* import = iter.value.ptrValue; + celix_rsa_service_entry_t* rsaSvcEntry = celix_longHashMap_get(manager->rsaMap, iter.key); + if (rsaSvcEntry != NULL) { + remote_service_admin_service_t *rsa = rsaSvcEntry->rsa; + rsa->importRegistration_close(rsa->admin, import); + } + } + celix_longHashMap_clear(importedEntry->imports); + return; +} - // add already imported services to new rsa - celixThreadMutex_lock(&manager->lock); +//XXX: call in locked section +static void topologyManager_importService(topology_manager_t* manager, celix_imported_service_entry_t* importedEntry) { + endpoint_description_t* endpoint = importedEntry->endpoint; + if(!scope_allowImport(manager->scope, endpoint)) { + celix_logHelper_info(manager->loghelper, "TOPOLOGY_MANAGER: Endpoint (%s; %s) is not allowed by scope, will not be imported.", endpoint->serviceName, endpoint->id); + return; + } + + CELIX_LONG_HASH_MAP_ITERATE(manager->rsaMap, iter) { + celix_rsa_service_entry_t* rsaSvcEntry = iter.value.ptrValue; + import_registration_t* import = NULL; + remote_service_admin_service_t *rsa = rsaSvcEntry->rsa; + celix_status_t status = rsa->importService(rsa->admin, endpoint, &import); + if (status != CELIX_SUCCESS) { + celix_logHelper_error(manager->loghelper, "TOPOLOGY_MANAGER: Error importing service (%s; %s), error:%d.", endpoint->serviceName, endpoint->id, status); + } else if (import != NULL) { + status = celix_longHashMap_put(importedEntry->imports, iter.key, import); + if (status != CELIX_SUCCESS) { + celix_logHelper_error(manager->loghelper, "TOPOLOGY_MANAGER: Error adding import registration to map for service (%s; %s).", endpoint->serviceName, endpoint->id); + rsa->importRegistration_close(rsa->admin, import); + } + } + } +} - hash_map_iterator_pt importedServicesIterator = hashMapIterator_create(manager->importedServices); - while (!found && hashMapIterator_hasNext(importedServicesIterator)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(importedServicesIterator); - endpoint = hashMapEntry_getKey(entry); +celix_status_t topologyManager_importScopeChanged(void *handle, char *filterStr) { + topology_manager_pt manager = (topology_manager_pt) handle; + celix_autoptr(celix_filter_t) filter = celix_filter_create(filterStr); + if (filter == NULL) { + celix_logHelper_logTssErrors(manager->loghelper, CELIX_LOG_LEVEL_ERROR); + celix_logHelper_error(manager->loghelper,"filter creating failed\n"); + return CELIX_ILLEGAL_ARGUMENT; + } - const char* name = celix_properties_get(endpoint->properties, CELIX_FRAMEWORK_SERVICE_NAME, ""); - // Test if a service with the same name is imported - if (strcmp(name, service_name) == 0) { - found = true; - } - } - hashMapIterator_destroy(importedServicesIterator); + celix_auto(celix_mutex_lock_guard_t) lockGuard = celixMutexLockGuard_init(&manager->lock); - if (found) { - status = topologyManager_removeImportedService_nolock(manager, endpoint, NULL); + CELIX_STRING_HASH_MAP_ITERATE(manager->importedServices, iter) { + celix_imported_service_entry_t* importedEntry = iter.value.ptrValue; + if (celix_filter_match(filter, importedEntry->endpoint->properties)) { + topologyManager_closeImportedService(manager, importedEntry); + topologyManager_importService(manager, importedEntry); + } + } - if (status != CELIX_SUCCESS) { - celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_ERROR, "TOPOLOGY_MANAGER: Removal of imported service (%s; %s) failed.", endpoint->serviceName, endpoint->id); - } else { - status = topologyManager_addImportedService_nolock(manager, endpoint, NULL); - } - } + return CELIX_SUCCESS; +} - //should unlock until here ?, avoid endpoint is released during topologyManager_removeImportedService - celixThreadMutex_unlock(&manager->lock); +static celix_status_t topologyManager_getRankingOffsetForEndpoint(topology_manager_t* tm, const endpoint_description_t* endpoint, long* offset) { + *offset = 0; + celix_autoptr(celix_array_list_t) serviceImportedConfigs = NULL; + celix_status_t status = celix_properties_getAsStringArrayList(endpoint->properties, CELIX_RSA_SERVICE_IMPORTED_CONFIGS, NULL, &serviceImportedConfigs); + if (status != CELIX_SUCCESS) { + celix_logHelper_logTssErrors(tm->loghelper, CELIX_LOG_LEVEL_ERROR); + celix_logHelper_error(tm->loghelper, "TOPOLOGY_MANAGER: Error getting property %s.", CELIX_RSA_SERVICE_IMPORTED_CONFIGS); + return status; + } else if (serviceImportedConfigs == NULL) { + celix_logHelper_error(tm->loghelper, "TOPOLOGY_MANAGER: Endpoint description is missing property %s.", CELIX_RSA_SERVICE_IMPORTED_CONFIGS); + return CELIX_ILLEGAL_ARGUMENT; + } + int size = celix_arrayList_size(serviceImportedConfigs); + for (int i = 0; i < size; ++i) { + const char* config = celix_arrayList_getString(serviceImportedConfigs, i); + if (celix_stringHashMap_hasKey(tm->importedServiceRankingOffsets, config)) { + //According to the OSGi specification, the service.imported.configs property lists configuration types that must refer to the same endpoint. + //So we take the first match. + *offset = celix_stringHashMap_getLong(tm->importedServiceRankingOffsets, config, 0); + break; + } + } + return CELIX_SUCCESS; +} - return status; +static int willOverflowOnLongAdd(long a, long b) { + if (b > 0 && a > LONG_MAX - b) { + return 1; + } + if (b < 0 && a < LONG_MIN - b) { + return -1; + } + return 0; +} + +static endpoint_description_t* topologyManager_createAdjustedRankingEndpoint(topology_manager_t* tm, const endpoint_description_t* endpoint) { + long rankingOffset = 0; + celix_status_t status = topologyManager_getRankingOffsetForEndpoint(tm, endpoint, &rankingOffset); + if (status != CELIX_SUCCESS) { + celix_logHelper_error(tm->loghelper, "TOPOLOGY_MANAGER: Error getting ranking offset for imported service."); + return NULL; + } + celix_autoptr(endpoint_description_t) endpointCopy = endpointDescription_clone(endpoint); + if (endpointCopy == NULL) { + celix_logHelper_error(tm->loghelper, "TOPOLOGY_MANAGER: Error cloning endpoint description."); + return NULL; + } + if (rankingOffset != 0) { + //According to OSGi specification, If no service.ranking service property is specified or its type is not Integer, + // then a ranking of 0 must be used. + long originalRanking = celix_properties_getAsLong(endpointCopy->properties, CELIX_FRAMEWORK_SERVICE_RANKING, 0); + long newRanking; + int overflow = willOverflowOnLongAdd(originalRanking, rankingOffset); + if (overflow < 0) { + newRanking = LONG_MIN; + } else if (overflow > 0) { + newRanking = LONG_MAX; + } else { + newRanking = originalRanking + rankingOffset; + } + status = celix_properties_setLong(endpointCopy->properties, CELIX_FRAMEWORK_SERVICE_RANKING, newRanking); + if (status != CELIX_SUCCESS) { + celix_logHelper_logTssErrors(tm->loghelper, CELIX_LOG_LEVEL_ERROR); + celix_logHelper_error(tm->loghelper, "TOPOLOGY_MANAGER: Error setting property %s.", CELIX_FRAMEWORK_SERVICE_RANKING); + return NULL; + } + } + return celix_steal_ptr(endpointCopy); } +static celix_imported_service_entry_t* topologyManager_createImportedServiceEntry(topology_manager_t* tm, const endpoint_description_t* endpoint) { + celix_autofree celix_imported_service_entry_t* entry = (celix_imported_service_entry_t*)calloc(1, sizeof(*entry)); + if (entry == NULL) { + celix_logHelper_error(tm->loghelper, "TOPOLOGY_MANAGER: Error allocating import registration entry."); + return NULL; + } + celix_autoptr(celix_long_hash_map_t) imports = entry->imports = celix_longHashMap_create(); + if (imports == NULL) { + celix_logHelper_error(tm->loghelper, "TOPOLOGY_MANAGER: Error creating long hash map for import registrations."); + return NULL; + } + entry->endpoint = topologyManager_createAdjustedRankingEndpoint(tm, endpoint); + if (entry->endpoint == NULL) { + celix_logHelper_error(tm->loghelper, "TOPOLOGY_MANAGER: Error creating adjusted ranking endpoint description."); + return NULL; + } + celix_steal_ptr(imports); + return celix_steal_ptr(entry); +} + +static void topologyManager_importedServiceEntryDestroy(void* entryPtr) { + celix_imported_service_entry_t* entry = entryPtr; + if (entry != NULL) { + endpointDescription_destroy(entry->endpoint); + assert(celix_longHashMap_size(entry->imports) == 0);//All import registrations should have been closed and removed before the entry is destroyed. + celix_longHashMap_destroy(entry->imports); + free(entry); + } +} + +CELIX_DEFINE_AUTOPTR_CLEANUP_FUNC(celix_imported_service_entry_t, topologyManager_importedServiceEntryDestroy) + static celix_status_t topologyManager_addImportedService_nolock(void *handle, endpoint_description_t *endpoint, char *matchedFilter) { celix_status_t status = CELIX_SUCCESS; topology_manager_pt manager = handle; @@ -798,24 +952,21 @@ static celix_status_t topologyManager_addImportedService_nolock(void *handle, en return CELIX_SUCCESS; } - hash_map_pt imports = hashMap_create(NULL, NULL, NULL, NULL); - hashMap_put(manager->importedServices, endpoint, imports); - - if (scope_allowImport(manager->scope, endpoint)) { - CELIX_LONG_HASH_MAP_ITERATE(manager->rsaMap, iter) { - celix_rsa_service_entry_t* rsaSvcEntry = iter.value.ptrValue; - import_registration_t *import = NULL; - remote_service_admin_service_t *rsa = rsaSvcEntry->rsa; - celix_status_t substatus = rsa->importService(rsa->admin, endpoint, &import); - if (substatus == CELIX_SUCCESS) { - hashMap_put(imports, rsa, import); - } else { - status = substatus; - } - } - } + celix_autoptr(celix_imported_service_entry_t) importedEntry = topologyManager_createImportedServiceEntry(manager, endpoint); + if (importedEntry == NULL) { + celix_logHelper_error(manager->loghelper, "TOPOLOGY_MANAGER: Error creating import registration entry."); + return ENOMEM; + } + status = celix_stringHashMap_put(manager->importedServices, importedEntry->endpoint->id, importedEntry); + if (status != CELIX_SUCCESS) { + celix_logHelper_error(manager->loghelper, "TOPOLOGY_MANAGER: Error adding imported service entry to map."); + return status; + } + topologyManager_importService(manager, importedEntry); - return status; + celix_steal_ptr(importedEntry); + + return CELIX_SUCCESS; } celix_status_t topologyManager_addImportedService(void *handle, endpoint_description_t *endpoint, char *matchedFilter) { @@ -834,40 +985,19 @@ celix_status_t topologyManager_addImportedService(void *handle, endpoint_descrip } static celix_status_t topologyManager_removeImportedService_nolock(void *handle, endpoint_description_t *endpoint, char *matchedFilter) { - celix_status_t status = CELIX_SUCCESS; - topology_manager_pt manager = handle; - - celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_DEBUG, "TOPOLOGY_MANAGER: Remove imported service (%s; %s).", endpoint->serviceName, endpoint->id); + topology_manager_pt manager = handle; - hash_map_iterator_pt iter = hashMapIterator_create(manager->importedServices); - while (hashMapIterator_hasNext(iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - endpoint_description_t *ep = hashMapEntry_getKey(entry); - hash_map_pt imports = hashMapEntry_getValue(entry); - - if (imports != NULL && strcmp(endpoint->id, ep->id) == 0) { - hash_map_iterator_pt importsIter = hashMapIterator_create(imports); - - while (hashMapIterator_hasNext(importsIter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(importsIter); - remote_service_admin_service_t *rsa = hashMapEntry_getKey(entry); - import_registration_t *import = hashMapEntry_getValue(entry); - celix_status_t substatus = rsa->importRegistration_close(rsa->admin, import); - if (substatus == CELIX_SUCCESS) { - hashMapIterator_remove(importsIter); - } else { - status = substatus; - } - } - hashMapIterator_destroy(importsIter); - hashMapIterator_remove(iter); + celix_logHelper_debug(manager->loghelper, "TOPOLOGY_MANAGER: Remove imported service (%s; %s).", endpoint->serviceName, endpoint->id); - hashMap_destroy(imports, false, false); - } - } - hashMapIterator_destroy(iter); + celix_imported_service_entry_t* importedEntry = celix_stringHashMap_get(manager->importedServices, endpoint->id); + if (importedEntry == NULL) { + celix_logHelper_debug(manager->loghelper, "TOPOLOGY_MANAGER: No imported service entries found for service (%s; %s).", endpoint->serviceName, endpoint->id); + return CELIX_SUCCESS; + } + topologyManager_closeImportedService(manager, importedEntry); + celix_stringHashMap_remove(manager->importedServices, endpoint->id); - return status; + return CELIX_SUCCESS; } celix_status_t topologyManager_removeImportedService(void *handle, endpoint_description_t *endpoint, char *matchedFilter) {