From 35b35dc237258b6ad1b16f4fa1cf7a281caf4511 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Mon, 27 Apr 2026 17:47:27 +0200 Subject: [PATCH 1/4] Minimal changes for backwards compatibility --- src/connectivityserver/connectionflask.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/connectivityserver/connectionflask.py b/src/connectivityserver/connectionflask.py index b164662..b4f9fe2 100755 --- a/src/connectivityserver/connectionflask.py +++ b/src/connectivityserver/connectionflask.py @@ -212,8 +212,8 @@ def publish(): partlock.release() return 'OK' -@app.route("/retract-partition",methods=['POST']) -def retract_partition(): +@app.route("/retract-session",methods=['POST']) +def retract_session(): if len(request.data) == 0: abort(400) @@ -233,6 +233,16 @@ def retract_partition(): partlock.release() abort(404) + +@app.route("/retract-partition",methods=['POST']) +def retract_partition(): + """ + For backwards compatibility. + Delete this when the partition word is completely retired + """ + log.warning("retract-partition depreciated. Please use retract-session.") + retract_session() + @app.route("/retract",methods=['POST']) def retract(): if len(request.data) == 0: From 7dbe6965cd147ba1da167ae86a422dc6171b92bb Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Mon, 27 Apr 2026 18:00:04 +0200 Subject: [PATCH 2/4] cosmetic session changes --- src/connectivityserver/connectionflask.py | 116 +++++++++++----------- 1 file changed, 58 insertions(+), 58 deletions(-) diff --git a/src/connectivityserver/connectionflask.py b/src/connectivityserver/connectionflask.py index b4f9fe2..8d54819 100755 --- a/src/connectivityserver/connectionflask.py +++ b/src/connectivityserver/connectionflask.py @@ -19,8 +19,8 @@ # Some functions exit with an abort(NNN) instead of return so don't complain! #ruff: noqa RET503 -partitions={} -partlock=Lock() +sessions={} +seshlock=Lock() if 'CONNECTION_FLASK_DEBUG' in os.environ: debug_level=int(os.environ['CONNECTION_FLASK_DEBUG']) @@ -50,7 +50,7 @@ def convert_log_level(log_level): nlookups=0 lookup_time=timedelta(0) publish_time=timedelta() -maxpartitions=0 +maxsessions=0 maxentries={} app=Flask(__name__) @@ -60,19 +60,19 @@ def dump(): now=datetime.now() dstream=StringIO() dstream.write('

Dump of configuration dictionary

') - dstream.write("

Active partitions

") - if len(partitions)>0: + dstream.write("

Active sessions

") + if len(sessions)>0: pad=' style="padding-left: 1em;padding-right: 1em"' dstream.write(f'' - f'Partition' + f'Session' f'Entries') - for p in partitions: + for p in sessions: dstream.write(f'{p}' - f'{len(partitions[p])}') + f'{len(sessions[p])}') dstream.write("
") - dstream.write(f'

Partitions

') - for p in partitions: - store=partitions[p] + dstream.write(f'

Sessions

') + for p in sessions: + store=sessions[p] dstream.write(f'

{p}

') dstream.write(f'' f'' @@ -117,9 +117,9 @@ def stats_to_html(dstream): avg_lookup=timedelta() dstream.write(f"

{nlookups} calls to lookup in total time {lookup_time} " f"(average {avg_lookup.microseconds} µs per call)

") - dstream.write(f"

Maximum number of partitions active = {maxpartitions}

") - for part in maxentries: - dstream.write(f"

Maximum entries in partition {part} = {maxentries[part]}

") + dstream.write(f"

Maximum number of sessions active = {maxsessions}

") + for sesh in maxentries: + dstream.write(f"

Maximum entries in session {sesh} = {maxentries[sesh]}

