Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
824 changes: 824 additions & 0 deletions collectoss/application/db/materialized_views.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion collectoss/application/db/models/augur_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ class ContributorRepo(Base):
class ContributorsAlias(Base):
__tablename__ = "contributors_aliases"
__table_args__ = (
UniqueConstraint("alias_email"),
UniqueConstraint("cntrb_id","alias_email", name="cntrb-email-insert-unique"),
{
"schema": "augur_data",
"comment": "Every open source user may have more than one email used to make contributions over time. CollectOSS selects the first email it encounters for a user as its “canonical_email”. \n\nThe canonical_email is also added to the contributors_aliases table, with the canonical_email and alias_email being identical. Using this strategy, an email search will only need to join the alias table for basic email information, and can then more easily map the canonical email from each alias row to the same, more detailed information in the contributors table for a user. ",
Expand Down
21 changes: 21 additions & 0 deletions collectoss/application/schema/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,27 @@
# target_metadata = mymodel.Base.metadata
target_metadata = Base.metadata

# NOTE FOR DEVELOPERS: alembic_utils manages materialized view definitions via
# DROP + CREATE (replace). When a view is replaced:
#
# 1. ALL indexes are destroyed. Manually add CREATE UNIQUE INDEX statements
# after the replace op, using MaterializedView.unique_index_columns from
# the registry (collectoss/application/db/materialized_views.py).
#
# 2. The view is recreated WITH NO DATA. You CANNOT run REFRESH CONCURRENTLY
# immediately — it requires both a unique index and pre-existing data.
# After recreating the index, run a non-concurrent refresh first:
# REFRESH MATERIALIZED VIEW augur_data.<view_name> WITH DATA;
# Only after that will the Celery refresh task's CONCURRENTLY succeed.
#
# WARNING: If MATERIALIZED_VIEWS is ever emptied, autogenerate will propose
# dropping all registered views. Keep the list complete.
from alembic_utils.pg_materialized_view import PGMaterializedView
from alembic_utils.replaceable_entity import register_entities
from collectoss.application.db.materialized_views import MATERIALIZED_VIEWS
_materialized_view_entities = [view.to_pg_view() for view in MATERIALIZED_VIEWS]
register_entities(_materialized_view_entities, entity_types=[PGMaterializedView])

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""fix alias email constraints

Revision ID: 41
Revises: 40
Create Date: 2026-05-04 14:23:57.315794

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '41'
down_revision = '40'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(op.f('contributor-alias-unique'), 'contributors_aliases', schema='augur_data', type_='unique')
op.create_unique_constraint('cntrb-email-insert-unique', 'contributors_aliases', ['cntrb_id', 'alias_email'], schema='augur_data')
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint('cntrb-email-insert-unique', 'contributors_aliases', schema='augur_data', type_='unique')
op.create_unique_constraint(op.f('contributor-alias-unique'), 'contributors_aliases', ['alias_email'], schema='augur_data')
# ### end Alembic commands ###
204 changes: 28 additions & 176 deletions collectoss/tasks/db/refresh_materialized_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,197 +3,53 @@
import sqlalchemy as s

from collectoss.tasks.init.celery_app import celery_app as celery
from collectoss.application.db.lib import execute_sql
from collectoss.application.db.materialized_views import MATERIALIZED_VIEWS
from collectoss.tasks.git.util.facade_worker.facade_worker.config import FacadeHelper
from collectoss.tasks.git.util.facade_worker.facade_worker.rebuildcache import invalidate_caches, rebuild_unknown_affiliation_and_web_caches


@celery.task(bind=True)
def refresh_materialized_views(self):

#self.logger = SystemLogger("data_collection_jobs").get_logger()

engine = self.app.engine

logger = logging.getLogger(refresh_materialized_views.__name__)
#self.logger = logging.getLogger(refresh_materialized_views.__name__)

mv1_refresh = s.sql.text("""
REFRESH MATERIALIZED VIEW concurrently augur_data.api_get_all_repo_prs with data;
COMMIT;
""")

mv2_refresh = s.sql.text("""
REFRESH MATERIALIZED VIEW concurrently augur_data.api_get_all_repos_commits with data;
COMMIT;
""")

mv3_refresh = s.sql.text("""
REFRESH MATERIALIZED VIEW concurrently augur_data.api_get_all_repos_issues with data;
COMMIT;
""")

mv4_refresh = s.sql.text("""
REFRESH MATERIALIZED VIEW concurrently augur_data.augur_new_contributors with data;
COMMIT;
""")
mv5_refresh = s.sql.text("""
REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_commits_and_committers_daily_count with data;
COMMIT;
""")

mv6_refresh = s.sql.text("""
REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_new_contributors with data;
COMMIT;
""")

mv7_refresh = s.sql.text("""
REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_entry_list with data;
COMMIT;
""")

mv8_refresh = s.sql.text("""

REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_contributor_actions with data;
COMMIT;
""")

mv9_refresh = s.sql.text("""

REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_user_repos with data;
COMMIT;
""")

mv10_refresh = s.sql.text("""

REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_pr_response_times with data;
COMMIT;
""")

mv11_refresh = s.sql.text("""

REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_pr_assignments with data;
COMMIT;
""")

mv12_refresh = s.sql.text("""

REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_issue_assignments with data;
COMMIT;
""")

mv13_refresh = s.sql.text("""

REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_pr_response with data;
COMMIT;
""")

mv14_refresh = s.sql.text("""

REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_repo_languages with data;
COMMIT;
""")

try:
execute_sql(mv1_refresh)
except Exception as e:
logger.info(f"error is {e}")
pass

try:
execute_sql(mv2_refresh)
except Exception as e:
logger.info(f"error is {e}")
pass

try:
execute_sql(mv3_refresh)
except Exception as e:
logger.info(f"error is {e}")
pass

try:
execute_sql(mv4_refresh)
except Exception as e:
logger.info(f"error is {e}")
pass

try:
execute_sql(mv5_refresh)
except Exception as e:
logger.info(f"error is {e}")
pass

try:
execute_sql(mv6_refresh)
except Exception as e:
logger.info(f"error is {e}")
pass

try:
execute_sql(mv7_refresh)
except Exception as e:
logger.info(f"error is {e}")
pass

try:
execute_sql(mv8_refresh)
except Exception as e:
logger.info(f"error is {e}")
pass

try:
execute_sql(mv9_refresh)
except Exception as e:
logger.info(f"error is {e}")
pass

try:
execute_sql(mv10_refresh)
except Exception as e:
logger.info(f"error is {e}")
pass

try:
execute_sql(mv11_refresh)
except Exception as e:
logger.info(f"error is {e}")
pass

try:
execute_sql(mv12_refresh)
except Exception as e:
logger.info(f"error is {e}")
pass

try:
execute_sql(mv13_refresh)
except Exception as e:
logger.info(f"error is {e}")
pass

try:
execute_sql(mv14_refresh)
except Exception as e:
logger.info(f"error is {e}")
pass
# REFRESH MATERIALIZED VIEW CONCURRENTLY cannot run inside a transaction
# block, so we use an autocommit connection rather than execute_sql().
failed_views = []
with self.app.engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
for view in MATERIALIZED_VIEWS:
logger.info(f"Refreshing materialized view: {view.fqn}")
try:
conn.execute(s.sql.text(view.refresh_sql(concurrently=True)))
except Exception as e:
logger.warning(f"Concurrent refresh failed for {view.fqn}, trying non-concurrent: {e}")
try:
conn.execute(s.sql.text(view.refresh_sql(concurrently=False)))
except Exception as e2:
logger.error(f"Non-concurrent refresh also failed for {view.fqn}: {e2}")
failed_views.append(view.fqn)

if failed_views:
raise RuntimeError(
f"{len(failed_views)} materialized view(s) failed to refresh: {failed_views}"
)

#Now refresh facade tables
#Use this class to get all the settings and
#Use this class to get all the settings and
#utility functions for facade
facade_helper = FacadeHelper(logger)

if facade_helper.nuke_stored_affiliations:
logger.error("Nuke stored affiliations is deprecated!")
# deprecated because the UI component of facade where affiliations would be
# nuked upon change no longer exists, and this information can easily be derived
# deprecated because the UI component of facade where affiliations would be
# nuked upon change no longer exists, and this information can easily be derived
# from queries and materialized views in the current version of CollectOSS.
# This method is also a major performance bottleneck with little value.

if not facade_helper.limited_run or (facade_helper.limited_run and facade_helper.fix_affiliations):
logger.error("Fill empty affiliations is deprecated!")
# deprecated because the UI component of facade where affiliations would need
# to be fixed upon change no longer exists, and this information can easily be derived
# deprecated because the UI component of facade where affiliations would need
# to be fixed upon change no longer exists, and this information can easily be derived
# from queries and materialized views in the current version of CollectOSS.
# This method is also a major performance bottleneck with little value.

Expand All @@ -202,13 +58,9 @@ def refresh_materialized_views(self):
invalidate_caches(facade_helper)
except Exception as e:
logger.info(f"error is {e}")

if not facade_helper.limited_run or (facade_helper.limited_run and facade_helper.rebuild_caches):
try:
rebuild_unknown_affiliation_and_web_caches(facade_helper)
except Exception as e:
logger.info(f"error is {e}")




2 changes: 1 addition & 1 deletion collectoss/tasks/git/facade_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def clone_repos():
setattr(repoStatus,"facade_status", CollectionState.ERROR.value)
session.commit()

clone_repos.si().apply_async(countdown=60*5)
clone_repos.si().apply_async(countdown=60*5)


#@celery.task(bind=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def insert_alias(logger, contributor, email):
alias_clean = clean_dict(alias)

# Insert new alias
bulk_insert_dicts(logger, alias_clean, ContributorsAlias, ['alias_email'])
bulk_insert_dicts(logger, alias_clean, ContributorsAlias, ['cntrb_id','alias_email'])

return

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ classifiers = [
]
dependencies = [
"alembic>=1.17.1",
"alembic-utils==0.8.8",
"Beaker==1.11.0",
"boto3==1.17.57",
"bs4==0.0.1",
Expand Down
Loading
Loading