Skip to content
This repository was archived by the owner on Sep 9, 2025. It is now read-only.

Commit 9af4e5e

Browse files
committed
feat: add concurrency and drop support
1 parent f846cfe commit 9af4e5e

2 files changed

Lines changed: 104 additions & 49 deletions

File tree

README.md

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,19 @@ This script is used to sync data from ClickHouse to Postgres based on a YAML con
55

66
**Key features:**
77

8-
- Replicates data from ClickHouse to Postgres.
9-
- Create and ensure indexes are kept in sync.
10-
- Batch processing to improve performance and memory usage.
11-
- Cursor-based processing for time-based data replication.
8+
- Replicates data from ClickHouse to PostgreSQL.
9+
- Manage primary keys, indexes and destination columns types.
10+
- Time-series data can be synced via cursor to avoid full table scans.
11+
- Batch processing coupled with temporary tables in separate thread and connection.
1212

1313
**About performance:**
1414

1515
Measured from table creation to last upsert, with batch size of 50k rows:
1616

17-
- 800k rows with 5 columns: around 12s, 67k rows/s
18-
- 170k rows with 18 columns: around 6s, 28k rows/s
17+
- 800k rows with 5 columns: around 10s, 80k rows/s
18+
- 170k rows with 18 columns: around 5s, 34k rows/s
1919

20-
> Note:
21-
>
22-
> - There is no concurrency yet, everything is single-threaded.
23-
> - This tool was not designed for high volume of data, this solution might not be the best fit for 10M+ rows. However, this might change in the future!
20+
> Note: This tool might not be the best fit for high volume of data. We tested it only under 10 million rows.
2421
2522
## Configuration
2623

@@ -29,10 +26,11 @@ Configuration is done via a YAML file. See `config.example.yml` for reference.
2926
## Running
3027

3128
```bash
32-
go run . [-only=<table_name>] [-config=<path>]
29+
go run . [-only=<table_name>] [-drop=<table_name>] [-config=<path>]
3330
```
3431

3532
- `-only=<table_name>`: Avoid running all tables and only process the one specified.
33+
- `-drop=<table_name>`: Drop the table after processing and reset cursor, if any.
3634
- `-config=<path>`: Path to the configuration file. Defaults to `config.yml`.
3735

3836
## Docker

main.go

