diff --git a/lib/query/load_view.go b/lib/query/load_view.go index 5ff92cc..9c543e5 100644 --- a/lib/query/load_view.go +++ b/lib/query/load_view.go @@ -17,7 +17,6 @@ import ( "github.com/mithrandie/csvq/lib/option" "github.com/mithrandie/csvq/lib/parser" "github.com/mithrandie/csvq/lib/value" - "github.com/mithrandie/go-text" "github.com/mithrandie/go-text/csv" "github.com/mithrandie/go-text/fixedlen" @@ -26,8 +25,10 @@ import ( "github.com/mithrandie/go-text/ltsv" ) -const fileLoadingPreparedRecordSetCap = 300 -const fileLoadingBuffer = 300 +const ( + fileLoadingPreparedRecordSetCap = 300 + fileLoadingBuffer = 300 +) const inlineTablePrefix = "@__io__" @@ -49,7 +50,13 @@ func isTableObjectAsURL(tablePath parser.QueryExpression) bool { return strings.HasPrefix(i.Literal, "http://") || strings.HasPrefix(i.Literal, "https://") } -func LoadView(ctx context.Context, scope *ReferenceScope, tables []parser.QueryExpression, forUpdate bool, useInternalId bool) (*View, error) { +func LoadView( + ctx context.Context, + scope *ReferenceScope, + tables []parser.QueryExpression, + forUpdate bool, + useInternalId bool, +) (*View, error) { if tables == nil { var obj parser.QueryExpression if scope.Tx.Session.CanReadStdin { @@ -76,7 +83,13 @@ func LoadView(ctx context.Context, scope *ReferenceScope, tables []parser.QueryE return view, err } -func LoadViewFromTableIdentifier(ctx context.Context, scope *ReferenceScope, table parser.QueryExpression, forUpdate bool, useInternalId bool) (*View, error) { +func LoadViewFromTableIdentifier( + ctx context.Context, + scope *ReferenceScope, + table parser.QueryExpression, + forUpdate bool, + useInternalId bool, +) (*View, error) { tables := []parser.QueryExpression{ parser.Table{Object: table}, } @@ -84,7 +97,13 @@ func LoadViewFromTableIdentifier(ctx context.Context, scope *ReferenceScope, tab return LoadView(ctx, scope, tables, forUpdate, useInternalId) } -func loadView(ctx context.Context, scope *ReferenceScope, tableExpr parser.QueryExpression, forUpdate bool, useInternalId bool) (view *View, err error) { +func loadView( + ctx context.Context, + scope *ReferenceScope, + tableExpr parser.QueryExpression, + forUpdate bool, + useInternalId bool, +) (view *View, err error) { if parentheses, ok := tableExpr.(parser.Parentheses); ok { return loadView(ctx, scope, parentheses.Expr, forUpdate, useInternalId) } @@ -414,7 +433,13 @@ func loadView(ctx context.Context, scope *ReferenceScope, tableExpr parser.Query return view, err } -func joinViews(ctx context.Context, scope *ReferenceScope, view *View, joinView *View, join parser.Join) error { +func joinViews( + ctx context.Context, + scope *ReferenceScope, + view *View, + joinView *View, + join parser.Join, +) error { condition, includeFields, excludeFields, err := ParseJoinCondition(join, view, joinView) if err != nil { return err @@ -573,7 +598,11 @@ func loadObjectFromString( tableName parser.Identifier, options option.ImportOptions, ) (*View, error) { - fileInfo := NewInlineFileInfo(inlineTablePrefix+file.RandomString(12), options, scope.Tx.Flags.ExportOptions) + fileInfo := NewInlineFileInfo( + inlineTablePrefix+file.RandomString(12), + options, + scope.Tx.Flags.ExportOptions, + ) r := strings.NewReader(data) view, err := loadViewFromFile(ctx, scope.Tx.Flags, r, fileInfo, options, tablePath) @@ -624,6 +653,7 @@ func loadHttpObject( options option.ImportOptions, ) (*View, error) { scope.Tx.viewLoadingMutex.Lock() + defer scope.Tx.viewLoadingMutex.Unlock() urlResource, ok := scope.Tx.UrlCache[httpObject.URL] if !ok { @@ -634,7 +664,11 @@ func loadHttpObject( } if 400 <= res.StatusCode { scope.Tx.viewLoadingMutex.Unlock() - return nil, NewHttpRequestError(tablePath, httpObject.URL, fmt.Sprintf("code %d, status %q", res.StatusCode, res.Status)) + return nil, NewHttpRequestError( + tablePath, + httpObject.URL, + fmt.Sprintf("code %d, status %q", res.StatusCode, res.Status), + ) } urlResource, err = NewUrlResource(res) @@ -677,7 +711,12 @@ func loadInlineObjectFromFile( scope.Tx.viewLoadingMutex.Lock() defer scope.Tx.viewLoadingMutex.Unlock() - fileInfo, err := NewFileInfo(tableIdentifier, scope.Tx.Flags.Repository, options, scope.Tx.Flags.ImportOptions.Format) + fileInfo, err := NewFileInfo( + tableIdentifier, + scope.Tx.Flags.Repository, + options, + scope.Tx.Flags.ImportOptions.Format, + ) if err != nil { return } @@ -754,7 +793,9 @@ func loadObjectFromFile( if useInternalId { if view, err = scope.Tx.CachedViews.GetWithInternalId(ctx, strings.ToUpper(filePath), scope.Tx.Flags); err != nil { if err == errTableNotLoaded { - err = NewTableNotLoadedError(parser.Identifier{BaseExpr: fileIdentifier.GetBaseExpr(), Literal: filePath}) + err = NewTableNotLoadedError( + parser.Identifier{BaseExpr: fileIdentifier.GetBaseExpr(), Literal: filePath}, + ) } return } @@ -795,7 +836,8 @@ func loadObject( } if !isInlineObject { - if tableFunction, ok := tablePath.(parser.TableFunction); ok && strings.ToUpper(tableFunction.Name) == "INLINE" { + if tableFunction, ok := tablePath.(parser.TableFunction); ok && + strings.ToUpper(tableFunction.Name) == "INLINE" { isInlineObject = true } } @@ -820,7 +862,9 @@ func loadObject( return loadInlineObjectFromFile(ctx, scope, fileIdentifier, tableName, options) } - if scope.RecursiveTable != nil && strings.EqualFold(fileIdentifier.Literal, scope.RecursiveTable.Name.Literal) && scope.RecursiveTmpView != nil { + if scope.RecursiveTable != nil && + strings.EqualFold(fileIdentifier.Literal, scope.RecursiveTable.Name.Literal) && + scope.RecursiveTmpView != nil { view := scope.RecursiveTmpView.Copy() if !strings.EqualFold(scope.RecursiveTable.Name.Literal, tableName.Literal) { if err := view.Header.Update(tableName.Literal, nil); err != nil { @@ -870,7 +914,15 @@ func loadObject( return view, nil } - return loadObjectFromFile(ctx, scope, fileIdentifier, tableName, forUpdate, useInternalId, options) + return loadObjectFromFile( + ctx, + scope, + fileIdentifier, + tableName, + forUpdate, + useInternalId, + options, + ) } func cacheViewFromFile( @@ -900,7 +952,12 @@ func cacheViewFromFile( return p, v, true, nil } - p, _, e = SearchFilePath(fileIdentifier, scope.Tx.Flags.Repository, options, scope.Tx.Flags.ImportOptions.Format) + p, _, e = SearchFilePath( + fileIdentifier, + scope.Tx.Flags.Repository, + options, + scope.Tx.Flags.ImportOptions.Format, + ) if e != nil { return "", nil, false, e } @@ -930,7 +987,12 @@ func cacheViewFromFile( var fp *os.File if forUpdate { - h, e := scope.Tx.FileContainer.CreateHandlerForUpdate(ctx, fileInfo.Path, scope.Tx.WaitTimeout, scope.Tx.RetryDelay) + h, e := scope.Tx.FileContainer.CreateHandlerForUpdate( + ctx, + fileInfo.Path, + scope.Tx.WaitTimeout, + scope.Tx.RetryDelay, + ) if e != nil { fileIdentifier.Literal = fileInfo.Path err = ConvertFileHandlerError(e, fileIdentifier) @@ -976,7 +1038,14 @@ func cacheViewFromFile( return } -func loadViewFromFile(ctx context.Context, flags *option.Flags, fp io.Reader, fileInfo *FileInfo, options option.ImportOptions, expr parser.QueryExpression) (*View, error) { +func loadViewFromFile( + ctx context.Context, + flags *option.Flags, + fp io.Reader, + fileInfo *FileInfo, + options option.ImportOptions, + expr parser.QueryExpression, +) (*View, error) { fileReader, err := file.NewReader(fp, 2048) if err != nil { return nil, NewIOError(expr, err.Error()) @@ -992,10 +1061,23 @@ func loadViewFromFile(ctx context.Context, flags *option.Flags, fp io.Reader, fi case option.JSONL: return loadViewFromJsonLinesFile(ctx, flags, fileReader, fileInfo, expr) } - return loadViewFromCSVFile(ctx, fileReader, fileInfo, options.AllowUnevenFields, options.WithoutNull, expr) + return loadViewFromCSVFile( + ctx, + fileReader, + fileInfo, + options.AllowUnevenFields, + options.WithoutNull, + expr, + ) } -func loadViewFromFixedLengthTextFile(ctx context.Context, fp *file.Reader, fileInfo *FileInfo, withoutNull bool, expr parser.QueryExpression) (*View, error) { +func loadViewFromFixedLengthTextFile( + ctx context.Context, + fp *file.Reader, + fileInfo *FileInfo, + withoutNull bool, + expr parser.QueryExpression, +) (*View, error) { fileHead, err := fp.HeadBytes() if err != nil { return nil, NewIOError(expr, err.Error()) @@ -1074,7 +1156,14 @@ func loadViewFromFixedLengthTextFile(ctx context.Context, fp *file.Reader, fileI return view, nil } -func loadViewFromCSVFile(ctx context.Context, fp *file.Reader, fileInfo *FileInfo, allowUnevenFields bool, withoutNull bool, expr parser.QueryExpression) (*View, error) { +func loadViewFromCSVFile( + ctx context.Context, + fp *file.Reader, + fileInfo *FileInfo, + allowUnevenFields bool, + withoutNull bool, + expr parser.QueryExpression, +) (*View, error) { if fileInfo.Format == option.TSV { fileInfo.Delimiter = '\t' } @@ -1156,7 +1245,14 @@ func loadViewFromCSVFile(ctx context.Context, fp *file.Reader, fileInfo *FileInf return view, nil } -func loadViewFromLTSVFile(ctx context.Context, flags *option.Flags, fp *file.Reader, fileInfo *FileInfo, withoutNull bool, expr parser.QueryExpression) (*View, error) { +func loadViewFromLTSVFile( + ctx context.Context, + flags *option.Flags, + fp *file.Reader, + fileInfo *FileInfo, + withoutNull bool, + expr parser.QueryExpression, +) (*View, error) { fileHead, err := fp.HeadBytes() if err != nil { return nil, NewIOError(expr, err.Error()) @@ -1213,6 +1309,8 @@ func readRecordSet(ctx context.Context, reader RecordReader, fileSize int64) (Re wg := sync.WaitGroup{} + var mu sync.Mutex + wg.Add(1) go func() { defer func() { @@ -1240,13 +1338,18 @@ func readRecordSet(ctx context.Context, reader RecordReader, fileSize int64) (Re } } - if 0 < fileSize && 0 < pos && len(recordSet) == fileLoadingPreparedRecordSetCap && int64(pos) < fileSize { + mu.Lock() + + if 0 < fileSize && 0 < pos && len(recordSet) == fileLoadingPreparedRecordSetCap && + int64(pos) < fileSize { l := int((float64(fileSize) / float64(pos)) * fileLoadingPreparedRecordSetCap * 1.2) newSet := make(RecordSet, fileLoadingPreparedRecordSetCap, l) copy(newSet, recordSet) recordSet = newSet } + mu.Unlock() + recordSet = append(recordSet, record) } }() @@ -1283,9 +1386,14 @@ func readRecordSet(ctx context.Context, reader RecordReader, fileSize int64) (Re } if 0 < fileSize && i < fileLoadingPreparedRecordSetCap { + + mu.Lock() + for j := range row { pos += len(row[j]) } + + mu.Unlock() } select { @@ -1308,7 +1416,11 @@ func readRecordSet(ctx context.Context, reader RecordReader, fileSize int64) (Re return recordSet, err } -func loadViewFromJsonFile(fp *file.Reader, fileInfo *FileInfo, expr parser.QueryExpression) (*View, error) { +func loadViewFromJsonFile( + fp *file.Reader, + fileInfo *FileInfo, + expr parser.QueryExpression, +) (*View, error) { jsonText, err := io.ReadAll(fp) if err != nil { return nil, NewIOError(expr, err.Error()) @@ -1334,7 +1446,13 @@ func loadViewFromJsonFile(fp *file.Reader, fileInfo *FileInfo, expr parser.Query return view, nil } -func loadViewFromJsonLinesFile(ctx context.Context, flags *option.Flags, fp *file.Reader, fileInfo *FileInfo, expr parser.QueryExpression) (*View, error) { +func loadViewFromJsonLinesFile( + ctx context.Context, + flags *option.Flags, + fp *file.Reader, + fileInfo *FileInfo, + expr parser.QueryExpression, +) (*View, error) { var err error headerList := make([]string, 0, 32) headerMap := make(map[string]bool, 32) @@ -1381,7 +1499,8 @@ func loadViewFromJsonLinesFile(ctx context.Context, flags *option.Flags, fp *fil } } - if 0 < fileSize && 0 < pos && len(objectList) == fileLoadingPreparedRecordSetCap && int64(pos) < fileSize { + if 0 < fileSize && 0 < pos && len(objectList) == fileLoadingPreparedRecordSetCap && + int64(pos) < fileSize { l := int((float64(fileSize) / float64(pos)) * fileLoadingPreparedRecordSetCap * 1.2) newSet := make([]txjson.Object, fileLoadingPreparedRecordSetCap, l) copy(newSet, objectList)