"accent_color": data.accent_color
}
}
if guild_id and (guild := await models.Guild.get_or_none(id=guild_id)):
if member := await guild.get_member(self.id):
data["guild_member_profile"] = {"guild_id": str(guild_id)}
data["guild_member"] = await member.ds_json()
if mutual_friends_count:
data["mutual_friends_count"] = 0 # TODO: add mutual friends count
if with_mutual_guilds:
query = Q(user=self)
if self != other_user:
query &= Q(guild__id__in=Subquery(
models.GuildMember
.filter(user__id__in=[self.id, other_user.id])
.group_by("guild_id")
.annotate(user_count=Count("user__id", distinct=True))
.filter(user_count=2)
.values_list("guild_id", flat=True)
))
data["mutual_guilds"] = [
{"id": str(guild_id), "nick": nick}
for nick, guild_id in await models.GuildMember.filter(query).values_list("nick", "guild_id")
]
if self.is_bot:
data["user"]["bot"] = True
return data
# noinspection PyMethodMayBeStatic
async def get_another_user(self, user_id: int) -> User:
# TODO: check for relationship, mutual guilds or mutual friends
if (user := await User.y.get(user_id, False)) is None: # TODO: add test for nonexistent user
raise UnknownUser
return user
def check_password(self, password: str) -> bool:
return checkpw(self.y.prepare_password(password, self.id), self.password.encode("utf8"))
def hash_new_password(self, password: str) -> str:
return self.y.hash_password(password, self.id)
async def change_password(self, new_password: str) -> None:
self.password = self.hash_new_password(new_password)
await self.save(update_fields=["password"])
async def change_username(self, username: str) -> None:
data = await self.data
discriminator = data.discriminator
if await User.y.getByUsername(username, discriminator):
discriminator = await self.y.get_free_discriminator(username)
if discriminator is None:
raise InvalidDataErr(400, Errors.make(50035, {"username": {
"code": "USERNAME_TOO_MANY_USERS",
"message": "This name is used by too many users. Please enter something else or try again."
}}))
data.username = username
data.discriminator = discriminator
await data.save(update_fields=["username", "discriminator"])
async def change_discriminator(self, new_discriminator: int, username_changed: bool = False) -> bool:
data = await self.data
username = data.username
if await self.y.getByUsername(username, new_discriminator):
if username_changed:
return False
raise InvalidDataErr(400, Errors.make(50035, {"username": {
"code": "USERNAME_TOO_MANY_USERS",
"message": "This discriminator already used by someone. Please enter something else."
}}))
data.discriminator = new_discriminator
await data.save(update_fields=["discriminator"])
return True
async def change_email(self, new_email: str) -> None:
new_email = new_email.lower()
if self.email == new_email:
return
if await User.exists(email=new_email):
raise InvalidDataErr(400, Errors.make(50035, {"email": {"code": "EMAIL_ALREADY_REGISTERED",
"message": "Email address already registered."}}))
self.email = new_email
self.verified = False
await self.save()
async def create_backup_codes(self) -> list[str]:
codes = ["".join([choice('abcdefghijklmnopqrstuvwxyz0123456789') for _ in range(8)]) for _ in range(10)]
await self.clear_backup_codes()
await models.MfaCode.bulk_create([
models.MfaCode(user=self, code=code) for code in codes
])
return codes
async def clear_backup_codes(self) -> None:
await models.MfaCode.filter(user=self).delete()
async def get_backup_codes(self) -> list[str]:
return [code.code for code in await models.MfaCode.filter(user=self).limit(10)]
async def use_backup_code(self, code: str) -> bool:
if (code := await models.MfaCode.get_or_none(user=self, code=code, used=False)) is None:
return False
code.used = True
await code.save(update_fields=["used"])
return True
async def y_delete(self) -> None:
await self.update(deleted=True, email=f"deleted_{self.id}@yepcord.ml", password="")
data = await self.data
await data.update(discriminator=0, username=f"Deleted User {hex(self.id)[2:]}", avatar=None, public_flags=0,
avatar_decoration=None)
await models.Session.filter(user=self).delete()
await models.Relationship.filter(Q(from_user=self) | Q(to_user=self)).delete()
await models.MfaCode.filter(user=self).delete()
await models.GuildMember.filter(user=self).delete()
await models.UserSettings.filter(user=self).delete()
await models.FrecencySettings.filter(user=self).delete()
await models.Invite.filter(inviter=self).delete()
await models.ReadState.filter(user=self).delete()
async def get_guilds(self) -> list[models.Guild]:
return [
member.guild for member in
await models.GuildMember.filter(user=self).select_related("guild", "guild__owner")
]
async def get_private_channels(self) -> list[models.Channel]:
return [
channel
for channel in await models.Channel.filter(recipients__id=self.id).select_related("owner")
if not await channel.dm_is_hidden(self)
]
async def get_relationships(self) -> list[models.Relationship]:
return [
relationship
for relationship in
await models.Relationship.filter(Q(from_user=self) | Q(to_user=self)).select_related("from_user", "to_user")
if not (relationship.type == RelationshipType.BLOCK and relationship.from_user.id != self.id)
]
async def get_related_users(self) -> list[models.User]:
users = {
relationship.other_user(self).id: relationship.other_user(self)
for relationship in await self.get_relationships()
}
for channel in await models.Channel.filter(recipients__id=self.id):
for recipient in await channel.recipients.all():
if recipient.id in users or recipient == self:
continue
users[recipient.id] = recipient
return list(users.values())
async def get_mfa_key(self) -> str | None:
return cast(str, await models.UserSettings.get(user=self).values_list("mfa", flat=True))
async def generate_mfa_nonce(self) -> tuple[str, str]:
key = b64decode(Config.KEY)
exp = time() + 600
code = b64encode(urandom(16))
nonce = JWT.encode({"t": MfaNonceType.NORMAL, "c": code, "u": self.id}, key, exp)
rnonce = JWT.encode({"t": MfaNonceType.REGENERATE, "c": code, "u": self.id}, key, exp)
return nonce, rnonce
async def verify_mfa_nonce(self, nonce: str, nonce_type: MfaNonceType) -> None:
key = b64decode(Config.KEY)
assert_(payload := JWT.decode(nonce, key), InvalidKey)
assert_(payload["u"] == self.id, InvalidKey)
assert_(nonce_type == payload["t"], InvalidKey)
async def update_read_state(self, channel: models.Channel, count: int, last: int) -> None:
await models.ReadState.update_or_create(user=self, channel=channel, defaults={
"last_read_id": last,
"count": count,
})
async def send_verification_email(self) -> None:
token = JWT.encode({"id": self.id, "email": self.email}, b64decode(Config.KEY), expires_after=600)
await EmailMsg.send_verification(self.email, token)
async def get_read_states(self) -> list[models.ReadState]:
if self.is_bot:
return []
return await models.ReadState.filter(user=self).select_related("channel")
server/yepcord/yepcord/models/user.py
Line 213 in 7dfe87e