Skip to content

Commit d8ef695

Browse files
committed
Add bulk update support to note_handler
Signed-off-by: ziad hany <ziadhany2016@gmail.com>
1 parent 92f7130 commit d8ef695

File tree

3 files changed

+259
-24
lines changed

3 files changed

+259
-24
lines changed

fedcode/activitypub.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#
99
import json
1010
import logging
11+
from collections import defaultdict
1112
from dataclasses import asdict
1213
from dataclasses import dataclass
1314
from dataclasses import field
@@ -196,6 +197,21 @@ def get_actor_permissions(cls, actor, object):
196197
# Return the permissions for the specific actor and object type
197198
return permissions.get(type(actor), {}).get(type(object), lambda: {})
198199

200+
@classmethod
201+
def bulk_federate(cls, activities):
202+
"""Bulk federate multiple activities"""
203+
grouped = defaultdict(list)
204+
205+
for activity in activities:
206+
targets_key = tuple(sorted(activity["targets"]))
207+
grouped[targets_key].append(activity)
208+
209+
for targets, group in grouped.items():
210+
for activity in group:
211+
cls.federate(
212+
targets=list(targets), body=activity["body"], key_id=activity["key_id"]
213+
)
214+
199215

200216
@dataclass
201217
class ApActor:

fedcode/pipelines/sync_vulnerablecode.py

Lines changed: 242 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,29 @@
66
# See https://github.com/nexB/federatedcode for support or download.
77
# See https://aboutcode.org for more information about AboutCode.org OSS projects.
88
#
9-
9+
import json
1010
import logging
1111
from itertools import zip_longest
1212
from pathlib import Path
1313

1414
import saneyaml
15+
from django.db.models import Case
16+
from django.db.models import Q
17+
from django.db.models import TextField
18+
from django.db.models import Value
19+
from django.db.models import When
1520

1621
from aboutcode.hashid import get_core_purl
1722
from aboutcode.pipeline import LoopProgress
23+
from fedcode.activitypub import Activity
24+
from fedcode.activitypub import CreateActivity
25+
from fedcode.activitypub import DeleteActivity
26+
from fedcode.activitypub import UpdateActivity
27+
from fedcode.models import Note
1828
from fedcode.models import Package
1929
from fedcode.models import Repository
2030
from fedcode.models import Vulnerability
2131
from fedcode.pipelines import FederatedCodePipeline
22-
from fedcode.pipes import utils
2332

2433

2534
class SyncVulnerableCode(FederatedCodePipeline):
@@ -98,7 +107,6 @@ def sync_vulnerabilities(repository, logger):
98107
a_name = Path(diff.a_path).name
99108
b_name = Path(diff.b_path).name
100109

101-
# FIXME use bulk updates
102110
if a_name == "vulnerabilities.yml" or b_name == "vulnerabilities.yml":
103111
note_handler(
104112
diff.change_type, repository.admin, yaml_data_a_blob, yaml_data_b_blob, logger
@@ -120,7 +128,6 @@ def sync_vulnerabilities(repository, logger):
120128
f"Processed {purl_files_processed} purls.yml files, flushing bulk changes..."
121129
)
122130
flush_pkg_changes(pkg_changes, logger)
123-
# reset after flush
124131
pkg_changes = {"create": [], "update": [], "delete": set()}
125132

126133
elif a_name.startswith("VCID") or b_name.startswith("VCID"):
@@ -137,7 +144,6 @@ def sync_vulnerabilities(repository, logger):
137144
if vul_files_processed % 10000 == 0:
138145
logger(f"Processed {vul_files_processed} VCID files, flushing bulk changes...")
139146
flush_vul_changes(vul_changes, logger)
140-
# reset after flush
141147
vul_changes = {"create": [], "update": [], "delete": set()}
142148

