diff --git a/decode.go b/decode.go index b67fe04..a252fc4 100644 --- a/decode.go +++ b/decode.go @@ -13,6 +13,8 @@ import ( "github.com/nu7hatch/gouuid" ) +var UnsupportedType = errors.New("KDB-Decoder: type is unsupported") + var typeSize = map[int8]int{ 1: 1, 4: 1, 10: 1, 2: 16, @@ -137,30 +139,59 @@ func Uncompress(b []byte) (dst []byte) { // Decodes data from src in q ipc format. func Decode(src *bufio.Reader) (data *K, msgtype int, e error) { - var header ipcHeader - e = binary.Read(src, binary.LittleEndian, &header) + header, e := DecodeHeader(src) if e != nil { return nil, -1, errors.New("Failed to read message header:" + e.Error()) } + // try to buffer entire message in one go + src.Peek(int(header.MsgSize - 8)) + return DecodeData(src, header) +} + +func DecodeHeader(src *bufio.Reader) (*ipcHeader, error) { + var header ipcHeader + e := binary.Read(src, binary.LittleEndian, &header) + if e != nil { + return nil, e + } if !header.ok() { - return nil, -1, errors.New("header is invalid") + return nil, errors.New("header is invalid") + } + return &header, nil +} + +func DecodeRaw(src *bufio.Reader) ([]byte, *ipcHeader, error) { + headerRaw := make([]byte, 8) + binary.Read(src, binary.LittleEndian, &headerRaw) + header, e := DecodeHeader(bufio.NewReader(bytes.NewReader(headerRaw))) + if e != nil { + return nil, header, errors.New("Failed to read message header:" + e.Error()) } // try to buffer entire message in one go src.Peek(int(header.MsgSize - 8)) + dataRaw := make([]byte, header.MsgSize-8) + _, e = io.ReadFull(src, dataRaw) + if e != nil { + return nil, header, errors.New("Decode:read error - " + e.Error()) + } + return append(headerRaw, dataRaw...), header, nil +} +func DecodeData(src *bufio.Reader, header *ipcHeader) (data *K, msgtype int, e error) { var order = header.getByteOrder() + if header.Compressed == 0x01 { - compressed := make([]byte, header.MsgSize-8) - _, e = io.ReadFull(src, compressed) + buf := make([]byte, header.MsgSize-8) + _, e = io.ReadFull(src, buf) if e != nil { - return nil, int(header.RequestType), errors.New("Decode:readcompressed error - " + e.Error()) + return nil, int(header.RequestType), errors.New("Decode:read error - " + e.Error()) } - var uncompressed = Uncompress(compressed) - var buf = bufio.NewReader(bytes.NewReader(uncompressed[8:])) - data, e = readData(buf, order) - return data, int(header.RequestType), e + var uncompressed = Uncompress(buf) + data, e = readData(bufio.NewReader(bytes.NewReader(uncompressed[8:])), order) + } else { + data, e = readData(src, order) } - data, e = readData(src, order) + return data, int(header.RequestType), e } @@ -190,7 +221,7 @@ func readData(r *bufio.Reader, order binary.ByteOrder) (kobj *K, err error) { binary.Read(r, order, &sh) return &K{msgtype, NONE, sh}, nil - case -KI, -KD, -KU, -KV: + case -KI, -KU, -KV, -KT: var i int32 binary.Read(r, order, &i) return &K{msgtype, NONE, i}, nil @@ -202,7 +233,7 @@ func readData(r *bufio.Reader, order binary.ByteOrder) (kobj *K, err error) { var e float32 binary.Read(r, order, &e) return &K{msgtype, NONE, e}, nil - case -KF, -KZ: + case -KF: var f float64 binary.Read(r, order, &f) return &K{msgtype, NONE, f}, nil @@ -222,6 +253,15 @@ func readData(r *bufio.Reader, order binary.ByteOrder) (kobj *K, err error) { var ts time.Duration binary.Read(r, order, &ts) return &K{msgtype, NONE, qEpoch.Add(ts)}, nil + case -KZ: + var ts float64 + binary.Read(r, order, &ts) + d := time.Duration(86400000*ts) * time.Millisecond + return &K{msgtype, NONE, qEpoch.Add(d)}, nil + case -KD: + var d int32 + binary.Read(r, order, &d) + return &K{msgtype, NONE, qEpoch.Add(time.Duration(d) * 24 * time.Hour)}, nil case -KM: var m Month binary.Read(r, order, &m) @@ -262,6 +302,7 @@ func readData(r *bufio.Reader, order binary.ByteOrder) (kobj *K, err error) { if msgtype == KC { return &K{msgtype, vecattr, string(arr.([]byte))}, nil } + if msgtype == KP { arr := arr.([]time.Duration) var timearr = make([]time.Time, veclen) @@ -270,15 +311,6 @@ func readData(r *bufio.Reader, order binary.ByteOrder) (kobj *K, err error) { } return &K{msgtype, vecattr, timearr}, nil } - if msgtype == KD { - arr := arr.([]int32) - var timearr = make([]time.Time, veclen) - for i := 0; i < int(veclen); i++ { - d := time.Duration(arr[i]) * 24 * time.Hour - timearr[i] = qEpoch.Add(d) - } - return &K{msgtype, vecattr, timearr}, nil - } if msgtype == KZ { arr := arr.([]float64) var timearr = make([]time.Time, veclen) @@ -288,32 +320,42 @@ func readData(r *bufio.Reader, order binary.ByteOrder) (kobj *K, err error) { } return &K{msgtype, vecattr, timearr}, nil } - if msgtype == KU { - arr := arr.([]int32) - var timearr = make([]Minute, veclen) - for i := 0; i < int(veclen); i++ { - d := time.Duration(arr[i]) * time.Minute - timearr[i] = Minute(time.Time{}.Add(d)) - } - return &K{msgtype, vecattr, timearr}, nil - } - if msgtype == KV { - arr := arr.([]int32) - var timearr = make([]Second, veclen) - for i := 0; i < int(veclen); i++ { - d := time.Duration(arr[i]) * time.Second - timearr[i] = Second(time.Time{}.Add(d)) - } - return &K{msgtype, vecattr, timearr}, nil - } - if msgtype == KT { + if msgtype == KD { arr := arr.([]int32) - var timearr = make([]Time, veclen) + var timearr = make([]time.Time, veclen) for i := 0; i < int(veclen); i++ { - timearr[i] = Time(qEpoch.Add(time.Duration(arr[i]) * time.Millisecond)) + d := time.Duration(arr[i]) * 24 * time.Hour + timearr[i] = qEpoch.Add(d) } return &K{msgtype, vecattr, timearr}, nil - } + } /* + // These were removed due to incorrectly handling negative values + if msgtype == KU { + arr := arr.([]int32) + var timearr = make([]Minute, veclen) + for i := 0; i < int(veclen); i++ { + d := time.Duration(arr[i]) * time.Minute + timearr[i] = Minute(time.Time{}.Add(d)) + } + return &K{msgtype, vecattr, timearr}, nil + } + if msgtype == KV { + arr := arr.([]int32) + var timearr = make([]Second, veclen) + for i := 0; i < int(veclen); i++ { + d := time.Duration(arr[i]) * time.Second + timearr[i] = Second(time.Time{}.Add(d)) + } + return &K{msgtype, vecattr, timearr}, nil + } + if msgtype == KT { + arr := arr.([]int32) + var timearr = make([]Time, veclen) + for i := 0; i < int(veclen); i++ { + timearr[i] = Time(qEpoch.Add(time.Duration(arr[i]) * time.Millisecond)) + } + return &K{msgtype, vecattr, timearr}, nil + }*/ return &K{msgtype, vecattr, arr}, nil case K0: var vecattr Attr @@ -420,11 +462,14 @@ func readData(r *bufio.Reader, order binary.ByteOrder) (kobj *K, err error) { } } return &K{msgtype, NONE, res}, nil + //These were not being decoded correctly case KEACH, KOVER, KSCAN, KPRIOR, KEACHRIGHT, KEACHLEFT: - return readData(r, order) + _, _ = readData(r, order) + return nil, UnsupportedType case KDYNLOAD: // 112 - dynamic load - return nil, errors.New("type is unsupported") + _, _ = readData(r, order) + return nil, UnsupportedType case KERR: line, err := r.ReadSlice(0) if err != nil { diff --git a/decode_test.go b/decode_test.go index 28c0bfc..8ea89d6 100644 --- a/decode_test.go +++ b/decode_test.go @@ -73,6 +73,8 @@ func TestCharArray(t *testing.T) { var TimestampAsBytes = []byte{0x01, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0xf4, 0x28, 0xbf, 0xce, 0x27, 0x35, 0xec, 0xe9, 0x07} var TimestampAsTime = time.Date(2018, 1, 26, 1, 49, 0, 884361000, time.UTC) var TimestampAsInt64 = int64(570246540884361000) +var DatetimeAsTime = time.Date(2013, 6, 10, 22, 03, 49, 713000000, time.UTC) +var DateAsTime = time.Date(2013, 6, 10, 0, 0, 0, 00000000, time.UTC) func TestTimestampEpoch(t *testing.T) { d := TimestampAsTime diff --git a/encode.go b/encode.go index 39d0d56..178259c 100644 --- a/encode.go +++ b/encode.go @@ -45,23 +45,22 @@ func writeData(dbuf io.Writer, order binary.ByteOrder, data *K) (err error) { binary.Write(dbuf, order, []byte(tosend[i])) binary.Write(dbuf, order, byte(0)) } - case -KB: - tosend := data.Data.(bool) - binary.Write(dbuf, order, int8(data.Type)) - var val byte - if tosend { - val = 0x01 - } else { - val = 0x00 - } - binary.Write(dbuf, order, val) - case -KI, -KJ, -KE, -KF, -UU: + case -KB, -KG, -KI, -KJ, -KE, -KF, -KU, -KV, -KT, -KM, -KN, -KH, -UU, -KC, + KFUNCUP, KFUNCBP, KFUNCTR: binary.Write(dbuf, order, int8(data.Type)) binary.Write(dbuf, order, data.Data) case -KP: tosend := data.Data.(time.Time) binary.Write(dbuf, order, int8(data.Type)) binary.Write(dbuf, order, tosend.Sub(qEpoch)) + case -KZ: + tosend := data.Data.(time.Time) + binary.Write(dbuf, order, int8(data.Type)) + binary.Write(dbuf, order, -1*(float64(qEpoch.Sub(tosend)/time.Millisecond)/86400000)) + case -KD: + tosend := data.Data.(time.Time) + binary.Write(dbuf, order, int8(data.Type)) + binary.Write(dbuf, order, -1*(int32(qEpoch.Sub(tosend)/time.Hour)/24)) case KP: binary.Write(dbuf, order, int8(data.Type)) binary.Write(dbuf, order, data.Attr) // attributes @@ -70,6 +69,22 @@ func writeData(dbuf io.Writer, order binary.ByteOrder, data *K) (err error) { for _, ts := range tosend { binary.Write(dbuf, order, ts.Sub(qEpoch)) } + case KZ: + binary.Write(dbuf, order, int8(data.Type)) + binary.Write(dbuf, order, data.Attr) // attributes + binary.Write(dbuf, order, int32(reflect.ValueOf(data.Data).Len())) + tosend := data.Data.([]time.Time) + for _, ts := range tosend { + binary.Write(dbuf, order, -1*(float64(qEpoch.Sub(ts)/time.Millisecond)/86400000)) + } + case KD: + binary.Write(dbuf, order, int8(data.Type)) + binary.Write(dbuf, order, data.Attr) // attributes + binary.Write(dbuf, order, int32(reflect.ValueOf(data.Data).Len())) + tosend := data.Data.([]time.Time) + for _, ts := range tosend { + binary.Write(dbuf, order, -1*(int32(qEpoch.Sub(ts)/time.Hour)/24)) + } case KB: binary.Write(dbuf, order, int8(data.Type)) binary.Write(dbuf, order, data.Attr) // attributes @@ -79,7 +94,7 @@ func writeData(dbuf io.Writer, order binary.ByteOrder, data *K) (err error) { for _, b := range tosend { binary.Write(dbuf, order, boolmap[b]) } - case KG, KI, KJ, KE, KF, KZ, KT, KD, KV, KU, KM, KN, UU: + case KG, KI, KJ, KE, KF, KU, KV, KT, KM, KN, KH, UU: binary.Write(dbuf, order, int8(data.Type)) binary.Write(dbuf, order, data.Attr) // attributes binary.Write(dbuf, order, int32(reflect.ValueOf(data.Data).Len())) @@ -108,7 +123,7 @@ func writeData(dbuf io.Writer, order binary.ByteOrder, data *K) (err error) { binary.Write(dbuf, order, int8(data.Type)) binary.Write(dbuf, order, []byte(tosend.Error())) binary.Write(dbuf, order, byte(0)) - case KFUNC: + case KFUNC, KOVER, KSCAN, KEACH, KPRIOR, KEACHRIGHT, KEACHLEFT: tosend := data.Data.(Function) binary.Write(dbuf, order, int8(data.Type)) binary.Write(dbuf, order, []byte(tosend.Namespace)) @@ -117,6 +132,16 @@ func writeData(dbuf io.Writer, order binary.ByteOrder, data *K) (err error) { if err != nil { return err } + case KPROJ, KCOMP: + tosend := data.Data.([]interface{}) + binary.Write(dbuf, order, int8(data.Type)) + binary.Write(dbuf, order, int32(len(tosend))) + for i := 0; i < len(tosend); i++ { + err = writeData(dbuf, order, &K{tosend[i].(*K).Type, NONE, tosend[i].(*K).Data}) + if err != nil { + return err + } + } default: return errors.New("unknown type " + strconv.Itoa(int(data.Type))) } @@ -205,19 +230,28 @@ func Compress(b []byte) (dst []byte) { return dst[:d:d] } -// Encode data to ipc format as msgtype(sync/async/response) to specified writer -func Encode(w io.Writer, msgtype int, data *K) (err error) { +func EncodeRaw(msgtype int, data *K) ([]byte, error) { var order = binary.LittleEndian dbuf := new(bytes.Buffer) - err = writeData(dbuf, order, data) + err := writeData(dbuf, order, data) if err != nil { - return err + return nil, err } msglen := uint32(8 + dbuf.Len()) var header = ipcHeader{1, byte(msgtype), 0, 0, msglen} buf := new(bytes.Buffer) err = binary.Write(buf, order, header) err = binary.Write(buf, order, dbuf.Bytes()) - _, err = w.Write(Compress(buf.Bytes())) + return buf.Bytes(), nil +} + +// Encode data to ipc format as msgtype(sync/async/response) to specified writer +func Encode(w io.Writer, msgtype int, data *K) (err error) { + buf, err := EncodeRaw(msgtype, data) + if err != nil { + return nil + } + _, err = w.Write(Compress(buf)) + //_, err = w.Write(buf.Bytes()) return err } diff --git a/encode_test.go b/encode_test.go index fb7de48..7579911 100644 --- a/encode_test.go +++ b/encode_test.go @@ -34,8 +34,8 @@ var encodingTests = []struct { {"`a`b!enlist each 2 3", NewDict(SymbolV([]string{"a", "b"}), &K{K0, NONE, []*K{{KI, NONE, []int32{2}}, {KI, NONE, []int32{3}}}}), DictWithVectorsBytes}, - {"1#2013.06.10T22:03:49.713", &K{KZ, NONE, []float64{4909.9193253819449}}, DateTimeVecBytes}, - {"1#2013.06.10", &K{KD, NONE, []int32{4909}}, DateVecBytes}, + {"1#2013.06.10T22:03:49.713", &K{KZ, NONE, []time.Time{DatetimeAsTime}}, DateTimeVecBytes}, + {"1#2013.06.10", &K{KD, NONE, []time.Time{DateAsTime}}, DateVecBytes}, {"1#21:53:37.963", &K{KT, NONE, []int32{78817963}}, TimeVecBytes}, {"21:22:01 + 1 2", &K{KV, NONE, []int32{76922, 76923}}, SecondVecBytes}, {"21:22*til 2", &K{KU, NONE, []int32{0, 1282}}, MinuteVecBytes}, diff --git a/kdb.go b/kdb.go index 949ba39..e0973e2 100644 --- a/kdb.go +++ b/kdb.go @@ -6,7 +6,6 @@ import ( "crypto/tls" "errors" "fmt" - "io" "net" "time" ) @@ -37,8 +36,12 @@ func (c *KDBConn) ok() bool { return c.con != nil } -// process clients requests -func HandleClientConnection(conn net.Conn) { +func (c *KDBConn) Conn() net.Conn { + return c.con +} + +// Previous listen handler, kept for posterity +/*func HandleClientConnection(conn net.Conn) { c := conn.(*net.TCPConn) c.SetKeepAlive(true) c.SetNoDelay(true) @@ -64,7 +67,7 @@ func HandleClientConnection(conn net.Conn) { // don't respond i++ } -} +}*/ // Make synchronous call to kdb+ similar to h(func;arg1;arg2;...) func (c *KDBConn) Call(cmd string, args ...*K) (data *K, err error) { diff --git a/server.go b/server.go new file mode 100644 index 0000000..c1a32c4 --- /dev/null +++ b/server.go @@ -0,0 +1,67 @@ +package kdb + +import ( + "bufio" + "net" + "strings" +) + +// Listen and serve client requests +func ListenAndServe(addr string, handler func(*K, KDBConn) error) error { + ln, err := net.Listen("tcp", addr) + if err != nil { + return err + } + defer ln.Close() + for { + conn, err := ln.Accept() + // TODO: Need to handle possible errors explicitly + if err != nil { + continue + } + go serve(conn, handler) + } +} + +// Serve a single client connecton +func serve(conn net.Conn, handler func(*K, KDBConn) error) { + c := conn.(*net.TCPConn) + c.SetKeepAlive(true) + c.SetNoDelay(true) + var cred = make([]byte, 100) + n, err := c.Read(cred) + if err != nil { + conn.Close() + return + } + auth := string(cred[:n-2]) + rbuf := bufio.NewReader(conn) + addr := strings.Split(conn.RemoteAddr().String(), ":") + kdbconn := KDBConn{conn, rbuf, addr[0], addr[1], auth} + + // TODO: Keep mode in kdb conn struct, properly determine mode + c.Write([]byte{3}) + + id := 0 + for { + id++ + data, msgtype, err := Decode(kdbconn.rbuf) + if err == UnsupportedType { + if msgtype == SYNC { + Encode(kdbconn.con, RESPONSE, Error(err)) + continue + } + } else if err != nil { + conn.Close() + return + } + if msgtype == SYNC { + handler(data, kdbconn) + } + } +} + +// Example echo handler +func EchoHandler(data *K, conn KDBConn) error { + return Encode(conn.Conn(), RESPONSE, data) +} diff --git a/struct.go b/struct.go index 4417381..effc012 100644 --- a/struct.go +++ b/struct.go @@ -119,18 +119,34 @@ func Int(x int32) *K { return &K{-KI, NONE, x} } +func IntV(x []int32) *K { + return &K{KI, NONE, x} +} + func Long(x int64) *K { return &K{-KJ, NONE, x} } +func LongV(x []int64) *K { + return &K{KJ, NONE, x} +} + func Real(x float32) *K { return &K{-KE, NONE, x} } +func RealV(x []float32) *K { + return &K{KE, NONE, x} +} + func Float(x float64) *K { return &K{-KF, NONE, x} } +func FloatV(x []float64) *K { + return &K{KF, NONE, x} +} + func Error(x error) *K { return &K{KERR, NONE, x} } @@ -142,6 +158,13 @@ func SymbolV(x []string) *K { return &K{KS, NONE, x} } +func Date(x time.Time) *K { + return &K{-KD, NONE, x} +} +func DateV(x []time.Time) *K { + return &K{KD, NONE, x} +} + func Atom(t int8, x interface{}) *K { return &K{t, NONE, x} }