Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions geonode/people/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,23 +280,40 @@ def handle_user_invalid_email(user):


def _update_user_groups_from_social(sociallogin, user):

# Retrieve the strategy from settings, defaulting FULL_SYNC
sync_strategy = getattr(settings, "SOCIALACCOUNT_SYNC_USER_GROUPS_ON_LOGIN", "FULL_SYNC")

if sync_strategy == "NO_SYNC":
return user

extractor = get_data_extractor(sociallogin.account.provider)
group_role_mapper = get_group_role_mapper(sociallogin.account.provider)
try:
groups = extractor.extract_groups(sociallogin.account.extra_data) or extractor.extract_roles(
sociallogin.account.extra_data
)
# Try groups first
groups = extractor.extract_groups(sociallogin.account.extra_data)

# If groups are missing, try roles
if groups == "":
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe if not groups is better, so if the code change and returns False / None or [] here is still compatible

groups = extractor.extract_roles(sociallogin.account.extra_data)

# check here if user is member already of other groups and remove it form the ones that are not declared here...
# If groups is STILL "", it means BOTH were missing.
# If groups is [], this check will be FALSE, and the wipe will happen.
if sync_strategy == "SAFE_SYNC" and groups == "":
return user

# Perform the "Wipe" for FULL_SYNC or SAFE_SYNC (if we have data)
for groupprofile in user.group_list_all():
groupprofile.leave(user)
for group_role_name in groups:
group_name, role_name = group_role_mapper.parse_group_and_role(group_role_name)
groupprofile = GroupProfile.objects.filter(slug=group_name).first()
if groupprofile:
groupprofile.join(user)
if group_role_mapper.is_manager(role_name):
groupprofile.promote(user)

if groups and not isinstance(groups, str):
for group_role_name in groups:
group_name, role_name = group_role_mapper.parse_group_and_role(group_role_name)
groupprofile = GroupProfile.objects.filter(slug=group_name).first()
if groupprofile:
groupprofile.join(user)
if group_role_mapper.is_manager(role_name):
groupprofile.promote(user)
except (AttributeError, NotImplementedError):
pass # extractor doesn't define a method for extracting field
return user
Expand Down
110 changes: 110 additions & 0 deletions geonode/people/test_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

from unittest import TestCase

from django.test import override_settings
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group

from geonode.groups.models import GroupProfile
from geonode.tests.base import GeoNodeBaseTestSupport
from geonode.people import adapters

try:
Expand Down Expand Up @@ -382,3 +388,107 @@ def test_respond_user_inactive(self, mock_func):
result = self.extractor.respond_user_inactive(self.django_request, dummy_user)
mock_func.assert_called_with(dummy_user)
self.assertEqual(result, dummy_return)


class GenericOpenIDConnectAdapterTestCase(GeoNodeBaseTestSupport):
fixtures = ["initial_data.json", "group_test_data.json", "default_oauth_apps.json"]

def setUp(self):
super().setUp()
self.user = get_user_model().objects.get(username="test_user")
self.group_bar = GroupProfile.objects.get(slug="bar")

# Ensure a clean state: put user in 'bar' to start
if not self.user.groups.filter(pk=self.group_bar.pk).exists():
self.group_bar.join(self.user)

# Mock the SocialLogin
self.mock_sociallogin = mock.MagicMock()
self.mock_sociallogin.account.provider = "openid_connect"
self.mock_sociallogin.user = self.user

@mock.patch("geonode.people.adapters.get_data_extractor")
@override_settings(SOCIALACCOUNT_SYNC_USER_GROUPS_ON_LOGIN="SAFE_SYNC")
def test_safe_sync_skips_on_missing_key(self, mock_get_extractor):
"""Verify SAFE_SYNC preserves 'bar' if group data is missing."""
# Setup mock extractor to return "" (missing key signal)
mock_ext = mock.MagicMock()
mock_ext.extract_groups.return_value = ""
mock_ext.extract_roles.return_value = ""
mock_get_extractor.return_value = mock_ext

# Run logic with no groups in extra_data
self.mock_sociallogin.account.extra_data = {"id": "123"}
adapters._update_user_groups_from_social(self.mock_sociallogin, self.user)

# Assert: User is still in 'bar'
self.assertTrue(self.user.groups.filter(groupprofile__slug="bar").exists())

