Skip to content

Commit dc8f8c3

Browse files
committed
dialers/retrier: m refactor short/long read timeouts
1 parent c849d52 commit dc8f8c3

1 file changed

Lines changed: 16 additions & 27 deletions

File tree

intra/dialers/retrier.go

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ func (r *retrier) retryWriteReadLocked(buf []byte) (int, error) {
426426

427427
// all of buf was written to c
428428
// require a response within a short timeout on r.conn (same as newConn)
429-
r.shorterReadDeadlineForRetryLocked()
429+
newConn.SetReadDeadline(time.Now().Add(r.readTimeoutLocked()))
430430
return newConn.Read(buf)
431431
}
432432

@@ -493,8 +493,8 @@ func (r *retrier) Read(buf []byte) (n int, err error) {
493493
_ = c.SetReadDeadline(r.readDeadline)
494494
_ = c.SetWriteDeadline(r.writeDeadline)
495495
}
496-
logeor(err, note)("retrier: read: %s: #%d + (mult? %d / %d) [%s<=%s]; t: %s; b: %d/%d; err? %v",
497-
r.dialerID(), r.retryCount, len(r.dialers), r.nextDialerIdx, laddr(c), r.raddr, core.FmtPeriod(r.timeout), n, len(buf), err)
496+
logeor(err, note)("retrier: read: %s: #%d + (mult? %d / %d) [%s<=%s]; rshortt: %s / rfullt: %s; b: %d/%d; err? %v",
497+
r.dialerID(), r.retryCount, len(r.dialers), r.nextDialerIdx, laddr(c), r.raddr, core.FmtPeriod(r.timeout), core.FmtTimeAsPeriod(r.readDeadline), n, len(buf), err)
498498
r.tee = nil // discard teed data
499499
return
500500
}
@@ -504,7 +504,7 @@ func (r *retrier) Read(buf []byte) (n int, err error) {
504504
return
505505
}
506506

