Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions api/doc/events/post-req/schema.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import PostSingleReqSchema from '../post-single-req/schema.js'

export default {
$id: 'https://github.com/data-fair/events/events/post-req',
title: 'Post event req',
Expand All @@ -7,9 +9,8 @@ export default {
properties: {
body: {
type: 'array',
items: {
$ref: 'https://github.com/data-fair/lib/event'
}
items: PostSingleReqSchema.properties.body
}
}
},
$defs: PostSingleReqSchema.$defs
}
13 changes: 11 additions & 2 deletions api/doc/events/post-single-req/schema.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
import jsonSchema from '@data-fair/lib-utils/json-schema.js'
import EventSchema from '@data-fair/lib-common-types/event/schema.js'

export default {
$id: 'https://github.com/data-fair/events/events/post-single-req',
title: 'Post single event req',
'x-exports': ['validate', 'types'],
type: 'object',
required: ['body'],
properties: {
body: { $ref: 'https://github.com/data-fair/lib/event' }
}
body: jsonSchema(EventSchema)
.removeReadonlyProperties()
.removeFromRequired(['date'])
.removeId()
.appendTitle(' post')
.schema
},
$defs: EventSchema.$defs
}
4 changes: 2 additions & 2 deletions api/doc/notifications/post-req/schema.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import jsonSchema from '@data-fair/lib-utils/json-schema.js'
import NotificationSchema from '#types/notification/schema.js'
import NotificationSchema from '@data-fair/lib-common-types/notification/schema.js'

