Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .claude/commands/test-sync-roundtrip-postrges-local-rls.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Sync Roundtrip Test with RLS
# Sync Roundtrip Test with local Postgres database and RLS policies

Execute a full roundtrip sync test between multiple local SQLite databases and the local Supabase Docker PostgreSQL instance, verifying that Row Level Security (RLS) policies are correctly enforced during sync.

Expand Down Expand Up @@ -255,7 +255,7 @@ SELECT cloudsync_network_send_changes();

-- Check for changes from server (repeat with 2-3 second delays)
SELECT cloudsync_network_check_changes();
-- Repeat check_changes 3-5 times with delays until it returns 0 or stabilizes
-- Repeat check_changes 3-5 times with delays until it returns more than 0 received rows or stabilizes
```

**Recommended sync order:**
Expand Down
2 changes: 2 additions & 0 deletions plans/TODO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- I need to call cloudsync_update_schema_hash to update the last schema hash when upgrading the library from the 0.8.* version
- Fix cloudsync_begin_alter and cloudsync_commit_alter for PostgreSQL, and we could call them automatically with a trigger on ALTER TABLE
2 changes: 1 addition & 1 deletion src/cloudsync.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
extern "C" {
#endif

#define CLOUDSYNC_VERSION "0.9.113"
#define CLOUDSYNC_VERSION "0.9.115"
#define CLOUDSYNC_MAX_TABLENAME_LEN 512

#define CLOUDSYNC_VALUE_NOTSET -1
Expand Down
235 changes: 104 additions & 131 deletions src/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ SQLITE_EXTENSION_INIT3
struct network_data {
char site_id[UUID_STR_MAXLEN];
char *authentication; // apikey or token
char *org_id; // organization ID for X-CloudSync-Org header
char *check_endpoint;
char *upload_endpoint;
char *apply_endpoint;
Expand Down Expand Up @@ -85,6 +86,10 @@ char *network_data_get_siteid (network_data *data) {
return data->site_id;
}

char *network_data_get_orgid (network_data *data) {
return data->org_id;
}

bool network_data_set_endpoints (network_data *data, char *auth, char *check, char *upload, char *apply, char *status) {
// sanity check
if (!check || !upload) return false;
Expand Down Expand Up @@ -145,8 +150,9 @@ bool network_data_set_endpoints (network_data *data, char *auth, char *check, ch

void network_data_free (network_data *data) {
if (!data) return;

if (data->authentication) cloudsync_memory_free(data->authentication);
if (data->org_id) cloudsync_memory_free(data->org_id);
if (data->check_endpoint) cloudsync_memory_free(data->check_endpoint);
if (data->upload_endpoint) cloudsync_memory_free(data->upload_endpoint);
if (data->apply_endpoint) cloudsync_memory_free(data->apply_endpoint);
Expand Down Expand Up @@ -219,6 +225,14 @@ NETWORK_RESULT network_receive_buffer (network_data *data, const char *endpoint,
headers = tmp;
}

if (data->org_id) {
char org_header[512];
snprintf(org_header, sizeof(org_header), "%s: %s", CLOUDSYNC_HEADER_ORG, data->org_id);
struct curl_slist *tmp = curl_slist_append(headers, org_header);
if (!tmp) {rc = CURLE_OUT_OF_MEMORY; goto cleanup;}
headers = tmp;
}

if (json_payload) {
struct curl_slist *tmp = curl_slist_append(headers, "Content-Type: application/json");
if (!tmp) {rc = CURLE_OUT_OF_MEMORY; goto cleanup;}
Expand Down Expand Up @@ -331,7 +345,15 @@ bool network_send_buffer (network_data *data, const char *endpoint, const char *
if (!tmp) {rc = CURLE_OUT_OF_MEMORY; goto cleanup;}
headers = tmp;
}


if (data->org_id) {
char org_header[512];
snprintf(org_header, sizeof(org_header), "%s: %s", CLOUDSYNC_HEADER_ORG, data->org_id);
struct curl_slist *tmp = curl_slist_append(headers, org_header);
if (!tmp) {rc = CURLE_OUT_OF_MEMORY; goto cleanup;}
headers = tmp;
}

// Set headers if needed (S3 pre-signed URLs usually do not require additional headers)
tmp = curl_slist_append(headers, "Content-Type: application/octet-stream");
if (!tmp) {rc = CURLE_OUT_OF_MEMORY; goto cleanup;}
Expand Down Expand Up @@ -578,144 +600,95 @@ int network_extract_query_param (const char *query, const char *key, char *outpu
return -3; // Key not found
}

#if !defined(CLOUDSYNC_OMIT_CURL) || defined(SQLITE_WASM_EXTRA_INIT)
bool network_compute_endpoints (sqlite3_context *context, network_data *data, const char *conn_string) {
// compute endpoints
// JSON format: {"address":"https://host:port","database":"db.sqlite","projectID":"abc","organizationID":"org","apikey":"KEY"}
bool result = false;

char *scheme = NULL;
char *host = NULL;
char *port = NULL;
char *database = NULL;
char *query = NULL;

size_t conn_len = strlen(conn_string);

char *address = json_extract_string(conn_string, conn_len, "address");
char *database = json_extract_string(conn_string, conn_len, "database");
char *project_id = json_extract_string(conn_string, conn_len, "projectID");
char *org_id = json_extract_string(conn_string, conn_len, "organizationID");
char *apikey = json_extract_string(conn_string, conn_len, "apikey");
char *token = json_extract_string(conn_string, conn_len, "token");

char *authentication = NULL;
char *check_endpoint = NULL;
char *upload_endpoint = NULL;
char *apply_endpoint = NULL;
char *status_endpoint = NULL;

char *conn_string_https = NULL;

#ifndef SQLITE_WASM_EXTRA_INIT
CURLUcode rc = CURLUE_OUT_OF_MEMORY;
CURLU *url = curl_url();
if (!url) goto finalize;
#endif

conn_string_https = cloudsync_string_replace_prefix(conn_string, "sqlitecloud://", "https://");
if (!conn_string_https) goto finalize;

#ifndef SQLITE_WASM_EXTRA_INIT
// set URL: https://UUID.g5.sqlite.cloud:443/chinook.sqlite?apikey=hWDanFolRT9WDK0p54lufNrIyfgLZgtMw6tb6fbPmpo
rc = curl_url_set(url, CURLUPART_URL, conn_string_https, 0);
if (rc != CURLUE_OK) goto finalize;

// https (MANDATORY)
rc = curl_url_get(url, CURLUPART_SCHEME, &scheme, 0);
if (rc != CURLUE_OK) goto finalize;

// UUID.g5.sqlite.cloud (MANDATORY)
rc = curl_url_get(url, CURLUPART_HOST, &host, 0);
if (rc != CURLUE_OK) goto finalize;

// 443 (OPTIONAL)
rc = curl_url_get(url, CURLUPART_PORT, &port, 0);
if (rc != CURLUE_OK && rc != CURLUE_NO_PORT) goto finalize;
char *port_or_default = port && strcmp(port, "8860") != 0 ? port : CLOUDSYNC_DEFAULT_ENDPOINT_PORT;

// /chinook.sqlite (MANDATORY)
rc = curl_url_get(url, CURLUPART_PATH, &database, 0);
if (rc != CURLUE_OK) goto finalize;

// apikey=hWDanFolRT9WDK0p54lufNrIyfgLZgtMw6tb6fbPmpo (OPTIONAL)
rc = curl_url_get(url, CURLUPART_QUERY, &query, 0);
if (rc != CURLUE_OK && rc != CURLUE_NO_QUERY) goto finalize;
#else
// Parse: scheme://host[:port]/path?query
const char *p = strstr(conn_string_https, "://");
if (!p) goto finalize;
scheme = substr(conn_string_https, p);
p += 3;
const char *host_start = p;
const char *host_end = strpbrk(host_start, ":/?");
if (!host_end) goto finalize;
host = substr(host_start, host_end);
p = host_end;
if (*p == ':') {
++p;
const char *port_end = strpbrk(p, "/?");
if (!port_end) goto finalize;
port = substr(p, port_end);
p = port_end;
}
if (*p == '/') {
const char *path_start = p;
const char *path_end = strchr(path_start, '?');
if (!path_end) path_end = path_start + strlen(path_start);
database = substr(path_start, path_end);
p = path_end;
// validate mandatory fields
if (!address || !database || !project_id || !org_id) {
sqlite3_result_error(context, "JSON must contain address, database, projectID, and organizationID", -1);
sqlite3_result_error_code(context, SQLITE_ERROR);
goto finalize;
}
if (*p == '?') {
query = strdup(p);

// parse address: scheme://host[:port]
const char *scheme_end = strstr(address, "://");
if (!scheme_end) {
sqlite3_result_error(context, "address must include scheme (e.g. https://host:port)", -1);
sqlite3_result_error_code(context, SQLITE_ERROR);
goto finalize;
}
if (!scheme || !host || !database) goto finalize;
char *port_or_default = port && strcmp(port, "8860") != 0 ? port : CLOUDSYNC_DEFAULT_ENDPOINT_PORT;
#endif

if (query != NULL) {
char value[CLOUDSYNC_SESSION_TOKEN_MAXSIZE];
if (!authentication && network_extract_query_param(query, "apikey", value, sizeof(value)) == 0) {
authentication = network_authentication_token("apikey", value);
}
if (!authentication && network_extract_query_param(query, "token", value, sizeof(value)) == 0) {
authentication = network_authentication_token("token", value);
}

size_t scheme_len = scheme_end - address;
const char *host_start = scheme_end + 3;
const char *port_sep = strchr(host_start, ':');
const char *host_end = port_sep ? port_sep : host_start + strlen(host_start);
const char *port_str = port_sep ? port_sep + 1 : CLOUDSYNC_DEFAULT_ENDPOINT_PORT;

// build authentication from apikey or token
if (apikey) {
authentication = network_authentication_token("apikey", apikey);
} else if (token) {
authentication = network_authentication_token("token", token);
}

size_t requested = strlen(scheme) + strlen(host) + strlen(port_or_default) + strlen(CLOUDSYNC_ENDPOINT_PREFIX) + strlen(database) + 64;

// build endpoints: {scheme}://{host}:{port}/v2/cloudsync/{projectID}/{database}/{siteId}/{action}
size_t requested = scheme_len + 3 + (host_end - host_start) + 1 + strlen(port_str) + 1
+ strlen(CLOUDSYNC_ENDPOINT_PREFIX) + 1 + strlen(project_id) + 1
+ strlen(database) + 1 + UUID_STR_MAXLEN + 1 + 16;
check_endpoint = (char *)cloudsync_memory_zeroalloc(requested);
upload_endpoint = (char *)cloudsync_memory_zeroalloc(requested);
apply_endpoint = (char *)cloudsync_memory_zeroalloc(requested);
status_endpoint = (char *)cloudsync_memory_zeroalloc(requested);

if ((!upload_endpoint) || (!check_endpoint) || (!apply_endpoint) || (!status_endpoint)) goto finalize;
if (!check_endpoint || !upload_endpoint || !apply_endpoint || !status_endpoint) {
sqlite3_result_error_code(context, SQLITE_NOMEM);
goto finalize;
}

snprintf(check_endpoint, requested, "%s://%s:%s/%s%s/%s/%s", scheme, host, port_or_default, CLOUDSYNC_ENDPOINT_PREFIX, database, data->site_id, CLOUDSYNC_ENDPOINT_CHECK);
snprintf(upload_endpoint, requested, "%s://%s:%s/%s%s/%s/%s", scheme, host, port_or_default, CLOUDSYNC_ENDPOINT_PREFIX, database, data->site_id, CLOUDSYNC_ENDPOINT_UPLOAD);
snprintf(apply_endpoint, requested, "%s://%s:%s/%s%s/%s/%s", scheme, host, port_or_default, CLOUDSYNC_ENDPOINT_PREFIX, database, data->site_id, CLOUDSYNC_ENDPOINT_APPLY);
snprintf(status_endpoint, requested, "%s://%s:%s/%s%s/%s/%s", scheme, host, port_or_default, CLOUDSYNC_ENDPOINT_PREFIX, database, data->site_id, CLOUDSYNC_ENDPOINT_STATUS);
// format: scheme://host:port/v2/cloudsync/projectID/database/siteId/action
snprintf(check_endpoint, requested, "%.*s://%.*s:%s/%s/%s/%s/%s/%s",
(int)scheme_len, address, (int)(host_end - host_start), host_start, port_str,
CLOUDSYNC_ENDPOINT_PREFIX, project_id, database, data->site_id, CLOUDSYNC_ENDPOINT_CHECK);
snprintf(upload_endpoint, requested, "%.*s://%.*s:%s/%s/%s/%s/%s/%s",
(int)scheme_len, address, (int)(host_end - host_start), host_start, port_str,
CLOUDSYNC_ENDPOINT_PREFIX, project_id, database, data->site_id, CLOUDSYNC_ENDPOINT_UPLOAD);
snprintf(apply_endpoint, requested, "%.*s://%.*s:%s/%s/%s/%s/%s/%s",
(int)scheme_len, address, (int)(host_end - host_start), host_start, port_str,
CLOUDSYNC_ENDPOINT_PREFIX, project_id, database, data->site_id, CLOUDSYNC_ENDPOINT_APPLY);
snprintf(status_endpoint, requested, "%.*s://%.*s:%s/%s/%s/%s/%s/%s",
(int)scheme_len, address, (int)(host_end - host_start), host_start, port_str,
CLOUDSYNC_ENDPOINT_PREFIX, project_id, database, data->site_id, CLOUDSYNC_ENDPOINT_STATUS);

result = true;

finalize:
if (result == false) {
// store proper result code/message
#ifndef SQLITE_WASM_EXTRA_INIT
if (rc != CURLUE_OK) sqlite3_result_error(context, curl_url_strerror(rc), -1);
sqlite3_result_error_code(context, (rc != CURLUE_OK) ? SQLITE_ERROR : SQLITE_NOMEM);
#else
sqlite3_result_error(context, "URL parse error", -1);
sqlite3_result_error_code(context, SQLITE_ERROR);
#endif

// cleanup memory managed by the extension
if (authentication) cloudsync_memory_free(authentication);
if (check_endpoint) cloudsync_memory_free(check_endpoint);
if (upload_endpoint) cloudsync_memory_free(upload_endpoint);
if (apply_endpoint) cloudsync_memory_free(apply_endpoint);
if (status_endpoint) cloudsync_memory_free(status_endpoint);
}

if (result) {
if (authentication) {
if (data->authentication) cloudsync_memory_free(data->authentication);
data->authentication = authentication;
}


if (data->org_id) cloudsync_memory_free(data->org_id);
data->org_id = cloudsync_string_dup(org_id);

if (data->check_endpoint) cloudsync_memory_free(data->check_endpoint);
data->check_endpoint = check_endpoint;

if (data->upload_endpoint) cloudsync_memory_free(data->upload_endpoint);
data->upload_endpoint = upload_endpoint;

Expand All @@ -724,22 +697,24 @@ bool network_compute_endpoints (sqlite3_context *context, network_data *data, co

if (data->status_endpoint) cloudsync_memory_free(data->status_endpoint);
data->status_endpoint = status_endpoint;
} else {
if (authentication) cloudsync_memory_free(authentication);
if (check_endpoint) cloudsync_memory_free(check_endpoint);
if (upload_endpoint) cloudsync_memory_free(upload_endpoint);
if (apply_endpoint) cloudsync_memory_free(apply_endpoint);
if (status_endpoint) cloudsync_memory_free(status_endpoint);
}

// cleanup memory
#ifndef SQLITE_WASM_EXTRA_INIT
if (url) curl_url_cleanup(url);
#endif
if (scheme) curl_free(scheme);
if (host) curl_free(host);
if (port) curl_free(port);
if (database) curl_free(database);
if (query) curl_free(query);
if (conn_string_https && conn_string_https != conn_string) cloudsync_memory_free(conn_string_https);


// cleanup JSON-extracted strings
if (address) cloudsync_memory_free(address);
if (database) cloudsync_memory_free(database);
if (project_id) cloudsync_memory_free(project_id);
if (org_id) cloudsync_memory_free(org_id);
if (apikey) cloudsync_memory_free(apikey);
if (token) cloudsync_memory_free(token);

return result;
}
#endif

void network_result_to_sqlite_error (sqlite3_context *context, NETWORK_RESULT res, const char *default_error_message) {
sqlite3_result_error(context, ((res.code == CLOUDSYNC_NETWORK_ERROR) && (res.buffer)) ? res.buffer : default_error_message, -1);
Expand Down Expand Up @@ -778,10 +753,9 @@ void cloudsync_network_init (sqlite3_context *context, int argc, sqlite3_value *
// save site_id string representation: 01957493c6c07e14803727e969f1d2cc
cloudsync_uuid_v7_stringify(site_id, netdata->site_id, false);

// connection string is something like:
// https://UUID.g5.sqlite.cloud:443/chinook.sqlite?apikey=hWDanFolRT9WDK0p54lufNrIyfgLZgtMw6tb6fbPmpo
// or https://UUID.g5.sqlite.cloud:443/chinook.sqlite
// apikey part is optional and can be replaced by a session token once client is authenticated
// connection string is a JSON object:
// {"address":"https://UUID.sqlite.cloud:443","database":"chinook.sqlite","projectID":"abc123","organizationID":"org456","apikey":"KEY"}
// apikey/token are optional and can be set later via cloudsync_network_set_token/cloudsync_network_set_apikey

const char *connection_param = (const char *)sqlite3_value_text(argv[0]);

Expand Down Expand Up @@ -1080,7 +1054,6 @@ int cloudsync_network_check_internal(sqlite3_context *context, int *pnrows, sync
char json_payload[2024];
snprintf(json_payload, sizeof(json_payload), "{\"dbVersion\":%lld, \"seq\":%d}", (long long)db_version, seq);

// http://uuid.g5.sqlite.cloud/v2/cloudsync/{dbname}/{site_id}/check
NETWORK_RESULT result = network_receive_buffer(netdata, netdata->check_endpoint, netdata->authentication, true, true, json_payload, CLOUDSYNC_HEADER_SQLITECLOUD);
int rc = SQLITE_OK;
if (result.code == CLOUDSYNC_NETWORK_BUFFER) {
Expand Down
Loading