Skip to content
Merged
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
"intl-messageformat": "^10.3.3",
"ip": "^1.1.9",
"ipfs-css": "^1.4.0",
"ipfs-geoip": "^9.2.0",
"ipfs-geoip": "^9.3.0",
"ipfs-provider": "^2.1.0",
"ipld-explorer-components": "^8.1.3",
"is-ipfs": "^8.0.1",
Expand Down
147 changes: 114 additions & 33 deletions src/bundles/peer-locations.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import HLRU from 'hashlru'
import { multiaddr } from '@multiformats/multiaddr'
import ms from 'milliseconds'
import ip from 'ip'
import memoize from 'p-memoize'
import { createContextSelector } from '../helpers/context-bridge'
import pkgJson from '../../package.json'

Expand All @@ -25,7 +24,7 @@ const selectIdentityData = () => {

// After this time interval, we re-check the locations for each peer
// once again through PeerLocationResolver.
const UPDATE_EVERY = ms.seconds(1)
const UPDATE_EVERY = ms.seconds(3)

// We reuse cached geoip lookups as long geoipVersion is the same.
const geoipVersion = dependencies['ipfs-geoip']
Expand All @@ -44,8 +43,68 @@ function createPeersLocations (opts) {
const bundle = createAsyncResourceBundle({
name: 'peerLocations',
actionBaseType: 'PEER_LOCATIONS',
getPromise: ({ store }) => peerLocResolver.findLocations(
store.selectAvailableGatewayUrl(), store.selectPeers()),
getPromise: ({ store }) => {
const peers = store.selectPeers()
if (!peers) {
// Peers bundle hasn't loaded yet. Return empty result but schedule
// a quick retry -- the reactor doesn't depend on selectPeers, so
// without this we'd wait the full staleAfter (3s) before re-fetching.
setTimeout(() => store.doMarkPeerLocationsAsOutdated(), 100)
return Promise.resolve({})
}

const promise = peerLocResolver.findLocations(
store.selectAvailableGatewayUrl(), peers)

// While optimizedPeerSet is still ramping up (pass 0→10, 1→100, 2→200, 3→all),
// chain an immediate re-fetch after the current one resolves instead of
// waiting for staleAfter (3s) between each pass.
if (peerLocResolver.pass <= 3) {
promise.then(() => setTimeout(() => store.doMarkPeerLocationsAsOutdated(), 0))
}

if (!peerLocResolver._completedHandler && peerLocResolver.queue.size > 0) {
let throttleTimer = null
let pendingUpdate = false

const throttledUpdate = () => {
if (throttleTimer) {
pendingUpdate = true
return
}
store.doMarkPeerLocationsAsOutdated()
throttleTimer = setTimeout(() => {
throttleTimer = null
if (pendingUpdate) {
pendingUpdate = false
throttledUpdate()
}
}, 500)
}

peerLocResolver._completedHandler = throttledUpdate
peerLocResolver.queue.on('completed', throttledUpdate)

peerLocResolver.queue.onIdle().then(() => {
peerLocResolver.queue.off('completed', throttledUpdate)
peerLocResolver._completedHandler = null
clearTimeout(throttleTimer)
store.doMarkPeerLocationsAsOutdated()
})
}

// Avoid unnecessary selector recomputation and re-renders: return
// the previous data reference when nothing actually changed. This is
// cheap because memoryCache (HLRU) returns the same value references
// for the same IPs, so a shallow comparison suffices.
return promise.then(newLocations => {
const prev = store.selectPeerLocations()
if (prev && shallowEqualObjects(prev, newLocations)) {
return prev
}
return newLocations
})
},
staleAfter: UPDATE_EVERY,
retryAfter: UPDATE_EVERY,
persist: false,
Expand All @@ -66,9 +125,8 @@ function createPeersLocations (opts) {
bundle.selectPeerLocationsForSwarm = createSelector(
'selectPeers',
'selectPeerLocations',
'selectBootstrapPeers',
selectIdentityData, // ipfs.id info from identity context, used for detecting local peers
(peers, locations = {}, bootstrapPeers, identity) => peers && Promise.all(peers.map(async (peer) => {
(peers, locations = {}, identity) => peers && peers.map((peer) => {
const peerId = peer.peer
const locationObj = locations ? locations[peerId] : null
const location = toLocationString(locationObj)
Expand All @@ -81,7 +139,7 @@ function createPeersLocations (opts) {
const address = peer.addr.toString()
const latency = parseLatency(peer.latency)
const direction = peer.direction
const { isPrivate, isNearby } = await isPrivateAndNearby(peer.addr, identity)
const { isPrivate, isNearby } = isPrivateAndNearby(peer.addr, identity)

const protocols = (Array.isArray(peer.streams)
? Array.from(new Set(peer.streams
Expand All @@ -107,18 +165,17 @@ function createPeersLocations (opts) {
isNearby,
agentVersion
}
}))
})
)

const COORDINATES_RADIUS = 4

bundle.selectPeersCoordinates = createSelector(
'selectPeerLocationsForSwarm',
async (peers) => {
(peers) => {
if (!peers) return []

const fetchedPeers = await peers
return fetchedPeers.reduce((previous, { peerId, coordinates }) => {
return peers.reduce((previous, { peerId, coordinates }) => {
if (!coordinates) return previous

let hasFoundACloseCoordinate = false
Expand Down Expand Up @@ -152,7 +209,15 @@ function createPeersLocations (opts) {
return bundle
}

const isNonHomeIPv4 = t => t[0] === 4 && t[1] !== '127.0.0.1'
const shallowEqualObjects = (a, b) => {
const keysA = Object.keys(a)
const keysB = Object.keys(b)
if (keysA.length !== keysB.length) return false
return keysA.every(key => a[key] === b[key])
}

const isPublicIP = t =>
(t[0] === 4 || t[0] === 41) && !ip.isPrivate(t[1])

const toLocationString = loc => {
if (!loc) return null
Expand All @@ -178,27 +243,32 @@ const parseLatency = (latency) => {
return value
}

const getPublicIP = memoize((identity) => {
let _cachedPublicIP
let _lastIdentityRef

const getPublicIP = (identity) => {
if (!identity) return
if (identity === _lastIdentityRef) return _cachedPublicIP

_lastIdentityRef = identity
_cachedPublicIP = undefined

for (const maddr of identity.addresses) {
try {
const addr = multiaddr(maddr).nodeAddress()

if ((ip.isV4Format(addr.address) || ip.isV6Format(addr.address)) && !ip.isPrivate(addr.address)) {
return addr.address
_cachedPublicIP = addr.address
return _cachedPublicIP
}
} catch (e) {
// TODO: We should provide a way to log these errors when debugging
// if (['development', 'test'].includes(process.env.REACT_APP_ENV)) {
// console.error(e)
// }
// Might fail for non-IP multiaddrs, safe to ignore.
}
}
})
}

const isPrivateAndNearby = async (maddr, identity) => {
const publicIP = await getPublicIP(identity)
const isPrivateAndNearby = (maddr, identity) => {
const publicIP = getPublicIP(identity)
let isPrivate = false
let isNearby = false
let addr
Expand Down Expand Up @@ -246,8 +316,10 @@ class PeerLocationResolver {
})

this.geoipLookupPromises = new Map()
this.memoryCache = HLRU(500)

this.pass = 0
this._completedHandler = null
}

async findLocations (gatewayUrls, peers) {
Expand All @@ -260,37 +332,46 @@ class PeerLocationResolver {
for (const p of this.optimizedPeerSet(peers)) {
const peerId = p.peer

const ipv4Tuple = p.addr.stringTuples().find(isNonHomeIPv4)
if (!ipv4Tuple) {
const ipTuple = p.addr.stringTuples().find(isPublicIP)
if (!ipTuple) {
continue
}

const ipAddr = ipTuple[1]
if (this.failedAddrs.has(ipAddr)) {
continue
}

const ipv4Addr = ipv4Tuple[1]
if (this.failedAddrs.has(ipv4Addr)) {
// check in-memory cache first (avoids IndexedDB reads for known IPs)
const memoryCached = this.memoryCache.get(ipAddr)
if (memoryCached) {
res[peerId] = memoryCached
continue
}

// maybe we have it cached by ipv4 address already, check that.
const location = await this.geoipCache.get(ipv4Addr)
// maybe we have it cached by IP address in IndexedDB
const location = await this.geoipCache.get(ipAddr)
if (location) {
this.memoryCache.set(ipAddr, location)
res[peerId] = location
continue
}

// no ip address cached. are we looking it up already?
if (this.geoipLookupPromises.has(ipv4Addr)) {
if (this.geoipLookupPromises.has(ipAddr)) {
continue
}

this.geoipLookupPromises.set(ipv4Addr, this.queue.add(async () => {
this.geoipLookupPromises.set(ipAddr, this.queue.add(async () => {
try {
const data = await lookup(gatewayUrls, ipv4Addr)
await this.geoipCache.set(ipv4Addr, data)
const data = await lookup(gatewayUrls, ipAddr)
this.memoryCache.set(ipAddr, data)
await this.geoipCache.set(ipAddr, data)
} catch (e) {
// mark this one as failed so we don't retry again
this.failedAddrs.set(ipv4Addr, true)
this.failedAddrs.set(ipAddr, true)
} finally {
this.geoipLookupPromises.delete(ipv4Addr)
this.geoipLookupPromises.delete(ipAddr)
}
}))
}
Expand Down
Loading