export default {
$id: 'https://github.com/data-fair/events/notifications/post-req',
Expand All @@ -11,7 +11,7 @@ export default {
body:
jsonSchema(NotificationSchema)
.removeReadonlyProperties()
.removeFromRequired(['visibility'])
.removeFromRequired(['date'])
.removeId()
.appendTitle(' post')
.schema
Expand Down
1 change: 1 addition & 0 deletions api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
},
"devDependencies": {
"@types/express": "^5.0.6",
"@types/ws": "^8.18.1",
"@types/fs-extra": "^11.0.4",
"@types/i18n": "^0.13.12",
"@types/node-pushnotifications": "^3.1.1",
Expand Down
8 changes: 6 additions & 2 deletions api/src/events/router.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Filter, Sort } from 'mongodb'
import type { FullEvent } from '#types'
import type { Event, FullEvent } from '#types'

import { Router } from 'express'
import mongo from '#mongo'
Expand Down Expand Up @@ -55,8 +55,12 @@ router.get('', async (req, res, next) => {
router.post('', async (req, res, next) => {
assertReqInternalSecret(req, config.secretKeys.events)
const { body } = postReq.returnValid(req, { name: 'req' })
const events: Event[] = body.map(e => ({
date: new Date().toISOString(),
...e,
}))

await postEvents(body)
await postEvents(events)

res.status(201).json(body)
})
46 changes: 39 additions & 7 deletions api/src/events/service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Filter } from 'mongodb'
import { MongoBulkWriteError, type Filter } from 'mongodb'
import type { Event, FullEvent, SearchableEvent, LocalizedEvent, Subscription, WebhookSubscription, Notification } from '#types'
import config from '#config'
import mongo from '#mongo'
Expand Down Expand Up @@ -61,10 +61,10 @@ export const postEvents = async (events: Event[]) => {
// this logic should work much better on a mongodb version that would support multi-language indexing
// https://www.mongodb.com/docs/manual/core/indexes/index-types/index-text/specify-language-text-index/create-text-index-multiple-languages/
const event: SearchableEvent = {
...rawEvent,
_id: nanoid(),
_search: [],
visibility: rawEvent.visibility ?? 'private'
visibility: 'private',
...rawEvent,
_search: []
}
for (const locale of config.i18n.locales) {
const localizedEvent = localizeEvent(event, locale)
Expand Down Expand Up @@ -98,11 +98,43 @@ export const postEvents = async (events: Event[]) => {
await createWebhook(localizeEvent(event), webhookSubscription)
}
}
if (eventsBulkOp.length) await eventsBulkOp.execute()
if (notifsBulkOp.length) await notifsBulkOp.execute()

if (eventsBulkOp.length) {
try {
await eventsBulkOp.execute({})
} catch (err) {
// we ignore conflict error, meaning that the same event id can be sent twice without triggering a failure
// but without being inserted twice either
if (err instanceof MongoBulkWriteError) {
const nonConflictError = err.result.getWriteErrors().find(e => e.code !== 11000)
if (nonConflictError) throw err
} else {
throw err
}
}
}

const insertedNotifications: Notification[] = []
if (notifsBulkOp.length) {
try {
const res = await notifsBulkOp.execute({})
for (const id of Object.values(res.insertedIds)) {
insertedNotifications.push(notifications.find(n => n._id === id)!)
}
} catch (err) {
// we ignore conflict error, meaning that the same event id can be sent twice without triggering a failure
// but without being inserted twice either
if (err instanceof MongoBulkWriteError) {
const nonConflictError = err.result.getWriteErrors().find(e => e.code !== 11000)
if (nonConflictError) throw err
} else {
throw err
}
}
}

// insertion must be performed before, so that data is available on receiving a WS event
for (const notification of notifications) {
for (const notification of insertedNotifications) {
await sendNotification(notification, true)
}
}
Expand Down
9 changes: 8 additions & 1 deletion api/src/mongo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,14 @@ export class EventsMongo {
]
},
notifications: {
'main-keys': { 'recipient.id': 1, date: -1 }
'main-keys': { 'recipient.id': 1, date: -1 },
'unique-event': [
{ 'recipient.id': 1, eventId: 1 },
{
unique: true,
partialFilterExpression: { eventId: { $exists: true } }
}
]
},
'webhook-subscriptions': {
'main-keys': { 'sender.type': 1, 'sender.id': 1, 'owner.type': 1, 'owner.id': 1, 'topic.key': 1 }
Expand Down
9 changes: 5 additions & 4 deletions api/src/notifications/router.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { SortDirection } from 'mongodb'
import type { Pointer } from '../types.ts'
import type { Notification } from '#types'
import { nanoid } from 'nanoid'
import { Router } from 'express'
import debugModule from 'debug'
import mongo from '#mongo'
Expand All @@ -12,6 +11,7 @@ import * as eventsPostSingleReq from '#doc/events/post-single-req/index.ts'
import * as notificationsPostReq from '#doc/notifications/post-req/index.ts'
import { postEvents } from '../events/service.ts'
import { sendNotification } from './service.ts'
import { nanoid } from 'nanoid'

const debug = debugModule('events')

Expand Down Expand Up @@ -66,7 +66,7 @@ router.post('', async (req, res, next) => {
delete req.body.recipient
}
const { body } = eventsPostSingleReq.returnValid(req, { name: 'req' })
await postEvents([body])
await postEvents([{ date: new Date().toISOString(), ...body }])
res.status(201).json(body)
} else {
debug('pushing a notification with a recipient', req.body)
Expand All @@ -81,10 +81,11 @@ router.post('', async (req, res, next) => {

const { body } = notificationsPostReq.returnValid(req, { name: 'req' })
const notification: Notification = {
...body,
_id: nanoid(),
date: new Date().toISOString()
date: new Date().toISOString(),
...body,
}

await sendNotification(notification)
res.status(200).json(notification)
}
Expand Down
15 changes: 14 additions & 1 deletion api/src/notifications/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import config from '#config'
import * as metrics from './metrics.js'
import { localizeEvent } from '../events/service.ts'
import * as pushService from '../push/service.ts'
import { MongoError } from 'mongodb'

const debug = Debug('notifications')

Expand All @@ -24,6 +25,7 @@ export const prepareSubscriptionNotification = (event: FullEvent, subscription:
delete localizedEvent.originator
delete localizedEvent.urlParams
const notification: Notification = {
eventId: event._id,
icon: subscription.icon || config.theme.notificationIcon || (subscription.origin + '/events/logo-192x192.png'),
locale: subscription.locale,
...localizedEvent,
Expand Down Expand Up @@ -54,7 +56,18 @@ export const prepareSubscriptionNotification = (event: FullEvent, subscription:

export const sendNotification = async (notification: Notification, skipInsert = false) => {
// global.events.emit('saveNotification', notification)
if (!skipInsert) await mongo.notifications.insertOne(notification)
if (!skipInsert) {
try {
await mongo.notifications.insertOne(notification)
} catch (err) {
if (err instanceof MongoError && err.code === 11000) {
// conflict error, simply ignore this duplicate notification
return
} else {
throw err
}
}
}
debug('Send WS notif', notification.recipient, notification)
await wsEmitter.emit(`user:${notification.recipient.id}:notifications`, notification)
if (notification.outputs && notification.outputs.includes('devices')) {
Expand Down
6 changes: 3 additions & 3 deletions api/types/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import type { Event } from '@data-fair/lib-common-types/event/index.js'
import type { Notification } from '@data-fair/lib-common-types/notification/index.js'

export type { Event, Notification }
export type { Subscription } from './subscription/index.js'
export type { Notification } from './notification/index.js'
export type { Webhook } from './webhook/index.js'
export type { WebhookSubscription } from './webhook-subscription/index.js'
export type { Event }
export type { DevicesPushSubscription } from './push-subscription/index.js'
export type { DeviceRegistration } from './device-registration/index.js'

export type FullEvent = Event & Required<Pick<Event, 'visibility'>> & { _id: string }
export type FullEvent = Event & Required<Pick<Event, 'visibility'>>
export type LocalizedEvent = Omit<FullEvent, 'title' | 'body' | 'htmlBody'> & { title: string, body?: string, htmlBody?: string }
export type SearchableEvent = FullEvent & { _search: { language: string, text: string }[] }
1 change: 0 additions & 1 deletion api/types/notification/index.js

This file was deleted.

78 changes: 0 additions & 78 deletions api/types/notification/schema.js

This file was deleted.

Loading