@mock.patch("geonode.people.adapters.get_data_extractor")
@override_settings(SOCIALACCOUNT_SYNC_USER_GROUPS_ON_LOGIN="SAFE_SYNC")
def test_safe_sync_wipes_on_empty_list(self, mock_get_extractor):
"""Verify SAFE_SYNC wipes 'bar' if provider explicitly sends empty list."""
mock_ext = mock.MagicMock()
mock_ext.extract_groups.return_value = []
mock_get_extractor.return_value = mock_ext

self.mock_sociallogin.account.extra_data = {"groups": []}
adapters._update_user_groups_from_social(self.mock_sociallogin, self.user)

# Assert: User is removed from everything
self.assertEqual(self.user.groups.count(), 0)

@mock.patch("geonode.people.adapters.get_data_extractor")
@override_settings(SOCIALACCOUNT_SYNC_USER_GROUPS_ON_LOGIN="SAFE_SYNC")
def test_safe_sync_performs_correct_update(self, mock_get_extractor):
"""Verify SAFE_SYNC acts like FULL_SYNC when valid data is present."""
azure_uuid = "4b7e2db3-2bd2-4c8e-aa71-7d6d2714e603"

# Create the base Django Group first
dj_group, _ = Group.objects.get_or_create(name="Azure Group")

GroupProfile.objects.get_or_create(slug=azure_uuid, defaults={"group": dj_group, "title": "Azure Group"})

mock_ext = mock.MagicMock()
mock_ext.extract_groups.return_value = [azure_uuid]
mock_get_extractor.return_value = mock_ext

self.mock_sociallogin.account.extra_data = {"groups": [azure_uuid]}
adapters._update_user_groups_from_social(self.mock_sociallogin, self.user)

# User should have left 'bar' and joined the Azure group
self.assertFalse(self.user.groups.filter(groupprofile__slug="bar").exists())
self.assertTrue(self.user.groups.filter(groupprofile__slug=azure_uuid).exists())

@mock.patch("geonode.people.adapters.get_data_extractor")
@override_settings(SOCIALACCOUNT_SYNC_USER_GROUPS_ON_LOGIN="FULL_SYNC")
def test_full_sync_wipes_on_missing_key(self, mock_get_extractor):
"""Verify FULL_SYNC (default) wipes groups even if the key is missing."""
# Mock extractor to return "" (missing key signal)
mock_ext = mock.MagicMock()
mock_ext.extract_groups.return_value = ""
mock_ext.extract_roles.return_value = ""
mock_get_extractor.return_value = mock_ext

# Simulate Azure response with NO groups key
self.mock_sociallogin.account.extra_data = {"id": "123"}
adapters._update_user_groups_from_social(self.mock_sociallogin, self.user)

# In FULL_SYNC, the user should be removed from 'bar'
self.assertFalse(self.user.groups.filter(groupprofile__slug="bar").exists())

@mock.patch("geonode.people.adapters.get_data_extractor")
@override_settings(SOCIALACCOUNT_SYNC_USER_GROUPS_ON_LOGIN="NO_SYNC")
def test_no_sync_ignores_provider_data(self, mock_get_extractor):
"""Verify NO_SYNC does not touch local groups regardless of provider data."""
mock_ext = mock.MagicMock()
# Even if provider says the user belongs to 'bar'
mock_ext.extract_groups.return_value = ["bar"]
mock_get_extractor.return_value = mock_ext

# and we simulate an empty list in the token
self.mock_sociallogin.account.extra_data = {"groups": []}
adapters._update_user_groups_from_social(self.mock_sociallogin, self.user)

# User should STILL be in 'bar' because we are not syncing
self.assertTrue(self.user.groups.filter(groupprofile__slug="bar").exists())
3 changes: 3 additions & 0 deletions geonode/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2012,6 +2012,9 @@ def get_geonode_catalogue_service():
SOCIALACCOUNT_OIDC_PROVIDER: SOCIALACCOUNT_PROVIDERS_DEFS.get(_SOCIALACCOUNT_PROVIDER),
}

# Default to FULL_SYNC to maintain backward compatibility
SOCIALACCOUNT_SYNC_USER_GROUPS_ON_LOGIN = os.getenv("SOCIALACCOUNT_SYNC_USER_GROUPS_ON_LOGIN", "FULL_SYNC")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would add here as a comment the three possibility and what they actually do


DISPLAY_RATINGS = ast.literal_eval(os.getenv("DISPLAY_RATINGS", "True"))
DISPLAY_WMS_LINKS = ast.literal_eval(os.getenv("DISPLAY_WMS_LINKS", "True"))

Expand Down
Loading