507-
func (r *retrier) teedFirstWrite(b []byte) (n int, firstWrite, didAttemptWrite bool, src net.Addr, err error) {
507+
func (r *retrier) teedFirstWrite(b []byte) (n int, firstWrite, didAttemptWrite bool, readWait time.Duration, src net.Addr, err error) {
508508
r.mu.Lock()
509509
defer r.mu.Unlock()
510510

@@ -519,33 +519,24 @@ func (r *retrier) teedFirstWrite(b []byte) (n int, firstWrite, didAttemptWrite b
519519
}
520520

521521
src = laddr(c)
522-
if !r.retryCompleted() { // first write
522+
if !r.retryCompleted() { // may be first write
523523
_ = c.SetWriteDeadline(r.writeDeadline)
524524

525525
n, err = c.Write(b)
526526

527527
// capture first write, aka "hello"
528528
r.tee = append(r.tee, b...)
529529
didAttemptWrite = true
530+
readWait = r.readTimeoutLocked()
530531
// all of b was written to r.tee if not to c
531532
// require a response or another write within a short timeout.
532-
r.shorterReadDeadlineForRetryLocked()
533+
c.SetReadDeadline(time.Now().Add(readWait))
533534
}
534535

535536
return
536537
}
537538

538-
func (r *retrier) shorterReadDeadlineForRetryLocked() {
539-
c := r.conn
540-
if r.timeout > 0 {
541-
_ = c.SetReadDeadline(time.Now().Add(r.timeout))
542-
} else {
543-
// if timeout is set to 0, then use client requested deadline
544-
_ = c.SetReadDeadline(r.readDeadline)
545-
}
546-
}
547-
548-
func (r *retrier) readTimeout() time.Duration {
539+
func (r *retrier) readTimeoutLocked() time.Duration {
549540
if r.timeout > 0 {
550541
return r.timeout
551542
}
@@ -563,7 +554,7 @@ func (r *retrier) Write(b []byte) (int, error) {
563554
// empty at steady-state.
564555
if !r.retryCompleted() {
565556
// todo: what if sentAndCopied is false and err != nil?
566-
n, first, sentAndCopied, src, err := r.teedFirstWrite(b)
557+
n, first, sentAndCopied, until, src, err := r.teedFirstWrite(b)
567558

568559
note := log.D
569560
if sentAndCopied {
@@ -587,33 +578,31 @@ func (r *retrier) Write(b []byte) (int, error) {
587578
// by the retry procedure. Block until we have a final socket (which will
588579
// already have replayed r.tee), and retry.
589580
// ie, wait until first write is done on the final socket.
590-
until := r.readTimeout()
591581
maxUntil := max(until, until*maxRetryCount)
592582
if r.multidial {
593583
maxUntil = max(maxUntil, maxUntil*time.Duration(len(r.dialers)))
594584
}
595585
select {
596586
case <-r.retryDoneCh:
597587
case <-time.After(maxUntil): // arb high timeout; it should rarely if ever needed
598-
log.W("retrier: write: %s: 1st write timed-out waiting for %s [calc-rtt: %s] 1st read b/w [%s=>%s], mult: %d, b: %d/%d, err: %v",
588+
rerr := log.EE("retrier: write: %s: 1st write timed-out waiting for %s [calc-rtt: %s] 1st read b/w [%s=>%s], mult: %d, b: %d/%d, err: %v",
599589
r.dialerID(), core.FmtPeriod(maxUntil), core.FmtPeriod(r.timeout), src, r.raddr, len(r.dialers), n, len(b), err)
600-
return n, core.JoinErr(err, errRetryTimeout)
590+
return n, core.JoinErr(err, rerr, errRetryTimeout)
601591
}
602592

603593
r.mu.Lock()
604594
defer r.mu.Unlock()
605595

606-
elapsed := time.Since(start).Milliseconds()
607596
// r.conn may be nil or closed by the time we get here
608597
finalConn := r.conn
609598
noconn := finalConn == nil || core.IsNil(finalConn)
610599
if r.retryWriteErr != nil || noconn { // check if retried writes also failed
611600
if noconn {
612601
err = core.JoinErr(err, errNilConn)
613602
}
614-
log.E("retrier: write: %s: retry failed [%s=>%s] b: %d/%d (tee: %d) in %dms; old => new: %v => %v; noconn? %t",
615-
r.dialerID(), laddr(r.conn), r.raddr, n, len(b), len(r.tee), elapsed, err, r.retryWriteErr, noconn)
616-
return n, core.JoinErr(err, r.retryWriteErr) // pass on the og error, too
603+
werr := log.EE("retrier: write: %s: retry failed [%s=>%s] b: %d/%d (tee: %d) in %s; old => new: %v => %v; noconn? %t",
604+
r.dialerID(), laddr(r.conn), r.raddr, n, len(b), len(r.tee), core.FmtTimeAsPeriod(start), err, r.retryWriteErr, noconn)
605+
return n, core.JoinErr(err, r.retryWriteErr, werr) // pass on the og error, too
617606
}
618607

619608
// if len(leftover) > 0 {
@@ -629,9 +618,9 @@ func (r *retrier) Write(b []byte) (int, error) {
629618

630619
// retryCompleted() is true, so r.conn is final and doesn't need locking
631620
if c := r.conn; c == nil || core.IsNil(c) {
632-
log.E("retrier: write: %s: [] => %s (b: %d, tee: %d), not retrying, but no conn",
621+
cerr := log.EE("retrier: write: %s: [] => %s (b: %d, tee: %d), not retrying, but no conn",
633622
r.dialerID(), r.raddr, len(b), len(r.tee))
634-
return 0, errNilConn
623+
return 0, core.JoinErr(cerr, errNilConn)
635624
} else {
636625
return c.Write(b)
637626
}

0 commit comments

Comments
 (0)