Skip to content

[question] close real-time stream intentionally  #45

@thangng48

Description

@thangng48

I've checked this #30, but it seems not working properly.

Consider following code snippet, it basically re-subscribes some symbols to deribit exchange every minute. At the end of minute interval, it will break the combined stream --> expect to terminate underlying websockets to deribit. So, spinning new process will not flood the deribit server.
But, at the next iterations, deribit return 429 error code, that indicates the rate limit error.

@thaaddeus do we anyway to shutdown the stream or at least terminate the running websockets ?

const {
  combine,
  normalizeDerivativeTickers,
  normalizeLiquidations, normalizeOptionsSummary,
  normalizeTrades,
  streamNormalized
}  = require("tardis-dev");


async function* breakTick(end) {
  for (;!end.__break__;) {
    await new Promise((resolve) => setTimeout(resolve, 60000));
    yield {__break__: true}
  }
}

async function deribitStream() {
  const symbols = [
    "BTC-29JUL22-35000-C",
    "BTC-29JUL22-70000-C",
    "BTC-29JUL22-34000-P",
    "BTC-29JUL22-30000-C"
  ]

  const channels = ["trade", "derivative_ticker", "liquidation", "option_summary"]
  const end = {__break__: false}
  const streams = []

  for (const symbol of symbols){
    for (const channel of channels){
      const options = {
        exchange: "deribit",
        symbols: [symbol],
        withDisconnectMessages: true,
        onError: (err) => {
          console.log("Error subscribing to channel", {
            channel,
            exchange: "deribit",
            error: err,
          })
        }
      }
      let stream;
      switch (channel) {
        case "trade": {
          stream = streamNormalized(options, normalizeTrades)
          break
        }

        case "derivative_ticker": {
          stream = streamNormalized(options, normalizeDerivativeTickers)
          break
        }

        case "liquidation": {
          // Liquidations happen not frequently, disable feed timeout
          options.timeoutIntervalMS = 0

          stream = streamNormalized(options, normalizeLiquidations)
          break
        }

        case "option_summary": {
          stream = streamNormalized(options, normalizeOptionsSummary)
          break
        }
      }
      streams.push(stream)
    }
  }
  streams.push(breakTick(end))
  const stream = combine(...streams)

  for await (const message of stream){
    if (message.__break__){
      end.__break__ = true
      console.log("end stream")
      break
    }
    console.log(message)
  }
}

(async () => {
  for (;;){
    await deribitStream();
  }
})()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions