Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion db/phys_rep.c
Original file line number Diff line number Diff line change
Expand Up @@ -2044,7 +2044,8 @@ static int stop_physrep_watcher_thread() {
return 0;
}

static int is_a_physrep_source_or_dest() {
int is_a_physrep_source_or_dest()
{
if (gbl_physrep_i_am_metadb == 1) { // Is not a physical replication metadb
return 0;
}
Expand Down
1 change: 1 addition & 0 deletions db/phys_rep.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,6 @@ int physrep_add_alternate_metadb(char *dbname, char *host);
void physrep_alt_metadb_print(void);
void physrep_metadb_cached_connections(void);
int physrep_allowed_source(const char *dbname, const char *hostname);
int is_a_physrep_source_or_dest(void);

#endif /* PHYS_REP_H */
9 changes: 9 additions & 0 deletions docs/pages/programming/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,15 @@ system table.

The ```REPLACE TABLE``` statement can be used to replace a table with another table.

The source table is specified using [foreign table](#foreign-tables) syntax. For a database running
on the same machine, use ```LOCAL_database.tablename```:

```sql
REPLACE TABLE mytable WITH LOCAL_sourcedb.sourcetable;
```

For remote databases, use ```database.tablename``` (requires [comdb2db](clients.html#comdb2db) configuration).

Below is a list of requirements that must be satisfied to use this statement:
- The source and destination databases have the same datastripe and blobstripe settings.
- There is no network firewall preventing the destination database from communicating
Expand Down
6 changes: 6 additions & 0 deletions sqlite/src/comdb2build.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <net_types.h>
#include <views.h>
#include <logmsg.h>
#include "phys_rep.h"
#include <str0.h>
#include <zlib.h>
#include <shard_range.h>
Expand Down Expand Up @@ -1782,6 +1783,11 @@ void comdb2Replace(Parse* pParse, Token *nm, Token *nm2, Token *nm3)
setError(pParse, SQLITE_MISUSE, "bulk import is not enabled");
return;
}

if (is_a_physrep_source_or_dest()) {
setError(pParse, SQLITE_MISUSE, "bulk import into a physical replicant is not allowed");
return;
}

if (!gbl_sc_protobuf) {
setError(pParse, SQLITE_MISUSE,
Expand Down
162 changes: 162 additions & 0 deletions tests/phys_rep_tiered.test/bulkimport.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#!/bin/bash

bash -n "$0" || exit 1
source ${TESTSROOTDIR}/tools/runit_common.sh
set -o pipefail

physrc=$1
physrc_host=$2
physrep=$3
physrep_host=$4

tmpdb_name="tmp${physrep}"
tmpdb_dir="${TESTDIR}/${tmpdb_name}"
echo "physrc: $physrc@$physrc_host, physrep: $physrep@$physrep_host, tmpdb: $tmpdb_name"

function cp_cdb2cfg() {
local dir=$1
echo "comdb2_config:default_type=local" > "$dir/comdb2db.cfg"
echo "comdb2_config:ssl_cert_path=$TESTDIR" >> "$dir/comdb2db.cfg"
echo "comdb2_config:allow_pmux_route:true" >> "$dir/comdb2db.cfg"
if [[ -n "$PMUXPORT" ]]; then
echo "comdb2_config:portmuxport=$PMUXPORT" >> "$dir/comdb2db.cfg"
fi
}

function check_rc() {
local cmd="$1"
eval $cmd
if [[ $? -ne 0 ]]; then
echo "failed running $cmd"
cleanup
exit 1
fi
}

function cleanup() {
local name="$tmpdb_name" dir="$tmpdb_dir"
if [[ -f "$dir/${name}.pid" ]]; then
kill -9 $(cat "$dir/${name}.pid") 2>/dev/null || true
fi
}

function create_db() {
local name=$1 dir=$2
mkdir -p "$dir"
check_rc "$COMDB2_EXE --create $name -dir $dir"
if [[ -n "$PMUXPORT" ]]; then
echo "portmux_port $PMUXPORT" >> "$dir/${name}.lrl"
echo "portmux_bind_path $pmux_socket" >> "$dir/${name}.lrl"
echo "logmsg level debug >> $dir/${name}.lrl"
echo "do semver 8.1.0" >> "$dir/${name}.lrl"
echo "enable_bulk_import 1" >> "$dir/${name}.lrl"
fi
$COMDB2_EXE $name --lrl $dir/${name}.lrl --pidfile $dir/${name}.pid \
>"$TESTDIR/logs/${name}.log" 2>&1 &
cp_cdb2cfg "$dir"
sleep 5
check_rc "${CDB2SQL_EXE} --cdb2cfg $dir/comdb2db.cfg $name --host localhost 'select 1'"
check_rc "${CDB2SQL_EXE} --cdb2cfg $dir/comdb2db.cfg $name --host localhost \
-f ${TESTDIR}/${TESTCASE}.test/1-create-table.src.sql"
check_rc "${CDB2SQL_EXE} --cdb2cfg $dir/comdb2db.cfg $name --host localhost \
\"insert into t1 (id) values (1)\""
}

function test_bulkimport_into_physrep() {
# Given: physrep is a physrep replicant (read-only)
check_rc "${CDB2SQL_EXE} ${CDB2_OPTIONS} $physrep --host $physrep_host \
\"exec procedure sys.cmd.send('enable_bulk_import 1')\""

local tbl="t1"
local output
output=$(${CDB2SQL_EXE} ${CDB2_OPTIONS} $physrep --host $physrep_host \
"replace table $tbl with LOCAL_$tmpdb_name.$tbl" 2>&1)
local rc=$?

if [[ $rc -eq 0 ]] || ! echo "$output" | grep -q "physical replicant"; then
echo "Fail, rc=$rc, output: $output"
cleanup
exit 1
fi

echo "SUCCESS: bulkimport into physrep rejected: $output"
}

function test_bulkimport_into_physrep_src() {

local tbl="t1"
local output
output=$(${CDB2SQL_EXE} ${CDB2_OPTIONS} $physrc --host $physrc_host \
"replace table $tbl with LOCAL_$tmpdb_name.$tbl" 2>&1)
local rc=$?

if [[ $rc -eq 0 ]] || ! echo "$output" | grep -q "physical replicant"; then
echo "Fail, rc=$rc, output: $output"
cleanup
exit 1
fi

echo "SUCCESS: bulkimport into physrep src rejected: $output"
}

function test_bulkimport_from_physrep_and_physrep_src() {
local tbl="t1"

check_rc "${CDB2SQL_EXE} ${CDB2_OPTIONS} $physrc --host $physrc_host \
\"drop table if exists $tbl\""
check_rc "${CDB2SQL_EXE} ${CDB2_OPTIONS} $physrc --host $physrc_host \
-f ${TESTDIR}/${TESTCASE}.test/1-create-table.src.sql"
check_rc "${CDB2SQL_EXE} ${CDB2_OPTIONS} $physrc --host $physrc_host \
\"insert into $tbl (id) values (100), (200), (300)\""

sleep 5

check_rc "${CDB2SQL_EXE} --cdb2cfg $tmpdb_dir/comdb2db.cfg $tmpdb_name --host localhost \
\"delete from $tbl where 1\""

# Import from physrep source
output_src=$(${CDB2SQL_EXE} --cdb2cfg $tmpdb_dir/comdb2db.cfg $tmpdb_name --host localhost \
"replace table $tbl with LOCAL_$physrc.$tbl" 2>&1)
local rc_src=$?
echo "$output_src"

sleep 5

# Import from physrep
output_rep=$(${CDB2SQL_EXE} --cdb2cfg $tmpdb_dir/comdb2db.cfg $tmpdb_name --host localhost \
"replace table $tbl with LOCAL_$physrep.$tbl" 2>&1)
local rc_rep=$?

if [[ $rc_rep -ne 0 ]]; then
echo "FAIL: bulkimport from physrep failed with rc_src=$rc_src, rc_rep=$rc_rep"
echo "Output src: $output_src, rep: $output_rep"
cleanup
exit 1
fi

local cnt
cnt=$(${CDB2SQL_EXE} --tabs --cdb2cfg $tmpdb_dir/comdb2db.cfg $tmpdb_name \
--host localhost "select count(*) from $tbl")
if [[ $cnt -eq 0 ]]; then
echo "FAIL: no records found in $tmpdb_name.$tbl after import"
cleanup
exit 1
fi

echo "SUCCESS: bulk import from physrep succeeded, imported $cnt records"
}

function main() {
create_db "$tmpdb_name" "$tmpdb_dir"

tests=$(compgen -A function | grep -oh "test_\w*")
for testcase in $tests; do
echo "Running test case: $testcase"
$testcase
done

echo "All bulkimport tests passed"
cleanup
}

main
4 changes: 3 additions & 1 deletion tests/phys_rep_tiered.test/lrl.options
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
logmsg level debug
dtastripe 1
debug_drop_nth_rep_message 10000
incoherent_slow_inactive_timeout 0
# test requires fast revconn than
Expand All @@ -25,3 +24,6 @@ wait_for_seqnum_trace 1
online_recovery 0
set_seqnum_trace 1
setattr WAIT_FOR_SEQNUM_TRACE 1

do semver 8.1.0
enable_bulk_import 1
16 changes: 15 additions & 1 deletion tests/phys_rep_tiered.test/runit
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env bash
bash -n "$0" | exit 1

set -x
[[ $DEBUG -eq 1 ]] && set -x

source ${TESTSROOTDIR}/tools/runit_common.sh
source ${TESTSROOTDIR}/tools/cluster_utils.sh
Expand Down Expand Up @@ -2979,6 +2979,15 @@ function testcase_preamble
announce "Running $testcase"
}

function test_bulkimport
{
echo "== Running bulkimport tests against physrep source and replicant =="
./bulkimport.sh $SOURCE_DBNAME $SOURCE_HOST $REPL_CLUS_DBNAME $REPL_CLUS_HOST
if [[ $? -ne 0 ]]; then
cleanFailExit "bulkimport tests failed"
fi
}

function run_tests
{
testcase="verify_generation"
Expand Down Expand Up @@ -3168,6 +3177,11 @@ function run_tests
testcase_preamble $testcase
physrep_no_cached_metadb_connections_test
testcase_finish $testcase

testcase="bulkimport"
testcase_preamble $testcase
test_bulkimport
testcase_finish $testcase
}

function run_one_test
Expand Down
Loading