From 62c6d2a80a0e160db7faed37f54d9ffd13db837c Mon Sep 17 00:00:00 2001 From: Sammy Nave Date: Tue, 19 Dec 2023 13:36:59 -0500 Subject: [PATCH 01/29] Working --- .gitignore | 2 + package.json | 4 + pnpm-lock.yaml | 192 ++++++++++++++++-- src/lib/browser-db.ts | 8 + src/lib/server/sync-db/db.ts | 17 ++ .../websockets/features/offline/sync.ts | 190 +++++++++++++++++ src/lib/server/websockets/handler.ts | 15 +- src/lib/server/websockets/redis-client.ts | 2 +- src/lib/sync-db.ts | 40 ++++ src/lib/websockets/ws-store.ts | 2 +- src/routes/app/+layout.svelte | 17 ++ src/routes/app/offline-first/+page.server.ts | 15 ++ src/routes/app/offline-first/+page.svelte | 126 ++++++++++++ src/routes/app/offline-first/README.md | 68 +++++++ src/routes/app/offline-first/sdb.ts | 188 +++++++++++++++++ sync-dbs/.keep | 0 16 files changed, 860 insertions(+), 26 deletions(-) create mode 100644 src/lib/browser-db.ts create mode 100644 src/lib/server/sync-db/db.ts create mode 100644 src/lib/server/websockets/features/offline/sync.ts create mode 100644 src/lib/sync-db.ts create mode 100644 src/routes/app/offline-first/+page.server.ts create mode 100644 src/routes/app/offline-first/+page.svelte create mode 100644 src/routes/app/offline-first/README.md create mode 100644 src/routes/app/offline-first/sdb.ts create mode 100644 sync-dbs/.keep diff --git a/.gitignore b/.gitignore index 3acd371..676cdd7 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,5 @@ node_modules vite.config.js.timestamp-* vite.config.ts.timestamp-* test-results +sync-dbs/**.* +!sync-dbs/.keep diff --git a/package.json b/package.json index 7721deb..b5acffc 100644 --- a/package.json +++ b/package.json @@ -35,6 +35,7 @@ "@sveltejs/kit": "^2.0.0", "@sveltejs/vite-plugin-svelte": "^3.0.1", "@testcontainers/postgresql": "^10.4.0", + "@types/better-sqlite3": "^7.6.8", "@types/ws": "^8.5.10", "@typescript-eslint/eslint-plugin": "^6.14.0", "@typescript-eslint/parser": "^6.14.0", @@ -62,6 +63,9 @@ "type": "module", "dependencies": { "@lucia-auth/adapter-postgresql": "^2.0.2", + "@vlcn.io/crsqlite": "^0.16.1", + "@vlcn.io/crsqlite-wasm": "^0.16.0", + "better-sqlite3": "^9.2.2", "bits-ui": "^0.11.8", "clsx": "^2.0.0", "drizzle-orm": "^0.29.1", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 101feaf..6b47bf2 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -8,6 +8,15 @@ dependencies: '@lucia-auth/adapter-postgresql': specifier: ^2.0.2 version: 2.0.2(lucia@2.7.5)(postgres@3.4.3) + '@vlcn.io/crsqlite': + specifier: ^0.16.1 + version: 0.16.1 + '@vlcn.io/crsqlite-wasm': + specifier: ^0.16.0 + version: 0.16.0 + better-sqlite3: + specifier: ^9.2.2 + version: 9.2.2 bits-ui: specifier: ^0.11.8 version: 0.11.8(svelte@4.2.8) @@ -16,7 +25,7 @@ dependencies: version: 2.0.0 drizzle-orm: specifier: ^0.29.1 - version: 0.29.1(postgres@3.4.3) + version: 0.29.1(@types/better-sqlite3@7.6.8)(better-sqlite3@9.2.2)(postgres@3.4.3) faktory-worker: specifier: ^4.5.1 version: 4.5.1 @@ -73,6 +82,9 @@ devDependencies: '@testcontainers/postgresql': specifier: ^10.4.0 version: 10.4.0 + '@types/better-sqlite3': + specifier: ^7.6.8 + version: 7.6.8 '@types/ws': specifier: ^8.5.10 version: 8.5.10 @@ -1285,6 +1297,11 @@ packages: - supports-color dev: true + /@types/better-sqlite3@7.6.8: + resolution: {integrity: sha512-ASndM4rdGrzk7iXXqyNC4fbwt4UEjpK0i3j4q4FyeQrLAthfB6s7EF135ZJE0qQxtKIMFwmyT6x0switET7uIw==} + dependencies: + '@types/node': 20.10.4 + /@types/cookie@0.6.0: resolution: {integrity: sha512-4Kh9a6B2bQciAhf7FSuMRRkUWecJgJu9nPnx3yzpsfXX/c50REIqpHY4C82bXP90qrLtXtkDxTZosYO3UpOwlA==} @@ -1325,7 +1342,6 @@ packages: resolution: {integrity: sha512-D08YG6rr8X90YB56tSIuBaddy/UXAA9RKJoFvrsnogAum/0pmjkgi4+2nx96A330FmioegBWmEYQ+syqCFaveg==} dependencies: undici-types: 5.26.5 - dev: true /@types/pug@2.0.10: resolution: {integrity: sha512-Sk/uYFOBAB7mb74XcpizmH0KOR2Pv3D2Hmrh1Dmy5BmK3MpdSa5kqZcg6EKBdklU0bFXX9gCfzvpnyUehrPIuA==} @@ -1537,6 +1553,29 @@ packages: pretty-format: 29.7.0 dev: true + /@vlcn.io/crsqlite-wasm@0.16.0: + resolution: {integrity: sha512-5gf52eyMYvZirxuEUo4QS65JhEsw3fndoO+tCtCEOxuiIEtvaKB2/6wuuKGRlMVkxIp4Bls70D3DCF5v9lcJxA==} + dependencies: + '@vlcn.io/wa-sqlite': 0.22.0 + '@vlcn.io/xplat-api': 0.15.0 + async-mutex: 0.4.0 + dev: false + + /@vlcn.io/crsqlite@0.16.1: + resolution: {integrity: sha512-ju6dONV/xq3haiHUVkY/mUuz14lXqak1GAyCsY8YqEoZUUORIx8nXa8aIOM6A+n31+KreTsJ4qSiG0VCnZLveA==} + requiresBuild: true + dev: false + + /@vlcn.io/wa-sqlite@0.22.0: + resolution: {integrity: sha512-OujKro0mAqP7/efUeCGB6zBiyMoSCFVe7jQKPF0n47U9ZhOaIW3kQUVCwF+CmzvzQfN1Vl4PrFQRNNxlSwTCNQ==} + dev: false + + /@vlcn.io/xplat-api@0.15.0: + resolution: {integrity: sha512-2/aE7VgI3EbIO5EcJGrskAJuCa2pteY1rWNWfhovFKMERe9NhJdlDMIB1I31X0sN/WC2DnF30RqcdTXNfYyzhQ==} + dependencies: + comlink: 4.4.1 + dev: false + /acorn-jsx@5.3.2(acorn@8.11.2): resolution: {integrity: sha512-rq9s+JNhf0IChjtDXxllJ7g41oZk5SlXtp0LHwyA5cejwn7vKmKp4pPri6YEePv2PU65sAsegbXtIinmDFDXgQ==} peerDependencies: @@ -1673,6 +1712,12 @@ packages: resolution: {integrity: sha512-coglx5yIWuetakm3/1dsX9hxCNox22h7+V80RQOu2XUUMidtArxKoZoOtHUPuR84SycKTXzgGzAUR5hJxujyJQ==} dev: true + /async-mutex@0.4.0: + resolution: {integrity: sha512-eJFZ1YhRR8UN8eBLoNzcDPcy/jqjsg6I1AP+KvWQX80BqOSW1oJPJXDylPUEeMr2ZQvHgnQ//Lp6f3RQ1zI7HA==} + dependencies: + tslib: 2.6.2 + dev: false + /async@3.2.5: resolution: {integrity: sha512-baNZyqaaLhyLVKm/DlvdW051MSgO6b8eVfIezl9E5PqWxFgzLm/wQntEW4zOytVburDEr0JlALEpdOFwvErLsg==} dev: true @@ -1707,7 +1752,6 @@ packages: /base64-js@1.5.1: resolution: {integrity: sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==} - dev: true /bcrypt-pbkdf@1.0.2: resolution: {integrity: sha512-qeFIXtP4MSoi6NLqO12WfqARWWuCKi2Rn/9hJLEmtB5yTNr9DqFWkJRCf2qShWzPeAMRnOgCrq0sg/KLv5ES9w==} @@ -1715,10 +1759,24 @@ packages: tweetnacl: 0.14.5 dev: true + /better-sqlite3@9.2.2: + resolution: {integrity: sha512-qwjWB46il0lsDkeB4rSRI96HyDQr8sxeu1MkBVLMrwusq1KRu4Bpt1TMI+8zIJkDUtZ3umjAkaEjIlokZKWCQw==} + requiresBuild: true + dependencies: + bindings: 1.5.0 + prebuild-install: 7.1.1 + dev: false + /binary-extensions@2.2.0: resolution: {integrity: sha512-jDctJ/IVQbZoJykoeHbhXpOlNBqGNcwXJKJog42E5HDPUwQTSdjCHdihjj0DlnheQ7blbT6dHOafNAiS8ooQKA==} engines: {node: '>=8'} + /bindings@1.5.0: + resolution: {integrity: sha512-p2q/t/mhvuOj/UeLlV6566GD/guowlr0hHxClI0W9m7MWYkL1F0hLo+0Aexs9HSPCtR1SXQ0TD3MMKrXZajbiQ==} + dependencies: + file-uri-to-path: 1.0.0 + dev: false + /bits-ui@0.11.8(svelte@4.2.8): resolution: {integrity: sha512-T3YaT88OJguBoUU/MSncf41fiIc+5/ka8Au2LUDo0nSECex+LFY40+hKWLJc5tRT56avkyHsI7x9daA2r9eS/g==} peerDependencies: @@ -1736,7 +1794,6 @@ packages: buffer: 5.7.1 inherits: 2.0.4 readable-stream: 3.6.2 - dev: true /blake3-wasm@2.1.5: resolution: {integrity: sha512-F1+K8EbfOZE49dtoPtmxUQrpXaBIl3ICvasLh+nJta0xkz+9kF/7uet9fLnwKqhDrmj6g+6K3Tw9yQPUg2ka5g==} @@ -1784,7 +1841,6 @@ packages: dependencies: base64-js: 1.5.1 ieee754: 1.2.1 - dev: true /buildcheck@0.0.6: resolution: {integrity: sha512-8f9ZJCUXyT1M35Jx7MkBgmBMo3oHTTBIPLiY9xyL0pl3T5RwcPEY8cUHr5LBNfu/fk6c2T4DJZuVM/8ZZT2D2A==} @@ -1883,7 +1939,6 @@ packages: /chownr@1.1.4: resolution: {integrity: sha512-jJ0bqzaylmJtVnNgzTeSOs8DPavpbYgEr/b0YL8/2GO3xJEhInFmhKMUnEJQjZumK7KXGFhUy89PrsJWlakBVg==} - dev: true /cli-color@2.0.3: resolution: {integrity: sha512-OkoZnxyC4ERN3zLzZaY9Emb7f/MhBOIpePv0Ycok0fJYT+Ouo00UBEIwsVsr0yoow++n5YWlSUgST9GKhNHiRQ==} @@ -1926,6 +1981,10 @@ packages: resolution: {integrity: sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==} dev: true + /comlink@4.4.1: + resolution: {integrity: sha512-+1dlx0aY5Jo1vHy/tSsIGpSkN4tS9rZSW8FIhG0JH/crs9wwweswIo/POr451r7bZww3hFbPAKnTpimzL/mm4Q==} + dev: false + /commander@4.1.1: resolution: {integrity: sha512-NOKm8xhkzAjzFx8B2v5OAHT+u5pRQc2UCa2Vq9jYL/31o2wi9mxBA7LIFs3sV5VSC49z6pEhfbMULvShKj26WA==} engines: {node: '>= 6'} @@ -2038,6 +2097,13 @@ packages: dependencies: ms: 2.1.2 + /decompress-response@6.0.0: + resolution: {integrity: sha512-aW35yZM6Bb/4oJlZncMH2LCoZtJXTRxES17vE3hoRiowU2kWHaJKFkSBDnDR+cm9J+9QhXmREyIfv0pji9ejCQ==} + engines: {node: '>=10'} + dependencies: + mimic-response: 3.1.0 + dev: false + /deep-eql@4.1.3: resolution: {integrity: sha512-WaEtAOpRA1MQ0eohqZjpGD8zdI0Ovsm8mmFhaDN8dvDZzyoUMcYDnf5Y6iu7HTXxf8JDS23qWa4a+hKCDyOPzw==} engines: {node: '>=6'} @@ -2045,6 +2111,11 @@ packages: type-detect: 4.0.8 dev: true + /deep-extend@0.6.0: + resolution: {integrity: sha512-LOHxIOaPYdHlJRtCQfDIVZtfw/ufM8+rVj649RIHzcm/vGwQRXFt6OPqIFWsm2XEMrNIEtWR64sY1LEKD2vAOA==} + engines: {node: '>=4.0.0'} + dev: false + /deep-is@0.1.4: resolution: {integrity: sha512-oIPzksmTg4/MriiaYGO+okXDT7ztn/w3Eptv/+gSIdMdKsJo0u4CfYNFJPy+4SKMuCqGw2wxnA+URMg3t8a/bQ==} dev: true @@ -2067,6 +2138,11 @@ packages: engines: {node: '>=8'} dev: true + /detect-libc@2.0.2: + resolution: {integrity: sha512-UX6sGumvvqSaXgdKGUsgZWqcUyIXZ/vZTrlRT/iobiKhGL0zL4d3osHj3uqllWJK+i+sixDS/3COVEOFbupFyw==} + engines: {node: '>=8'} + dev: false + /devalue@4.3.2: resolution: {integrity: sha512-KqFl6pOgOW+Y6wJgu80rHpo2/3H07vr8ntR9rkkFIRETewbf5GaYYcakYfiKz89K+sLsuPkQIZaXDMjUObZwWg==} @@ -2167,7 +2243,7 @@ packages: - utf-8-validate dev: true - /drizzle-orm@0.29.1(postgres@3.4.3): + /drizzle-orm@0.29.1(@types/better-sqlite3@7.6.8)(better-sqlite3@9.2.2)(postgres@3.4.3): resolution: {integrity: sha512-yItc4unfHnk8XkDD3/bdC63vdboTY7e7I03lCF1OJYABXSIfQYU9BFTQJXMMovVeb3T1/OJWwfW/70T1XPnuUA==} peerDependencies: '@aws-sdk/client-rds-data': '>=3' @@ -2229,6 +2305,8 @@ packages: sqlite3: optional: true dependencies: + '@types/better-sqlite3': 7.6.8 + better-sqlite3: 9.2.2 postgres: 3.4.3 dev: false @@ -2240,7 +2318,6 @@ packages: resolution: {integrity: sha512-+uw1inIHVPQoaVuHzRyXd21icM+cnt4CzD5rW+NC1wjOUSTOs+Te7FOv7AhN7vS9x/oIyhLP5PR1H+phQAHu5Q==} dependencies: once: 1.4.0 - dev: true /es5-ext@0.10.62: resolution: {integrity: sha512-BHLqn0klhEpnOKSrzn/Xsz2UIW8j+cGmo9JLzr8BiUapV8hPL9+FliFqjwr9ngW7jWdnxv6eO+/LqyhJVqgrjA==} @@ -2783,6 +2860,11 @@ packages: engines: {node: '>=6'} dev: true + /expand-template@2.0.3: + resolution: {integrity: sha512-XYfuKMvj4O35f/pOXLObndIRvyQ+/+6AhODh+OKWj9S9498pHHn/IMszH+gt0fBCRWMNfk1ZSp5x3AifmnI2vg==} + engines: {node: '>=6'} + dev: false + /ext@1.7.0: resolution: {integrity: sha512-6hxeJYaL110a9b5TEJSj0gojyHQAmA2ch5Os+ySCiA1QGdS697XWY1pzsrSjqA9LDEEgdB/KypIlR59RcLuHYw==} dependencies: @@ -2842,6 +2924,10 @@ packages: flat-cache: 3.2.0 dev: true + /file-uri-to-path@1.0.0: + resolution: {integrity: sha512-0Zt+s3L7Vf1biwWZ29aARiVYLx7iMGnEUl9x33fbB/j3jR81u/O2LbqK+Bm1CDSNDKVtJ/YjwY7TUd5SkeLQLw==} + dev: false + /fill-range@7.0.1: resolution: {integrity: sha512-qOo9F+dMUmC2Lcb4BbVvnKJxTPjCm+RRpe4gDuGrzkL7mEVl/djYSu2OdQ2Pa302N4oqkSg9ir6jaLWJ2USVpQ==} engines: {node: '>=8'} @@ -2893,7 +2979,6 @@ packages: /fs-constants@1.0.0: resolution: {integrity: sha512-y6OAwoSIf7FyjMIv94u+b5rdheZEjzR63GTyZJm5qh4Bi+2YgwLCcI/fPFZkL5PSixOt6ZNKm+w+Hfp/Bciwow==} - dev: true /fs.realpath@1.0.0: resolution: {integrity: sha512-OO0pH2lK6a0hZnAdau5ItzHPI6pUlvI7jMVnxUQRtw4owF2wk8lOSabtGDCTP4Ggrg2MbGnWO9X8K1t4+fGMDw==} @@ -2948,6 +3033,10 @@ packages: resolve-pkg-maps: 1.0.0 dev: true + /github-from-package@0.0.0: + resolution: {integrity: sha512-SyHy3T1v2NUXn29OsWdxmK6RwHD+vkj3v8en8AOBZ1wBQ/hCAQ5bAQTD02kW4W9tUp/3Qh6J8r9EvntiyCmOOw==} + dev: false + /glob-parent@5.1.2: resolution: {integrity: sha512-AOIgSQCepiJYwP3ARnGx+5VnTu2HBYdzbGP45eLw1vr3zB3vZLeyed1sC9hnbcOc9/SrMyM5RPQrkGz4aS9Zow==} engines: {node: '>= 6'} @@ -3058,7 +3147,6 @@ packages: /ieee754@1.2.1: resolution: {integrity: sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==} - dev: true /ignore@5.3.0: resolution: {integrity: sha512-g7dmpshy+gD7mh88OC9NwSGTKoc3kyLAZQRU1mt53Aw/vnvfXnbC+F/7F7QoYVKbV+KNvJx8wArewKy1vXMtlg==} @@ -3091,6 +3179,10 @@ packages: /inherits@2.0.4: resolution: {integrity: sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==} + /ini@1.3.8: + resolution: {integrity: sha512-JV/yugV2uzW5iMRSiZAyDtQd+nxtUnjeLt0acNdw98kKLrvuRVyB80tsREOE7yvGVgalhZ6RNXCmEHkUKBKxew==} + dev: false + /ioredis@5.3.2: resolution: {integrity: sha512-1DKMMzlIHM02eBBVOFQ1+AolGjs6+xEcM4PDL7NqOS6szq7H9jSaEkIUH6/a5Hl241LzW6JLSiAbNvTQjUupUA==} engines: {node: '>=12.22.0'} @@ -3322,7 +3414,6 @@ packages: engines: {node: '>=10'} dependencies: yallist: 4.0.0 - dev: true /lru-queue@0.1.0: resolution: {integrity: sha512-BpdYkt9EvGl8OfWHDQPISVpcl5xZthb+XPsbELj5AQXxIC8IriDZIQYjBJPEm5rS420sjZ0TLEzRcq5KdBhYrQ==} @@ -3403,6 +3494,11 @@ packages: engines: {node: '>=12'} dev: true + /mimic-response@3.1.0: + resolution: {integrity: sha512-z0yWI+4FDrrweS8Zmt4Ej5HdJmky15+L2e6Wgn3+iK5fWzb6T3fhNFq2+MeTRb064c6Wr4N/wv0DzQTjNzHNGQ==} + engines: {node: '>=10'} + dev: false + /min-indent@1.0.1: resolution: {integrity: sha512-I9jwMn07Sy/IwOj3zVkVik2JTvgpaykDZEigL6Rx6N9LbMywwUSMtxET+7lVoDLLd3O3IXwJwvuuns8UB/HeAg==} engines: {node: '>=4'} @@ -3452,11 +3548,9 @@ packages: /minimist@1.2.8: resolution: {integrity: sha512-2yyAR8qBkN3YuheJanUpWC5U3bb5osDywNB8RzDVlDwDHbocAJveqqj1u8+SVD7jkWT4yvsHCpWqqWqAxb0zCA==} - dev: true /mkdirp-classic@0.5.3: resolution: {integrity: sha512-gKLcREMhtuZRwRAfqP3RFW+TK4JqApVBtOIftVgjuABpAtpxhPGaDcfvbhNvD0B8iD1oUr/txX35NjcaY6Ns/A==} - dev: true /mkdirp@0.5.6: resolution: {integrity: sha512-FP+p8RB8OWpF3YZBCrP5gtADmtXApB5AMLn+vdyA+PyxCjrCs00mjyUozssO33cwDeT3wNGdLxJ5M//YqtHAJw==} @@ -3526,6 +3620,10 @@ packages: hasBin: true dev: false + /napi-build-utils@1.0.2: + resolution: {integrity: sha512-ONmRUqK7zj7DWX0D9ADe03wbwOBZxNAfF20PlGfCWQcD3+/MakShIHrMqx9YwPTfxDdF1zLeL+RGZiR9kGMLdg==} + dev: false + /natural-compare@1.4.0: resolution: {integrity: sha512-OWND8ei3VtNC9h7V60qff3SVobHr996CTwgxubgyQYEpg290h9J0buyECNNJexkFm5sOajh5G116RYA1c8ZMSw==} dev: true @@ -3534,6 +3632,13 @@ packages: resolution: {integrity: sha512-CXdUiJembsNjuToQvxayPZF9Vqht7hewsvy2sOWafLvi2awflj9mOC6bHIg50orX8IJvWKY9wYQ/zB2kogPslQ==} dev: true + /node-abi@3.52.0: + resolution: {integrity: sha512-JJ98b02z16ILv7859irtXn4oUaFWADtvkzy2c0IAatNVX2Mc9Yoh8z6hZInn3QwvMEYhHuQloYi+TTQy67SIdQ==} + engines: {node: '>=10'} + dependencies: + semver: 7.5.4 + dev: false + /node-fetch@2.7.0: resolution: {integrity: sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==} engines: {node: 4.x || >=6.0.0} @@ -3836,6 +3941,25 @@ packages: engines: {node: '>=12'} dev: false + /prebuild-install@7.1.1: + resolution: {integrity: sha512-jAXscXWMcCK8GgCoHOfIr0ODh5ai8mj63L2nWrjuAgXE6tDyYGnx4/8o/rCgU+B4JSyZBKbeZqzhtwtC3ovxjw==} + engines: {node: '>=10'} + hasBin: true + dependencies: + detect-libc: 2.0.2 + expand-template: 2.0.3 + github-from-package: 0.0.0 + minimist: 1.2.8 + mkdirp-classic: 0.5.3 + napi-build-utils: 1.0.2 + node-abi: 3.52.0 + pump: 3.0.0 + rc: 1.2.8 + simple-get: 4.0.1 + tar-fs: 2.0.1 + tunnel-agent: 0.6.0 + dev: false + /prelude-ls@1.2.1: resolution: {integrity: sha512-vkcDPrRZo1QZLbn5RLGPpg/WmIQ65qoWWhcGKf/b5eplkkarX0m9z8ppCat4mlOqUsWpyNuYgO3VRyrYHSzX5g==} engines: {node: '>= 0.8.0'} @@ -3894,7 +4018,6 @@ packages: dependencies: end-of-stream: 1.4.4 once: 1.4.0 - dev: true /punycode@2.3.1: resolution: {integrity: sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg==} @@ -3908,6 +4031,16 @@ packages: resolution: {integrity: sha512-kJt5qhMxoszgU/62PLP1CJytzd2NKetjSRnyuj31fDd3Rlcz3fzlFdFLD1SItunPwyqEOkca6GbV612BWfaBag==} dev: true + /rc@1.2.8: + resolution: {integrity: sha512-y3bGgqKj3QBdxLbLkomlohkvsA8gdAiUQlSBJnBhfn+BPxg4bc62d8TcBW15wavDfgexCgccckhcZvywyQYPOw==} + hasBin: true + dependencies: + deep-extend: 0.6.0 + ini: 1.3.8 + minimist: 1.2.8 + strip-json-comments: 2.0.1 + dev: false + /react-is@18.2.0: resolution: {integrity: sha512-xWGDIW6x921xtzPkhiULtthJHoJvBbF3q26fzloPCK0hsvxtPVelvftw3zjbHWSkR2km9Z+4uxbDDK/6Zw9B8w==} dev: true @@ -3936,7 +4069,6 @@ packages: inherits: 2.0.4 string_decoder: 1.3.0 util-deprecate: 1.0.2 - dev: true /readdir-glob@1.1.3: resolution: {integrity: sha512-v05I2k7xN8zXvPD9N+z/uhXPaj0sUFCe2rcWZIpBsqxfP7xXFQ0tipAd/wjj1YxWyWtUS5IDJpOG82JKt2EAVA==} @@ -4069,7 +4201,6 @@ packages: /safe-buffer@5.2.1: resolution: {integrity: sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==} - dev: true /safer-buffer@2.1.2: resolution: {integrity: sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg==} @@ -4098,7 +4229,6 @@ packages: hasBin: true dependencies: lru-cache: 6.0.0 - dev: true /set-cookie-parser@2.6.0: resolution: {integrity: sha512-RVnVQxTXuerk653XfuliOxBP81Sf0+qfQE73LIYKcyMYHG94AuH0kgrQpRDuTZnSmjpysHmzxJXKNfa6PjFhyQ==} @@ -4128,6 +4258,18 @@ packages: engines: {node: '>=14'} dev: true + /simple-concat@1.0.1: + resolution: {integrity: sha512-cSFtAPtRhljv69IK0hTVZQ+OfE9nePi/rtJmw5UjHeVyVroEqJXP1sFztKUy1qU+xvz3u/sfYJLa947b7nAN2Q==} + dev: false + + /simple-get@4.0.1: + resolution: {integrity: sha512-brv7p5WgH0jmQJr1ZDDfKDOSeWWg+OVypG99A/5vYGPqJ6pxiaHLy8nxtFjBA7oMa01ebA9gfh1uMCFqOuXxvA==} + dependencies: + decompress-response: 6.0.0 + once: 1.4.0 + simple-concat: 1.0.1 + dev: false + /sirv@2.0.3: resolution: {integrity: sha512-O9jm9BsID1P+0HOi81VpXPoDxYP374pkOLzACAoyUQ/3OUVndNpsz6wMnY2z+yOxzbllCKZrM+9QrWsv4THnyA==} engines: {node: '>= 10'} @@ -4240,7 +4382,6 @@ packages: resolution: {integrity: sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==} dependencies: safe-buffer: 5.2.1 - dev: true /strip-ansi@6.0.1: resolution: {integrity: sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A==} @@ -4261,6 +4402,11 @@ packages: min-indent: 1.0.1 dev: true + /strip-json-comments@2.0.1: + resolution: {integrity: sha512-4gB8na07fecVVkOI6Rs4e7T6NOTki5EmL7TUduTs6bu3EdnSycntVJ4re8kgZA+wx9IueI2Y11bfbgwtzuE0KQ==} + engines: {node: '>=0.10.0'} + dev: false + /strip-json-comments@3.1.1: resolution: {integrity: sha512-6fPc+R4ihwqP6N/aIv2f1gMH8lOVtWQHoqC4yK6oSDVVocumAsfCqjkXnqiYMhmMwS/mEHLp7Vehlt3ql6lEig==} engines: {node: '>=8'} @@ -4531,7 +4677,6 @@ packages: mkdirp-classic: 0.5.3 pump: 3.0.0 tar-stream: 2.2.0 - dev: true /tar-fs@3.0.4: resolution: {integrity: sha512-5AFQU8b9qLfZCX9zp2duONhPmZv0hGYiBPJsyUdqMjzq/mqVpy/rEUSeHk1+YitmxugaptgBh5oDGU3VsAJq4w==} @@ -4550,7 +4695,6 @@ packages: fs-constants: 1.0.0 inherits: 2.0.4 readable-stream: 3.6.2 - dev: true /tar-stream@3.1.6: resolution: {integrity: sha512-B/UyjYwPpMBv+PaFSWAmtYjwdrlEaZQEhMIBFNC5oEG8lpiW8XjcSdmEaClj28ArfKScKHs2nshz3k2le6crsg==} @@ -4669,6 +4813,12 @@ packages: esbuild: 0.15.18 dev: true + /tunnel-agent@0.6.0: + resolution: {integrity: sha512-McnNiV1l8RYeY8tBgEpuodCC1mLUdbSN+CYBL7kJsJNInOP8UjDDEwdk6Mw60vdLLrr5NHKZhMAOSrR2NZuQ+w==} + dependencies: + safe-buffer: 5.2.1 + dev: false + /tweetnacl@0.14.5: resolution: {integrity: sha512-KXXFFdAbFXY4geFIwoyNK+f5Z1b7swfXABfL7HXCmoIWMKU3dmS26672A4EeQtDzLKy7SXmfBu51JolvEKwtGA==} dev: true @@ -4710,7 +4860,6 @@ packages: /undici-types@5.26.5: resolution: {integrity: sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==} - dev: true /undici@5.28.2: resolution: {integrity: sha512-wh1pHJHnUeQV5Xa8/kyQhO7WFa8M34l026L5P/+2TYiakvGy5Rdc8jWZVyG7ieht/0WgJLEd3kcU5gKx+6GC8w==} @@ -4958,7 +5107,6 @@ packages: /yallist@4.0.0: resolution: {integrity: sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==} - dev: true /yaml@1.10.2: resolution: {integrity: sha512-r3vXyErRCYJ7wg28yvBY5VSoAF8ZvlcW9/BwUzEtUsjvX/DKs24dIkuwjtuprwJJHsbyUbLApepYTR1BN4uHrg==} diff --git a/src/lib/browser-db.ts b/src/lib/browser-db.ts new file mode 100644 index 0000000..ee9d147 --- /dev/null +++ b/src/lib/browser-db.ts @@ -0,0 +1,8 @@ +import initWasm from '@vlcn.io/crsqlite-wasm'; +import wasmUrl from '@vlcn.io/crsqlite-wasm/crsqlite.wasm?url'; + +export async function load(file = 'default.db', paths: { wasm?: string } = {}) { + const sqlite = await initWasm(() => paths?.wasm || wasmUrl); + const db = await sqlite.open(file); + return { db, browser: true }; +} diff --git a/src/lib/server/sync-db/db.ts b/src/lib/server/sync-db/db.ts new file mode 100644 index 0000000..8294db7 --- /dev/null +++ b/src/lib/server/sync-db/db.ts @@ -0,0 +1,17 @@ +import Database from 'better-sqlite3'; +import { extensionPath } from '@vlcn.io/crsqlite'; + +export async function dbFrom(filename) { + // TODO: should be an env-var so we can use Render's persistent disk as the path + const db = new Database(`./sync-dbs/${filename}`); + db.pragma('journal_mode = WAL'); + db.loadExtension(extensionPath); + + // TODO: import schema from same place as frontend + await db.exec(`CREATE TABLE IF NOT EXISTS todos (id PRIMARY KEY NOT NULL, content, complete); + SELECT crsql_as_crr('todos'); + CREATE TABLE IF NOT EXISTS todonts (id PRIMARY KEY NOT NULL, content, complete); + SELECT crsql_as_crr('todonts'); + `); + return db; +} diff --git a/src/lib/server/websockets/features/offline/sync.ts b/src/lib/server/websockets/features/offline/sync.ts new file mode 100644 index 0000000..e700858 --- /dev/null +++ b/src/lib/server/websockets/features/offline/sync.ts @@ -0,0 +1,190 @@ +import { client, create } from '../../redis-client'; +import type { ExtendedWebSocket } from '../../../../../../vite-plugins/vite-plugin-svelte-kit-integrated-websocket-server'; +import type { Redis } from 'ioredis'; +import { dbFrom } from '$lib/server/sync-db/db'; +import type Database from 'better-sqlite3'; +import { WebSocket } from 'ws'; +import { latestVersions } from '../../../../../routes/app/offline-first/sdb'; + +const INSERT_CHANGES = `INSERT INTO crsql_changes VALUES (?, unhex(?), ?, ?, ?, ?, unhex(?), ?, ?)`; + +// TODO: review https://github.com/vlcn-io/js/blob/main/packages/ws-server/src/DB.ts +// see if any edge cases have been missed + +export class Sync { + private stream: string; + private ws: ExtendedWebSocket; + private userId: string; + private redisClient: Redis = client(); + private db: ReturnType; + private version: number; + private siteId: string; + + /* + This is the preferred way to instantiate this class for 2 reasons: + 1. If we use the class in multiple places, we want to avoid duplicating all of this setup. + 2. Constructors can not be asynchronous so the only way to encapsulate the setup is through + a static method + */ + static async init({ + ws, + stream, + clientSiteId, + clientVersion + }: { + stream: string; + ws: ExtendedWebSocket; + clientSiteId: string; + clientVersion: number; + }) { + const db = await dbFrom(`${ws.session.user.userId}.db`); + const { siteId, version } = await db + .prepare('SELECT hex(crsql_site_id()) as siteId, crsql_db_version() as version;') + .get(); + const sync = new Sync({ ws, stream, db, version, siteId }); + + const subClient = create(); + await subClient.subscribe(sync.stream, (err) => { + if (err) { + console.error('Failed to subscribe: %s', err.message); + } + }); + + const subscription = await subClient.on('messageBuffer', (stream, message) => { + sync.notify(message); + }); + + ws.on('message', async (data) => { + const parsed = JSON.parse(data.toString()); + const changes = parsed.changes; + if (parsed.type === 'update') { + /* + some client is sending an update to the server + which then is forwarded to all clients -> `sync.recieve(data)` + this can be triggered in two ways: + 1. client inserts a new entry and sends an update + 2. client receives a message of `type: 'connected'`, then it sends up all changes + */ + changes.forEach(async (change, i) => { + await db.prepare(INSERT_CHANGES).run(...change); + }); + + const changeSiteVersions = latestVersions(changes); + + changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => { + await db + .prepare( + `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) + VALUES (unhex(?), ?, 0, 0) + ON CONFLICT([site_id], [tag], [event]) + DO UPDATE SET version=excluded.version` + ) + .run(changeSiteId, changeDbVersion); + }); + + await sync.receive(data); + } + }); + + ws.on('close', async () => { + await subscription.unsubscribe(); + }); + + // Make sure this happens AFTER event handlers are declared + await sync.catchUpClient(clientSiteId); + await sync.catchUpServer(clientSiteId); + + return sync; + } + + private constructor({ + ws, + stream, + db, + version, + siteId + }: { + ws: ExtendedWebSocket; + stream: string; + db: ReturnType; + version: number; + siteId: string; + }) { + this.stream = stream; + this.ws = ws; + this.userId = ws.session.user.userId; + this.db = db; + this.version = version; + this.siteId = siteId; + } + + async catchUpServer(clientSiteId) { + // Here we can send down the last seen id or something. + // that way we don't need the entire contents of the db + + const result = await this.db + .prepare(`SELECT version FROM crsql_tracked_peers WHERE site_id = unhex(?)`) + .get(clientSiteId); + const version = result?.version ?? 0; + await this.notify(JSON.stringify({ type: 'connected', siteId: clientSiteId, version })); + } + + async catchUpClient(clientSiteId: string) { + // Maybe we can do something to only send down what's needed. + // just updates after the last update by `${clientSiteId} + + const result = await this.db + .prepare(`SELECT version FROM crsql_tracked_peers WHERE site_id = unhex(?)`) + .get(clientSiteId); + const lastVersion = result?.version ?? 0; + + const changes = await this.db + .prepare( + `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" + FROM crsql_changes WHERE site_id != unhex(:clientSiteId) + AND db_version > :lastVersion` + ) + .all({ + clientSiteId, + lastVersion + }); + + const changeSiteVersions = latestVersions(changes.map((change) => Object.values(change))); + + changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => { + await this.db + .prepare( + `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) + VALUES (unhex(?), ?, 0, 0) + ON CONFLICT([site_id], [tag], [event]) + DO UPDATE SET version=excluded.version` + ) + .run(changeSiteId, changeDbVersion); + }); + + if (changes.length) { + const message = JSON.stringify({ + type: 'pull', + siteId: this.siteId, + version: this.version, + changes: changes.map((change) => Object.values(change)) + }); + this.notify(message); + } + } + + private notify(message) { + if (this.ws.readyState === WebSocket.OPEN) { + this.ws.send(message, { binary: true }); + } + } + + private async publishMessage(message: ArrayBufferLike) { + await this.redisClient.lpush(this.stream, message); + await this.redisClient.publish(this.stream, message); + } + + private async receive(message: Buffer) { + await this.publishMessage(message); + } +} diff --git a/src/lib/server/websockets/handler.ts b/src/lib/server/websockets/handler.ts index 52b3d0e..6d2ac7e 100644 --- a/src/lib/server/websockets/handler.ts +++ b/src/lib/server/websockets/handler.ts @@ -5,6 +5,7 @@ import { Chat as ChatStreams } from './features/redis-streams/chat.js'; import { Chat as ChatPubSub } from './features/redis-pub-sub/chat.js'; import type { ExtendedWebSocket } from '../../../../vite-plugins/vite-plugin-svelte-kit-integrated-websocket-server'; import { auth } from '../lucia'; +import { Sync } from './features/offline/sync'; const FEATURE_STRATEGIES = { chat: { @@ -14,6 +15,9 @@ const FEATURE_STRATEGIES = { presence: { 'redis-streams': PresenceStreams, 'redis-pub-sub': PresencePubSub + }, + offline: { + sync: Sync } }; @@ -21,6 +25,8 @@ type Feature = { type: keyof typeof FEATURE_STRATEGIES; strategy: keyof (typeof FEATURE_STRATEGIES)[keyof typeof FEATURE_STRATEGIES]; stream: string; + clientSiteId?: string; + clientVersion?: number; }; function getFeatureFor( @@ -58,7 +64,12 @@ export async function hooksConnectionHandler(ws: ExtendedWebSocket, request: Inc ws.close(1008, `No stream specified for ${feature.type}`); throw new Error(`Invalid feature ${feature.type} - no stream specified`); } - - await getFeatureFor(feature.type, feature.strategy).init({ ws, stream: feature.stream }); + const f = await getFeatureFor(feature.type, feature.strategy); + await f.init({ + ws, + stream: feature.stream, + clientSiteId: feature.clientSiteId, + clientVersion: feature.clientVersion + }); }); } diff --git a/src/lib/server/websockets/redis-client.ts b/src/lib/server/websockets/redis-client.ts index 6ca7afb..5468fa2 100644 --- a/src/lib/server/websockets/redis-client.ts +++ b/src/lib/server/websockets/redis-client.ts @@ -5,7 +5,7 @@ import 'dotenv/config'; const connectionString = process.env.REDIS_WS_SERVER as string; let cli: Redis | null = null; -export const create = () => new Redis(connectionString); +export const create = (options = {}) => new Redis(connectionString, options); export const client = () => { cli = cli ? cli : create(); return cli; diff --git a/src/lib/sync-db.ts b/src/lib/sync-db.ts new file mode 100644 index 0000000..4f7f55e --- /dev/null +++ b/src/lib/sync-db.ts @@ -0,0 +1,40 @@ +import initWasm, { DB } from '@vlcn.io/crsqlite-wasm'; +import wasmUrl from '@vlcn.io/crsqlite-wasm/crsqlite.wasm?url'; + +const INSERT_CHANGES = `INSERT INTO crsql_changes VALUES (?, unhex(?), ?, ?, ?, ?, unhex(?), ?, ?)`; + +export class Database { + db: DB; + + static async load({ schema, name }: { schema: string[]; name: string }) { + const sqlite = await initWasm(() => wasmUrl); + const db = await sqlite.open(name); + const database = new Database(db); + await database.db.execMany(schema); + return database; + } + + constructor(db: DB) { + this.db = db; + } + + async version() { + const [[version]] = await this.db.exec(`SELECT crsql_db_version();`); + return version; + } + + async siteId() { + const [[siteId]] = await this.db.exec(`SELECT hex(crsql_site_id());`); + return siteId; + } + + async merge(changes) { + // const trackedPeers = await this.db.exec(`SELECT * FROM crsql_tracked_peers`); + // TODO: USE PREPARED STATEMENTS + await this.db.tx(async (tx) => { + changes.forEach(async (change) => { + await tx.exec(INSERT_CHANGES, change); + }); + }); + } +} diff --git a/src/lib/websockets/ws-store.ts b/src/lib/websockets/ws-store.ts index 460435f..0744527 100644 --- a/src/lib/websockets/ws-store.ts +++ b/src/lib/websockets/ws-store.ts @@ -48,7 +48,7 @@ export function wsStore({ url }: { url: string }) { // Generic send, can be customized/extended from custom store // see `send` in `chat-store` for example - send(message: string) { + send(message: string | ArrayBufferLike | Blob | ArrayBufferView) { ws?.send(message); } }; diff --git a/src/routes/app/+layout.svelte b/src/routes/app/+layout.svelte index f7655ce..69b6db2 100644 --- a/src/routes/app/+layout.svelte +++ b/src/routes/app/+layout.svelte @@ -66,6 +66,16 @@ href="/app/websocket-example/using-pub-sub">Websocket examples + + + + + + + + {#each $todos as todo} +
+ todos.toggle(todo.id)} /> + {todo.id} + {todo.content} + +
+ {/each} + + +
+

