11#include < arpa/inet.h>
2+ #include < condition_variable>
23#include < iostream>
34#include < jsoncons/json.hpp>
5+ #include < mutex>
46#include < netinet/in.h>
7+ #include < signal.h>
58#include < sys/socket.h>
69#include < unistd.h>
710
811#include " capiocl/api.h"
912#include " capiocl/engine.h"
1013#include " capiocl/printer.h"
1114
15+ std::mutex _setupMtx;
16+ std::condition_variable _setupCv;
17+ bool thread_ready = false ;
18+
1219// / @brief Main WebServer thread function
13- void server (const std::string &address, const int port, capiocl::engine::Engine *engine) {
14- pthread_setcancelstate (PTHREAD_CANCEL_ASYNCHRONOUS, nullptr );
20+ void server (const std::string &address, const int port, capiocl::engine::Engine *engine,
21+ std::atomic<bool > *terminate) {
22+
23+ constexpr int RECV_BUF_SIZE = 65535 ;
1524
16- capiocl::printer::print (capiocl::printer::CLI_LEVEL_INFO,
17- " Starting API server @ " + address + " :" + std::to_string (port));
25+ const auto &wf_name = engine->getWorkflowName ();
1826
1927 int fd = socket (AF_INET, SOCK_DGRAM, 0 );
20- if (fd < 0 ) {
21- perror (" Socket creation failed" );
22- return ;
23- }
2428
2529 int reuse = 1 ;
26- if (setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof (reuse)) < 0 ) {
27- perror (" Setting SO_REUSEADDR failed" );
28- close (fd);
29- return ;
30- }
30+ setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof (reuse));
3131
3232 sockaddr_in localAddr{};
3333 localAddr.sin_family = AF_INET;
3434 localAddr.sin_port = htons (port);
3535 localAddr.sin_addr .s_addr = INADDR_ANY;
3636
37- if (bind (fd, reinterpret_cast <sockaddr *>(&localAddr), sizeof (localAddr)) < 0 ) {
38- perror (" Bind failed" );
39- close (fd);
40- return ;
41- }
37+ bind (fd, reinterpret_cast <sockaddr *>(&localAddr), sizeof (localAddr));
4238
4339 ip_mreq group{};
4440 group.imr_multiaddr .s_addr = inet_addr (address.c_str ());
4541 group.imr_interface .s_addr = INADDR_ANY;
46- if (setsockopt (fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof (group)) < 0 ) {
47- perror (" Adding multicast group failed" );
48- close (fd);
49- return ;
50- }
42+ setsockopt (fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof (group));
5143
52- char buffer[65535 ] = {0 };
44+ char buffer[RECV_BUF_SIZE ] = {0 };
5345 sockaddr_in srcAddr{};
5446 socklen_t addrlen = sizeof (srcAddr);
5547
5648 // timeout of 1 second for termination
5749 timeval tv{};
58- tv.tv_sec = 1 ;
59- tv.tv_usec = 0 ;
50+ tv.tv_sec = 0 ;
51+ tv.tv_usec = 500 ;
6052 setsockopt (fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof (tv));
6153
62- while (true ) {
63- ssize_t nbytes = recvfrom (fd, buffer, sizeof (buffer) - 1 , 0 ,
64- reinterpret_cast <struct sockaddr *>(&srcAddr), &addrlen);
65- if (nbytes < 0 ) {
54+ thread_ready = true ;
55+ _setupCv.notify_all ();
56+
57+ while (!*terminate) {
58+ // GCOVR_EXCL_START
59+ ssize_t n = recvfrom (fd, buffer, RECV_BUF_SIZE - 1 , 0 ,
60+ reinterpret_cast <sockaddr *>(&srcAddr), &addrlen);
61+ // GCOVR_EXCL_STOP
62+
63+ if (n <= 0 ) {
6664 continue ;
6765 }
6866
69- buffer[nbytes ] = ' \0 ' ;
67+ buffer[n ] = ' \0 ' ;
7068
7169 try {
72- auto data = jsoncons::json::parse (buffer);
73- auto path = data[" path" ].as_string ();
74- auto workflow_name = data[" workflow_name" ].as_string ();
75- auto rule = capiocl::engine::CapioCLEntry::fromJson (data[" CapioClEntry" ].as_string ());
70+ auto data = jsoncons::json::parse (buffer); // GCOVR_EXCL_LINE
71+ const auto path =
72+ data.get_value_or <std::string, std::string>(" path" , " " ); // GCOVR_EXCL_LINE
73+ if (path.empty ()) {
74+ continue ;
75+ }
76+ const auto workflow_name =
77+ data.get_value_or <std::string, std::string>(" workflow_name" , " " ); // GCOVR_EXCL_LINE
78+ if (workflow_name.empty ()) {
79+ continue ;
80+ }
81+ const auto jsonEntry =
82+ data.get_value_or <std::string, std::string>(" CapioClEntry" , " " ); // GCOVR_EXCL_LINE
83+ if (jsonEntry.empty ()) {
84+ continue ;
85+ }
86+ auto rule = capiocl::engine::CapioCLEntry::fromJson (jsonEntry);
7687
77- if (workflow_name == engine-> getWorkflowName () ) {
88+ if (workflow_name == wf_name ) {
7889 engine->add (path, rule);
90+ } else {
91+ continue ;
7992 }
8093
8194 } catch (const jsoncons::json_exception &e) {
8295 capiocl::printer::print (capiocl::printer::CLI_LEVEL_ERROR,
83- " JSON Parse Error : " + std::string (e.what ()));
96+ " APIServer: Received invalid json : " + std::string (e.what ()));
8497 }
8598 }
8699
@@ -92,15 +105,28 @@ capiocl::api::CapioClApiServer::CapioClApiServer(engine::Engine *engine,
92105 : capiocl_configuration(config) {
93106
94107 std::string address;
95- config.getParameter (" dynamic_api.ip" , &address);
96-
97108 int port;
98- config.getParameter (" dynamic_api.port" , &port);
109+ try {
110+ config.getParameter (" dynamic_api.ip" , &address); // GCOVR_EXCL_LINE
111+ } catch (...) {
112+ address = configuration::defaults::DEFAULT_API_MULTICAST_IP.v ;
113+ }
114+
115+ try {
116+ config.getParameter (" dynamic_api.port" , &port); // GCOVR_EXCL_LINE
117+ } catch (...) {
118+ port = std::stoi (configuration::defaults::DEFAULT_API_MULTICAST_PORT.v );
119+ }
120+
121+ _webApiThread = std::thread (server, address, port, engine, &_terminate);
122+
123+ std::unique_lock lock (_setupMtx);
124+ _setupCv.wait (lock, [] { return thread_ready; });
99125
100- _webApiThread = std::thread (server, address, port, engine );
126+ printer::print (printer::CLI_LEVEL_INFO, " API server @ " + address + " : " + std::to_string (port) );
101127}
102128
103129capiocl::api::CapioClApiServer::~CapioClApiServer () {
104- pthread_cancel (_webApiThread. native_handle ()) ;
130+ _terminate = true ;
105131 _webApiThread.join ();
106132}
0 commit comments