Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 75 additions & 13 deletions lib/diameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3234,6 +3234,54 @@ def Answer_16777216_301(self, packet_vars, avps):
response = self.generate_diameter_packet("01", "40", 301, 16777216, packet_vars['hop-by-hop-identifier'], packet_vars['end-to-end-identifier'], avp) #Generate Diameter packet
return response

def _ifc_has_unregistered_services(self, ims_subscriber_details) -> bool:
"""Check whether the subscriber's iFC template contains any rule with
SessionCase=2 (TERMINATING_UNREGISTERED), which indicates services for
unregistered users (e.g. voicemail, CFNR).

Per 3GPP TS 29.228 Section 6.1.2 the HSS must distinguish between
DIAMETER_UNREGISTERED_SERVICE (5005) and
DIAMETER_ERROR_IDENTITY_NOT_REGISTERED (5003) based on this.
"""
try:
details = dict(ims_subscriber_details)
details['mnc'] = self.MNC.zfill(3)
details['mcc'] = self.MCC.zfill(3)

template = None
if self.ifcCacheEnabled:
template = self.ifcTemplateCache.get_template(details, config, self.database)
else:
if self.ifcUseDatabase and details.get('ifc_template_id'):
template_data = self.database.GetObj(IFC_TEMPLATE, details['ifc_template_id'])
if template_data and 'template_content' in template_data:
template = jinja2.Template(template_data['template_content'])
if template is None:
ifc_path = details.get('ifc_path') or self.ifcDefaultTemplatePath
templateLoader = jinja2.FileSystemLoader(searchpath="../")
templateEnv = jinja2.Environment(loader=templateLoader)
template = templateEnv.get_template(ifc_path)

if template is None:
return False

xmlbody = template.render(iFC_vars=details)
return '<SessionCase>2</SessionCase>' in xmlbody
except Exception as e:
self.logTool.log(service='HSS', level='warning',
message=f"Failed to check iFC for unregistered services: {e}",
redisClient=self.redisMessaging)
return False

def _get_scscf_from_pool(self) -> str:
"""Pick an S-CSCF URI from the configured pool or generate a default."""
if 'scscf_pool' in config['hss']:
try:
return random.choice(config['hss']['scscf_pool'])
except Exception:
pass
return "sip:scscf.ims.mnc" + str(self.MNC).zfill(3) + ".mcc" + str(self.MCC).zfill(3) + ".3gppnetwork.org"

#3GPP Cx Location Information Answer
def Answer_16777216_302(self, packet_vars, avps):
avp = '' #Initiate empty var AVP #Session-ID
Expand All @@ -3250,21 +3298,35 @@ def Answer_16777216_302(self, packet_vars, avps):
ims_subscriber_details = self.Get_IMS_Subscriber_Details_from_AVP(username)
if ims_subscriber_details['scscf'] != None:
self.logTool.log(service='HSS', level='debug', message="Got SCSCF on record for Sub", redisClient=self.redisMessaging)
#Strip double sip prefix
avp += self.generate_vendor_avp(602, "c0", 10415, str(binascii.hexlify(str.encode(str(ims_subscriber_details['scscf']))),'ascii'))
else:
self.logTool.log(service='HSS', level='debug', message="No SCSF assigned - Using SCSCF Pool", redisClient=self.redisMessaging)
if 'scscf_pool' in config['hss']:
try:
scscf = random.choice(config['hss']['scscf_pool'])
self.logTool.log(service='HSS', level='debug', message="Randomly picked SCSCF address " + str(scscf) + " from pool", redisClient=self.redisMessaging)
avp += self.generate_vendor_avp(602, "c0", 10415, str(binascii.hexlify(str.encode(scscf)),'ascii'))
except Exception as E:
avp += self.generate_vendor_avp(602, "c0", 10415, str(binascii.hexlify(str.encode("sip:scscf.ims.mnc" + str(self.MNC).zfill(3) + ".mcc" + str(self.MCC).zfill(3) + ".3gppnetwork.org")),'ascii'))
self.logTool.log(service='HSS', level='debug', message="Using generated iFC as failed to source from list due to " + str(E), redisClient=self.redisMessaging)
else:
avp += self.generate_vendor_avp(602, "c0", 10415, str(binascii.hexlify(str.encode("sip:scscf.ims.mnc" + str(self.MNC).zfill(3) + ".mcc" + str(self.MCC).zfill(3) + ".3gppnetwork.org")),'ascii'))
self.logTool.log(service='HSS', level='debug', message="Using generated iFC", redisClient=self.redisMessaging)
# Subscriber exists but is not registered (no S-CSCF assigned).
# Per 3GPP TS 29.228 Section 6.1.2:
# - If the user has services for unregistered state (iFC with
# SessionCase=2), return DIAMETER_UNREGISTERED_SERVICE (5005)
# with a Server-Name from the pool so the I-CSCF can route
# to an S-CSCF for unregistered service execution.
# - Otherwise return DIAMETER_ERROR_IDENTITY_NOT_REGISTERED (5003)
# without Server-Name (no S-CSCF needed).
if self._ifc_has_unregistered_services(ims_subscriber_details):
scscf_uri = self._get_scscf_from_pool()
avp += self.generate_vendor_avp(602, "c0", 10415, str(binascii.hexlify(str.encode(scscf_uri)),'ascii'))
self.logTool.log(service='HSS', level='info',
message="Subscriber not registered but has unregistered services - returning DIAMETER_UNREGISTERED_SERVICE (5005)",
redisClient=self.redisMessaging)
result_code = 5005
else:
self.logTool.log(service='HSS', level='info',
message="Subscriber not registered and no unregistered services - returning DIAMETER_ERROR_IDENTITY_NOT_REGISTERED (5003)",
redisClient=self.redisMessaging)
result_code = 5003

avp_experimental_result = ''
avp_experimental_result += self.generate_vendor_avp(266, 40, 10415, '')
avp_experimental_result += self.generate_avp(298, 40, self.int_to_hex(result_code, 4))
avp += self.generate_avp(297, 40, avp_experimental_result)
response = self.generate_diameter_packet("01", "40", 302, 16777216, packet_vars['hop-by-hop-identifier'], packet_vars['end-to-end-identifier'], avp)
return response
except Exception as E:
self.logTool.log(service='HSS', level='debug', message="Threw Exception: " + str(E), redisClient=self.redisMessaging)
self.logTool.log(service='HSS', level='debug', message=f"No known MSISDN or IMSI in Answer_16777216_302()", redisClient=self.redisMessaging)
Expand Down
Loading