diff --git a/docs/README.md b/docs/README.md index 8028c69..3438334 100644 --- a/docs/README.md +++ b/docs/README.md @@ -37,7 +37,7 @@ server to serve connection information to DAQ applications. http://connection-flask.connections:5000/publish ``` -### /getconnection/ +### /getconnection/ This uri returns a list of connections matching the 'uid_regex' and 'data_type' specified in the JSON encoded request. @@ -53,9 +53,9 @@ curl -d '{"uid_regex":"DRO.*","data_type":"TPSet"}' \ This uri should be used to remove published connections. The request should be JSON encoded with the keys "partition" and "connections" with the latter being an array of "connection_id" and "data_type" values. -### /retract-partition +### /retract-session This uri should be used to remove all published connections from the -given partition. The request should be JSON encoded with one field "partition" naming the partition to be retracted. +given session. The request should be JSON encoded with one field "session" naming the session to be retracted. ## Running the server locally from the command line The server is intended to be run under the Gunicorn web server. diff --git a/src/connectivityserver/connectionflask.py b/src/connectivityserver/connectionflask.py index b164662..d725c8f 100755 --- a/src/connectivityserver/connectionflask.py +++ b/src/connectivityserver/connectionflask.py @@ -19,14 +19,30 @@ # Some functions exit with an abort(NNN) instead of return so don't complain! #ruff: noqa RET503 -partitions={} -partlock=Lock() +sessions={} +seshlock=Lock() if 'CONNECTION_FLASK_DEBUG' in os.environ: debug_level=int(os.environ['CONNECTION_FLASK_DEBUG']) else: debug_level=1 + +def get_sesh(js): + has_session = 'session' in js + has_partition = 'partition' in js + + if has_session and has_partition: + raise ValueError("Both 'session' and 'partition' were provided") + + if has_session: + return js['session'] + + if has_partition: + return js['partition'] + + raise ValueError("No session (partition) provided") + def convert_log_level(log_level): if log_level == 0: return logging.WARNING @@ -50,7 +66,7 @@ def convert_log_level(log_level): nlookups=0 lookup_time=timedelta(0) publish_time=timedelta() -maxpartitions=0 +maxsessions=0 maxentries={} app=Flask(__name__) @@ -60,19 +76,19 @@ def dump(): now=datetime.now() dstream=StringIO() dstream.write('

Dump of configuration dictionary

') - dstream.write("

Active partitions

") - if len(partitions)>0: + dstream.write("

Active sessions

") + if len(sessions)>0: pad=' style="padding-left: 1em;padding-right: 1em"' dstream.write(f'' - f'Partition' + f'Session' f'Entries') - for p in partitions: + for p in sessions: dstream.write(f'{p}' - f'{len(partitions[p])}') + f'{len(sessions[p])}') dstream.write("
") - dstream.write(f'

Partitions

') - for p in partitions: - store=partitions[p] + dstream.write(f'

Sessions

') + for p in sessions: + store=sessions[p] dstream.write(f'

{p}

') dstream.write(f'' f'' @@ -117,9 +133,9 @@ def stats_to_html(dstream): avg_lookup=timedelta() dstream.write(f"

{nlookups} calls to lookup in total time {lookup_time} " f"(average {avg_lookup.microseconds} µs per call)

") - dstream.write(f"

Maximum number of partitions active = {maxpartitions}

") - for part in maxentries: - dstream.write(f"

Maximum entries in partition {part} = {maxentries[part]}

") + dstream.write(f"

Maximum number of sessions active = {maxsessions}

") + for sesh in maxentries: + dstream.write(f"

Maximum entries in session {sesh} = {maxentries[sesh]}