todonts

+
{ + await todonts.insert(newTodont); + newTodont = ''; + }} + > + + +
+ + {#each $todonts as todont} +
+ todonts.toggle(todont.id)} + /> + {todont.id} + {todont.content} + +
+ {/each} +
+ + +

How this works

+
    +
  1. + Setup (catch Server up, catch the Client A up) +
      +
    1. initialize and/or open Client A DB
    2. +
    3. Client A connect web socket (sending up clientAVersion and clientASiteID)
    4. +
    5. Server queries for all updates >= clientAVersion and != clientASiteID
    6. +
    7. Server `push` message with results to Client A
    8. +
    9. Client A merges results and "catches up" on all changes
    10. +
    11. Client A `push` all changes >= serverVersion and == clientASiteId to Server
    12. +
    13. Server merges and `push` changes to Client B and Client C
    14. +
    15. Client B merges change
    16. +
    17. Client C merges change
    18. +
    +
  2. +
  3. + Normal flow +
      +
    1. Client A makes update
    2. +
    3. `push` update to server
    4. +
    5. server merges change
    6. +
    7. server `push` update to Client B and Client C
    8. +
    9. Client B merges change
    10. +
    11. Client C merges change
    12. +
    +
  4. +
  5. + Database migrations +
      +
    1. ???
    2. +
    +
  6. +
