-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata.go
More file actions
169 lines (134 loc) · 3.51 KB
/
data.go
File metadata and controls
169 lines (134 loc) · 3.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package qube
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"os"
"strings"
"github.com/valyala/fastjson"
"github.com/winebarrel/qube/util"
)
var (
// End of data
ErrEOD = errors.New("EOD")
)
type DataOptions struct {
DataFiles []string `kong:"short='f',required,help='JSON Lines file list of queries to execute.'"`
Key string `kong:"default='q',help='Key name of the query field in the test data. e.g. {\"q\":\"SELECT ...\"}'"`
Loop bool `kong:"negatable,default='true',help='Return to the beginning after reading the test data. (default: enabled)'"`
Random bool `kong:"negatable,default='false',help='Randomize the starting position of the test data. (default: disabled)'"`
CommitRate uint `kong:"help='Number of queries to execute \"COMMIT\".'"`
}
type Data struct {
*DataOptions
file io.ReadSeekCloser
reader *bufio.Reader
count uint
inTxn bool
}
func NewData(options *Options, agentNum uint64) (*Data, error) {
var (
seekReader io.ReadSeekCloser
fileSize int64
)
{
dataFile := options.DataFiles[agentNum%uint64(len(options.DataFiles))]
file, err := os.OpenFile(dataFile, os.O_RDONLY, 0)
if err != nil {
return nil, fmt.Errorf("failed to open test data - %s (%w)", dataFile, err)
}
fileInfo, err := file.Stat()
if err != nil {
file.Close()
return nil, fmt.Errorf("failed to get test data file info - %s (%w)", dataFile, err)
}
if fileInfo.Size() == 0 {
file.Close()
return nil, fmt.Errorf("test data is empty - %s", dataFile)
}
if strings.HasSuffix(file.Name(), ".zst") {
if zf, err := NewZstdFile(file); err != nil {
file.Close()
return nil, fmt.Errorf("failed to open zstd file - %s (%w)", dataFile, err)
} else {
seekReader = zf
fileSize = zf.Size()
}
} else {
seekReader = file
fileSize = fileInfo.Size()
}
}
if options.Random {
err := util.RandSeek(seekReader, fileSize)
if err != nil {
seekReader.Close()
return nil, fmt.Errorf("failed to seek test data (%w)", err)
}
}
reader := bufio.NewReader(seekReader)
if options.Random {
// If it is random, skip one line
_, err := util.ReadLine(reader)
if err == io.EOF {
_, err = seekReader.Seek(0, io.SeekStart)
if err != nil {
seekReader.Close()
return nil, fmt.Errorf("failed to rewind test data (%w)", err)
}
} else if err != nil {
seekReader.Close()
return nil, fmt.Errorf("failed to read test data (%w)", err)
}
}
data := &Data{
DataOptions: &options.DataOptions,
file: seekReader,
reader: reader,
}
return data, nil
}
func (data *Data) Next() (string, error) {
data.count++
if data.CommitRate > 0 && !data.inTxn {
data.inTxn = true
return "begin", nil
}
if data.CommitRate > 0 && data.count%(data.CommitRate+2) == 0 {
data.inTxn = false
return "commit", nil
}
for {
line, err := util.ReadLine(data.reader)
if err == io.EOF {
if !data.Loop {
return "", ErrEOD
}
_, err = data.file.Seek(0, io.SeekStart)
if err != nil {
return "", fmt.Errorf("failed to rewind test data (%w)", err)
}
data.reader.Reset(data.file)
continue
}
if err != nil {
return "", fmt.Errorf("failed to read test data (%w)", err)
}
if len(line) == 0 {
continue
}
if bytes.HasPrefix(line, []byte("//")) {
continue
}
query := fastjson.GetString(line, data.Key)
if query == "" {
return "", fmt.Errorf(`failed to get query field "%s" from '%s'`, data.Key, line)
}
return query, nil
}
}
func (data *Data) Close() error {
return data.file.Close()
}