143149
flush_pkg_changes(pkg_changes, logger)
@@ -253,45 +259,259 @@ def note_handler(change_type, default_service, yaml_data_a_blob, yaml_data_b_blo
253259
Handle notes from vulnerabilities.yml changes.
254260
Uses zip_longest so both old (A) and new (B) entries are processed together.
255261
"""
262+
notes_to_create = []
263+
notes_to_update = []
264+
notes_to_delete = []
256265

266+
purls_to_fetch = set()
257267
for pkg_status_a, pkg_status_b in zip_longest(yaml_data_a_blob or [], yaml_data_b_blob or []):
258-
pkg_a = pkg_b = None
259-
260-
# Resolve old package
261268
if pkg_status_a:
262269
purl_a = pkg_status_a.get("purl")
263-
if not purl_a:
264-
logger("Invalid Vulnerability File: missing purl in old entry", level=logging.ERROR)
270+
if purl_a:
271+
purls_to_fetch.add(get_core_purl(purl_a))
265272
else:
266-
core_purl_a = get_core_purl(purl_a)
267-
pkg_a, _ = Package.objects.get_or_create(purl=core_purl_a, service=default_service)
273+
logger("Invalid Vulnerability File: missing purl in old entry", level=logging.ERROR)
268274

269-
# Resolve new package
270275
if pkg_status_b:
271276
purl_b = pkg_status_b.get("purl")
272-
if not purl_b:
273-
logger("Invalid Vulnerability File: missing purl in new entry", level=logging.ERROR)
277+
if purl_b:
278+
purls_to_fetch.add(get_core_purl(purl_b))
274279
else:
275-
core_purl_b = get_core_purl(purl_b)
276-
pkg_b, _ = Package.objects.get_or_create(purl=core_purl_b, service=default_service)
280+
logger("Invalid Vulnerability File: missing purl in new entry", level=logging.ERROR)
281+
282+
packages_map = {}
283+
if purls_to_fetch:
284+
existing_packages = Package.objects.filter(purl__in=purls_to_fetch, service=default_service)
285+
packages_map = {pkg.purl: pkg for pkg in existing_packages}
286+
287+
missing_purls = purls_to_fetch - set(packages_map.keys())
288+
if missing_purls:
289+
new_packages = [Package(purl=purl, service=default_service) for purl in missing_purls]
290+
Package.objects.bulk_create(new_packages, ignore_conflicts=True)
291+
refreshed = Package.objects.filter(purl__in=missing_purls, service=default_service)
292+
packages_map.update({pkg.purl: pkg for pkg in refreshed})
293+
294+
for pkg_status_a, pkg_status_b in zip_longest(yaml_data_a_blob or [], yaml_data_b_blob or []):
295+
pkg_a = pkg_b = None
296+
297+
if pkg_status_a and pkg_status_a.get("purl"):
298+
core_purl_a = get_core_purl(pkg_status_a["purl"])
299+
pkg_a = packages_map.get(str(core_purl_a))
300+
301+
if pkg_status_b and pkg_status_b.get("purl"):
302+
core_purl_b = get_core_purl(pkg_status_b["purl"])
303+
pkg_b = packages_map.get(str(core_purl_b))
277304

278305
if change_type == "A":
279306
if pkg_status_b and pkg_b:
280-
utils.create_note(pkg_b, saneyaml.dump(pkg_status_b))
307+
notes_to_create.append((pkg_b, pkg_status_b))
281308

282309
elif change_type == "M":
283310
if pkg_status_a and not pkg_status_b and pkg_a:
284-
utils.delete_note(pkg_a, saneyaml.dump(pkg_status_a))
311+
notes_to_delete.append((pkg_a, pkg_status_a))
285312

286313
elif pkg_status_b and not pkg_status_a and pkg_b:
287-
utils.create_note(pkg_b, saneyaml.dump(pkg_status_b))
314+
notes_to_create.append((pkg_b, pkg_status_b))
288315

289316
elif pkg_status_a and pkg_status_b and pkg_b:
290-
utils.update_note(pkg_b, saneyaml.dump(pkg_status_a), saneyaml.dump(pkg_status_b))
317+
notes_to_update.append((pkg_b, pkg_status_a, pkg_status_b))
291318

292319
elif change_type == "D":
293320
if pkg_status_a and pkg_a:
294-
utils.delete_note(pkg_a, saneyaml.dump(pkg_status_a))
321+
notes_to_delete.append((pkg_a, pkg_status_a))
295322

296323
else:
297324
logger(f"Unknown change_type: {change_type}", level=logging.ERROR)
325+
326+
if notes_to_create:
327+
bulk_create_notes(notes_to_create)
328+
if notes_to_update:
329+
bulk_update_notes(notes_to_update)
330+
if notes_to_delete:
331+
bulk_delete_notes(notes_to_delete)
332+
333+
334+
def bulk_create_notes(notes_to_create):
335+
"""Bulk create notes and federate activities"""
336+
if not notes_to_create:
337+
return
338+
339+
notes_by_pkg = {}
340+
note_objects_to_create = []
341+
342+
for pkg, note_dict in notes_to_create:
343+
content = saneyaml.dump(note_dict)
344+
if pkg not in notes_by_pkg:
345+
notes_by_pkg[pkg] = []
346+
notes_by_pkg[pkg].append(content)
347+
348+
existing_notes = set()
349+
for pkg, contents in notes_by_pkg.items():
350+
existing = Note.objects.filter(acct=pkg.acct, content__in=contents).values_list(
351+
"content", flat=True
352+
)
353+
existing_notes.update(existing)
354+
355+
pkg_note_pairs = []
356+
activities_to_federate = []
357+
358+
for pkg, note_dict in notes_to_create:
359+
content = saneyaml.dump(note_dict)
360+
361+
if content not in existing_notes:
362+
note = Note(acct=pkg.acct, content=content)
363+
note_objects_to_create.append(note)
364+
pkg_note_pairs.append((pkg, note))
365+
366+
if note_objects_to_create:
367+
created_notes = Note.objects.bulk_create(note_objects_to_create)
368+
369+
through_objects = []
370+
for i, (pkg, _) in enumerate(pkg_note_pairs):
371+
note = created_notes[i]
372+
through_objects.append(Package.notes.through(package_id=pkg.id, note_id=note.id))
373+
374+
Package.notes.through.objects.bulk_create(through_objects, ignore_conflicts=True)
375+
376+
for pkg, note in zip([p for p, _ in pkg_note_pairs], created_notes):
377+
if pkg.followers_inboxes:
378+
create_activity = CreateActivity(actor=pkg.to_ap, object=note.to_ap)
379+
activities_to_federate.append(
380+
{
381+
"targets": pkg.followers_inboxes,
382+
"body": json.dumps(create_activity.to_ap()),
383+
"key_id": pkg.key_id,
384+
}
385+
)
386+
387+
for pkg, note_dict in notes_to_create:
388+
content = saneyaml.dump(note_dict)
389+
if content in existing_notes:
390+
note = Note.objects.get(acct=pkg.acct, content=content)
391+
pkg.notes.add(note)
392+
393+
# Still need to federate for existing notes
394+
if pkg.followers_inboxes:
395+
create_activity = CreateActivity(actor=pkg.to_ap, object=note.to_ap)
396+
activities_to_federate.append(
397+
{
398+
"targets": pkg.followers_inboxes,
399+
"body": json.dumps(create_activity.to_ap()),
400+
"key_id": pkg.key_id,
401+
}
402+
)
403+
404+
if activities_to_federate:
405+
Activity.bulk_federate(activities_to_federate)
406+
407+
408+
def bulk_update_notes(notes_to_update):
409+
"""Bulk update notes and federate activities"""
410+
if not notes_to_update:
411+
return
412+
413+
actual_updates = []
414+
for pkg, old_note_dict, new_note_dict in notes_to_update:
415+
if old_note_dict != new_note_dict:
416+
actual_updates.append((pkg, old_note_dict, new_note_dict))
417+
418+
if not actual_updates:
419+
return
420+
421+
query_conditions = Q()
422+
update_mapping = {}
423+
424+
for pkg, old_note_dict, new_note_dict in actual_updates:
425+
old_content = saneyaml.dump(old_note_dict)
426+
new_content = saneyaml.dump(new_note_dict)
427+
query_conditions |= Q(acct=pkg.acct, content=old_content)
428+
update_mapping[(pkg.acct, old_content)] = new_content
429+
430+
notes_to_update_qs = Note.objects.filter(query_conditions)
431+
existing_notes = list(notes_to_update_qs)
432+
433+
if not existing_notes:
434+
return
435+
436+
when_clauses = []
437+
activities_to_federate = []
438+
note_id_to_pkg = {}
439+
440+
for note in existing_notes:
441+
key = (note.acct, note.content)
442+
if key in update_mapping:
443+
new_content = update_mapping[key]
444+
when_clauses.append(When(id=note.id, then=Value(new_content)))
445+
446+
for pkg, old_note_dict, new_note_dict in actual_updates:
447+
if pkg.acct == note.acct and saneyaml.dump(old_note_dict) == note.content:
448+
note_id_to_pkg[note.id] = (pkg, new_note_dict)
449+
break
450+
451+
if when_clauses:
452+
Note.objects.filter(id__in=[note.id for note in existing_notes]).update(
453+
content=Case(*when_clauses, output_field=TextField())
454+
)
455+
456+
for note in existing_notes:
457+
if note.id in note_id_to_pkg:
458+
pkg, new_note_dict = note_id_to_pkg[note.id]
459+
if pkg.followers_inboxes:
460+
note.content = saneyaml.dump(new_note_dict)
461+
update_activity = UpdateActivity(actor=pkg.to_ap, object=note.to_ap)
462+
activities_to_federate.append(
463+
{
464+
"targets": pkg.followers_inboxes,
465+
"body": json.dumps(update_activity.to_ap()),
466+
"key_id": pkg.key_id,
467+
}
468+
)
469+
470+
if activities_to_federate:
471+
Activity.bulk_federate(activities_to_federate)
472+
473+
474+
def bulk_delete_notes(notes_to_delete):
475+
"""Bulk delete notes (soft delete) and federate activities"""
476+
if not notes_to_delete:
477+
return
478+
479+
query_conditions = Q()
480+
delete_mapping = {}
481+
482+
for pkg, note_dict in notes_to_delete:
483+
content = saneyaml.dump(note_dict)
484+
query_conditions |= Q(acct=pkg.acct, content=content)
485+
delete_mapping[(pkg.acct, content)] = pkg
486+
487+
notes_to_delete_qs = Note.objects.filter(query_conditions)
488+
existing_notes = list(notes_to_delete_qs.select_related())
489+
490+
if not existing_notes:
491+
return
492+
493+
activities_to_federate = []
494+
notes_to_soft_delete = []
495+
496+
for note in existing_notes:
497+
key = (note.acct, note.content)
498+
if key in delete_mapping:
499+
pkg = delete_mapping[key]
500+
notes_to_soft_delete.append(note.id)
501+
note_ap = note.to_ap
502+
503+
if pkg.followers_inboxes:
504+
deleted_activity = DeleteActivity(actor=pkg.to_ap, object=note_ap)
505+
activities_to_federate.append(
506+
{
507+
"targets": pkg.followers_inboxes,
508+
"body": json.dumps(deleted_activity.to_ap()),
509+
"key_id": pkg.key_id,
510+
}
511+
)
512+
513+
if notes_to_soft_delete:
514+
Note.objects.filter(id__in=notes_to_soft_delete).delete()
515+
516+
if activities_to_federate:
517+
Activity.bulk_federate(activities_to_federate)

federatedcode/settings.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,7 @@
124124
}
125125
}
126126

127-
DEFAULT_AUTO_FIELD = "django.db.models.AutoField"
128-
127+
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
129128
# Templates
130129

131130
TEMPLATES = [

0 commit comments

Comments
 (0)