Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 144 additions & 25 deletions lib/query/load_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/mithrandie/csvq/lib/option"
"github.com/mithrandie/csvq/lib/parser"
"github.com/mithrandie/csvq/lib/value"

"github.com/mithrandie/go-text"
"github.com/mithrandie/go-text/csv"
"github.com/mithrandie/go-text/fixedlen"
Expand All @@ -26,8 +25,10 @@ import (
"github.com/mithrandie/go-text/ltsv"
)

const fileLoadingPreparedRecordSetCap = 300
const fileLoadingBuffer = 300
const (
fileLoadingPreparedRecordSetCap = 300
fileLoadingBuffer = 300
)

const inlineTablePrefix = "@__io__"

Expand All @@ -49,7 +50,13 @@ func isTableObjectAsURL(tablePath parser.QueryExpression) bool {
return strings.HasPrefix(i.Literal, "http://") || strings.HasPrefix(i.Literal, "https://")
}

func LoadView(ctx context.Context, scope *ReferenceScope, tables []parser.QueryExpression, forUpdate bool, useInternalId bool) (*View, error) {
func LoadView(
ctx context.Context,
scope *ReferenceScope,
tables []parser.QueryExpression,
forUpdate bool,
useInternalId bool,
) (*View, error) {
if tables == nil {
var obj parser.QueryExpression
if scope.Tx.Session.CanReadStdin {
Expand All @@ -76,15 +83,27 @@ func LoadView(ctx context.Context, scope *ReferenceScope, tables []parser.QueryE
return view, err
}

func LoadViewFromTableIdentifier(ctx context.Context, scope *ReferenceScope, table parser.QueryExpression, forUpdate bool, useInternalId bool) (*View, error) {
func LoadViewFromTableIdentifier(
ctx context.Context,
scope *ReferenceScope,
table parser.QueryExpression,
forUpdate bool,
useInternalId bool,
) (*View, error) {
tables := []parser.QueryExpression{
parser.Table{Object: table},
}

return LoadView(ctx, scope, tables, forUpdate, useInternalId)
}

func loadView(ctx context.Context, scope *ReferenceScope, tableExpr parser.QueryExpression, forUpdate bool, useInternalId bool) (view *View, err error) {
func loadView(
ctx context.Context,
scope *ReferenceScope,
tableExpr parser.QueryExpression,
forUpdate bool,
useInternalId bool,
) (view *View, err error) {
if parentheses, ok := tableExpr.(parser.Parentheses); ok {
return loadView(ctx, scope, parentheses.Expr, forUpdate, useInternalId)
}
Expand Down Expand Up @@ -414,7 +433,13 @@ func loadView(ctx context.Context, scope *ReferenceScope, tableExpr parser.Query
return view, err
}

func joinViews(ctx context.Context, scope *ReferenceScope, view *View, joinView *View, join parser.Join) error {
func joinViews(
ctx context.Context,
scope *ReferenceScope,
view *View,
joinView *View,
join parser.Join,
) error {
condition, includeFields, excludeFields, err := ParseJoinCondition(join, view, joinView)
if err != nil {
return err
Expand Down Expand Up @@ -573,7 +598,11 @@ func loadObjectFromString(
tableName parser.Identifier,
options option.ImportOptions,
) (*View, error) {
fileInfo := NewInlineFileInfo(inlineTablePrefix+file.RandomString(12), options, scope.Tx.Flags.ExportOptions)
fileInfo := NewInlineFileInfo(
inlineTablePrefix+file.RandomString(12),
options,
scope.Tx.Flags.ExportOptions,
)

r := strings.NewReader(data)
view, err := loadViewFromFile(ctx, scope.Tx.Flags, r, fileInfo, options, tablePath)
Expand Down Expand Up @@ -624,6 +653,7 @@ func loadHttpObject(
options option.ImportOptions,
) (*View, error) {
scope.Tx.viewLoadingMutex.Lock()
defer scope.Tx.viewLoadingMutex.Unlock()

urlResource, ok := scope.Tx.UrlCache[httpObject.URL]
if !ok {
Expand All @@ -634,7 +664,11 @@ func loadHttpObject(
}
if 400 <= res.StatusCode {
scope.Tx.viewLoadingMutex.Unlock()
return nil, NewHttpRequestError(tablePath, httpObject.URL, fmt.Sprintf("code %d, status %q", res.StatusCode, res.Status))
return nil, NewHttpRequestError(
tablePath,
httpObject.URL,
fmt.Sprintf("code %d, status %q", res.StatusCode, res.Status),
)
}

urlResource, err = NewUrlResource(res)
Expand Down Expand Up @@ -677,7 +711,12 @@ func loadInlineObjectFromFile(
scope.Tx.viewLoadingMutex.Lock()
defer scope.Tx.viewLoadingMutex.Unlock()

fileInfo, err := NewFileInfo(tableIdentifier, scope.Tx.Flags.Repository, options, scope.Tx.Flags.ImportOptions.Format)
fileInfo, err := NewFileInfo(
tableIdentifier,
scope.Tx.Flags.Repository,
options,
scope.Tx.Flags.ImportOptions.Format,
)
if err != nil {
return
}
Expand Down Expand Up @@ -754,7 +793,9 @@ func loadObjectFromFile(
if useInternalId {
if view, err = scope.Tx.CachedViews.GetWithInternalId(ctx, strings.ToUpper(filePath), scope.Tx.Flags); err != nil {
if err == errTableNotLoaded {
err = NewTableNotLoadedError(parser.Identifier{BaseExpr: fileIdentifier.GetBaseExpr(), Literal: filePath})
err = NewTableNotLoadedError(
parser.Identifier{BaseExpr: fileIdentifier.GetBaseExpr(), Literal: filePath},
)
}
return
}
Expand Down Expand Up @@ -795,7 +836,8 @@ func loadObject(
}

if !isInlineObject {
if tableFunction, ok := tablePath.(parser.TableFunction); ok && strings.ToUpper(tableFunction.Name) == "INLINE" {
if tableFunction, ok := tablePath.(parser.TableFunction); ok &&
strings.ToUpper(tableFunction.Name) == "INLINE" {
isInlineObject = true
}
}
Expand All @@ -820,7 +862,9 @@ func loadObject(
return loadInlineObjectFromFile(ctx, scope, fileIdentifier, tableName, options)
}

if scope.RecursiveTable != nil && strings.EqualFold(fileIdentifier.Literal, scope.RecursiveTable.Name.Literal) && scope.RecursiveTmpView != nil {
if scope.RecursiveTable != nil &&
strings.EqualFold(fileIdentifier.Literal, scope.RecursiveTable.Name.Literal) &&
scope.RecursiveTmpView != nil {
view := scope.RecursiveTmpView.Copy()
if !strings.EqualFold(scope.RecursiveTable.Name.Literal, tableName.Literal) {
if err := view.Header.Update(tableName.Literal, nil); err != nil {
Expand Down Expand Up @@ -870,7 +914,15 @@ func loadObject(
return view, nil
}

return loadObjectFromFile(ctx, scope, fileIdentifier, tableName, forUpdate, useInternalId, options)
return loadObjectFromFile(
ctx,
scope,
fileIdentifier,
tableName,
forUpdate,
useInternalId,
options,
)
}

func cacheViewFromFile(
Expand Down Expand Up @@ -900,7 +952,12 @@ func cacheViewFromFile(
return p, v, true, nil
}

p, _, e = SearchFilePath(fileIdentifier, scope.Tx.Flags.Repository, options, scope.Tx.Flags.ImportOptions.Format)
p, _, e = SearchFilePath(
fileIdentifier,
scope.Tx.Flags.Repository,
options,
scope.Tx.Flags.ImportOptions.Format,
)
if e != nil {
return "", nil, false, e
}
Expand Down Expand Up @@ -930,7 +987,12 @@ func cacheViewFromFile(
var fp *os.File

if forUpdate {
h, e := scope.Tx.FileContainer.CreateHandlerForUpdate(ctx, fileInfo.Path, scope.Tx.WaitTimeout, scope.Tx.RetryDelay)
h, e := scope.Tx.FileContainer.CreateHandlerForUpdate(
ctx,
fileInfo.Path,
scope.Tx.WaitTimeout,
scope.Tx.RetryDelay,
)
if e != nil {
fileIdentifier.Literal = fileInfo.Path
err = ConvertFileHandlerError(e, fileIdentifier)
Expand Down Expand Up @@ -976,7 +1038,14 @@ func cacheViewFromFile(
return
}

func loadViewFromFile(ctx context.Context, flags *option.Flags, fp io.Reader, fileInfo *FileInfo, options option.ImportOptions, expr parser.QueryExpression) (*View, error) {
func loadViewFromFile(
ctx context.Context,
flags *option.Flags,
fp io.Reader,
fileInfo *FileInfo,
options option.ImportOptions,
expr parser.QueryExpression,
) (*View, error) {
fileReader, err := file.NewReader(fp, 2048)
if err != nil {
return nil, NewIOError(expr, err.Error())
Expand All @@ -992,10 +1061,23 @@ func loadViewFromFile(ctx context.Context, flags *option.Flags, fp io.Reader, fi
case option.JSONL:
return loadViewFromJsonLinesFile(ctx, flags, fileReader, fileInfo, expr)
}
return loadViewFromCSVFile(ctx, fileReader, fileInfo, options.AllowUnevenFields, options.WithoutNull, expr)
return loadViewFromCSVFile(
ctx,
fileReader,
fileInfo,
options.AllowUnevenFields,
options.WithoutNull,
expr,
)
}

func loadViewFromFixedLengthTextFile(ctx context.Context, fp *file.Reader, fileInfo *FileInfo, withoutNull bool, expr parser.QueryExpression) (*View, error) {
func loadViewFromFixedLengthTextFile(
ctx context.Context,
fp *file.Reader,
fileInfo *FileInfo,
withoutNull bool,
expr parser.QueryExpression,
) (*View, error) {
fileHead, err := fp.HeadBytes()
if err != nil {
return nil, NewIOError(expr, err.Error())
Expand Down Expand Up @@ -1074,7 +1156,14 @@ func loadViewFromFixedLengthTextFile(ctx context.Context, fp *file.Reader, fileI
return view, nil
}

func loadViewFromCSVFile(ctx context.Context, fp *file.Reader, fileInfo *FileInfo, allowUnevenFields bool, withoutNull bool, expr parser.QueryExpression) (*View, error) {
func loadViewFromCSVFile(
ctx context.Context,
fp *file.Reader,
fileInfo *FileInfo,
allowUnevenFields bool,
withoutNull bool,
expr parser.QueryExpression,
) (*View, error) {
if fileInfo.Format == option.TSV {
fileInfo.Delimiter = '\t'
}
Expand Down Expand Up @@ -1156,7 +1245,14 @@ func loadViewFromCSVFile(ctx context.Context, fp *file.Reader, fileInfo *FileInf
return view, nil
}

func loadViewFromLTSVFile(ctx context.Context, flags *option.Flags, fp *file.Reader, fileInfo *FileInfo, withoutNull bool, expr parser.QueryExpression) (*View, error) {
func loadViewFromLTSVFile(
ctx context.Context,
flags *option.Flags,
fp *file.Reader,
fileInfo *FileInfo,
withoutNull bool,
expr parser.QueryExpression,
) (*View, error) {
fileHead, err := fp.HeadBytes()
if err != nil {
return nil, NewIOError(expr, err.Error())
Expand Down Expand Up @@ -1213,6 +1309,8 @@ func readRecordSet(ctx context.Context, reader RecordReader, fileSize int64) (Re

wg := sync.WaitGroup{}

var mu sync.Mutex

wg.Add(1)
go func() {
defer func() {
Expand Down Expand Up @@ -1240,13 +1338,18 @@ func readRecordSet(ctx context.Context, reader RecordReader, fileSize int64) (Re
}
}

if 0 < fileSize && 0 < pos && len(recordSet) == fileLoadingPreparedRecordSetCap && int64(pos) < fileSize {
mu.Lock()

if 0 < fileSize && 0 < pos && len(recordSet) == fileLoadingPreparedRecordSetCap &&
int64(pos) < fileSize {
l := int((float64(fileSize) / float64(pos)) * fileLoadingPreparedRecordSetCap * 1.2)
newSet := make(RecordSet, fileLoadingPreparedRecordSetCap, l)
copy(newSet, recordSet)
recordSet = newSet
}

mu.Unlock()

recordSet = append(recordSet, record)
}
}()
Expand Down Expand Up @@ -1283,9 +1386,14 @@ func readRecordSet(ctx context.Context, reader RecordReader, fileSize int64) (Re
}

if 0 < fileSize && i < fileLoadingPreparedRecordSetCap {

mu.Lock()

for j := range row {
pos += len(row[j])
}

mu.Unlock()
}

select {
Expand All @@ -1308,7 +1416,11 @@ func readRecordSet(ctx context.Context, reader RecordReader, fileSize int64) (Re
return recordSet, err
}

func loadViewFromJsonFile(fp *file.Reader, fileInfo *FileInfo, expr parser.QueryExpression) (*View, error) {
func loadViewFromJsonFile(
fp *file.Reader,
fileInfo *FileInfo,
expr parser.QueryExpression,
) (*View, error) {
jsonText, err := io.ReadAll(fp)
if err != nil {
return nil, NewIOError(expr, err.Error())
Expand All @@ -1334,7 +1446,13 @@ func loadViewFromJsonFile(fp *file.Reader, fileInfo *FileInfo, expr parser.Query
return view, nil
}

func loadViewFromJsonLinesFile(ctx context.Context, flags *option.Flags, fp *file.Reader, fileInfo *FileInfo, expr parser.QueryExpression) (*View, error) {
func loadViewFromJsonLinesFile(
ctx context.Context,
flags *option.Flags,
fp *file.Reader,
fileInfo *FileInfo,
expr parser.QueryExpression,
) (*View, error) {
var err error
headerList := make([]string, 0, 32)
headerMap := make(map[string]bool, 32)
Expand Down Expand Up @@ -1381,7 +1499,8 @@ func loadViewFromJsonLinesFile(ctx context.Context, flags *option.Flags, fp *fil
}
}

if 0 < fileSize && 0 < pos && len(objectList) == fileLoadingPreparedRecordSetCap && int64(pos) < fileSize {
if 0 < fileSize && 0 < pos && len(objectList) == fileLoadingPreparedRecordSetCap &&
int64(pos) < fileSize {
l := int((float64(fileSize) / float64(pos)) * fileLoadingPreparedRecordSetCap * 1.2)
newSet := make([]txjson.Object, fileLoadingPreparedRecordSetCap, l)
copy(newSet, objectList)
Expand Down