") @app.route("/stats") def dumpStats(): @@ -133,14 +149,14 @@ def dumpStats(): def resetStats(): stats = dumpStats() - global last_stats,npublishes,nlookups,lookup_time,publish_time,maxpartitions,maxentries + global last_stats,npublishes,nlookups,lookup_time,publish_time,maxsessions,maxentries last_stats=datetime.now() npublishes=0 nlookups=0 lookup_time=timedelta(0) publish_time=timedelta() - maxpartitions=0 + maxsessions=0 maxentries={} return stats @@ -148,36 +164,39 @@ def resetStats(): @app.route("/resetService") def reset(): - global partitions - partitions={} + global sessions + sessions={} return resetStats() @app.route("/publish",methods=['POST']) def publish(): # Store multiple connection ids and corresponding uris in a - # dictionary associated with the appropriate partition. + # dictionary associated with the appropriate session. timestamp=datetime.now() js=json.loads(request.data) log.debug(f"{js=}") - part=js['partition'] + try: + sesh = get_sesh(js) + except ValueError: + abort(400) log.info( - f"{len(js['connections'])} connections in partition {part} from {request.remote_addr} uri={js['connections'][0]['uri']}..." + f"{len(js['connections'])} connections in session {sesh} from {request.remote_addr} uri={js['connections'][0]['uri']}..." ) - partlock.acquire() - if part in partitions: - store=partitions[part] + seshlock.acquire() + if sesh in sessions: + store=sessions[sesh] else: store={} - partitions[part]=store - global maxpartitions - if len(partitions)>maxpartitions: - maxpartitions=len(partitions) - if part not in maxentries: - maxentries[part]=0 + sessions[sesh]=store + global maxsessions + if len(sessions)>maxsessions: + maxsessions=len(sessions) + if sesh not in maxentries: + maxentries[sesh]=0 Connection=namedtuple( 'Connection',['uri','data_type','capacity','connection_type','time']) @@ -206,33 +225,44 @@ def publish(): publish_time+=elapsed npublishes+=1 - if len(store)>maxentries[part]: - maxentries[part]=len(store) + if len(store)>maxentries[sesh]: + maxentries[sesh]=len(store) - partlock.release() + seshlock.release() return 'OK' -@app.route("/retract-partition",methods=['POST']) -def retract_partition(): +@app.route("/retract-session",methods=['POST']) +def retract_session(): if len(request.data) == 0: abort(400) js=json.loads(request.data) log.debug(f"request=[{js}]") - if 'partition' not in js: + try: + sesh = get_sesh(js) + except ValueError: abort(400) - part=js['partition'] - partlock.acquire() + seshlock.acquire() - if part in partitions: - partitions.pop(part) - partlock.release() + if sesh in sessions: + sessions.pop(sesh) + seshlock.release() return 'OK' - partlock.release() + seshlock.release() abort(404) + +@app.route("/retract-partition",methods=['POST']) +def retract_partition(): + """ + For backwards compatibility. + Delete this when the partition word is completely retired + """ + log.warning("retract-partition depreciated. Please use retract-session.") + retract_session() + @app.route("/retract",methods=['POST']) def retract(): if len(request.data) == 0: @@ -240,13 +270,17 @@ def retract(): js=json.loads(request.data) good=True - part=js['partition'] - partlock.acquire() - if part not in partitions: - partlock.release() - return make_response(f"Partition {part} not found", 404) + try: + sesh = get_sesh(js) + except ValueError: + abort(400) + + seshlock.acquire() + if sesh not in sessions: + seshlock.release() + return make_response(f"Session {sesh} not found", 404) - store=partitions[part] + store=sessions[sesh] if 'connections' not in js: abort(400) for con in js['connections']: @@ -257,16 +291,16 @@ def retract(): log.info(f"Could not find connection_id <{id}>") good=False if len(store)==0: - # We've deleted the last entry in this partition so delete the - # partition as well - partitions.pop(part) - partlock.release() + # We've deleted the last entry in this session so delete the + # session as well + sessions.pop(sesh) + seshlock.release() if good: return 'OK' abort(404) -@app.route("/getconnection/",methods=['POST','GET']) -def get_connection(part): +@app.route("/getconnection/",methods=['POST','GET']) +def get_connection(sesh): if len(request.data) == 0: abort(400) @@ -285,17 +319,17 @@ def get_connection(part): result=[] regex=re.compile(js['uid_regex']) dt=js['data_type'] - partlock.acquire() + seshlock.acquire() - if part in partitions: - store=partitions[part] + if sesh in sessions: + store=sessions[sesh] matched=[] for uid,con in store.items(): if regex.fullmatch(uid) and con.data_type==dt and now-con.time