diff --git a/src/routes/app/offline-first/README.md b/src/routes/app/offline-first/README.md new file mode 100644 index 0000000..8030403 --- /dev/null +++ b/src/routes/app/offline-first/README.md @@ -0,0 +1,68 @@ + +Node Connects +1. Server sends request to catch up + +```ts +// SYNC SERVER +// This is the server requesting to CATCH UP +// This can happen when the server receives a new connection + + +// Step 1. CLIENT INITIALIZES FLOW +// client sends up `siteId` as queryParm in ws URL +const [[siteId]] = await db.exec(`SELECT crsql_site_id()`) +new WebSocket(`wss://url:host/?siteId=${siteId}`) + +// Step 2. SERVER RECEIVES CLIENT CONNECT +async function catchUpServer({clientSiteId}) { + const [[versionOfClient]] = await db.exec(`SELECT db_version FROM crsql_tracked_peers WHERE site_id = ?`, [clientSiteId]) + const message = { type: 'catch up', version: versionOfClient }; + ws.send(JSON.stringify(message) +} +// Once server has client's siteId +await catchUpServer({clientSiteId: message.siteId}) + + +// on client +// receive { type: 'catch up', version } +if (message.type === 'catch up') { + const versionFromServer = message.version; + // Pull all changes made by client since version + const changes = + return await db.exec(`SELECT * FROM crsql_changes WHERE db_version > ? AND site_id = crsql_site_id()`, [versionFromServer]); + }) + const message = { type: 'update', changes } + ws.send(JSON.stringify(message)) +} +// on server +// typical upate handler used to merge changes and update tracked peers + + + + +// SYNC CLIENT +// Server opens websocket then triggers this Flow +ws.send({ type:'connected' }) + + +// Client receives message +if (message.type === 'connected') { + const clientTrackedPeers = await db.execO(`SELECT site_id, db_version FROM crsql_tracked_peers`) + const message = { type: 'catch up', peers: clientTrackedPeers } + ws.send(JSON.stringify(message)) +} + + +// on server +// This is the server fulfilling the client's request to CATCH UP +async function catchUpClient(clientTrackedPeers) { + const changes = clientTrackedPeers.flatMap(async ({db_version, site_id}) => { + return await db.exec(`SELECT * FROM crsql_changes WHERE db_version = ? AND site_id = ?`, [db_version, site_id]); + }) + const message = { type: 'update', changes } + ws.send(JSON.stringify(message)) +} +if (message.type === 'catch up') { + await catchUpClient(message.peers) +} +``` diff --git a/src/routes/app/offline-first/sdb.ts b/src/routes/app/offline-first/sdb.ts new file mode 100644 index 0000000..6eea974 --- /dev/null +++ b/src/routes/app/offline-first/sdb.ts @@ -0,0 +1,188 @@ +import { browser } from '$app/environment'; +import { Database } from '$lib/sync-db'; +import { onDestroy } from 'svelte'; +import { readable, writable } from 'svelte/store'; + +const encoder = new TextEncoder(); + +function wsErrorHandler(error: Event) { + console.error(error); +} + +export function latestVersions(changes) { + return Object.entries( + changes.reduce((acc, change) => { + const siteId = change[6]; + const version = change[5]; + if (acc[siteId]) { + acc[siteId] = acc[siteId] > version ? acc[siteId] : version; + } else { + acc[siteId] = version; + } + return acc; + }, {}) + ); +} + +async function pushOfflineChangesToServer(database, ws, version) { + const changes = await database.db.exec( + `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" + FROM crsql_changes WHERE site_id = crsql_site_id() AND db_version > ?`, + [version] + ); + + const changeSiteVersions = latestVersions(changes); + // sending so we're using the local db_version + changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => { + await database.db.exec( + `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) + VALUES (unhex(?), ?, 0, 0) + ON CONFLICT([site_id], [tag], [event]) + DO UPDATE SET version=excluded.version`, + [changeSiteId, changeDbVersion] + ); + }); + + // maybe this should be a POST so we can get a nicer + // user experience. that way we can await until the + // changes are here rather than reacting to a server + // sent websocket message + const message = encoder.encode( + JSON.stringify({ + type: 'update', + siteId: await database.siteId(), + version: await database.version(), + changes + }) + ); + ws.send(message); +} + +function wsMessageHandler({ + database, + update, + identifier +}: { + database: Database; + update: () => Promise; + identifier?: string; +}) { + return async function (event: Event) { + // Are we over subscribing here? every `store` attaches an event listener + // maybe there's some kind of queue or something we can use to only apply + // appropriate udpates + if (typeof event.data !== 'string') { + const clientSiteId = await database.siteId(); + const m = await event.data.text(); + const { type, changes, siteId, version } = JSON.parse(m); + + if ((type === 'update' && siteId !== clientSiteId) || type === 'pull') { + await database.merge(changes); + + const changeSiteVersions = latestVersions(changes); + + changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => { + await database.db.exec( + `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) + VALUES (unhex(?), ?, 0, 0) + ON CONFLICT([site_id], [tag], [event]) + DO UPDATE SET version=excluded.version`, + [changeSiteId, changeDbVersion] + ); + }); + await update(); + } + + if (type === 'connected') { + await pushOfflineChangesToServer(database, this, version); + } + } + }; +} + +// TODO: probably need re-connect/retry logic if WS server closes connection +async function setupWs({ url, database }: { url: string; database: Promise }) { + const db = await database; + const u = new URL(url); + const features = JSON.parse(u.searchParams.get('features') as string); + features[0].clientSiteId = await db.siteId(); + features[0].clientVersion = await db.version(); + u.searchParams.set('features', JSON.stringify(features)); + const ws = new WebSocket(`${decodeURI(u.href)}`); + ws.addEventListener('error', wsErrorHandler); + return ws; +} + +export function db({ schema, name, wsUrl }) { + if (!browser) { + // No SSR + return { store: () => readable([]) }; + } + const databasePromise = Database.load({ schema, name }); + const wsPromise = setupWs({ url: wsUrl, database: databasePromise }); + const store = ({ query, commands, identifier }) => { + const q = writable([]); + databasePromise.then(async (database) => { + const ws = await wsPromise; + const update = async () => q.set(await query(database.db)); + // Maybe this should register the listener in a store, + // we may be over subscribing since we add a listener with + // every `store` + ws.addEventListener('message', wsMessageHandler({ database, update, identifier })); + await update(); + }); + + const cmds = Object.fromEntries( + Object.entries(commands).map(([name, fn]) => [ + name, + async (args) => { + const db = await databasePromise; + const results = await fn(db.db, args); + q.set(await query(db.db)); + const changes = await db.db.exec( + `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" + FROM crsql_changes WHERE site_id = crsql_site_id() + AND db_version >= crsql_db_version()` + ); + + const changeSiteVersions = latestVersions(changes); + + changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => { + await db.db.exec( + `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) + VALUES (unhex(?), ?, 0, 0) + ON CONFLICT([site_id], [tag], [event]) + DO UPDATE SET version=excluded.version`, + [changeSiteId, changeDbVersion] + ); + }); + + const ws = await wsPromise; + ws.send( + encoder.encode( + JSON.stringify({ + type: 'update', + siteId: await db.siteId(), + version: await db.version(), + changes + }) + ) + ); + return results; + } + ]) + ); + + return { + subscribe: q.subscribe, + ...cmds + }; + }; + + onDestroy(async () => { + const ws = await wsPromise; + ws.close(); + }); + + return { store }; +} diff --git a/sync-dbs/.keep b/sync-dbs/.keep new file mode 100644 index 0000000..e69de29 From 65791756b9df7f4c4d5bd60ce27b64d2b7ad2618 Mon Sep 17 00:00:00 2001 From: sammy Date: Sat, 30 Dec 2023 15:46:16 -0500 Subject: [PATCH 02/29] x --- src/lib/server/sync-db/db.ts | 4 +- .../websockets/features/offline/sync.ts | 12 +++-- src/routes/app/offline-first/+page.server.ts | 6 ++- src/routes/app/offline-first/+page.svelte | 7 ++- src/routes/app/offline-first/sdb.ts | 54 ++++++++++--------- 5 files changed, 49 insertions(+), 34 deletions(-) diff --git a/src/lib/server/sync-db/db.ts b/src/lib/server/sync-db/db.ts index 8294db7..01a9832 100644 --- a/src/lib/server/sync-db/db.ts +++ b/src/lib/server/sync-db/db.ts @@ -1,14 +1,14 @@ import Database from 'better-sqlite3'; import { extensionPath } from '@vlcn.io/crsqlite'; -export async function dbFrom(filename) { +export function dbFrom(filename) { // TODO: should be an env-var so we can use Render's persistent disk as the path const db = new Database(`./sync-dbs/${filename}`); db.pragma('journal_mode = WAL'); db.loadExtension(extensionPath); // TODO: import schema from same place as frontend - await db.exec(`CREATE TABLE IF NOT EXISTS todos (id PRIMARY KEY NOT NULL, content, complete); + db.exec(`CREATE TABLE IF NOT EXISTS todos (id PRIMARY KEY NOT NULL, content, complete); SELECT crsql_as_crr('todos'); CREATE TABLE IF NOT EXISTS todonts (id PRIMARY KEY NOT NULL, content, complete); SELECT crsql_as_crr('todonts'); diff --git a/src/lib/server/websockets/features/offline/sync.ts b/src/lib/server/websockets/features/offline/sync.ts index e700858..60fbb70 100644 --- a/src/lib/server/websockets/features/offline/sync.ts +++ b/src/lib/server/websockets/features/offline/sync.ts @@ -37,8 +37,8 @@ export class Sync { clientSiteId: string; clientVersion: number; }) { - const db = await dbFrom(`${ws.session.user.userId}.db`); - const { siteId, version } = await db + const db = dbFrom(`${ws.session.user.userId}.db`); + const { siteId, version } = db .prepare('SELECT hex(crsql_site_id()) as siteId, crsql_db_version() as version;') .get(); const sync = new Sync({ ws, stream, db, version, siteId }); @@ -69,7 +69,7 @@ export class Sync { await db.prepare(INSERT_CHANGES).run(...change); }); - const changeSiteVersions = latestVersions(changes); + const changeSiteVersions = latestVersions(changes).filter(([sId]) => sId !== siteId); changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => { await db @@ -142,14 +142,16 @@ export class Sync { .prepare( `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" FROM crsql_changes WHERE site_id != unhex(:clientSiteId) - AND db_version > :lastVersion` + AND db_version >= :lastVersion` ) .all({ clientSiteId, lastVersion }); - const changeSiteVersions = latestVersions(changes.map((change) => Object.values(change))); + const changeSiteVersions = latestVersions( + changes.map((change) => Object.values(change)) + ).filter(([siteId]) => siteId !== this.siteId); changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => { await this.db diff --git a/src/routes/app/offline-first/+page.server.ts b/src/routes/app/offline-first/+page.server.ts index 72f8077..1553dcc 100644 --- a/src/routes/app/offline-first/+page.server.ts +++ b/src/routes/app/offline-first/+page.server.ts @@ -1,15 +1,19 @@ import type { PageServerLoad } from './$types'; import { COMBINED_PATH } from '$lib/websockets/constants'; +import { dbFrom } from '$lib/server/sync-db/db'; export const load: PageServerLoad = async ({ locals, url }) => { const { username, userId } = locals.user; const { protocol, host } = url; const wsProtocol = protocol === 'https:' ? 'wss:' : 'ws:'; const dbName = `${userId}.db`; + const db = dbFrom(dbName); + const { serverSiteId } = db.prepare('SELECT hex(crsql_site_id()) as serverSiteId;').get(); const features = [{ type: 'offline', strategy: 'sync', stream: dbName }]; return { url: `${wsProtocol}//${host}${COMBINED_PATH}?features=${JSON.stringify(features)}`, username, - dbName + dbName, + serverSiteId }; }; diff --git a/src/routes/app/offline-first/+page.svelte b/src/routes/app/offline-first/+page.svelte index 0d28be7..c1078f8 100644 --- a/src/routes/app/offline-first/+page.svelte +++ b/src/routes/app/offline-first/+page.svelte @@ -15,7 +15,12 @@ `SELECT crsql_as_crr('todonts');` ]; - const { store } = db({ schema, name: data.dbName, wsUrl: data.url }); + const { store } = db({ + schema, + name: data.dbName, + wsUrl: data.url, + serverSiteId: data.serverSiteId + }); const todos = store({ query: async (db) => await db.execO('SELECT * FROM todos'), commands: { diff --git a/src/routes/app/offline-first/sdb.ts b/src/routes/app/offline-first/sdb.ts index 6eea974..6eb51ee 100644 --- a/src/routes/app/offline-first/sdb.ts +++ b/src/routes/app/offline-first/sdb.ts @@ -27,7 +27,7 @@ export function latestVersions(changes) { async function pushOfflineChangesToServer(database, ws, version) { const changes = await database.db.exec( `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" - FROM crsql_changes WHERE site_id = crsql_site_id() AND db_version > ?`, + FROM crsql_changes WHERE site_id = crsql_site_id() AND db_version >= ?`, [version] ); @@ -61,10 +61,12 @@ async function pushOfflineChangesToServer(database, ws, version) { function wsMessageHandler({ database, update, + serverSiteId, identifier }: { database: Database; update: () => Promise; + serverSiteId: string; identifier?: string; }) { return async function (event: Event) { @@ -79,17 +81,14 @@ function wsMessageHandler({ if ((type === 'update' && siteId !== clientSiteId) || type === 'pull') { await database.merge(changes); - const changeSiteVersions = latestVersions(changes); - - changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => { - await database.db.exec( - `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) - VALUES (unhex(?), ?, 0, 0) + await database.db.exec( + `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) + VALUES (unhex(?), crsql_db_version(), 0, 0) ON CONFLICT([site_id], [tag], [event]) DO UPDATE SET version=excluded.version`, - [changeSiteId, changeDbVersion] - ); - }); + [serverSiteId] + ); + await update(); } @@ -113,14 +112,15 @@ async function setupWs({ url, database }: { url: string; database: Promise readable([]) }; } + console.log({ serverSiteId }); const databasePromise = Database.load({ schema, name }); const wsPromise = setupWs({ url: wsUrl, database: databasePromise }); - const store = ({ query, commands, identifier }) => { + const store = ({ query, commands }) => { const q = writable([]); databasePromise.then(async (database) => { const ws = await wsPromise; @@ -128,7 +128,10 @@ export function db({ schema, name, wsUrl }) { // Maybe this should register the listener in a store, // we may be over subscribing since we add a listener with // every `store` - ws.addEventListener('message', wsMessageHandler({ database, update, identifier })); + ws.addEventListener( + 'message', + wsMessageHandler({ database, update, identifier, serverSiteId }) + ); await update(); }); @@ -139,24 +142,25 @@ export function db({ schema, name, wsUrl }) { const db = await databasePromise; const results = await fn(db.db, args); q.set(await query(db.db)); + const serverSiteVersion = await db.db.execO( + `SELECT version FROM crsql_tracked_peers WHERE site_id = unhex(?)`, + [serverSiteId] + ); + const changes = await db.db.exec( `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" - FROM crsql_changes WHERE site_id = crsql_site_id() - AND db_version >= crsql_db_version()` + FROM crsql_changes WHERE db_version >= ?`, + [serverSiteVersion[0].version] ); - const changeSiteVersions = latestVersions(changes); - - changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => { - await db.db.exec( - `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) - VALUES (unhex(?), ?, 0, 0) + await db.db.exec( + `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) + VALUES (unhex(?), crsql_db_version(), 0, 0) ON CONFLICT([site_id], [tag], [event]) DO UPDATE SET version=excluded.version`, - [changeSiteId, changeDbVersion] - ); - }); - + [serverSiteId] + ); + console.log({ changes }); const ws = await wsPromise; ws.send( encoder.encode( From f35d0b87d9d382ca480e73441784dd7f9480426b Mon Sep 17 00:00:00 2001 From: sammy Date: Sat, 30 Dec 2023 23:42:47 -0500 Subject: [PATCH 03/29] always send all changes. need to work on sending just changes since last update --- .../websockets/features/offline/sync.ts | 101 ++++++++++-------- src/routes/app/offline-first/+page.svelte | 70 ++++++++++-- src/routes/app/offline-first/sdb.ts | 43 ++++---- 3 files changed, 141 insertions(+), 73 deletions(-) diff --git a/src/lib/server/websockets/features/offline/sync.ts b/src/lib/server/websockets/features/offline/sync.ts index 60fbb70..475b2a8 100644 --- a/src/lib/server/websockets/features/offline/sync.ts +++ b/src/lib/server/websockets/features/offline/sync.ts @@ -17,7 +17,6 @@ export class Sync { private userId: string; private redisClient: Redis = client(); private db: ReturnType; - private version: number; private siteId: string; /* @@ -38,10 +37,8 @@ export class Sync { clientVersion: number; }) { const db = dbFrom(`${ws.session.user.userId}.db`); - const { siteId, version } = db - .prepare('SELECT hex(crsql_site_id()) as siteId, crsql_db_version() as version;') - .get(); - const sync = new Sync({ ws, stream, db, version, siteId }); + const { siteId } = db.prepare('SELECT hex(crsql_site_id()) as siteId;').get(); + const sync = new Sync({ ws, stream, db, siteId }); const subClient = create(); await subClient.subscribe(sync.stream, (err) => { @@ -51,6 +48,20 @@ export class Sync { }); const subscription = await subClient.on('messageBuffer', (stream, message) => { + const { clientVersion } = db + .prepare( + `SELECT version as clientVersion FROM crsql_tracked_peers WHERE site_id = unhex(?)` + ) + .get(clientSiteId); + const { serverVersion } = db.prepare('SELECT crsql_db_version() as serverVersion;').get(); + + db.prepare( + `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) + VALUES (unhex(?), crsql_db_version(), 0, 0) + ON CONFLICT([site_id], [tag], [event]) + DO UPDATE SET version=excluded.version` + ).run(clientSiteId); + sync.notify(message); }); @@ -65,22 +76,18 @@ export class Sync { 1. client inserts a new entry and sends an update 2. client receives a message of `type: 'connected'`, then it sends up all changes */ - changes.forEach(async (change, i) => { - await db.prepare(INSERT_CHANGES).run(...change); + changes.forEach((change, i) => { + db.prepare(INSERT_CHANGES).run(...change); }); - const changeSiteVersions = latestVersions(changes).filter(([sId]) => sId !== siteId); + const fromSiteId = parsed.siteId; - changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => { - await db - .prepare( - `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) - VALUES (unhex(?), ?, 0, 0) + db.prepare( + `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) + VALUES (unhex(?), crsql_db_version(), 0, 0) ON CONFLICT([site_id], [tag], [event]) DO UPDATE SET version=excluded.version` - ) - .run(changeSiteId, changeDbVersion); - }); + ).run(fromSiteId); await sync.receive(data); } @@ -91,8 +98,8 @@ export class Sync { }); // Make sure this happens AFTER event handlers are declared - await sync.catchUpClient(clientSiteId); - await sync.catchUpServer(clientSiteId); + sync.catchUpServer(clientSiteId); + sync.catchUpClient(clientSiteId); return sync; } @@ -101,74 +108,78 @@ export class Sync { ws, stream, db, - version, siteId }: { ws: ExtendedWebSocket; stream: string; db: ReturnType; - version: number; siteId: string; }) { this.stream = stream; this.ws = ws; this.userId = ws.session.user.userId; this.db = db; - this.version = version; this.siteId = siteId; } - async catchUpServer(clientSiteId) { + catchUpServer(clientSiteId) { // Here we can send down the last seen id or something. // that way we don't need the entire contents of the db - const result = await this.db + const result = this.db .prepare(`SELECT version FROM crsql_tracked_peers WHERE site_id = unhex(?)`) .get(clientSiteId); const version = result?.version ?? 0; - await this.notify(JSON.stringify({ type: 'connected', siteId: clientSiteId, version })); + this.notify(JSON.stringify({ type: 'connected', siteId: clientSiteId, version })); } - async catchUpClient(clientSiteId: string) { + catchUpClient(clientSiteId: string) { // Maybe we can do something to only send down what's needed. // just updates after the last update by `${clientSiteId} - const result = await this.db + const result = this.db .prepare(`SELECT version FROM crsql_tracked_peers WHERE site_id = unhex(?)`) .get(clientSiteId); - const lastVersion = result?.version ?? 0; - const changes = await this.db + const lastVersion = result?.version ? result.version - 1 : 0; + + // ALL + const changes = this.db .prepare( `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" FROM crsql_changes WHERE site_id != unhex(:clientSiteId) - AND db_version >= :lastVersion` + ` ) .all({ - clientSiteId, - lastVersion + clientSiteId }); - - const changeSiteVersions = latestVersions( - changes.map((change) => Object.values(change)) - ).filter(([siteId]) => siteId !== this.siteId); - - changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => { - await this.db - .prepare( - `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) - VALUES (unhex(?), ?, 0, 0) + // const changes = this.db + // .prepare( + // `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" + // FROM crsql_changes WHERE site_id != unhex(:clientSiteId) + // AND db_version >= :lastVersion` + // ) + // .all({ + // clientSiteId, + // lastVersion + // }); + + this.db + .prepare( + `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) + VALUES (unhex(?), crsql_db_version(), 0, 0) ON CONFLICT([site_id], [tag], [event]) DO UPDATE SET version=excluded.version` - ) - .run(changeSiteId, changeDbVersion); - }); + ) + .run(clientSiteId); + + const { version } = this.db.prepare(`SELECT crsql_db_version() as version;`).get(); if (changes.length) { const message = JSON.stringify({ type: 'pull', siteId: this.siteId, - version: this.version, + version: version, changes: changes.map((change) => Object.values(change)) }); this.notify(message); diff --git a/src/routes/app/offline-first/+page.svelte b/src/routes/app/offline-first/+page.svelte index c1078f8..5cd9e37 100644 --- a/src/routes/app/offline-first/+page.svelte +++ b/src/routes/app/offline-first/+page.svelte @@ -21,30 +21,80 @@ wsUrl: data.url, serverSiteId: data.serverSiteId }); + + const me = store({ + query: async (db) => + await db.execO('SELECT hex(crsql_site_id()) as site_id, crsql_db_version() as version'), + commands: { + requery: async (db) => console.log('requerying') + }, + identifier: 'peers' + }); + const peers = store({ + query: async (db) => { + console.log('getting peers'); + return await db.execO('SELECT hex(site_id) as site_id, version FROM crsql_tracked_peers'); + }, + commands: { + requery: async (db) => console.log('requerying') + }, + identifier: 'peers' + }); const todos = store({ query: async (db) => await db.execO('SELECT * FROM todos'), commands: { - insert: async (db, name) => - await db.exec('INSERT INTO todos VALUES (?, ?, ?)', [nanoid(), name, 0]), - toggle: async (db, id) => - await db.exec('UPDATE todos SET complete = not(complete) WHERE id = ?', [id]), - delete: async (db, id) => await db.exec('DELETE FROM todos WHERE id = ?', [id]) + insert: async (db, name) => { + await db.exec('INSERT INTO todos VALUES (?, ?, ?)', [nanoid(), name, 0]); + await peers.requery(); + await me.requery(); + }, + toggle: async (db, id) => { + await db.exec('UPDATE todos SET complete = not(complete) WHERE id = ?', [id]); + + await peers.requery(); + await me.requery(); + }, + + delete: async (db, id) => { + await db.exec('DELETE FROM todos WHERE id = ?', [id]); + await peers.requery(); + await me.requery(); + } }, identifier: 'todos' }); const todonts = store({ query: async (db) => await db.execO('SELECT * FROM todonts'), commands: { - insert: async (db, name) => - await db.exec('INSERT INTO todonts VALUES (?, ?, ?)', [nanoid(), name, 0]), - toggle: async (db, id) => - await db.exec('UPDATE todonts SET complete = not(complete) WHERE id = ?', [id]), - delete: async (db, id) => await db.exec('DELETE FROM todonts WHERE id = ?', [id]) + insert: async (db, name) => { + await db.exec('INSERT INTO todonts VALUES (?, ?, ?)', [nanoid(), name, 0]); + peers.requery(); + me.requery(); + }, + toggle: async (db, id) => { + await db.exec('UPDATE todonts SET complete = not(complete) WHERE id = ?', [id]); + peers.requery(); + me.requery(); + }, + delete: async (db, id) => { + await db.exec('DELETE FROM todonts WHERE id = ?', [id]); + peers.requery(); + me.requery(); + } }, identifier: 'todonts' }); + + $: console.log($peers); +{#each $me as m} +
me: {m.site_id} {m.version}
+{/each} +
server: {data.serverSiteId}
+{#each $peers as peer} +
{peer.site_id} {peer.version}
+{/each}

todos

diff --git a/src/routes/app/offline-first/sdb.ts b/src/routes/app/offline-first/sdb.ts index 6eb51ee..4b48a28 100644 --- a/src/routes/app/offline-first/sdb.ts +++ b/src/routes/app/offline-first/sdb.ts @@ -24,24 +24,27 @@ export function latestVersions(changes) { ); } -async function pushOfflineChangesToServer(database, ws, version) { +async function pushOfflineChangesToServer(database, ws, version, serverSiteId) { + // const changes = await database.db.exec( + // `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" + // FROM crsql_changes WHERE site_id = crsql_site_id() AND db_version >= ?`, + // [version ? version - 1 : 0] + // ); + + // ALL const changes = await database.db.exec( `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" - FROM crsql_changes WHERE site_id = crsql_site_id() AND db_version >= ?`, - [version] + FROM crsql_changes` ); - const changeSiteVersions = latestVersions(changes); // sending so we're using the local db_version - changeSiteVersions.forEach(async ([changeSiteId, changeDbVersion]) => { - await database.db.exec( - `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) - VALUES (unhex(?), ?, 0, 0) + await database.db.exec( + `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) + VALUES (unhex(?), crsql_db_version(), 0, 0) ON CONFLICT([site_id], [tag], [event]) DO UPDATE SET version=excluded.version`, - [changeSiteId, changeDbVersion] - ); - }); + [serverSiteId] + ); // maybe this should be a POST so we can get a nicer // user experience. that way we can await until the @@ -80,7 +83,6 @@ function wsMessageHandler({ if ((type === 'update' && siteId !== clientSiteId) || type === 'pull') { await database.merge(changes); - await database.db.exec( `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) VALUES (unhex(?), crsql_db_version(), 0, 0) @@ -93,7 +95,7 @@ function wsMessageHandler({ } if (type === 'connected') { - await pushOfflineChangesToServer(database, this, version); + await pushOfflineChangesToServer(database, this, version, serverSiteId); } } }; @@ -117,7 +119,6 @@ export function db({ schema, name, wsUrl, serverSiteId, identifier }) { // No SSR return { store: () => readable([]) }; } - console.log({ serverSiteId }); const databasePromise = Database.load({ schema, name }); const wsPromise = setupWs({ url: wsUrl, database: databasePromise }); const store = ({ query, commands }) => { @@ -142,15 +143,21 @@ export function db({ schema, name, wsUrl, serverSiteId, identifier }) { const db = await databasePromise; const results = await fn(db.db, args); q.set(await query(db.db)); + const serverSiteVersion = await db.db.execO( `SELECT version FROM crsql_tracked_peers WHERE site_id = unhex(?)`, [serverSiteId] ); - + // const v = serverSiteVersion[0]?.version ? serverSiteVersion[0].version - 1 : 0; + // const changes = await db.db.exec( + // `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" + // FROM crsql_changes WHERE db_version >= ?`, + // [v] + // ); + // ALL const changes = await db.db.exec( `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" - FROM crsql_changes WHERE db_version >= ?`, - [serverSiteVersion[0].version] + FROM crsql_changes` ); await db.db.exec( @@ -160,7 +167,7 @@ export function db({ schema, name, wsUrl, serverSiteId, identifier }) { DO UPDATE SET version=excluded.version`, [serverSiteId] ); - console.log({ changes }); + const ws = await wsPromise; ws.send( encoder.encode( From 6c77a3045f59e9d81320d1454891ad35cb903717 Mon Sep 17 00:00:00 2001 From: sammy Date: Mon, 1 Jan 2024 14:52:38 -0500 Subject: [PATCH 04/29] small refactor --- src/lib/sync-db.ts | 12 +++++------- src/routes/app/offline-first/+page.svelte | 7 ++----- src/routes/app/offline-first/sdb.ts | 8 ++++---- 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/src/lib/sync-db.ts b/src/lib/sync-db.ts index 4f7f55e..4345ca9 100644 --- a/src/lib/sync-db.ts +++ b/src/lib/sync-db.ts @@ -5,17 +5,20 @@ const INSERT_CHANGES = `INSERT INTO crsql_changes VALUES (?, unhex(?), ?, ?, ?, export class Database { db: DB; + siteId: string; static async load({ schema, name }: { schema: string[]; name: string }) { const sqlite = await initWasm(() => wasmUrl); const db = await sqlite.open(name); - const database = new Database(db); + const [{ siteId }] = await db.execO(`SELECT hex(crsql_site_id()) as siteId;`); + const database = new Database(db, siteId); await database.db.execMany(schema); return database; } - constructor(db: DB) { + constructor(db: DB, siteId: string) { this.db = db; + this.siteId = siteId; } async version() { @@ -23,11 +26,6 @@ export class Database { return version; } - async siteId() { - const [[siteId]] = await this.db.exec(`SELECT hex(crsql_site_id());`); - return siteId; - } - async merge(changes) { // const trackedPeers = await this.db.exec(`SELECT * FROM crsql_tracked_peers`); // TODO: USE PREPARED STATEMENTS diff --git a/src/routes/app/offline-first/+page.svelte b/src/routes/app/offline-first/+page.svelte index 5cd9e37..6e43997 100644 --- a/src/routes/app/offline-first/+page.svelte +++ b/src/routes/app/offline-first/+page.svelte @@ -26,17 +26,16 @@ query: async (db) => await db.execO('SELECT hex(crsql_site_id()) as site_id, crsql_db_version() as version'), commands: { - requery: async (db) => console.log('requerying') + requery: async (db) => undefined }, identifier: 'peers' }); const peers = store({ query: async (db) => { - console.log('getting peers'); return await db.execO('SELECT hex(site_id) as site_id, version FROM crsql_tracked_peers'); }, commands: { - requery: async (db) => console.log('requerying') + requery: async (db) => undefined }, identifier: 'peers' }); @@ -84,8 +83,6 @@ }, identifier: 'todonts' }); - - $: console.log($peers); {#each $me as m} diff --git a/src/routes/app/offline-first/sdb.ts b/src/routes/app/offline-first/sdb.ts index 4b48a28..3474f19 100644 --- a/src/routes/app/offline-first/sdb.ts +++ b/src/routes/app/offline-first/sdb.ts @@ -53,7 +53,7 @@ async function pushOfflineChangesToServer(database, ws, version, serverSiteId) { const message = encoder.encode( JSON.stringify({ type: 'update', - siteId: await database.siteId(), + siteId: database.siteId, version: await database.version(), changes }) @@ -77,7 +77,7 @@ function wsMessageHandler({ // maybe there's some kind of queue or something we can use to only apply // appropriate udpates if (typeof event.data !== 'string') { - const clientSiteId = await database.siteId(); + const clientSiteId = database.siteId; const m = await event.data.text(); const { type, changes, siteId, version } = JSON.parse(m); @@ -106,7 +106,7 @@ async function setupWs({ url, database }: { url: string; database: Promise Date: Mon, 1 Jan 2024 15:05:04 -0500 Subject: [PATCH 05/29] ssa --- src/lib/server/websockets/features/offline/sync.ts | 10 ---------- src/lib/sync-db.ts | 1 - src/routes/app/offline-first/+page.ts | 1 + src/routes/app/offline-first/sdb.ts | 14 +------------- 4 files changed, 2 insertions(+), 24 deletions(-) create mode 100644 src/routes/app/offline-first/+page.ts diff --git a/src/lib/server/websockets/features/offline/sync.ts b/src/lib/server/websockets/features/offline/sync.ts index 475b2a8..a895ab2 100644 --- a/src/lib/server/websockets/features/offline/sync.ts +++ b/src/lib/server/websockets/features/offline/sync.ts @@ -153,16 +153,6 @@ export class Sync { .all({ clientSiteId }); - // const changes = this.db - // .prepare( - // `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" - // FROM crsql_changes WHERE site_id != unhex(:clientSiteId) - // AND db_version >= :lastVersion` - // ) - // .all({ - // clientSiteId, - // lastVersion - // }); this.db .prepare( diff --git a/src/lib/sync-db.ts b/src/lib/sync-db.ts index 4345ca9..64d4f98 100644 --- a/src/lib/sync-db.ts +++ b/src/lib/sync-db.ts @@ -27,7 +27,6 @@ export class Database { } async merge(changes) { - // const trackedPeers = await this.db.exec(`SELECT * FROM crsql_tracked_peers`); // TODO: USE PREPARED STATEMENTS await this.db.tx(async (tx) => { changes.forEach(async (change) => { diff --git a/src/routes/app/offline-first/+page.ts b/src/routes/app/offline-first/+page.ts new file mode 100644 index 0000000..a3d1578 --- /dev/null +++ b/src/routes/app/offline-first/+page.ts @@ -0,0 +1 @@ +export const ssr = false; diff --git a/src/routes/app/offline-first/sdb.ts b/src/routes/app/offline-first/sdb.ts index 3474f19..56a1b50 100644 --- a/src/routes/app/offline-first/sdb.ts +++ b/src/routes/app/offline-first/sdb.ts @@ -25,12 +25,6 @@ export function latestVersions(changes) { } async function pushOfflineChangesToServer(database, ws, version, serverSiteId) { - // const changes = await database.db.exec( - // `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" - // FROM crsql_changes WHERE site_id = crsql_site_id() AND db_version >= ?`, - // [version ? version - 1 : 0] - // ); - // ALL const changes = await database.db.exec( `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" @@ -148,13 +142,7 @@ export function db({ schema, name, wsUrl, serverSiteId, identifier }) { `SELECT version FROM crsql_tracked_peers WHERE site_id = unhex(?)`, [serverSiteId] ); - // const v = serverSiteVersion[0]?.version ? serverSiteVersion[0].version - 1 : 0; - // const changes = await db.db.exec( - // `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" - // FROM crsql_changes WHERE db_version >= ?`, - // [v] - // ); - // ALL + const changes = await db.db.exec( `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" FROM crsql_changes` From bd55968d84014f80fa42c09d1717f879eeca6370 Mon Sep 17 00:00:00 2001 From: Sammy Nave Date: Mon, 1 Jan 2024 15:36:57 -0500 Subject: [PATCH 06/29] service worker --- src/service-worker.js | 81 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 src/service-worker.js diff --git a/src/service-worker.js b/src/service-worker.js new file mode 100644 index 0000000..5f6fb54 --- /dev/null +++ b/src/service-worker.js @@ -0,0 +1,81 @@ +/// +import { build, files, version } from '$service-worker'; + +// Create a unique cache name for this deployment +const CACHE = `cache-${version}`; + +const ASSETS = [ + ...build, // the app itself + ...files // everything in `static` +]; + +self.addEventListener('install', (event) => { + console.log('installing'); + // Create a new cache and add all files to it + async function addFilesToCache() { + const cache = await caches.open(CACHE); + await cache.addAll(ASSETS); + } + + event.waitUntil(addFilesToCache()); +}); + +self.addEventListener('activate', (event) => { + // Remove previous cached data from disk + async function deleteOldCaches() { + for (const key of await caches.keys()) { + if (key !== CACHE) await caches.delete(key); + } + } + + event.waitUntil(deleteOldCaches()); +}); + +self.addEventListener('fetch', (event) => { + // ignore POST requests etc + if (event.request.method !== 'GET') return; + + async function respond() { + const url = new URL(event.request.url); + const cache = await caches.open(CACHE); + + // `build`/`files` can always be served from the cache + if (ASSETS.includes(url.pathname)) { + const response = await cache.match(url.pathname); + + if (response) { + return response; + } + } + + // for everything else, try the network first, but + // fall back to the cache if we're offline + try { + const response = await fetch(event.request); + + // if we're offline, fetch can return a value that is not a Response + // instead of throwing - and we can't pass this non-Response to respondWith + if (!(response instanceof Response)) { + throw new Error('invalid response from fetch'); + } + + if (response.status === 200) { + cache.put(event.request, response.clone()); + } + + return response; + } catch (err) { + const response = await cache.match(event.request); + + if (response) { + return response; + } + + // if there's no cache, then just error out + // as there is nothing we can do to respond to this request + throw err; + } + } + + event.respondWith(respond()); +}); From ff1290cd0e4ca596198ce43128977864a164e1e8 Mon Sep 17 00:00:00 2001 From: Sammy Nave Date: Mon, 1 Jan 2024 15:52:20 -0500 Subject: [PATCH 07/29] move schema --- src/lib/sync-db.ts | 10 ++++++++-- src/lib/sync/schema.sql | 5 +++++ src/routes/app/offline-first/+page.svelte | 18 ++++++++---------- 3 files changed, 21 insertions(+), 12 deletions(-) create mode 100644 src/lib/sync/schema.sql diff --git a/src/lib/sync-db.ts b/src/lib/sync-db.ts index 64d4f98..fdb324e 100644 --- a/src/lib/sync-db.ts +++ b/src/lib/sync-db.ts @@ -7,12 +7,18 @@ export class Database { db: DB; siteId: string; - static async load({ schema, name }: { schema: string[]; name: string }) { + static async load({ + schema, + name + }: { + schema: { name: string; schemaContent: string }; + name: string; + }) { const sqlite = await initWasm(() => wasmUrl); const db = await sqlite.open(name); const [{ siteId }] = await db.execO(`SELECT hex(crsql_site_id()) as siteId;`); const database = new Database(db, siteId); - await database.db.execMany(schema); + await db.automigrateTo(schema.name, schema.schemaContent); return database; } diff --git a/src/lib/sync/schema.sql b/src/lib/sync/schema.sql new file mode 100644 index 0000000..7cec62d --- /dev/null +++ b/src/lib/sync/schema.sql @@ -0,0 +1,5 @@ + +CREATE TABLE IF NOT EXISTS todos (id PRIMARY KEY NOT NULL, content, complete); +SELECT crsql_as_crr('todos'); +CREATE TABLE IF NOT EXISTS todonts (id PRIMARY KEY NOT NULL, content, complete); +SELECT crsql_as_crr('todonts'); diff --git a/src/routes/app/offline-first/+page.svelte b/src/routes/app/offline-first/+page.svelte index 6e43997..37945d8 100644 --- a/src/routes/app/offline-first/+page.svelte +++ b/src/routes/app/offline-first/+page.svelte @@ -1,22 +1,15 @@ + +

+ {#if online}Online{:else}Offline{/if} +

+ {#each $me as m}
me: {m.site_id} {m.version}
{/each} From 9fb49dc9a9f9c84da65d7fbcd486c808f666a7a5 Mon Sep 17 00:00:00 2001 From: Sammy Nave Date: Mon, 1 Jan 2024 16:23:24 -0500 Subject: [PATCH 08/29] x --- .../websockets/features/offline/sycn.md | 13 +++ .../websockets/features/offline/sync.ts | 81 +++++-------------- src/routes/app/offline-first/+page.svelte | 2 +- .../app/offline-first/server-sync-db.ts} | 0 .../{sdb.ts => sync-db-store.ts} | 2 +- 5 files changed, 36 insertions(+), 62 deletions(-) create mode 100644 src/lib/server/websockets/features/offline/sycn.md rename src/{lib/sync-db.ts => routes/app/offline-first/server-sync-db.ts} (100%) rename src/routes/app/offline-first/{sdb.ts => sync-db-store.ts} (99%) diff --git a/src/lib/server/websockets/features/offline/sycn.md b/src/lib/server/websockets/features/offline/sycn.md new file mode 100644 index 0000000..db724b4 --- /dev/null +++ b/src/lib/server/websockets/features/offline/sycn.md @@ -0,0 +1,13 @@ +# Concerns + +1. websocket client (ws) + a. receive message + b. send message +2. interacting with other websocket clients (redis) + a. broadcasting messages to other clients + b. receiving messages from other clients +3. dealing with database changes + a. merging changes from clients + b. broadcasting changes to clients + c. catching server up with clients (ex: offline changes happened on client) + d. catching clients up with server (ex: new client) diff --git a/src/lib/server/websockets/features/offline/sync.ts b/src/lib/server/websockets/features/offline/sync.ts index a895ab2..259fbdd 100644 --- a/src/lib/server/websockets/features/offline/sync.ts +++ b/src/lib/server/websockets/features/offline/sync.ts @@ -2,9 +2,8 @@ import { client, create } from '../../redis-client'; import type { ExtendedWebSocket } from '../../../../../../vite-plugins/vite-plugin-svelte-kit-integrated-websocket-server'; import type { Redis } from 'ioredis'; import { dbFrom } from '$lib/server/sync-db/db'; -import type Database from 'better-sqlite3'; +import type BetterSqlite3 from 'better-sqlite3'; import { WebSocket } from 'ws'; -import { latestVersions } from '../../../../../routes/app/offline-first/sdb'; const INSERT_CHANGES = `INSERT INTO crsql_changes VALUES (?, unhex(?), ?, ?, ?, ?, unhex(?), ?, ?)`; @@ -16,8 +15,10 @@ export class Sync { private ws: ExtendedWebSocket; private userId: string; private redisClient: Redis = client(); - private db: ReturnType; + private db: ReturnType; private siteId: string; + insertChangesStatement: BetterSqlite3.Statement; + insertTrackedPeersStatement: BetterSqlite3.Statement; /* This is the preferred way to instantiate this class for 2 reasons: @@ -48,46 +49,22 @@ export class Sync { }); const subscription = await subClient.on('messageBuffer', (stream, message) => { - const { clientVersion } = db - .prepare( - `SELECT version as clientVersion FROM crsql_tracked_peers WHERE site_id = unhex(?)` - ) - .get(clientSiteId); - const { serverVersion } = db.prepare('SELECT crsql_db_version() as serverVersion;').get(); - - db.prepare( - `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) - VALUES (unhex(?), crsql_db_version(), 0, 0) - ON CONFLICT([site_id], [tag], [event]) - DO UPDATE SET version=excluded.version` - ).run(clientSiteId); - - sync.notify(message); + sync.insertTrackedPeersStatement.run(clientSiteId); + + sync.send(message); }); ws.on('message', async (data) => { const parsed = JSON.parse(data.toString()); const changes = parsed.changes; if (parsed.type === 'update') { - /* - some client is sending an update to the server - which then is forwarded to all clients -> `sync.recieve(data)` - this can be triggered in two ways: - 1. client inserts a new entry and sends an update - 2. client receives a message of `type: 'connected'`, then it sends up all changes - */ - changes.forEach((change, i) => { - db.prepare(INSERT_CHANGES).run(...change); + changes.forEach((change) => { + sync.insertChangesStatement.run(...change); }); const fromSiteId = parsed.siteId; - db.prepare( - `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) - VALUES (unhex(?), crsql_db_version(), 0, 0) - ON CONFLICT([site_id], [tag], [event]) - DO UPDATE SET version=excluded.version` - ).run(fromSiteId); + sync.insertTrackedPeersStatement.run(fromSiteId); await sync.receive(data); } @@ -120,6 +97,12 @@ export class Sync { this.userId = ws.session.user.userId; this.db = db; this.siteId = siteId; + this.insertChangesStatement = db.prepare(INSERT_CHANGES); + this.insertTrackedPeersStatement = + db.prepare(`INSERT INTO crsql_tracked_peers (site_id, version, tag, event) + VALUES (unhex(?), crsql_db_version(), 0, 0) + ON CONFLICT([site_id], [tag], [event]) + DO UPDATE SET version=excluded.version`); } catchUpServer(clientSiteId) { @@ -130,19 +113,10 @@ export class Sync { .prepare(`SELECT version FROM crsql_tracked_peers WHERE site_id = unhex(?)`) .get(clientSiteId); const version = result?.version ?? 0; - this.notify(JSON.stringify({ type: 'connected', siteId: clientSiteId, version })); + this.send(JSON.stringify({ type: 'connected', siteId: clientSiteId, version })); } catchUpClient(clientSiteId: string) { - // Maybe we can do something to only send down what's needed. - // just updates after the last update by `${clientSiteId} - - const result = this.db - .prepare(`SELECT version FROM crsql_tracked_peers WHERE site_id = unhex(?)`) - .get(clientSiteId); - - const lastVersion = result?.version ? result.version - 1 : 0; - // ALL const changes = this.db .prepare( @@ -153,15 +127,7 @@ export class Sync { .all({ clientSiteId }); - - this.db - .prepare( - `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) - VALUES (unhex(?), crsql_db_version(), 0, 0) - ON CONFLICT([site_id], [tag], [event]) - DO UPDATE SET version=excluded.version` - ) - .run(clientSiteId); + this.insertTrackedPeersStatement.run(clientSiteId); const { version } = this.db.prepare(`SELECT crsql_db_version() as version;`).get(); @@ -172,22 +138,17 @@ export class Sync { version: version, changes: changes.map((change) => Object.values(change)) }); - this.notify(message); + this.send(message); } } - private notify(message) { + private send(message) { if (this.ws.readyState === WebSocket.OPEN) { this.ws.send(message, { binary: true }); } } - private async publishMessage(message: ArrayBufferLike) { - await this.redisClient.lpush(this.stream, message); - await this.redisClient.publish(this.stream, message); - } - private async receive(message: Buffer) { - await this.publishMessage(message); + await this.redisClient.publish(this.stream, message); } } diff --git a/src/routes/app/offline-first/+page.svelte b/src/routes/app/offline-first/+page.svelte index 37945d8..ce4b287 100644 --- a/src/routes/app/offline-first/+page.svelte +++ b/src/routes/app/offline-first/+page.svelte @@ -1,6 +1,6 @@ - -

- {#if online}Online{:else}Offline{/if} -

- -
await todos.loadEmUp()}> - - -
-
todos count: {$count}
-{#each $me as m} -
me: {m.site_id} {m.version}
-{/each} -
server: {data.serverSiteId}
-{#each $peers as peer} -
{peer.site_id} {peer.version}
-{/each} -
-
-

todos

-
{ - await todos.insert(newTodo); - newTodo = ''; - }} - > - - -
- - {#each $todos as todo} -
- todos.toggle(todo.id)} /> - {todo.id} - {todo.content} - -
- {/each} -
- -
-

todonts

-
{ - await todonts.insert(newTodont); - newTodont = ''; - }} - > - - -
+ - {#each $todonts as todont} -
- todonts.toggle(todont.id)} - /> - {todont.id} - {todont.content} - -
- {/each} -
-
+ diff --git a/src/routes/app/offline-first/Todos.svelte b/src/routes/app/offline-first/Todos.svelte new file mode 100644 index 0000000..9f88edf --- /dev/null +++ b/src/routes/app/offline-first/Todos.svelte @@ -0,0 +1,152 @@ + + + +

+ {#if online}Online{:else}Offline{/if} +

+ +
await todos.loadEmUp()}> + + +
+
await todos.deleteEm()}> + +
+
todos count: {$count}
+{#each $me as m} +
me: {m.site_id} {m.version}
+{/each} +
server: {dbConfig.serverSiteId}
+{#each $peers as peer} +
{peer.site_id} {peer.version}
+{/each} +
+
+

todos

+
{ + await todos.insert(newTodo); + newTodo = ''; + }} + > + + +
+ + {#each $todos as todo} +
+ todos.toggle(todo.id)} /> + {todo.id} + {todo.content} + +
+ {/each} +
+ +
+

todonts

+
{ + await todonts.insert(newTodont); + newTodont = ''; + }} + > + + +
+ + {#each $todonts as todont} +
+ todonts.toggle(todont.id)} + /> + {todont.id} + {todont.content} + +
+ {/each} +
+
diff --git a/src/routes/app/offline-first/server-sync-db.ts b/src/routes/app/offline-first/server-sync-db.ts index 984dfa6..578a80b 100644 --- a/src/routes/app/offline-first/server-sync-db.ts +++ b/src/routes/app/offline-first/server-sync-db.ts @@ -35,10 +35,13 @@ export class Database { async merge(changes) { await this.db.tx(async (tx) => { - changes.forEach(async (change, i) => { - console.log(`merging ${i} of ${changes.length}`); - await tx.exec(INSERT_CHANGES, change); - }); + const execPromises = []; + + for (const change of changes) { + execPromises.push(tx.exec(INSERT_CHANGES, change)); + } + + await Promise.all(execPromises); }); } @@ -55,7 +58,7 @@ export class Database { async changesSince(since = 0) { return await this.db.exec( `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" - FROM crsql_changes WHERE db_version >= ?`, + FROM crsql_changes WHERE db_version > ?`, [since] ); } diff --git a/src/routes/app/offline-first/sync-db-store.ts b/src/routes/app/offline-first/sync-db-store.ts index f926943..777a0a7 100644 --- a/src/routes/app/offline-first/sync-db-store.ts +++ b/src/routes/app/offline-first/sync-db-store.ts @@ -1,5 +1,5 @@ +import { nanoid } from 'nanoid'; import { Database } from './server-sync-db'; -import { onDestroy } from 'svelte'; import { writable } from 'svelte/store'; function wsErrorHandler(error: Event) { @@ -27,23 +27,11 @@ async function pushChangesSince({ database, ws, sinceVersion, serverSiteId }) { function wsMessageHandler({ database, - updates, - serverSiteId, - identifier + serverSiteId }: { database: Database; - updates: Set<() => Promise>; serverSiteId: string; - identifier?: string; }) { - // TODO - // TODO - // TODO - // `update` needs to be a subscription type structure. we want to pass in an `update` function for for each - // `store` we register but we don't want to register multiple wsMessageHandlers - // TODO - // TODO - return async function (event: Event) { // Are we over subscribing here? every `store` attaches an event listener // maybe there's some kind of queue or something we can use to only apply @@ -54,17 +42,8 @@ function wsMessageHandler({ const { type, changes, siteId, version } = JSON.parse(m); if ((type === 'update' && siteId !== clientSiteId) || type === 'pull') { - if (type === 'pull') { - console.log('pull'); - } await database.merge(changes); await database.insertTrackedPeers(serverSiteId); - - const updatePromises = []; - for (const update of updates) { - updatePromises.push(update()); - } - await Promise.all(updatePromises); } if (type === 'connected') { @@ -76,7 +55,7 @@ function wsMessageHandler({ } // TODO: probably need re-connect/retry logic if WS server closes connection -async function setupWs({ url, database }: { url: string; database: Promise }) { +export async function setupWs({ url, database }: { url: string; database: Promise }) { const db = await database; const u = new URL(url); const features = JSON.parse(u.searchParams.get('features') as string); @@ -88,28 +67,50 @@ async function setupWs({ url, database }: { url: string; database: Promise { +export function db({ databasePromise, wsPromise, serverSiteId, name }) { + const self = nanoid(); + let wsListenerAdded = false; + let channelListenerAdded = false; + const channelSubscribers = new Set(); + const channel = 'BroadcastChannel' in globalThis ? new globalThis.BroadcastChannel(name) : null; + // TODO - rename `query` to `view` + const store = ({ watch, query, commands = {} }) => { const q = writable([]); databasePromise.then(async (database) => { const ws = await wsPromise; - const update = async () => q.set(await query(database.db)); - // Maybe this should register the listener in a store, - // we may be over subscribing since we add a listener with - // every `store` - updates.add(update); - if (listenerAdded === false) { - ws.addEventListener( - 'message', - wsMessageHandler({ database, identifier, serverSiteId, updates }) - ); - listenerAdded = true; + if (wsListenerAdded === false) { + ws.addEventListener('message', wsMessageHandler({ database, serverSiteId })); + wsListenerAdded = true; + } + + // Update other tabs + channelSubscribers.add(async (event: MessageEvent) => { + if (watch.some((table) => event.data.tables.includes(table))) { + await q.set(await query(database.db)); + } + }); + + if (channelListenerAdded === false) { + channel?.addEventListener('message', async (event) => { + for (const update of channelSubscribers) { + await update(event); + } + }); + channelListenerAdded = true; } - await update(); + + // Could we do some fine-grained updates here with rowid? + // svelte 5 might make this easier/possible. For now, + // just re-calculate view + database.db.onUpdate(async (type, dbName, tblName, rowid) => { + if (watch.includes(tblName)) { + // Force other tabs/windows to refresh their views when + // when the db changes in another window. + channel?.postMessage({ tables: [tblName], sender: self }); + } + }); + + await q.set(await query(database.db)); }); const cmds = Object.fromEntries( @@ -119,9 +120,9 @@ export function db({ schema, name, wsUrl, serverSiteId, identifier }) { const ws = await wsPromise; const db = await databasePromise; const results = await fn(db.db, args); - q.set(await query(db.db)); const [[sinceVersion]] = await db.lastTrackedChangeFor(serverSiteId); + await pushChangesSince({ database: db, ws, @@ -129,6 +130,15 @@ export function db({ schema, name, wsUrl, serverSiteId, identifier }) { serverSiteId }); + /* + TODO: investigate why this is needed. + in some cases the syncing fails + example - server is offline, offline changes made to private browser WindowA, + offline changes made to public WindowA, and offline changes to public WindowB + do not always sync up when the server reconnects. + seems like an off by one issue + */ + channel?.postMessage({ tables: watch, sender: self }); return results; } ]) @@ -140,10 +150,8 @@ export function db({ schema, name, wsUrl, serverSiteId, identifier }) { }; }; - onDestroy(async () => { - const ws = await wsPromise; - ws.close(); - }); - - return { store }; + return { + store, + database: new Promise((r) => databasePromise.then((db) => r(db))) + }; } From ee5c3f3d5cb66f1e3476f3c524a9f529aaad6d2a Mon Sep 17 00:00:00 2001 From: Sammy Nave Date: Thu, 4 Jan 2024 22:25:49 -0500 Subject: [PATCH 18/29] looks like this works in every scenario --- .../websockets/features/offline/sycn.md | 13 --- .../websockets/features/offline/sync.ts | 27 +++--- src/routes/app/offline-first/+page.svelte | 18 ++++ src/routes/app/offline-first/README.md | 84 +++++++------------ src/routes/app/offline-first/Todos.svelte | 28 +++---- .../app/offline-first/server-sync-db.ts | 15 ++-- src/routes/app/offline-first/sync-db-store.ts | 30 ++++--- 7 files changed, 101 insertions(+), 114 deletions(-) delete mode 100644 src/lib/server/websockets/features/offline/sycn.md diff --git a/src/lib/server/websockets/features/offline/sycn.md b/src/lib/server/websockets/features/offline/sycn.md deleted file mode 100644 index db724b4..0000000 --- a/src/lib/server/websockets/features/offline/sycn.md +++ /dev/null @@ -1,13 +0,0 @@ -# Concerns - -1. websocket client (ws) - a. receive message - b. send message -2. interacting with other websocket clients (redis) - a. broadcasting messages to other clients - b. receiving messages from other clients -3. dealing with database changes - a. merging changes from clients - b. broadcasting changes to clients - c. catching server up with clients (ex: offline changes happened on client) - d. catching clients up with server (ex: new client) diff --git a/src/lib/server/websockets/features/offline/sync.ts b/src/lib/server/websockets/features/offline/sync.ts index 6a6af06..98b6b17 100644 --- a/src/lib/server/websockets/features/offline/sync.ts +++ b/src/lib/server/websockets/features/offline/sync.ts @@ -7,13 +7,13 @@ import { WebSocket } from 'ws'; const INSERT_CHANGES = `INSERT INTO crsql_changes VALUES (?, unhex(?), ?, ?, ?, ?, unhex(?), ?, ?)`; const INSERT_TRACKED_PEERS = `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) -VALUES (unhex(?), crsql_db_version(), 0, 0) +VALUES (unhex(?), ?, 0, ?) ON CONFLICT([site_id], [tag], [event]) DO UPDATE SET version=excluded.version`; const SELECT_VERSION = `SELECT crsql_db_version() as version;`; const SELECT_NON_CLIENT_CHANGES = `SELECT "table", hex("pk") as pk, "cid", "val", "col_version", "db_version", hex("site_id") as site_id, "cl", "seq" FROM crsql_changes WHERE site_id != unhex(:clientSiteId) AND db_version > :dbVersion`; -const SELECT_VERSION_FROM_TRACKED_PEER = `SELECT version FROM crsql_tracked_peers WHERE site_id = unhex(?)`; +const SELECT_VERSION_FROM_TRACKED_PEER = `SELECT IFNULL(version, 0) version FROM crsql_tracked_peers WHERE site_id = unhex(?) AND event = ?`; // TODO: review https://github.com/vlcn-io/js/blob/main/packages/ws-server/src/DB.ts // see if any edge cases have been missed @@ -57,7 +57,7 @@ export class Sync { } }); const subscription = await subClient.on('messageBuffer', (stream, message) => { - sync.insertTrackedPeersStatement.run(clientSiteId); + sync.insertTrackedPeersStatement.run(clientSiteId, sync.version, 1); sync.send(message); }); @@ -94,7 +94,10 @@ export class Sync { this.nonClientChanges = db.prepare(SELECT_NON_CLIENT_CHANGES); this.versionOfTrackedPeer = db.prepare(SELECT_VERSION_FROM_TRACKED_PEER); } - + get version() { + const { version } = this.versionStatement.get(); + return version; + } async onMessage(data) { const parsed = JSON.parse(data.toString()); const changes = parsed.changes; @@ -102,7 +105,7 @@ export class Sync { this.merge(changes); const fromSiteId = parsed.siteId; - this.insertTrackedPeersStatement.run(fromSiteId); + this.insertTrackedPeersStatement.run(fromSiteId, parsed.version, 0); this.redisClient.publish(this.stream, data); } @@ -118,21 +121,21 @@ export class Sync { } private pull(clientSiteId: string) { - const result = this.versionOfTrackedPeer.get(clientSiteId); - const version = result?.version ?? 0; - this.send(JSON.stringify({ type: 'connected', siteId: clientSiteId, version })); + // const result = this.versionOfTrackedPeer.get(clientSiteId, 0); + // const version = result?.version ?? 0; + this.send(JSON.stringify({ type: 'connected', siteId: clientSiteId })); + // , version })); } private push(clientSiteId: string) { - const result = this.versionOfTrackedPeer.get(clientSiteId); + const result = this.versionOfTrackedPeer.get(clientSiteId, 1); const changes = this.nonClientChanges.all({ clientSiteId, dbVersion: result?.version ?? 0 }); - const { version } = this.versionStatement.get(); if (changes.length) { const message = JSON.stringify({ type: 'pull', siteId: this.siteId, - version, + version: this.version, changes: changes.map((change) => Object.values(change)) }); this.send(message, clientSiteId); @@ -144,7 +147,7 @@ export class Sync { // Only update if we send the message /// might want an ACK from client before we do this if (clientSiteId) { - this.insertTrackedPeersStatement.run(clientSiteId); + this.insertTrackedPeersStatement.run(clientSiteId, this.version, 1); } this.ws.send(message, { binary: true }); } diff --git a/src/routes/app/offline-first/+page.svelte b/src/routes/app/offline-first/+page.svelte index 7afec99..23cc88a 100644 --- a/src/routes/app/offline-first/+page.svelte +++ b/src/routes/app/offline-first/+page.svelte @@ -7,6 +7,7 @@ export let data; + let online = false; const databasePromise = Database.load({ schema: { name: 'schema.sql', schemaContent }, name: data.dbName @@ -23,6 +24,23 @@ }); + + +

BUG

+
    +
  1. 1. open three windows, 1 private, 2 public
  2. +
  3. 2. add two todonts and 4 todos
  4. +
  5. 3. stop server
  6. +
  7. 4. check both todonts in private window
  8. +
  9. 5. check two todos in 1 public window and two todos in the other public window
  10. +
  11. 6. start server_id
  12. +
  13. 7. one of the checked todonts should be out of sync (this may take several tries)
  14. +
+ +

+ {#if online}Online{:else}Offline{/if} +

+ ? AND site_id = crsql_site_id()`, [versionFromServer]); - }) - const message = { type: 'update', changes } - ws.send(JSON.stringify(message)) -} -// on server -// typical upate handler used to merge changes and update tracked peers +# Flows +Init +1. client connects to server +2. client announces presence +3. server sends all changes to client (since last version) +4. server updates tracked version of client +5. client merges changes +6. client updates tracked version of server +7. client sends all changes to server (since last version) +8. client updates tracked version of server +10. server merges changes +11. server updates tracked version of client -// SYNC CLIENT -// Server opens websocket then triggers this Flow -ws.send({ type:'connected' }) +Client connected and caught up +1. user makes change +2. client sends all changes to server (since last version) -// Client receives message -if (message.type === 'connected') { - const clientTrackedPeers = await db.execO(`SELECT site_id, db_version FROM crsql_tracked_peers`) - const message = { type: 'catch up', peers: clientTrackedPeers } - ws.send(JSON.stringify(message)) -} +Offline to Online (TO BE IMPLEMENTED) -// on server -// This is the server fulfilling the client's request to CATCH UP -async function catchUpClient(clientTrackedPeers) { - const changes = clientTrackedPeers.flatMap(async ({db_version, site_id}) => { - return await db.exec(`SELECT * FROM crsql_changes WHERE db_version = ? AND site_id = ?`, [db_version, site_id]); - }) - const message = { type: 'update', changes } - ws.send(JSON.stringify(message)) -} -if (message.type === 'catch up') { - await catchUpClient(message.peers) -} -``` +1. should be similar if not the same as Init flow diff --git a/src/routes/app/offline-first/Todos.svelte b/src/routes/app/offline-first/Todos.svelte index 9f88edf..e129b1d 100644 --- a/src/routes/app/offline-first/Todos.svelte +++ b/src/routes/app/offline-first/Todos.svelte @@ -7,30 +7,29 @@ let newTodo = ''; let newTodont = ''; - let online = false; let num = 100; - const { store, database } = db(dbConfig); - const me = store({ + const { repo } = db(dbConfig); + const me = repo({ watch: ['todos'], - query: async (db) => + view: async (db) => await db.execO('SELECT hex(crsql_site_id()) as site_id, crsql_db_version() as version') }); - const peers = store({ + const peers = repo({ watch: ['todos'], - query: async (db) => { + view: async (db) => { return await db.execO('SELECT hex(site_id) as site_id, version FROM crsql_tracked_peers'); } }); - const count = store({ + const count = repo({ watch: ['todos'], - query: async (db) => { + view: async (db) => { const [{ count }] = await db.execO('SELECT count(*) as count FROM todos'); return count; } }); - const todos = store({ + const todos = repo({ watch: ['todos'], - query: async (db) => { + view: async (db) => { const todos = await db.execO('SELECT * FROM todos'); return todos; }, @@ -64,9 +63,9 @@ } }); - const todonts = store({ + const todonts = repo({ watch: ['todonts'], - query: async (db) => await db.execO('SELECT * FROM todonts'), + view: async (db) => await db.execO('SELECT * FROM todonts'), commands: { insert: async (db, name) => { await db.exec('INSERT INTO todonts VALUES (?, ?, ?)', [nanoid(), name, 0]); @@ -81,11 +80,6 @@ }); - -

- {#if online}Online{:else}Offline{/if} -

-
await todos.loadEmUp()}> diff --git a/src/routes/app/offline-first/server-sync-db.ts b/src/routes/app/offline-first/server-sync-db.ts index 578a80b..4c88b76 100644 --- a/src/routes/app/offline-first/server-sync-db.ts +++ b/src/routes/app/offline-first/server-sync-db.ts @@ -45,13 +45,13 @@ export class Database { }); } - async insertTrackedPeers(serverSiteId) { + async insertTrackedPeers(serverSiteId, version, event) { await this.db.exec( `INSERT INTO crsql_tracked_peers (site_id, version, tag, event) - VALUES (unhex(?), crsql_db_version(), 0, 0) + VALUES (unhex(?), ?, 0, ?) ON CONFLICT([site_id], [tag], [event]) DO UPDATE SET version=excluded.version`, - [serverSiteId] + [serverSiteId, version, event] ); } @@ -63,9 +63,10 @@ export class Database { ); } - async lastTrackedChangeFor(siteId) { - return await this.db.exec(`SELECT version FROM crsql_tracked_peers WHERE hex(site_id) = ?`, [ - siteId - ]); + async lastTrackedChangeFor(siteId, event) { + return await this.db.exec( + `SELECT IFNULL(version, 0) version FROM crsql_tracked_peers WHERE hex(site_id) = ? AND event = ?`, + [siteId, event] + ); } } diff --git a/src/routes/app/offline-first/sync-db-store.ts b/src/routes/app/offline-first/sync-db-store.ts index 777a0a7..547673a 100644 --- a/src/routes/app/offline-first/sync-db-store.ts +++ b/src/routes/app/offline-first/sync-db-store.ts @@ -13,12 +13,13 @@ async function pushChangesSince({ database, ws, sinceVersion, serverSiteId }) { // do not attempt to send to server // if websocket is not open if (ws.readyState === WebSocket.OPEN) { - await database.insertTrackedPeers(serverSiteId); + const version = await database.version(); + await database.insertTrackedPeers(serverSiteId, version, 1); ws.send( JSON.stringify({ type: 'update', siteId: database.siteId, - version: await database.version(), + version, changes }) ); @@ -33,7 +34,7 @@ function wsMessageHandler({ serverSiteId: string; }) { return async function (event: Event) { - // Are we over subscribing here? every `store` attaches an event listener + // Are we over subscribing here? every `repo` attaches an event listener // maybe there's some kind of queue or something we can use to only apply // appropriate updates if (typeof event.data !== 'string') { @@ -43,12 +44,17 @@ function wsMessageHandler({ if ((type === 'update' && siteId !== clientSiteId) || type === 'pull') { await database.merge(changes); - await database.insertTrackedPeers(serverSiteId); + await database.insertTrackedPeers(serverSiteId, version, 0); } if (type === 'connected') { - console.log('connected'); - await pushChangesSince({ database, ws: this, sinceVersion: version, serverSiteId }); + const [[trackedVersion]] = await database.lastTrackedChangeFor(serverSiteId, 1); + await pushChangesSince({ + database, + ws: this, + sinceVersion: trackedVersion, + serverSiteId + }); } } }; @@ -73,8 +79,8 @@ export function db({ databasePromise, wsPromise, serverSiteId, name }) { let channelListenerAdded = false; const channelSubscribers = new Set(); const channel = 'BroadcastChannel' in globalThis ? new globalThis.BroadcastChannel(name) : null; - // TODO - rename `query` to `view` - const store = ({ watch, query, commands = {} }) => { + // TODO - rename `view` to `view` + const repo = ({ watch, view, commands = {} }) => { const q = writable([]); databasePromise.then(async (database) => { const ws = await wsPromise; @@ -86,7 +92,7 @@ export function db({ databasePromise, wsPromise, serverSiteId, name }) { // Update other tabs channelSubscribers.add(async (event: MessageEvent) => { if (watch.some((table) => event.data.tables.includes(table))) { - await q.set(await query(database.db)); + await q.set(await view(database.db)); } }); @@ -110,7 +116,7 @@ export function db({ databasePromise, wsPromise, serverSiteId, name }) { } }); - await q.set(await query(database.db)); + await q.set(await view(database.db)); }); const cmds = Object.fromEntries( @@ -121,7 +127,7 @@ export function db({ databasePromise, wsPromise, serverSiteId, name }) { const db = await databasePromise; const results = await fn(db.db, args); - const [[sinceVersion]] = await db.lastTrackedChangeFor(serverSiteId); + const [[sinceVersion]] = await db.lastTrackedChangeFor(serverSiteId, 1); await pushChangesSince({ database: db, @@ -151,7 +157,7 @@ export function db({ databasePromise, wsPromise, serverSiteId, name }) { }; return { - store, + repo, database: new Promise((r) => databasePromise.then((db) => r(db))) }; } From f742ce997df1819086d42f4036c6d73e5acdd783 Mon Sep 17 00:00:00 2001 From: Sammy Nave Date: Thu, 4 Jan 2024 22:51:22 -0500 Subject: [PATCH 19/29] safe --- src/routes/app/offline-first/server-sync-db.ts | 3 ++- src/routes/app/offline-first/sync-db-store.ts | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/routes/app/offline-first/server-sync-db.ts b/src/routes/app/offline-first/server-sync-db.ts index 4c88b76..0535e5c 100644 --- a/src/routes/app/offline-first/server-sync-db.ts +++ b/src/routes/app/offline-first/server-sync-db.ts @@ -29,7 +29,8 @@ export class Database { } async version() { - const [[version]] = await this.db.exec(`SELECT crsql_db_version();`); + const result = await this.db.exec(`SELECT crsql_db_version();`); + const version = result?.[0]?.[0] ?? 0; return version; } diff --git a/src/routes/app/offline-first/sync-db-store.ts b/src/routes/app/offline-first/sync-db-store.ts index 547673a..b928b08 100644 --- a/src/routes/app/offline-first/sync-db-store.ts +++ b/src/routes/app/offline-first/sync-db-store.ts @@ -48,7 +48,8 @@ function wsMessageHandler({ } if (type === 'connected') { - const [[trackedVersion]] = await database.lastTrackedChangeFor(serverSiteId, 1); + const result = await database.lastTrackedChangeFor(serverSiteId, 1); + const trackedVersion = result?.[0]?.[0] ?? 0; await pushChangesSince({ database, ws: this, @@ -127,8 +128,8 @@ export function db({ databasePromise, wsPromise, serverSiteId, name }) { const db = await databasePromise; const results = await fn(db.db, args); - const [[sinceVersion]] = await db.lastTrackedChangeFor(serverSiteId, 1); - + const result = await db.lastTrackedChangeFor(serverSiteId, 1); + const sinceVersion = result?.[0]?.[0] ?? 0; await pushChangesSince({ database: db, ws, From 181960070a2e72682f33269ecfa717dc994cc95e Mon Sep 17 00:00:00 2001 From: Sammy Nave Date: Thu, 4 Jan 2024 22:53:31 -0500 Subject: [PATCH 20/29] remove --- src/routes/app/offline-first/README.md | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/src/routes/app/offline-first/README.md b/src/routes/app/offline-first/README.md index fab557f..cd6f1e9 100644 --- a/src/routes/app/offline-first/README.md +++ b/src/routes/app/offline-first/README.md @@ -18,29 +18,3 @@ Within App Within Component 1. subscribe to multiple `view`s that are automatically refreshed when that is being `watched` is updated. - - -# Flows - -Init - -1. client connects to server -2. client announces presence -3. server sends all changes to client (since last version) -4. server updates tracked version of client -5. client merges changes -6. client updates tracked version of server -7. client sends all changes to server (since last version) -8. client updates tracked version of server -10. server merges changes -11. server updates tracked version of client - -Client connected and caught up - -1. user makes change -2. client sends all changes to server (since last version) - - -Offline to Online (TO BE IMPLEMENTED) - -1. should be similar if not the same as Init flow From cd041b9f60f3e7e4d073d04efeaaafe2f3686af9 Mon Sep 17 00:00:00 2001 From: Sammy Nave Date: Fri, 5 Jan 2024 09:16:26 -0500 Subject: [PATCH 21/29] small cleanup --- src/routes/app/offline-first/+page.svelte | 14 -- src/routes/app/offline-first/Todos.svelte | 172 ++++++++++-------- src/routes/app/offline-first/sync-db-store.ts | 1 - 3 files changed, 97 insertions(+), 90 deletions(-) diff --git a/src/routes/app/offline-first/+page.svelte b/src/routes/app/offline-first/+page.svelte index 23cc88a..9f37eb2 100644 --- a/src/routes/app/offline-first/+page.svelte +++ b/src/routes/app/offline-first/+page.svelte @@ -26,23 +26,11 @@ -

BUG

-
    -
  1. 1. open three windows, 1 private, 2 public
  2. -
  3. 2. add two todonts and 4 todos
  4. -
  5. 3. stop server
  6. -
  7. 4. check both todonts in private window
  8. -
  9. 5. check two todos in 1 public window and two todos in the other public window
  10. -
  11. 6. start server_id
  12. -
  13. 7. one of the checked todonts should be out of sync (this may take several tries)
  14. -
-

{#if online}Online{:else}Offline{/if}

- - await db.execO('SELECT hex(crsql_site_id()) as site_id, crsql_db_version() as version') - }); - const peers = repo({ - watch: ['todos'], - view: async (db) => { - return await db.execO('SELECT hex(site_id) as site_id, version FROM crsql_tracked_peers'); - } - }); - const count = repo({ - watch: ['todos'], - view: async (db) => { - const [{ count }] = await db.execO('SELECT count(*) as count FROM todos'); - return count; - } - }); const todos = repo({ watch: ['todos'], view: async (db) => { @@ -78,69 +59,110 @@ } } }); + + const me = repo({ + watch: ['todos', 'todonts'], + view: async (db) => + await db.execO('SELECT hex(crsql_site_id()) as site_id, crsql_db_version() as version') + }); + + const peers = repo({ + watch: ['crsql_tracked_peers'], + view: async (db) => + await db.execO('SELECT hex(site_id) as site_id, version, event FROM crsql_tracked_peers') + }); + + const todoCount = repo({ + watch: ['todos'], + view: async (db) => { + const [{ count }] = await db.execO('SELECT count(*) as count FROM todos'); + return count; + } + }); + const todontCount = repo({ + watch: ['todonts'], + view: async (db) => { + const [{ count }] = await db.execO('SELECT count(*) as count FROM todonts'); + return count; + } + }); - await todos.loadEmUp()}> - - - -
await todos.deleteEm()}> - -
-
todos count: {$count}
-{#each $me as m} -
me: {m.site_id} {m.version}
-{/each} -
server: {dbConfig.serverSiteId}
-{#each $peers as peer} -
{peer.site_id} {peer.version}
-{/each} -
+
+
await todos.loadEmUp()}> + + +
+
await todos.deleteEm()}> + +
-

