Skip to content

Comments

feat(go): add go desrialization support via io streams#3374

Open
ayush00git wants to merge 7 commits intoapache:mainfrom
ayush00git:feat/go-deserialization
Open

feat(go): add go desrialization support via io streams#3374
ayush00git wants to merge 7 commits intoapache:mainfrom
ayush00git:feat/go-deserialization

Conversation

@ayush00git
Copy link
Contributor

@ayush00git ayush00git commented Feb 20, 2026

Why?

To enable stream-based deserialization in Fory's Go library, allowing for direct reading from io.Reader without pre-buffering the entire payload. This improves efficiency for network and file-based transport.

What does this PR do?

1. go/fory/buffer.go

Enhanced ByteBuffer to support io.Reader with an internal sliding window and automatic filling.

  • Added reader io.Reader and minCap int fields.
  • Implemented fill(n int) bool for on-demand data fetching and compaction.
  • Updated all Read* methods (fixed-size, varint, tagged) to fetch data from the reader if not cached.
func (b *ByteBuffer) fill(n int) bool {
    if b.reader == nil { return false }
    // Compaction and stream reading logic
    ...
}

2. go/fory/fory.go

Added the DeserializeFromReader method as the primary public API for stream deserialization.

  • Integrated io package.
  • Implemented DeserializeFromReader to reset the buffer state and initiate deserialization from a stream.
func (f *Fory) DeserializeFromReader(r io.Reader, v any) error {
    defer f.resetReadState()
    f.readCtx.buffer.ResetWithReader(r, 0)
    // Deserialization logic
    ...
}

3. go/fory/reader.go

Ensured ReadContext correctly manages the buffer state when switching between memory-only and stream-backed modes.

  • Updated SetData to reset the reader field.

Related issues

Closes #3302

Does this PR introduce any user-facing change?

  • Does this PR introduce any public API change?
  • Does this PR introduce any binary protocol compatibility change?

Benchmark

N/A

@ayush00git ayush00git changed the title feat(go): add go desrialization support via transport streams feat(go): add go desrialization support via io streams Feb 20, 2026
@ayush00git
Copy link
Contributor Author

Hey @chaokunyang
Have a review and let me know the changes

@Zakir032002
Copy link

hey @ayush00git, looked through this and the main issue i see is in DeserializeFromReader
it calls ResetWithReader at the start of every call:

func (f *Fory) DeserializeFromReader(r io.Reader, v any) error {
    defer f.resetReadState()
    f.readCtx.buffer.ResetWithReader(r, 0) // this wipes the prefetch window every time

so if fill() reads ahead past the first object boundary (which it will), those bytes
are gone on the next call. sequential decode from one stream is broken:

for {
    var msg Msg
    f.DeserializeFromReader(conn, &msg) // bytes after first object get thrown away
}

if you look at how he handles this for c++/python — the Buffer is constructed
from the stream once and passed to each deserialize call directly. the buffer holds
state across calls, it's never reset between objects. the python test
test_stream_deserialize_multiple_objects_from_single_stream shows this exactly —
same reader buffer passed to multiple fory.deserialize() calls.

the go version probably needs something similar — a stream reader type that owns the
buffer and gets reused across deserializations rather than resetting on each call.

Happy to discuss if I'm misreading the flow here

@ayush00git
Copy link
Contributor Author

Hiii @Zakir032002
Thanks for noticing this, exactly this is a bug in the implementation from my side. yes the call would clear any prefetched data from the ByteBuffer making the sequential reads from the stream impossible, also it was clearing the typemetadata as well. thanks for mentioning this, i'll look at the c++ python implementation to correct the deserializer.

@Zakir032002
Copy link

hey @ayush00git , one more thing — ReadBinary and ReadBytes return a direct slice into
b.data:

v := b.data[b.readerIndex : b.readerIndex+length]
return v

the problem is fill() compacts the buffer in-place:

copy(b.data, b.data[b.readerIndex:])

so if someone reads a []byte field and holds onto that slice, then the next
read triggers a fill() — the compaction just overwrote the bytes they're
still holding. no error, no panic, just wrong data.

in stream mode you probably want to copy before returning instead of aliasing:

if b.reader != nil {
    result := make([]byte, length)
    copy(result, b.data[b.readerIndex:b.readerIndex+length])
    b.readerIndex += length
    return result
}

in-memory path stays as is.

@Zakir032002
Copy link

also noticed — ReadVarUint32Small7 only does fill(1) for the first byte, but if that byte has 0x80 set it falls through to continueReadVarUint32 which isn't touched in this PR. so in stream mode, if a multi-byte varint straddles a chunk boundary, the continuation bytes may not be in the buffer yet — you either get a BufferOutOfBoundError or silently read the wrong bytes depending on what's sitting at that position in the buffer.

easiest fix is probably just routing the multi-byte case through readVarUint32Slow since that's already stream-aware after your changes. or adding fill(1) guards inside continueReadVarUint32 directly, either works.

Happy to discuss if I'm misreading the flow here

@ayush00git
Copy link
Contributor Author

Hey @Zakir032002
Sorry i'm a bit busy with my exams, as i get free, i'll review the comments

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Go] Streaming Deserialization Support For Go

2 participants