Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ server to serve connection information to DAQ applications.
http://connection-flask.connections:5000/publish
```

### /getconnection/<partition>
### /getconnection/<session>
This uri returns a list of connections matching the 'uid_regex' and
'data_type' specified in the JSON encoded request.

Expand All @@ -53,9 +53,9 @@ curl -d '{"uid_regex":"DRO.*","data_type":"TPSet"}' \
This uri should be used to remove published connections. The request should be JSON encoded with the keys "partition" and "connections" with the latter being an array of "connection_id" and "data_type" values.


### /retract-partition
### /retract-session
This uri should be used to remove all published connections from the
given partition. The request should be JSON encoded with one field "partition" naming the partition to be retracted.
given session. The request should be JSON encoded with one field "session" naming the session to be retracted.

## Running the server locally from the command line
The server is intended to be run under the Gunicorn web server.
Expand Down
156 changes: 95 additions & 61 deletions src/connectivityserver/connectionflask.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,30 @@
# Some functions exit with an abort(NNN) instead of return so don't complain!
#ruff: noqa RET503

partitions={}
partlock=Lock()
sessions={}
seshlock=Lock()

if 'CONNECTION_FLASK_DEBUG' in os.environ:
debug_level=int(os.environ['CONNECTION_FLASK_DEBUG'])
else:
debug_level=1


def get_sesh(js):
has_session = 'session' in js
has_partition = 'partition' in js

if has_session and has_partition:
raise ValueError("Both 'session' and 'partition' were provided")

if has_session:
return js['session']

if has_partition:
return js['partition']

raise ValueError("No session (partition) provided")

def convert_log_level(log_level):
if log_level == 0:
return logging.WARNING
Expand All @@ -50,7 +66,7 @@ def convert_log_level(log_level):
nlookups=0
lookup_time=timedelta(0)
publish_time=timedelta()
maxpartitions=0
maxsessions=0
maxentries={}

app=Flask(__name__)
Expand All @@ -60,19 +76,19 @@ def dump():
now=datetime.now()
dstream=StringIO()
dstream.write('<h1>Dump of configuration dictionary</h1>')
dstream.write("<h2>Active partitions</h2><p>")
if len(partitions)>0:
dstream.write("<h2>Active sessions</h2><p>")
if len(sessions)>0:
pad=' style="padding-left: 1em;padding-right: 1em"'
dstream.write(f'<table style="border: 1px solid black">'
f'<tr style="background: #e0e0e0"><th{pad}>Partition</th>'
f'<tr style="background: #e0e0e0"><th{pad}>Session</th>'
f'<th{pad}>Entries</th></tr>')
for p in partitions:
for p in sessions:
dstream.write(f'<tr><td{pad}>{p}'
f'</td><td{pad}>{len(partitions[p])}</td></tr>')
f'</td><td{pad}>{len(sessions[p])}</td></tr>')
dstream.write("</table>")
dstream.write(f'<h2>Partitions</h2>')
for p in partitions:
store=partitions[p]
dstream.write(f'<h2>Sessions</h2>')
for p in sessions:
store=sessions[p]
dstream.write(f'<h3>{p}</h3>')
dstream.write(f'<table style="border: 1px solid black">'
f'<tr style="background: #e0e0e0">'
Expand Down Expand Up @@ -117,9 +133,9 @@ def stats_to_html(dstream):
avg_lookup=timedelta()
dstream.write(f"<p>{nlookups} calls to lookup in total time {lookup_time} "
f"(average {avg_lookup.microseconds} &micro;s per call)</p>")
dstream.write(f"<p>Maximum number of partitions active = {maxpartitions}</p>")
for part in maxentries:
dstream.write(f"<p>Maximum entries in partition {part} = {maxentries[part]}</p>")
dstream.write(f"<p>Maximum number of sessions active = {maxsessions}</p>")
for sesh in maxentries:
dstream.write(f"<p>Maximum entries in session {sesh} = {maxentries[sesh]}</p>")

@app.route("/stats")
def dumpStats():
Expand All @@ -133,51 +149,54 @@ def dumpStats():
def resetStats():
stats = dumpStats()

global last_stats,npublishes,nlookups,lookup_time,publish_time,maxpartitions,maxentries
global last_stats,npublishes,nlookups,lookup_time,publish_time,maxsessions,maxentries

last_stats=datetime.now()
npublishes=0
nlookups=0
lookup_time=timedelta(0)
publish_time=timedelta()
maxpartitions=0
maxsessions=0
maxentries={}

return stats

@app.route("/resetService")
def reset():

global partitions
partitions={}
global sessions
sessions={}
return resetStats()


@app.route("/publish",methods=['POST'])
def publish():
# Store multiple connection ids and corresponding uris in a
# dictionary associated with the appropriate partition.
# dictionary associated with the appropriate session.
timestamp=datetime.now()
js=json.loads(request.data)

log.debug(f"{js=}")
part=js['partition']
try:
sesh = get_sesh(js)
except ValueError:
abort(400)

log.info(
f"{len(js['connections'])} connections in partition {part} from {request.remote_addr} uri={js['connections'][0]['uri']}..."
f"{len(js['connections'])} connections in session {sesh} from {request.remote_addr} uri={js['connections'][0]['uri']}..."
)

partlock.acquire()
if part in partitions:
store=partitions[part]
seshlock.acquire()
if sesh in sessions:
store=sessions[sesh]
else:
store={}
partitions[part]=store
global maxpartitions
if len(partitions)>maxpartitions:
maxpartitions=len(partitions)
if part not in maxentries:
maxentries[part]=0
sessions[sesh]=store
global maxsessions
if len(sessions)>maxsessions:
maxsessions=len(sessions)
if sesh not in maxentries:
maxentries[sesh]=0

Connection=namedtuple(
'Connection',['uri','data_type','capacity','connection_type','time'])
Expand Down Expand Up @@ -206,47 +225,62 @@ def publish():
publish_time+=elapsed
npublishes+=1

if len(store)>maxentries[part]:
maxentries[part]=len(store)
if len(store)>maxentries[sesh]:
maxentries[sesh]=len(store)

partlock.release()
seshlock.release()
return 'OK'

@app.route("/retract-partition",methods=['POST'])
def retract_partition():
@app.route("/retract-session",methods=['POST'])
def retract_session():
if len(request.data) == 0:
abort(400)

js=json.loads(request.data)
log.debug(f"request=[{js}]")

if 'partition' not in js:
try:
sesh = get_sesh(js)
except ValueError:
abort(400)

part=js['partition']
partlock.acquire()
seshlock.acquire()

if part in partitions:
partitions.pop(part)
partlock.release()
if sesh in sessions:
sessions.pop(sesh)
seshlock.release()
return 'OK'
partlock.release()
seshlock.release()
abort(404)


@app.route("/retract-partition",methods=['POST'])
def retract_partition():
"""
For backwards compatibility.
Delete this when the partition word is completely retired
"""
log.warning("retract-partition depreciated. Please use retract-session.")
retract_session()

@app.route("/retract",methods=['POST'])
def retract():
if len(request.data) == 0:
abort(400)

js=json.loads(request.data)
good=True
part=js['partition']
partlock.acquire()
if part not in partitions:
partlock.release()
return make_response(f"Partition {part} not found", 404)
try:
sesh = get_sesh(js)
except ValueError:
abort(400)

seshlock.acquire()
if sesh not in sessions:
seshlock.release()
return make_response(f"Session {sesh} not found", 404)

store=partitions[part]
store=sessions[sesh]
if 'connections' not in js:
abort(400)
for con in js['connections']:
Expand All @@ -257,16 +291,16 @@ def retract():
log.info(f"Could not find connection_id <{id}>")
good=False
if len(store)==0:
# We've deleted the last entry in this partition so delete the
# partition as well
partitions.pop(part)
partlock.release()
# We've deleted the last entry in this session so delete the
# session as well
sessions.pop(sesh)
seshlock.release()
if good:
return 'OK'
abort(404)

@app.route("/getconnection/<part>",methods=['POST','GET'])
def get_connection(part):
@app.route("/getconnection/<sesh>",methods=['POST','GET'])
def get_connection(sesh):
if len(request.data) == 0:
abort(400)

Expand All @@ -285,17 +319,17 @@ def get_connection(part):
result=[]
regex=re.compile(js['uid_regex'])
dt=js['data_type']
partlock.acquire()
seshlock.acquire()

if part in partitions:
store=partitions[part]
if sesh in sessions:
store=sessions[sesh]
matched=[]
for uid,con in store.items():
if regex.fullmatch(uid) and con.data_type==dt and now-con.time<entry_ttl:
matched.append((uid,con))
partlock.release()
seshlock.release()
# We should now be able to construct JSON string while other threads
# have access to the partition dict
# have access to the session dict
for uid,con in matched:
result.append('{'
f'"uid":"{uid}",'
Expand All @@ -317,8 +351,8 @@ def get_connection(part):

return "["+",".join(result)+"]"

partlock.release()
log.info(f"Partition {part} not found")
seshlock.release()
log.info(f"Session {sesh} not found")
abort(404)
else:
abort(400)
Expand Down
31 changes: 24 additions & 7 deletions tests/test_connectivityservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def runner(app):
"uri":"tcp://192.168.1.100:1235"
}
],
"partition":"ccTest"
"session":"ccTest"
}""")


Expand All @@ -47,6 +47,23 @@ def test_publish(client):
resp = client.post("/publish", json=con)
assert resp.status_code == 200


def test_publish_with_partition_still_works(client):
partition_con = {
"connections": con["connections"],
"partition": "ccPartitionCompat",
}

resp = client.post("/publish", json=partition_con)
assert resp.status_code == 200

query = {"uid_regex": "DRO.*", "data_type": "TPSet"}
resp = client.post("/getconnection/ccPartitionCompat", json=query)
assert resp.status_code == 200

rjson = json.loads(resp.data)
assert len(rjson) == 2

def test_lookup(client):
query = json.loads("""{"uid_regex":"DRO.*", "data_type":"TPSet"}""")
resp = client.post("/getconnection/ccTest", json=query)
Expand All @@ -68,7 +85,7 @@ def test_retract(client):
resp = client.post("/retract")
assert resp.status_code == 400

retraction = json.loads("""{"partition":"ccTest",
retraction = json.loads("""{"session":"ccTest",
"connections":[{"connection_id":"DRO-000-tp_to_trigger"},
{"connection_id":"DRO-001-tp_to_trigger"}]
}""")
Expand All @@ -84,19 +101,19 @@ def test_retract(client):
assert resp.status_code == 404


def test_retract_partition(client):
def test_retract_session(client):
resp = client.post("/publish", json=con)
assert resp.status_code == 200

resp = client.post("/retract-partition")
resp = client.post("/retract-session")
assert resp.status_code == 400

retraction = json.loads("""{"partition":"ccTest"}""")
resp = client.post("/retract-partition", json=retraction)
retraction = json.loads("""{"session":"ccTest"}""")
resp = client.post("/retract-session", json=retraction)
assert resp.status_code == 200

# Second time should fail
resp = client.post("/retract-partition", json=retraction)
resp = client.post("/retract-session", json=retraction)
assert resp.status_code == 404


Expand Down