I've checked this #30, but it seems not working properly.
Consider following code snippet, it basically re-subscribes some symbols to deribit exchange every minute. At the end of minute interval, it will break the combined stream --> expect to terminate underlying websockets to deribit. So, spinning new process will not flood the deribit server.
But, at the next iterations, deribit return 429 error code, that indicates the rate limit error.
@thaaddeus do we anyway to shutdown the stream or at least terminate the running websockets ?
const {
combine,
normalizeDerivativeTickers,
normalizeLiquidations, normalizeOptionsSummary,
normalizeTrades,
streamNormalized
} = require("tardis-dev");
async function* breakTick(end) {
for (;!end.__break__;) {
await new Promise((resolve) => setTimeout(resolve, 60000));
yield {__break__: true}
}
}
async function deribitStream() {
const symbols = [
"BTC-29JUL22-35000-C",
"BTC-29JUL22-70000-C",
"BTC-29JUL22-34000-P",
"BTC-29JUL22-30000-C"
]
const channels = ["trade", "derivative_ticker", "liquidation", "option_summary"]
const end = {__break__: false}
const streams = []
for (const symbol of symbols){
for (const channel of channels){
const options = {
exchange: "deribit",
symbols: [symbol],
withDisconnectMessages: true,
onError: (err) => {
console.log("Error subscribing to channel", {
channel,
exchange: "deribit",
error: err,
})
}
}
let stream;
switch (channel) {
case "trade": {
stream = streamNormalized(options, normalizeTrades)
break
}
case "derivative_ticker": {
stream = streamNormalized(options, normalizeDerivativeTickers)
break
}
case "liquidation": {
// Liquidations happen not frequently, disable feed timeout
options.timeoutIntervalMS = 0
stream = streamNormalized(options, normalizeLiquidations)
break
}
case "option_summary": {
stream = streamNormalized(options, normalizeOptionsSummary)
break
}
}
streams.push(stream)
}
}
streams.push(breakTick(end))
const stream = combine(...streams)
for await (const message of stream){
if (message.__break__){
end.__break__ = true
console.log("end stream")
break
}
console.log(message)
}
}
(async () => {
for (;;){
await deribitStream();
}
})()
I've checked this #30, but it seems not working properly.
Consider following code snippet, it basically re-subscribes some symbols to
deribitexchange every minute. At the end of minute interval, it will break the combined stream --> expect to terminate underlying websockets to deribit. So, spinning new process will not flood thederibitserver.But, at the next iterations,
deribitreturn429error code, that indicates the rate limit error.@thaaddeus do we anyway to shutdown the stream or at least terminate the running websockets ?