todos

-
{ - await todos.insert(newTodo); - newTodo = ''; - }} - > - - -
- - {#each $todos as todo} -
- todos.toggle(todo.id)} /> - {todo.id} - {todo.content} - +
server id: {dbConfig.serverSiteId}
+
+
+ {#each $me as m} +
+
me: {m.site_id}
+
version: {m.version}
{/each}
- -
-

todonts

-
{ - await todonts.insert(newTodont); - newTodont = ''; - }} - > - - -
- - {#each $todonts as todont} + {#each $peers as peer} +
+
peer: {peer.site_id}
- todonts.toggle(todont.id)} - /> - {todont.id} - {todont.content} - + tracked version: {peer.version} + {#if peer.event === 0}received{:else}sent{/if}
- {/each} +
+ {/each} +
+
+

todos: {$todoCount}

+
{ + await todos.insert(newTodo); + newTodo = ''; + }} + > + + +
+ + {#each $todos as todo} +
+ todos.toggle(todo.id)} /> + {todo.id} + {todo.content} + +
+ {/each} +
+ +
+

todonts {$todontCount}

+
{ + await todonts.insert(newTodont); + newTodont = ''; + }} + > + + +
+ + {#each $todonts as todont} +
+ todonts.toggle(todont.id)} + /> + {todont.id} + {todont.content} + +
+ {/each} +
diff --git a/src/routes/app/offline-first/sync-db-store.ts b/src/routes/app/offline-first/sync-db-store.ts index b928b08..0bef093 100644 --- a/src/routes/app/offline-first/sync-db-store.ts +++ b/src/routes/app/offline-first/sync-db-store.ts @@ -80,7 +80,6 @@ export function db({ databasePromise, wsPromise, serverSiteId, name }) { let channelListenerAdded = false; const channelSubscribers = new Set(); const channel = 'BroadcastChannel' in globalThis ? new globalThis.BroadcastChannel(name) : null; - // TODO - rename `view` to `view` const repo = ({ watch, view, commands = {} }) => { const q = writable([]); databasePromise.then(async (database) => { From de3797936d00ffbf9bc850af416bf59867ff5975 Mon Sep 17 00:00:00 2001 From: Sammy Nave Date: Fri, 5 Jan 2024 09:31:58 -0500 Subject: [PATCH 22/29] comment --- src/routes/app/offline-first/Todos.svelte | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/routes/app/offline-first/Todos.svelte b/src/routes/app/offline-first/Todos.svelte index 4c6d38b..78a9f06 100644 --- a/src/routes/app/offline-first/Todos.svelte +++ b/src/routes/app/offline-first/Todos.svelte @@ -11,6 +11,9 @@ const todos = repo({ watch: ['todos'], view: async (db) => { + // TODO - this gets triggered a lot + // Maybe we can batch updates in the onUpdate trigger + console.log('refreshing todos view'); const todos = await db.execO('SELECT * FROM todos'); return todos; }, @@ -39,7 +42,9 @@ }, deleteEm: async (db) => { - await db.exec('DELETE FROM todos'); + await db.tx(async (tx) => { + await tx.exec('DELETE FROM todos'); + }); } } }); From d13401ef44f771b7a9427813e6d3cc8219c7b8f6 Mon Sep 17 00:00:00 2001 From: Sammy Nave Date: Fri, 5 Jan 2024 10:06:52 -0500 Subject: [PATCH 23/29] superfluous channel message --- src/routes/app/offline-first/sync-db-store.ts | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/src/routes/app/offline-first/sync-db-store.ts b/src/routes/app/offline-first/sync-db-store.ts index 0bef093..b8d66c4 100644 --- a/src/routes/app/offline-first/sync-db-store.ts +++ b/src/routes/app/offline-first/sync-db-store.ts @@ -111,7 +111,7 @@ export function db({ databasePromise, wsPromise, serverSiteId, name }) { database.db.onUpdate(async (type, dbName, tblName, rowid) => { if (watch.includes(tblName)) { // Force other tabs/windows to refresh their views when - // when the db changes in another window. + // // when the db changes in another window. channel?.postMessage({ tables: [tblName], sender: self }); } }); @@ -136,15 +136,6 @@ export function db({ databasePromise, wsPromise, serverSiteId, name }) { serverSiteId }); - /* - TODO: investigate why this is needed. - in some cases the syncing fails - example - server is offline, offline changes made to private browser WindowA, - offline changes made to public WindowA, and offline changes to public WindowB - do not always sync up when the server reconnects. - seems like an off by one issue - */ - channel?.postMessage({ tables: watch, sender: self }); return results; } ]) From e67c5ad69eeed7070dc81b12db7f23017dec3615 Mon Sep 17 00:00:00 2001 From: Sammy Nave Date: Fri, 5 Jan 2024 13:24:21 -0500 Subject: [PATCH 24/29] rename --- src/routes/app/offline-first/Todos.svelte | 2 +- src/routes/app/offline-first/sync-db-store.ts | 31 +++++++++++++++++-- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/src/routes/app/offline-first/Todos.svelte b/src/routes/app/offline-first/Todos.svelte index 78a9f06..c788e42 100644 --- a/src/routes/app/offline-first/Todos.svelte +++ b/src/routes/app/offline-first/Todos.svelte @@ -13,7 +13,7 @@ view: async (db) => { // TODO - this gets triggered a lot // Maybe we can batch updates in the onUpdate trigger - console.log('refreshing todos view'); + // console.log('refreshing todos view'); const todos = await db.execO('SELECT * FROM todos'); return todos; }, diff --git a/src/routes/app/offline-first/sync-db-store.ts b/src/routes/app/offline-first/sync-db-store.ts index b8d66c4..a2094c1 100644 --- a/src/routes/app/offline-first/sync-db-store.ts +++ b/src/routes/app/offline-first/sync-db-store.ts @@ -2,6 +2,19 @@ import { nanoid } from 'nanoid'; import { Database } from './server-sync-db'; import { writable } from 'svelte/store'; +const raf = globalThis.requestAnimationFrame; +function throttle(fn) { + // could be setTimeout or raf id + let id = null; + + return (...args) => { + if (id === null) { + fn(...args); + id = typeof raf === 'undefined' ? setTimeout(() => (id = null), 60) : raf(() => (id = null)); + } + }; +} + function wsErrorHandler(error: Event) { console.error(error); } @@ -80,6 +93,19 @@ export function db({ databasePromise, wsPromise, serverSiteId, name }) { let channelListenerAdded = false; const channelSubscribers = new Set(); const channel = 'BroadcastChannel' in globalThis ? new globalThis.BroadcastChannel(name) : null; + const tablesToRefresh = new Set(); + const updateTabs = throttle(() => { + const tables = []; + for (const table of tablesToRefresh) { + tables.push(table); + tablesToRefresh.delete(table); + } + + if (tables.length) { + channel?.postMessage({ tables, sender: self }); + } + }); + const repo = ({ watch, view, commands = {} }) => { const q = writable([]); databasePromise.then(async (database) => { @@ -110,9 +136,8 @@ export function db({ databasePromise, wsPromise, serverSiteId, name }) { // just re-calculate view database.db.onUpdate(async (type, dbName, tblName, rowid) => { if (watch.includes(tblName)) { - // Force other tabs/windows to refresh their views when - // // when the db changes in another window. - channel?.postMessage({ tables: [tblName], sender: self }); + tablesToRefresh.add(tblName); + updateTabs(); } }); From 2d8a2482db9fd60238a88809bd6949bcd17a9c7a Mon Sep 17 00:00:00 2001 From: Sammy Nave Date: Fri, 5 Jan 2024 14:12:33 -0500 Subject: [PATCH 25/29] input --- src/routes/app/offline-first/Todos.svelte | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/routes/app/offline-first/Todos.svelte b/src/routes/app/offline-first/Todos.svelte index c788e42..19d72f5 100644 --- a/src/routes/app/offline-first/Todos.svelte +++ b/src/routes/app/offline-first/Todos.svelte @@ -1,6 +1,7 @@