Lines changed: 95 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ import (
66
"fmt"
77
"reflect"
88
"strings"
9+
"sync"
910
"time"
1011

1112
"github.com/ClickHouse/clickhouse-go/v2"
1213
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
14+
"github.com/google/uuid"
1315
"github.com/jackc/pgx/v5"
1416
"github.com/jackc/pgx/v5/pgxpool"
1517
log "github.com/sirupsen/logrus"
@@ -20,6 +22,7 @@ var ctx = context.Background()
2022
func main() {
2123
only := flag.String("only", "", "Only replicate one table by name")
2224
configPath := flag.String("config", "config.yml", "Path to the configuration file")
25+
drop := flag.String("drop", "", "Drop a table by name")
2326
flag.Parse()
2427

2528
var config Config
@@ -57,9 +60,9 @@ func main() {
5760
start := time.Now()
5861

5962
if table.Cursor.Column != "" {
60-
if table.Cursor.LastSync.IsZero() {
63+
if table.Cursor.LastSync.IsZero() || *drop == table.Source {
6164
log.Warn("No last sync date found, resetting cursor")
62-
config.Tables[idx].Cursor.LastSync = time.Time{}
65+
table.Cursor.LastSync = time.Time{}
6366
}
6467

6568
log.WithFields(log.Fields{
@@ -68,6 +71,14 @@ func main() {
6871
}).Info("Resuming from cursor")
6972
}
7073

74+
if *drop != "" && *drop == table.Source {
75+
log.WithField("table", table.Source).Info("Dropping table")
76+
77+
if _, err := db.Exec(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", table.Destination)); err != nil {
78+
log.WithError(err).Errorln("Failed to drop table")
79+
}
80+
}
81+
7182
if err := SynchronizeTable(config, table, conn, db); err != nil {
7283
log.WithError(err).Errorln("Failed to synchronize table")
7384
continue
@@ -79,7 +90,7 @@ func main() {
7990

8091
log.WithFields(log.Fields{
8192
"column": table.Cursor.Column,
82-
"lastSync": table.Cursor.LastSync,
93+
"lastSync": config.Tables[idx].Cursor.LastSync,
8394
}).Info("Updated cursor")
8495
}
8596

@@ -98,54 +109,98 @@ func main() {
98109

99110
// SynchronizeTable synchronizes a table from ClickHouse to Postgres
100111
func SynchronizeTable(config Config, table Table, conn driver.Conn, db *pgxpool.Pool) error {
101-
if err := CreatePostgresTable(table, db, true); err != nil {
112+
if err := CreatePostgresTable(table, db); err != nil {
102113
return err
103114
}
104115

105116
columns := table.GetDestinationColumns()
117+
batches := make(chan [][]interface{})
106118

107-
total, err := Batching(table, conn, config.BatchSize, func(batch [][]interface{}) error {
108-
log.WithField("batch", len(batch)).Info("Inserting batch")
119+
go func() {
120+
defer close(batches)
121+
total, err := Batching(table, conn, config.BatchSize, func(batch [][]interface{}) error {
122+
batches <- batch
123+
return nil
124+
})
109125

110-
_, err := db.CopyFrom(
111-
ctx,
112-
pgx.Identifier{table.Destination + "_tmp"},
113-
columns,
114-
pgx.CopyFromRows(batch),
115-
)
126+
if err != nil {
127+
log.WithError(err).Errorln("Failed to batch")
128+
}
116129

117-
return err
118-
})
119-
if err != nil {
120-
return err
121-
}
130+
log.WithField("total", total).Infoln("Selecting data completed")
131+
}()
122132

123-
log.WithField("total", total).Infoln("Batching completed")
133+
wg := sync.WaitGroup{}
134+
for batch := range batches {
135+
wg.Add(1)
124136

125-
if err := MoveTemporaryTable(table, db); err != nil {
126-
return err
137+
go func(batch [][]interface{}) {
138+
defer wg.Done()
139+
log.WithField("batch", len(batch)).Info("Inserting batch")
140+
141+
conn, err := db.Acquire(ctx)
142+
if err != nil {
143+
log.WithError(err).Errorln("Failed to acquire connection")
144+
return
145+
}
146+
defer conn.Release()
147+
148+
tableName, err := MakeTemporaryTable(table, conn)
149+
if err != nil {
150+
log.WithError(err).Errorln("Failed to make temporary table")
151+
return
152+
}
153+
154+
_, err = conn.CopyFrom(
155+
ctx,
156+
pgx.Identifier{tableName},
157+
columns,
158+
pgx.CopyFromRows(batch),
159+
)
160+
if err != nil {
161+
log.WithError(err).Errorln("Failed to insert batch")
162+
}
163+
164+
if err := MoveTemporaryTable(table, conn, tableName); err != nil {
165+
log.WithError(err).Errorln("Failed to move temporary table")
166+
}
167+
}(batch)
127168
}
128169

129-
log.Infoln("Moved temporary table to main table")
170+
wg.Wait()
171+
172+
log.Infoln("Data inserted")
130173

131174
return nil
132175
}
133176

134177
// MoveTemporaryTable moves the temporary table to the main table
135-
func MoveTemporaryTable(table Table, db *pgxpool.Pool) error {
178+
func MoveTemporaryTable(table Table, conn *pgxpool.Conn, tableName string) error {
136179
updateQuery := []string{}
137180
for _, column := range table.GetDestinationColumns() {
138181
updateQuery = append(updateQuery, fmt.Sprintf("%s = EXCLUDED.%s", column, column))
139182
}
140183

141-
_, err := db.Exec(ctx, fmt.Sprintf(`
184+
log.WithField("source", tableName).Info("Moving temporary table")
185+
_, err := conn.Exec(ctx, fmt.Sprintf(`
142186
INSERT INTO %s
143-
SELECT * FROM %s_tmp
187+
SELECT DISTINCT ON (%s) * FROM %s
144188
ON CONFLICT (%s) DO UPDATE SET
145189
%s;
146-
`, table.Destination, table.Destination, strings.Join(table.GetPrimaryKey(), ", "), strings.Join(updateQuery, ", ")))
190+
`, table.Destination,
191+
strings.Join(table.GetPrimaryKey(), ", "),
192+
tableName,
193+
strings.Join(table.GetPrimaryKey(), ", "),
194+
strings.Join(updateQuery, ", "),
195+
))
196+
197+
if err != nil {
198+
log.WithError(err).Errorln("Failed to move temporary table")
199+
}
147200

148-
return err
201+
log.WithField("table", tableName).Infoln("Moved temporary table")
202+
203+
return nil
149204
}
150205

151206
// GetScannerValues guesses the scanner values from the column types
@@ -174,7 +229,7 @@ func GetScannerValues(columnTypes []driver.ColumnType) []interface{} {
174229
}
175230

176231
// CreatePostgresTable creates a table in Postgres
177-
func CreatePostgresTable(table Table, db *pgxpool.Pool, temporary bool) error {
232+
func CreatePostgresTable(table Table, db *pgxpool.Pool) error {
178233
columns := []string{}
179234

180235
for _, column := range table.Columns {
@@ -216,17 +271,19 @@ func CreatePostgresTable(table Table, db *pgxpool.Pool, temporary bool) error {
216271
}
217272
}
218273

219-
if temporary {
220-
_, err = db.Exec(ctx, fmt.Sprintf(
221-
`CREATE TEMPORARY TABLE %s_tmp (LIKE %s INCLUDING DEFAULTS)`,
222-
table.Destination,
223-
table.Destination,
224-
))
274+
return nil
275+
}
225276

226-
if err != nil {
227-
return err
228-
}
229-
}
277+
// MakeTemporaryTable creates a temporary table
278+
func MakeTemporaryTable(table Table, conn *pgxpool.Conn) (string, error) {
279+
rnd := uuid.New().String()[:8]
280+
tableName := fmt.Sprintf("%s_%s_tmp", table.Destination, rnd)
230281

231-
return nil
282+
_, err := conn.Exec(ctx, fmt.Sprintf(
283+
`CREATE TEMPORARY TABLE %s (LIKE %s INCLUDING DEFAULTS)`,
284+
tableName,
285+
table.Destination,
286+
))
287+
288+
return tableName, err
232289
}

0 commit comments

Comments
 (0)