") @app.route("/stats") def dumpStats(): @@ -133,14 +133,14 @@ def dumpStats(): def resetStats(): stats = dumpStats() - global last_stats,npublishes,nlookups,lookup_time,publish_time,maxpartitions,maxentries + global last_stats,npublishes,nlookups,lookup_time,publish_time,maxsessions,maxentries last_stats=datetime.now() npublishes=0 nlookups=0 lookup_time=timedelta(0) publish_time=timedelta() - maxpartitions=0 + maxsessions=0 maxentries={} return stats @@ -148,36 +148,36 @@ def resetStats(): @app.route("/resetService") def reset(): - global partitions - partitions={} + global sessions + sessions={} return resetStats() @app.route("/publish",methods=['POST']) def publish(): # Store multiple connection ids and corresponding uris in a - # dictionary associated with the appropriate partition. + # dictionary associated with the appropriate session. timestamp=datetime.now() js=json.loads(request.data) log.debug(f"{js=}") - part=js['partition'] + sesh=js['partition'] log.info( - f"{len(js['connections'])} connections in partition {part} from {request.remote_addr} uri={js['connections'][0]['uri']}..." + f"{len(js['connections'])} connections in session {sesh} from {request.remote_addr} uri={js['connections'][0]['uri']}..." ) - partlock.acquire() - if part in partitions: - store=partitions[part] + seshlock.acquire() + if sesh in sessions: + store=sessions[sesh] else: store={} - partitions[part]=store - global maxpartitions - if len(partitions)>maxpartitions: - maxpartitions=len(partitions) - if part not in maxentries: - maxentries[part]=0 + sessions[sesh]=store + global maxsessions + if len(sessions)>maxsessions: + maxsessions=len(sessions) + if sesh not in maxentries: + maxentries[sesh]=0 Connection=namedtuple( 'Connection',['uri','data_type','capacity','connection_type','time']) @@ -206,10 +206,10 @@ def publish(): publish_time+=elapsed npublishes+=1 - if len(store)>maxentries[part]: - maxentries[part]=len(store) + if len(store)>maxentries[sesh]: + maxentries[sesh]=len(store) - partlock.release() + seshlock.release() return 'OK' @app.route("/retract-session",methods=['POST']) @@ -223,14 +223,14 @@ def retract_session(): if 'partition' not in js: abort(400) - part=js['partition'] - partlock.acquire() + sesh=js['partition'] + seshlock.acquire() - if part in partitions: - partitions.pop(part) - partlock.release() + if sesh in sessions: + sessions.pop(sesh) + seshlock.release() return 'OK' - partlock.release() + seshlock.release() abort(404) @@ -250,13 +250,13 @@ def retract(): js=json.loads(request.data) good=True - part=js['partition'] - partlock.acquire() - if part not in partitions: - partlock.release() - return make_response(f"Partition {part} not found", 404) + sesh=js['partition'] + seshlock.acquire() + if sesh not in sessions: + seshlock.release() + return make_response(f"Session {sesh} not found", 404) - store=partitions[part] + store=sessions[sesh] if 'connections' not in js: abort(400) for con in js['connections']: @@ -267,16 +267,16 @@ def retract(): log.info(f"Could not find connection_id <{id}>") good=False if len(store)==0: - # We've deleted the last entry in this partition so delete the - # partition as well - partitions.pop(part) - partlock.release() + # We've deleted the last entry in this session so delete the + # session as well + sessions.pop(sesh) + seshlock.release() if good: return 'OK' abort(404) -@app.route("/getconnection/",methods=['POST','GET']) -def get_connection(part): +@app.route("/getconnection/",methods=['POST','GET']) +def get_connection(sesh): if len(request.data) == 0: abort(400) @@ -295,17 +295,17 @@ def get_connection(part): result=[] regex=re.compile(js['uid_regex']) dt=js['data_type'] - partlock.acquire() + seshlock.acquire() - if part in partitions: - store=partitions[part] + if sesh in sessions: + store=sessions[sesh] matched=[] for uid,con in store.items(): if regex.fullmatch(uid) and con.data_type==dt and now-con.time Date: Mon, 27 Apr 2026 18:07:15 +0200 Subject: [PATCH 3/4] Fix partition in other locations --- docs/README.md | 6 +++--- tests/test_connectivityservice.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/README.md b/docs/README.md index 8028c69..3438334 100644 --- a/docs/README.md +++ b/docs/README.md @@ -37,7 +37,7 @@ server to serve connection information to DAQ applications. http://connection-flask.connections:5000/publish ``` -### /getconnection/ +### /getconnection/ This uri returns a list of connections matching the 'uid_regex' and 'data_type' specified in the JSON encoded request. @@ -53,9 +53,9 @@ curl -d '{"uid_regex":"DRO.*","data_type":"TPSet"}' \ This uri should be used to remove published connections. The request should be JSON encoded with the keys "partition" and "connections" with the latter being an array of "connection_id" and "data_type" values. -### /retract-partition +### /retract-session This uri should be used to remove all published connections from the -given partition. The request should be JSON encoded with one field "partition" naming the partition to be retracted. +given session. The request should be JSON encoded with one field "session" naming the session to be retracted. ## Running the server locally from the command line The server is intended to be run under the Gunicorn web server. diff --git a/tests/test_connectivityservice.py b/tests/test_connectivityservice.py index ec31065..3e27e4e 100644 --- a/tests/test_connectivityservice.py +++ b/tests/test_connectivityservice.py @@ -88,15 +88,15 @@ def test_retract_partition(client): resp = client.post("/publish", json=con) assert resp.status_code == 200 - resp = client.post("/retract-partition") + resp = client.post("/retract-session") assert resp.status_code == 400 retraction = json.loads("""{"partition":"ccTest"}""") - resp = client.post("/retract-partition", json=retraction) + resp = client.post("/retract-session", json=retraction) assert resp.status_code == 200 # Second time should fail - resp = client.post("/retract-partition", json=retraction) + resp = client.post("/retract-session", json=retraction) assert resp.status_code == 404 From c201a19b7e1933688d6e74d96c3e4e99234a095e Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Tue, 28 Apr 2026 12:41:19 +0200 Subject: [PATCH 4/4] change payload partition to session --- src/connectivityserver/connectionflask.py | 32 ++++++++++++++++++++--- tests/test_connectivityservice.py | 25 +++++++++++++++--- 2 files changed, 49 insertions(+), 8 deletions(-) diff --git a/src/connectivityserver/connectionflask.py b/src/connectivityserver/connectionflask.py index 8d54819..d725c8f 100755 --- a/src/connectivityserver/connectionflask.py +++ b/src/connectivityserver/connectionflask.py @@ -27,6 +27,22 @@ else: debug_level=1 + +def get_sesh(js): + has_session = 'session' in js + has_partition = 'partition' in js + + if has_session and has_partition: + raise ValueError("Both 'session' and 'partition' were provided") + + if has_session: + return js['session'] + + if has_partition: + return js['partition'] + + raise ValueError("No session (partition) provided") + def convert_log_level(log_level): if log_level == 0: return logging.WARNING @@ -161,7 +177,10 @@ def publish(): js=json.loads(request.data) log.debug(f"{js=}") - sesh=js['partition'] + try: + sesh = get_sesh(js) + except ValueError: + abort(400) log.info( f"{len(js['connections'])} connections in session {sesh} from {request.remote_addr} uri={js['connections'][0]['uri']}..." @@ -220,10 +239,11 @@ def retract_session(): js=json.loads(request.data) log.debug(f"request=[{js}]") - if 'partition' not in js: + try: + sesh = get_sesh(js) + except ValueError: abort(400) - sesh=js['partition'] seshlock.acquire() if sesh in sessions: @@ -250,7 +270,11 @@ def retract(): js=json.loads(request.data) good=True - sesh=js['partition'] + try: + sesh = get_sesh(js) + except ValueError: + abort(400) + seshlock.acquire() if sesh not in sessions: seshlock.release() diff --git a/tests/test_connectivityservice.py b/tests/test_connectivityservice.py index 3e27e4e..6927c04 100644 --- a/tests/test_connectivityservice.py +++ b/tests/test_connectivityservice.py @@ -34,7 +34,7 @@ def runner(app): "uri":"tcp://192.168.1.100:1235" } ], - "partition":"ccTest" + "session":"ccTest" }""") @@ -47,6 +47,23 @@ def test_publish(client): resp = client.post("/publish", json=con) assert resp.status_code == 200 + +def test_publish_with_partition_still_works(client): + partition_con = { + "connections": con["connections"], + "partition": "ccPartitionCompat", + } + + resp = client.post("/publish", json=partition_con) + assert resp.status_code == 200 + + query = {"uid_regex": "DRO.*", "data_type": "TPSet"} + resp = client.post("/getconnection/ccPartitionCompat", json=query) + assert resp.status_code == 200 + + rjson = json.loads(resp.data) + assert len(rjson) == 2 + def test_lookup(client): query = json.loads("""{"uid_regex":"DRO.*", "data_type":"TPSet"}""") resp = client.post("/getconnection/ccTest", json=query) @@ -68,7 +85,7 @@ def test_retract(client): resp = client.post("/retract") assert resp.status_code == 400 - retraction = json.loads("""{"partition":"ccTest", + retraction = json.loads("""{"session":"ccTest", "connections":[{"connection_id":"DRO-000-tp_to_trigger"}, {"connection_id":"DRO-001-tp_to_trigger"}] }""") @@ -84,14 +101,14 @@ def test_retract(client): assert resp.status_code == 404 -def test_retract_partition(client): +def test_retract_session(client): resp = client.post("/publish", json=con) assert resp.status_code == 200 resp = client.post("/retract-session") assert resp.status_code == 400 - retraction = json.loads("""{"partition":"ccTest"}""") + retraction = json.loads("""{"session":"ccTest"}""") resp = client.post("/retract-session", json=retraction) assert resp.status_code == 200