Skip to content

Latest commit

 

History

History
178 lines (143 loc) · 4.62 KB

File metadata and controls

178 lines (143 loc) · 4.62 KB

atingester: Bluesky Syncing Tools

This library for syncing data from the bluesky network.

Forked from @atproto/sync

NPM Github CI Status

This tool can subscribes...

  • Firehose is an authenticated stream of events used to efficiently sync user updates (posts, likes, follows, handle changes, etc).
  • Jetstream is a streaming service that consumes Firehose and converts it into lightweight, friendly JSON.
  • Turbostream is a real-time, hydrated repeater service built on top of Jetstream.

Links

Usage

It's basically the same as here.

Make sure to execute await initIngester() exactly once before calling Ingester, Jetstream, or similar classes!

Ingester ( Firehose / Jetstream / Turbostream )

import { Ingester, initIngester } from 'atingester'
import { IdResolver } from '@atproto/identity'

const run = async () => {
  await initIngester()

  const ingester = new Ingester('Firehose', {
    idResolver: new IdResolver(),
    handleEvent: (evt) => {
      if (evt.event === 'create') {
        console.log(evt.record.text)
      }
    },
    onInfo: (info) => {
      console.info(info)
    },
    onError: (err: Error) => {
      console.error(err)
    },
    /*
    getCursor?: () => Awaited<number | undefined>
    runner?: EventRunner // should only set getCursor *or* runner
    */
    service: 'wss://bsky.network',
    subscriptionReconnectDelay: 3000,
    unauthenticatedCommits: false,
    unauthenticatedHandles: false,
    compress: true,
    filterCollections: ['app.bsky.feed.post'],
    filterDids: ['did:plc:abcde....'],
    excludeIdentity: true,
    excludeAccount: true,
    excludeCommit: false,
    excludeSync: true,
  })

  ingester.start()

  await ingester.destroy()
}

run()

Firehose

import { Firehose, initIngester } from 'atingester'
import { IdResolver } from '@atproto/identity'

const run = async () => {
  await initIngester()

  const firehose = new Firehose({
    idResolver: new IdResolver(),
    handleEvent: (evt) => {
      if (evt.event === 'create') {
        console.log(evt.record.text)
      }
    },
    onInfo: (info) => {
      console.info(info)
    },
    onError: (err: Error) => {
      console.error(err)
    },
    service: 'wss://bsky.network',
    filterCollections: ['app.bsky.feed.post'],
    filterDids: ['did:plc:abcde....'],
    excludeIdentity: true,
    excludeAccount: true,
    excludeSync: true,
  })

  firehose.start()

  await firehose.destroy()
}

run()

Jetstream

import { Jetstream, initIngester } from 'atingester'
import { IdResolver } from '@atproto/identity'

const run = async () => {
  await initIngester()

  const jetstream = new Jetstream({
    idResolver: new IdResolver(),
    handleEvent: (evt) => {
      if (evt.event === 'create') {
        console.log(evt.record.text)
      }
    },
    onInfo: (info) => {
      console.info(info)
    },
    onError: (err: Error) => {
      console.error(err)
    },
    service: 'wss://jetstream1.us-east.bsky.network',
    compress: true,
    filterCollections: ['app.bsky.feed.post'],
    filterDids: ['did:plc:abcde....'],
    excludeIdentity: true,
    excludeAccount: true,
  })

  jetstream.start()

  await jetstream.destroy()
}

run()

Turbostream

import { initIngester, Turbostream } from 'atingester'

const run = async () => {
  await initIngester()

  const turbostream = new Turbostream({
    handleEvent: (evt) => {
      if (evt.event === 'create') {
        console.log(evt.record.text)
      }
    },
    onInfo: (info) => {
      console.info(info)
    },
    onError: (err: Error) => {
      console.error(err)
    },
    service: 'wss://api.graze.social',
    filterDids: ['did:plc:abcde....'],
  })

  turbostream.start()

  await turbostream.destroy()
}

run()