-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmulti.go
More file actions
119 lines (109 loc) · 2.36 KB
/
multi.go
File metadata and controls
119 lines (109 loc) · 2.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package io
import (
"context"
"errors"
"fmt"
"io"
"go.uber.org/atomic"
)
type multiReader struct {
ctx context.Context
cancel context.CancelFunc
count *atomic.Int32
chanReader chan *chanReader
readers []io.Reader
}
type chanReader struct {
index int
p []byte
n int
err error
}
func (m *multiReader) Read(p []byte) (n int, err error) {
if m.chanReader == nil {
if len(m.readers) == 1 {
if r, ok := m.readers[0].(*multiReader); ok {
m.readers = r.readers
}
}
m.ctx, m.cancel = context.WithCancel(context.TODO())
m.count = atomic.NewInt32(int32(len(m.readers)))
m.chanReader = make(chan *chanReader, m.count.Load())
for i := 0; i < len(m.readers); i++ {
go func(ctx context.Context, cb chan<- *chanReader, index int, reader io.Reader) {
defer m.count.Dec()
for {
r := &chanReader{
index: index,
p: make([]byte, len(p)),
}
r.n, r.err = reader.Read(r.p)
select {
case <-ctx.Done():
return
case cb <- r:
}
if r.err != nil {
return
}
}
}(m.ctx, m.chanReader, i, m.readers[i])
}
}
for {
if m.count.Load() <= 0 {
if m.cancel != nil {
m.cancel()
m.cancel = nil
}
break
}
select {
case <-m.ctx.Done():
if m.chanReader != nil {
close(m.chanReader)
}
break
case r := <-m.chanReader:
if r.err != nil && r.err != io.EOF {
if m.cancel != nil {
m.cancel()
m.cancel = nil
}
return 0, fmt.Errorf("index(%d) was error:%w", r.index, r.err)
} else if r.err == io.EOF {
continue
}
n := copy(p, r.p[:r.n])
return n, nil
default:
}
}
return 0, io.EOF
}
func (m *multiReader) Close() error {
if m.cancel != nil {
m.cancel()
m.cancel = nil
return nil
}
return errors.New("reader is already closed")
}
// MultiReader returns a Reader that's the logical concatenation of
// the provided input readers. They're read sequentially. Once all
// inputs have returned EOF, Read will return EOF. If any of the readers
// return a non-nil, non-EOF error, Read will return that error.
func MultiReader(readers ...io.Reader) io.Reader {
r := make([]io.Reader, len(readers))
copy(r, readers)
return &multiReader{
readers: r,
}
}
func Close(reader io.Reader) error {
v, b := reader.(*multiReader)
if !b {
return errors.New("not a correct multi reader type")
}
return v.Close()
}