From 537a585255c0b10d1bfe0e9b0705abc5f322d9a0 Mon Sep 17 00:00:00 2001 From: Logan Rasmussen Date: Wed, 1 Apr 2026 12:07:58 -0600 Subject: [PATCH 1/7] feat(members): implement WebSocket message subscriptions --- README.md | 2 + src/ConnectWidget.tsx | 16 +- src/__tests__/ConnectWidget-test.tsx | 43 ++++++ src/context/WebSocketContext.tsx | 19 +++ src/hooks/__tests__/usePollMember-test.tsx | 144 +++++++++++++++--- src/hooks/usePollMember.tsx | 6 +- .../experimentalFeaturesSlice-test.ts | 28 ++++ .../reducers/experimentalFeaturesSlice.ts | 3 + .../transport/MemberUpdateTransport.ts | 48 +++++- .../__tests__/MemberUpdateTransport-test.ts | 121 ++++++++++++++- typings/connectProps.d.ts | 3 + 11 files changed, 398 insertions(+), 35 deletions(-) create mode 100644 src/__tests__/ConnectWidget-test.tsx create mode 100644 src/context/WebSocketContext.tsx create mode 100644 src/redux/reducers/__tests__/experimentalFeaturesSlice-test.ts diff --git a/README.md b/README.md index dbd64f87cd..028f18fcab 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,8 @@ const App = () => { | `profiles` | [`ProfilesTypes`](./typings/connectProps.d.ts) | The connect widget uses the profiles to set the initial state of the widget. [More details](./docs/PROFILES.md) | See more details | | `userFeatures` | [`UserFeaturesType`](./typings/connectProps.d.ts) | The connect widget uses user features to determine the behavior of the widget. [More details](./docs/USER_FEATURES.md) | See more details | | `showTooSmallDialog` | `boolean` | The connect widget can show a warning when the widget size is below the supported 320px. | `true` | +| `webSocketConnection` | `object` | An object containing `isConnected()` function and `webSocketMessages$` observable for real-time updates. | `null` | +| `experimentalFeatures` | `object` | An object to enable or disable experimental features like `useWebSockets: true`. | `null` | ## ApiProvider diff --git a/src/ConnectWidget.tsx b/src/ConnectWidget.tsx index 389a40e163..97acf4bf03 100644 --- a/src/ConnectWidget.tsx +++ b/src/ConnectWidget.tsx @@ -9,6 +9,7 @@ import { initGettextLocaleData } from 'src/utilities/Personalization' import { ConnectedTokenProvider } from 'src/ConnectedTokenProvider' import { TooSmallDialog } from 'src/components/app/TooSmallDialog' import { setLocalizedContent } from 'src/redux/reducers/localizedContentSlice' +import { WebSocketProvider } from 'src/context/WebSocketContext' import './sharedVariables.css' interface PostMessageContextType { @@ -27,6 +28,7 @@ export const ConnectWidget = ({ onAnalyticPageview = () => {}, postMessageEventOverrides, showTooSmallDialog = true, + webSocketConnection, ...props }: any) => { initGettextLocaleData(props.language) @@ -38,12 +40,14 @@ export const ConnectWidget = ({ return ( - - - {showTooSmallDialog && } - - - + + + + {showTooSmallDialog && } + + + + ) diff --git a/src/__tests__/ConnectWidget-test.tsx b/src/__tests__/ConnectWidget-test.tsx new file mode 100644 index 0000000000..b93c0d2a33 --- /dev/null +++ b/src/__tests__/ConnectWidget-test.tsx @@ -0,0 +1,43 @@ +import React from 'react' +import { render } from '@testing-library/react' +import { describe, it, expect, vi } from 'vitest' +import { of } from 'rxjs' + +import { ConnectWidget } from '../ConnectWidget' +import { useWebSocket } from 'src/context/WebSocketContext' + +// Mock Connect component to verify context value +const MockConnect = () => { + const ws = useWebSocket() + return
{ws ? 'has-ws' : 'no-ws'}
+} + +vi.mock('src/Connect', () => ({ + default: MockConnect, +})) + +describe('ConnectWidget', () => { + const defaultProps = { + clientConfig: {}, + profiles: {}, + userFeatures: {}, + language: { locale: 'en', localizedContent: {} }, + } + + it('provides webSocketConnection to children when passed as a prop', () => { + const mockWS = { + isConnected: vi.fn().mockReturnValue(true), + webSocketMessages$: of({}), + } + + const { getByTestId } = render() + + expect(getByTestId('mock-connect')).toHaveTextContent('has-ws') + }) + + it('does not provide webSocketConnection when not passed', () => { + const { getByTestId } = render() + + expect(getByTestId('mock-connect')).toHaveTextContent('no-ws') + }) +}) diff --git a/src/context/WebSocketContext.tsx b/src/context/WebSocketContext.tsx new file mode 100644 index 0000000000..04f24ec9f3 --- /dev/null +++ b/src/context/WebSocketContext.tsx @@ -0,0 +1,19 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import React, { createContext, useContext } from 'react' +import { Observable } from 'rxjs' + +export interface WebSocketConnection { + isConnected: () => boolean + webSocketMessages$: Observable +} + +const WebSocketContext = createContext(undefined) + +export const WebSocketProvider: React.FC<{ + value?: WebSocketConnection + children: React.ReactNode +}> = ({ value, children }) => ( + {children} +) + +export const useWebSocket = () => useContext(WebSocketContext) diff --git a/src/hooks/__tests__/usePollMember-test.tsx b/src/hooks/__tests__/usePollMember-test.tsx index 41869a1fda..bd153cac85 100644 --- a/src/hooks/__tests__/usePollMember-test.tsx +++ b/src/hooks/__tests__/usePollMember-test.tsx @@ -1,21 +1,29 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ import React from 'react' import { renderHook, waitFor } from '@testing-library/react' import { vi } from 'vitest' import { usePollMember, PollingState } from 'src/hooks/usePollMember' import { ApiProvider, ApiContextTypes } from 'src/context/ApiContext' +import { WebSocketProvider, WebSocketConnection } from 'src/context/WebSocketContext' import { Provider } from 'react-redux' import { createReduxStore, RootState } from 'src/redux/Store' import { member, JOB_DATA } from 'src/services/mockedData' import { ReadableStatuses } from 'src/const/Statuses' import { CONNECTING_MESSAGES } from 'src/utilities/pollers' import { take } from 'rxjs/operators' +import { Subject } from 'rxjs' -const createWrapper = (apiValue: Partial, preloadedState?: Partial) => { +const createWrapper = ( + apiValue: Partial, + preloadedState?: Partial, + webSocketValue?: WebSocketConnection, +) => { const store = createReduxStore(preloadedState) const Wrapper = ({ children }: { children: React.ReactNode }) => ( - {/* eslint-disable-next-line @typescript-eslint/no-explicit-any */} - {children} + + {children} + ) Wrapper.displayName = 'TestWrapper' @@ -27,6 +35,10 @@ describe('usePollMember', () => { document.documentElement.setAttribute('lang', 'en') }) + afterEach(() => { + vi.restoreAllMocks() + }) + it('should return a pollMember function', () => { const apiValue = { loadMemberByGuid: vi.fn().mockResolvedValue(member.member), @@ -303,9 +315,12 @@ describe('usePollMember', () => { }) it('should increment pollingCount on each poll', async () => { + const member1 = { ...member.member, guid: 'MBR-1', most_recent_job_guid: 'JOB-1' } + const member2 = { ...member.member, guid: 'MBR-2', most_recent_job_guid: 'JOB-2' } + const apiValue = { - loadMemberByGuid: vi.fn().mockResolvedValue(member.member), - loadJob: vi.fn().mockResolvedValue(JOB_DATA), + loadMemberByGuid: vi.fn().mockResolvedValueOnce(member1).mockResolvedValue(member2), + loadJob: vi.fn().mockImplementation((guid) => Promise.resolve({ ...JOB_DATA, guid })), } const preloadedState = { @@ -446,15 +461,23 @@ describe('usePollMember', () => { async_account_data_ready: true, } - const memberWithJob = { + const member1 = { ...member.member, + guid: 'MBR-1', + most_recent_job_guid: 'JOB-1', is_being_aggregated: false, connection_status: ReadableStatuses.CONNECTED, } + const member2 = { ...member1, guid: 'MBR-2', most_recent_job_guid: 'JOB-2' } + const member3 = { ...member1, guid: 'MBR-3', most_recent_job_guid: 'JOB-3' } const apiValue = { - loadMemberByGuid: vi.fn().mockResolvedValue(memberWithJob), - loadJob: vi.fn().mockResolvedValue(jobWithAsyncData), + loadMemberByGuid: vi + .fn() + .mockResolvedValueOnce(member1) + .mockResolvedValueOnce(member2) + .mockResolvedValue(member3), + loadJob: vi.fn().mockImplementation((guid) => Promise.resolve({ ...jobWithAsyncData, guid })), } const preloadedState = { @@ -622,12 +645,12 @@ describe('usePollMember', () => { }, 10000) it('should correctly update previousResponse and currentResponse over multiple polls', async () => { - const member1 = { ...member.member, guid: 'MBR-1' } - const member2 = { ...member.member, guid: 'MBR-2' } + const member1 = { ...member.member, guid: 'MBR-1', most_recent_job_guid: 'JOB-1' } + const member2 = { ...member.member, guid: 'MBR-2', most_recent_job_guid: 'JOB-2' } const apiValue = { loadMemberByGuid: vi.fn().mockResolvedValueOnce(member1).mockResolvedValue(member2), - loadJob: vi.fn().mockResolvedValue(JOB_DATA), + loadJob: vi.fn().mockImplementation((guid) => Promise.resolve({ ...JOB_DATA, guid })), } const preloadedState = { @@ -658,25 +681,34 @@ describe('usePollMember', () => { // First poll expect(states[0].previousResponse).toEqual({}) - expect(states[0].currentResponse).toEqual({ member: member1, job: JOB_DATA }) + expect(states[0].currentResponse).toEqual({ + member: member1, + job: { ...JOB_DATA, guid: 'JOB-1' }, + }) // Second poll - expect(states[1].previousResponse).toEqual({ member: member1, job: JOB_DATA }) - expect(states[1].currentResponse).toEqual({ member: member2, job: JOB_DATA }) + expect(states[1].previousResponse).toEqual({ + member: member1, + job: { ...JOB_DATA, guid: 'JOB-1' }, + }) + expect(states[1].currentResponse).toEqual({ + member: member2, + job: { ...JOB_DATA, guid: 'JOB-2' }, + }) subscription.unsubscribe() }, 10000) it('should preserve previousResponse and currentResponse when an intermediate poll fails', async () => { - const member1 = { ...member.member, guid: 'MBR-1' } + const member1 = { ...member.member, guid: 'MBR-1', most_recent_job_guid: 'JOB-1' } const apiValue = { loadMemberByGuid: vi .fn() .mockResolvedValueOnce(member1) .mockRejectedValueOnce(new Error('Intermediate Error')) - .mockResolvedValue(member1), - loadJob: vi.fn().mockResolvedValue(JOB_DATA), + .mockResolvedValue({ ...member1, guid: 'MBR-1-new', most_recent_job_guid: 'JOB-1-new' }), + loadJob: vi.fn().mockImplementation((guid) => Promise.resolve({ ...JOB_DATA, guid })), } const preloadedState = { @@ -707,18 +739,88 @@ describe('usePollMember', () => { // First poll: Success expect(states[0].isError).toBe(false) - expect(states[0].currentResponse).toEqual({ member: member1, job: JOB_DATA }) + expect(states[0].currentResponse).toEqual({ + member: member1, + job: { ...JOB_DATA, guid: 'JOB-1' }, + }) // Second poll: Error expect(states[1].isError).toBe(true) expect(states[1].previousResponse).toEqual({}) // Should be preserved from acc - expect(states[1].currentResponse).toEqual({ member: member1, job: JOB_DATA }) // Should be preserved from acc + expect(states[1].currentResponse).toEqual({ + member: member1, + job: { ...JOB_DATA, guid: 'JOB-1' }, + }) // Should be preserved from acc // Third poll: Success again expect(states[2].isError).toBe(false) - expect(states[2].previousResponse).toEqual({ member: member1, job: JOB_DATA }) // acc.currentResponse was preserved - expect(states[2].currentResponse).toEqual({ member: member1, job: JOB_DATA }) + expect(states[2].previousResponse).toEqual({ + member: member1, + job: { ...JOB_DATA, guid: 'JOB-1' }, + }) // acc.currentResponse was preserved + expect(states[2].currentResponse).toEqual({ + member: { ...member1, guid: 'MBR-1-new', most_recent_job_guid: 'JOB-1-new' }, + job: { ...JOB_DATA, guid: 'JOB-1-new' }, + }) subscription.unsubscribe() }, 10000) + + it('should receive updates from WebSockets when enabled', async () => { + const wsMessages$ = new Subject() + const mockWS = { + isConnected: vi.fn().mockReturnValue(true), + webSocketMessages$: wsMessages$.asObservable(), + } + + const apiValue = { + loadMemberByGuid: vi.fn().mockResolvedValue(member.member), + loadJob: vi.fn().mockResolvedValue(JOB_DATA), + } + + const preloadedState = { + experimentalFeatures: { + useWebSockets: true, + memberPollingMilliseconds: 10000, // Long interval to avoid poll interference + }, + } + + const { result } = renderHook(() => usePollMember(), { + wrapper: createWrapper(apiValue, preloadedState, mockWS), + }) + + const pollMember = result.current + const states: PollingState[] = [] + + const subscription = pollMember('MBR-123').subscribe((state: PollingState) => { + states.push(state) + }) + + // Emit from WebSocket + const wsMember = { guid: 'MBR-123', connection_status: 1 } + wsMessages$.next({ topic: 'members/updated', data: wsMember }) + + await waitFor( + () => { + expect(states.length).toBeGreaterThan(0) + }, + { timeout: 4000 }, + ) + + expect(states[0].currentResponse?.member).toEqual(wsMember) + + // Emit priority data ready + wsMessages$.next({ topic: 'members/priority_data_ready', data: wsMember }) + + await waitFor( + () => { + expect(states.length).toBeGreaterThan(1) + }, + { timeout: 4000 }, + ) + + expect(states[1].initialDataReady).toBe(true) + + subscription.unsubscribe() + }) }) diff --git a/src/hooks/usePollMember.tsx b/src/hooks/usePollMember.tsx index 9c571569da..d23516aaaa 100644 --- a/src/hooks/usePollMember.tsx +++ b/src/hooks/usePollMember.tsx @@ -1,6 +1,7 @@ import { useMemo } from 'react' import { DEFAULT_POLLING_STATE, handlePollingResponse } from 'src/utilities/pollers' import { useApi } from 'src/context/ApiContext' +import { useWebSocket } from 'src/context/WebSocketContext' import { useSelector } from 'react-redux' import { getExperimentalFeatures } from 'src/redux/reducers/experimentalFeaturesSlice' @@ -22,12 +23,13 @@ export interface PollingState { export function usePollMember() { const { api } = useApi() + const webSocket = useWebSocket() const clientLocale = useMemo(() => { return document.querySelector('html')?.getAttribute('lang') || 'en' }, [document.querySelector('html')?.getAttribute('lang')]) - const { optOutOfEarlyUserRelease, memberPollingMilliseconds } = + const { optOutOfEarlyUserRelease, memberPollingMilliseconds, useWebSockets } = useSelector(getExperimentalFeatures) const pollingInterval = memberPollingMilliseconds || 3000 @@ -46,7 +48,9 @@ export function usePollMember() { { pollingInterval, clientLocale, + useWebSockets, }, + webSocket, ) return updateStream$.pipe( diff --git a/src/redux/reducers/__tests__/experimentalFeaturesSlice-test.ts b/src/redux/reducers/__tests__/experimentalFeaturesSlice-test.ts new file mode 100644 index 0000000000..a5eb020f39 --- /dev/null +++ b/src/redux/reducers/__tests__/experimentalFeaturesSlice-test.ts @@ -0,0 +1,28 @@ +import { describe, it, expect } from 'vitest' +import reducer, { initialState, loadExperimentalFeatures } from '../experimentalFeaturesSlice' + +describe('experimentalFeaturesSlice', () => { + it('should return the initial state', () => { + expect(reducer(undefined, { type: 'unknown' })).toEqual(initialState) + }) + + it('should handle loadExperimentalFeatures', () => { + const payload = { + unavailableInstitutions: [{ guid: '123', name: 'Test' }], + optOutOfEarlyUserRelease: true, + memberPollingMilliseconds: 5000, + useWebSockets: true, + } + const nextState = reducer(initialState, loadExperimentalFeatures(payload)) + expect(nextState.unavailableInstitutions).toEqual(payload.unavailableInstitutions) + expect(nextState.optOutOfEarlyUserRelease).toBe(true) + expect(nextState.memberPollingMilliseconds).toBe(5000) + expect(nextState.useWebSockets).toBe(true) + }) + + it('should default useWebSockets to false if not provided', () => { + const payload = {} + const nextState = reducer(initialState, loadExperimentalFeatures(payload)) + expect(nextState.useWebSockets).toBe(false) + }) +}) diff --git a/src/redux/reducers/experimentalFeaturesSlice.ts b/src/redux/reducers/experimentalFeaturesSlice.ts index 5b957ff583..a1c9a063e4 100644 --- a/src/redux/reducers/experimentalFeaturesSlice.ts +++ b/src/redux/reducers/experimentalFeaturesSlice.ts @@ -5,12 +5,14 @@ type ExperimentalFeaturesSlice = { optOutOfEarlyUserRelease?: boolean unavailableInstitutions?: { guid: string; name: string }[] memberPollingMilliseconds?: number + useWebSockets?: boolean } export const initialState: ExperimentalFeaturesSlice = { optOutOfEarlyUserRelease: false, unavailableInstitutions: [], memberPollingMilliseconds: undefined, + useWebSockets: false, } const experimentalFeaturesSlice = createSlice({ @@ -21,6 +23,7 @@ const experimentalFeaturesSlice = createSlice({ state.unavailableInstitutions = action.payload?.unavailableInstitutions || [] state.optOutOfEarlyUserRelease = action.payload?.optOutOfEarlyUserRelease || false state.memberPollingMilliseconds = action.payload?.memberPollingMilliseconds || undefined + state.useWebSockets = action.payload?.useWebSockets || false }, }, }) diff --git a/src/utilities/transport/MemberUpdateTransport.ts b/src/utilities/transport/MemberUpdateTransport.ts index e81d7fc63b..bcda71a09e 100644 --- a/src/utilities/transport/MemberUpdateTransport.ts +++ b/src/utilities/transport/MemberUpdateTransport.ts @@ -1,6 +1,8 @@ -import { Observable, defer, interval, of } from 'rxjs' -import { catchError, map, mergeMap, exhaustMap } from 'rxjs/operators' +import { Observable, defer, interval, of, merge } from 'rxjs' +import { catchError, map, mergeMap, exhaustMap, filter, distinctUntilChanged } from 'rxjs/operators' +import _isEqual from 'lodash/isEqual' import type { ApiContextTypes } from 'src/context/ApiContext' +import { WebSocketConnection } from 'src/context/WebSocketContext' type MemberUpdateApi = Required> @@ -12,17 +14,20 @@ export interface MemberUpdate { export interface MemberUpdateTransportOptions { pollingInterval?: number clientLocale?: string + useWebSockets?: boolean } export function createMemberUpdateTransport( api: MemberUpdateApi, memberGuid: string, options: MemberUpdateTransportOptions = {}, + webSocket?: WebSocketConnection, ): Observable { const pollingInterval = options.pollingInterval || 3000 const clientLocale = options.clientLocale || 'en' + const useWebSockets = options.useWebSockets || false - return interval(pollingInterval).pipe( + const polling$ = interval(pollingInterval).pipe( exhaustMap(() => defer(() => api.loadMemberByGuid(memberGuid, clientLocale)).pipe( mergeMap((member: MemberResponseType) => @@ -34,4 +39,41 @@ export function createMemberUpdateTransport( ), ), ) + + let transport$: Observable = polling$ + + if (useWebSockets && webSocket?.webSocketMessages$) { + const socket$ = webSocket.webSocketMessages$.pipe( + filter( + (msg) => + (msg.topic === 'members/updated' || msg.topic === 'members/priority_data_ready') && + msg.data?.guid === memberGuid, + ), + map((msg) => ({ + member: msg.data, + job: + msg.topic === 'members/priority_data_ready' + ? ({ async_account_data_ready: true } as JobResponseType) + : undefined, + })), + ) + transport$ = merge(polling$, socket$) + } + + return transport$.pipe( + distinctUntilChanged((prev, curr) => { + // Don't deduplicate errors + if (prev instanceof Error || curr instanceof Error) return false + + const prevMember = prev.member + const currMember = curr.member + + // Compare status, MFA, and async data ready flag to determine if we should emit + return ( + prevMember?.connection_status === currMember?.connection_status && + _isEqual(prevMember?.mfa, currMember?.mfa) && + prev.job?.async_account_data_ready === curr.job?.async_account_data_ready + ) + }), + ) } diff --git a/src/utilities/transport/__tests__/MemberUpdateTransport-test.ts b/src/utilities/transport/__tests__/MemberUpdateTransport-test.ts index 32e9e59584..0279ad0f6c 100644 --- a/src/utilities/transport/__tests__/MemberUpdateTransport-test.ts +++ b/src/utilities/transport/__tests__/MemberUpdateTransport-test.ts @@ -1,5 +1,7 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ import { vi } from 'vitest' import { take } from 'rxjs/operators' +import { Subject } from 'rxjs' import { createMemberUpdateTransport, MemberUpdate } from '../MemberUpdateTransport' describe('MemberUpdateTransport', () => { @@ -47,7 +49,7 @@ describe('MemberUpdateTransport', () => { subscription.unsubscribe() }) - it('should continue emitting updates on each interval', async () => { + it('should continue emitting updates on each interval when data changes', async () => { const transport$ = createMemberUpdateTransport(mockApi, mockMemberGuid, { pollingInterval: 1000, clientLocale: mockClientLocale, @@ -59,12 +61,22 @@ describe('MemberUpdateTransport', () => { results.push(val) }) - // Fast-forward 3 intervals - await vi.advanceTimersByTimeAsync(3000) + // Fast-forward 1 interval + await vi.advanceTimersByTimeAsync(1000) + expect(results).toHaveLength(1) + + // Change the mock to return a different status + mockApi.loadMemberByGuid.mockResolvedValue({ ...mockMember, connection_status: 1 }) + await vi.advanceTimersByTimeAsync(1000) + expect(results).toHaveLength(2) + + // Change it back + mockApi.loadMemberByGuid.mockResolvedValue(mockMember) + await vi.advanceTimersByTimeAsync(1000) + expect(results).toHaveLength(3) expect(mockApi.loadMemberByGuid).toHaveBeenCalledTimes(3) expect(mockApi.loadJob).toHaveBeenCalledTimes(3) - expect(results).toHaveLength(3) subscription.unsubscribe() }) @@ -134,4 +146,105 @@ describe('MemberUpdateTransport', () => { subscription.unsubscribe() }) + + it('should emit WebSocket updates immediately when enabled', async () => { + const wsMessages$ = new Subject() + const mockWS = { + isConnected: vi.fn().mockReturnValue(true), + webSocketMessages$: wsMessages$.asObservable(), + } + + const transport$ = createMemberUpdateTransport( + mockApi, + mockMemberGuid, + { useWebSockets: true }, + mockWS, + ) + + const results: (MemberUpdate | Error)[] = [] + const subscription = transport$.subscribe((val) => { + results.push(val) + }) + + const wsMember = { guid: mockMemberGuid, connection_status: 1 } + wsMessages$.next({ topic: 'members/updated', data: wsMember }) + + expect(results).toHaveLength(1) + expect(results[0]).toEqual({ member: wsMember, job: undefined }) + + subscription.unsubscribe() + }) + + it('should signal async_account_data_ready when members/priority_data_ready is received', async () => { + const wsMessages$ = new Subject() + const mockWS = { + isConnected: vi.fn().mockReturnValue(true), + webSocketMessages$: wsMessages$.asObservable(), + } + + const transport$ = createMemberUpdateTransport( + mockApi, + mockMemberGuid, + { useWebSockets: true }, + mockWS, + ) + + const results: (MemberUpdate | Error)[] = [] + const subscription = transport$.subscribe((val) => { + results.push(val) + }) + + const wsMember = { guid: mockMemberGuid, connection_status: 1 } + wsMessages$.next({ topic: 'members/priority_data_ready', data: wsMember }) + + expect(results).toHaveLength(1) + expect(results[0]).toEqual({ + member: wsMember, + job: { async_account_data_ready: true }, + }) + + subscription.unsubscribe() + }) + + it('should deduplicate identical updates from polling and WebSockets', async () => { + const wsMessages$ = new Subject() + const mockWS = { + isConnected: vi.fn().mockReturnValue(true), + webSocketMessages$: wsMessages$.asObservable(), + } + + // Configure polling to return same data + mockApi.loadMemberByGuid.mockResolvedValue(mockMember) + mockApi.loadJob.mockResolvedValue(mockJob) + + const transport$ = createMemberUpdateTransport( + mockApi, + mockMemberGuid, + { useWebSockets: true, pollingInterval: 1000 }, + mockWS, + ) + + const results: (MemberUpdate | Error)[] = [] + const subscription = transport$.subscribe((val) => { + results.push(val) + }) + + // 1. Emit from WebSocket + wsMessages$.next({ topic: 'members/updated', data: mockMember }) + + // 2. Trigger poll (which returns identical data) + await vi.advanceTimersByTimeAsync(1000) + + // Should only have 1 result because they are identical + expect(results).toHaveLength(1) + + // 3. Emit a DIFFERENT update from WebSocket + const updatedMember = { ...mockMember, connection_status: 3 } + wsMessages$.next({ topic: 'members/updated', data: updatedMember }) + + expect(results).toHaveLength(2) + expect((results[1] as MemberUpdate).member?.connection_status).toBe(3) + + subscription.unsubscribe() + }) }) diff --git a/typings/connectProps.d.ts b/typings/connectProps.d.ts index 65a9b06c7e..86356e91f9 100644 --- a/typings/connectProps.d.ts +++ b/typings/connectProps.d.ts @@ -4,6 +4,7 @@ interface ConnectWidgetPropTypes extends ConnectProps { language?: LanguageType onPostMessage: (event: string, data?: object) => void showTooSmallDialog: boolean + webSocketConnection?: any } interface PostMessageEventOverrides { @@ -36,10 +37,12 @@ interface ConnectProps { postMessageEventOverrides?: PostMessageEventOverrides profiles: ProfilesTypes userFeatures?: object + webSocketConnection?: any experimentalFeatures?: null | { unavailableInstitutions?: { guid: string; name: string }[] optOutOfEarlyUserRelease?: boolean memberPollingMilliseconds?: number + useWebSockets?: boolean } } interface ClientConfigType { From 97aab96b601828850ebcd3df2dd47c2d4a2833f4 Mon Sep 17 00:00:00 2001 From: Logan Rasmussen Date: Wed, 1 Apr 2026 12:19:45 -0600 Subject: [PATCH 2/7] test: add missing test changes --- src/__tests__/ConnectWidget-test.tsx | 24 ++++++++++++----- .../transport/MemberUpdateTransport.ts | 19 ++++++++------ .../__tests__/MemberUpdateTransport-test.ts | 26 +++++++++++++------ 3 files changed, 46 insertions(+), 23 deletions(-) diff --git a/src/__tests__/ConnectWidget-test.tsx b/src/__tests__/ConnectWidget-test.tsx index b93c0d2a33..1ab6d38be6 100644 --- a/src/__tests__/ConnectWidget-test.tsx +++ b/src/__tests__/ConnectWidget-test.tsx @@ -4,16 +4,26 @@ import { describe, it, expect, vi } from 'vitest' import { of } from 'rxjs' import { ConnectWidget } from '../ConnectWidget' -import { useWebSocket } from 'src/context/WebSocketContext' +import { useWebSocket } from '../context/WebSocketContext' -// Mock Connect component to verify context value -const MockConnect = () => { +vi.mock('src/Connect', () => ({ + default: vi.fn(() => { + // In actual implementation, it uses Context + // But for the test we just want to see if it renders without crashing + // and correctly provides the context which we can check via useWebSocket in a child if we want + return
mock-connect
+ }), +})) + +// A simple component to verify context +const ContextChecker = () => { const ws = useWebSocket() - return
{ws ? 'has-ws' : 'no-ws'}
+ return
{ws ? 'has-ws' : 'no-ws'}
} +// We need to mock Connect to render the ContextChecker instead vi.mock('src/Connect', () => ({ - default: MockConnect, + default: () => , })) describe('ConnectWidget', () => { @@ -32,12 +42,12 @@ describe('ConnectWidget', () => { const { getByTestId } = render() - expect(getByTestId('mock-connect')).toHaveTextContent('has-ws') + expect(getByTestId('context-checker')).toHaveTextContent('has-ws') }) it('does not provide webSocketConnection when not passed', () => { const { getByTestId } = render() - expect(getByTestId('mock-connect')).toHaveTextContent('no-ws') + expect(getByTestId('context-checker')).toHaveTextContent('no-ws') }) }) diff --git a/src/utilities/transport/MemberUpdateTransport.ts b/src/utilities/transport/MemberUpdateTransport.ts index bcda71a09e..3e7bbfc1fd 100644 --- a/src/utilities/transport/MemberUpdateTransport.ts +++ b/src/utilities/transport/MemberUpdateTransport.ts @@ -49,13 +49,15 @@ export function createMemberUpdateTransport( (msg.topic === 'members/updated' || msg.topic === 'members/priority_data_ready') && msg.data?.guid === memberGuid, ), - map((msg) => ({ - member: msg.data, - job: - msg.topic === 'members/priority_data_ready' - ? ({ async_account_data_ready: true } as JobResponseType) - : undefined, - })), + map((msg) => { + const member = msg.data + const job = { + guid: member?.most_recent_job_guid, + async_account_data_ready: msg.topic === 'members/priority_data_ready' || undefined, + } as JobResponseType + + return { member, job } + }), ) transport$ = merge(polling$, socket$) } @@ -68,10 +70,11 @@ export function createMemberUpdateTransport( const prevMember = prev.member const currMember = curr.member - // Compare status, MFA, and async data ready flag to determine if we should emit + // Compare status, MFA, job GUID, and async data ready flag to determine if we should emit return ( prevMember?.connection_status === currMember?.connection_status && _isEqual(prevMember?.mfa, currMember?.mfa) && + prev.job?.guid === curr.job?.guid && prev.job?.async_account_data_ready === curr.job?.async_account_data_ready ) }), diff --git a/src/utilities/transport/__tests__/MemberUpdateTransport-test.ts b/src/utilities/transport/__tests__/MemberUpdateTransport-test.ts index 0279ad0f6c..c0d396af9c 100644 --- a/src/utilities/transport/__tests__/MemberUpdateTransport-test.ts +++ b/src/utilities/transport/__tests__/MemberUpdateTransport-test.ts @@ -2,7 +2,10 @@ import { vi } from 'vitest' import { take } from 'rxjs/operators' import { Subject } from 'rxjs' -import { createMemberUpdateTransport, MemberUpdate } from '../MemberUpdateTransport' +import { + createMemberUpdateTransport, + MemberUpdate, +} from 'src/utilities/transport/MemberUpdateTransport' describe('MemberUpdateTransport', () => { const mockMemberGuid = 'MBR-123' @@ -170,7 +173,10 @@ describe('MemberUpdateTransport', () => { wsMessages$.next({ topic: 'members/updated', data: wsMember }) expect(results).toHaveLength(1) - expect(results[0]).toEqual({ member: wsMember, job: undefined }) + expect(results[0]).toEqual({ + member: wsMember, + job: { async_account_data_ready: undefined, guid: undefined }, + }) subscription.unsubscribe() }) @@ -214,8 +220,9 @@ describe('MemberUpdateTransport', () => { } // Configure polling to return same data + const jobWithGuid = { ...mockJob, guid: 'JOB-123' } mockApi.loadMemberByGuid.mockResolvedValue(mockMember) - mockApi.loadJob.mockResolvedValue(mockJob) + mockApi.loadJob.mockResolvedValue(jobWithGuid) const transport$ = createMemberUpdateTransport( mockApi, @@ -229,16 +236,19 @@ describe('MemberUpdateTransport', () => { results.push(val) }) - // 1. Emit from WebSocket + // 1. Trigger first poll + await vi.advanceTimersByTimeAsync(1000) + expect(results).toHaveLength(1) + + // 2. Emit identical data from WebSocket wsMessages$.next({ topic: 'members/updated', data: mockMember }) + expect(results).toHaveLength(1) // Still 1 - // 2. Trigger poll (which returns identical data) + // 3. Trigger second poll await vi.advanceTimersByTimeAsync(1000) - - // Should only have 1 result because they are identical expect(results).toHaveLength(1) - // 3. Emit a DIFFERENT update from WebSocket + // 4. Emit a DIFFERENT update from WebSocket const updatedMember = { ...mockMember, connection_status: 3 } wsMessages$.next({ topic: 'members/updated', data: updatedMember }) From db8858db82c1264952f4c65b67e7e17492acc3c5 Mon Sep 17 00:00:00 2001 From: Logan Rasmussen Date: Wed, 1 Apr 2026 16:03:47 -0600 Subject: [PATCH 3/7] fix: update structure of the websocket events to use event-payload instead of topic-data --- src/hooks/__tests__/usePollMember-test.tsx | 4 ++-- src/hooks/usePollMember.tsx | 1 - src/utilities/transport/MemberUpdateTransport.ts | 15 +++++++++------ .../__tests__/MemberUpdateTransport-test.ts | 8 ++++---- typings/apiTypes.d.ts | 1 + typings/mxTypes.d.ts | 1 + 6 files changed, 17 insertions(+), 13 deletions(-) diff --git a/src/hooks/__tests__/usePollMember-test.tsx b/src/hooks/__tests__/usePollMember-test.tsx index bd153cac85..9571e2328a 100644 --- a/src/hooks/__tests__/usePollMember-test.tsx +++ b/src/hooks/__tests__/usePollMember-test.tsx @@ -798,7 +798,7 @@ describe('usePollMember', () => { // Emit from WebSocket const wsMember = { guid: 'MBR-123', connection_status: 1 } - wsMessages$.next({ topic: 'members/updated', data: wsMember }) + wsMessages$.next({ event: 'members/updated', payload: wsMember }) await waitFor( () => { @@ -810,7 +810,7 @@ describe('usePollMember', () => { expect(states[0].currentResponse?.member).toEqual(wsMember) // Emit priority data ready - wsMessages$.next({ topic: 'members/priority_data_ready', data: wsMember }) + wsMessages$.next({ event: 'members/priority_data_ready', payload: wsMember }) await waitFor( () => { diff --git a/src/hooks/usePollMember.tsx b/src/hooks/usePollMember.tsx index d23516aaaa..d889905eba 100644 --- a/src/hooks/usePollMember.tsx +++ b/src/hooks/usePollMember.tsx @@ -76,7 +76,6 @@ export function usePollMember() { if ( !isError && !acc.initialDataReady && - // @ts-expect-error response might be undefined or an error response?.job?.async_account_data_ready && !optOutOfEarlyUserRelease ) { diff --git a/src/utilities/transport/MemberUpdateTransport.ts b/src/utilities/transport/MemberUpdateTransport.ts index 3e7bbfc1fd..b4fe528794 100644 --- a/src/utilities/transport/MemberUpdateTransport.ts +++ b/src/utilities/transport/MemberUpdateTransport.ts @@ -46,14 +46,14 @@ export function createMemberUpdateTransport( const socket$ = webSocket.webSocketMessages$.pipe( filter( (msg) => - (msg.topic === 'members/updated' || msg.topic === 'members/priority_data_ready') && - msg.data?.guid === memberGuid, + (msg.event === 'members/updated' || msg.event === 'members/priority_data_ready') && + msg.payload?.guid === memberGuid, ), map((msg) => { - const member = msg.data + const member = msg.payload const job = { guid: member?.most_recent_job_guid, - async_account_data_ready: msg.topic === 'members/priority_data_ready' || undefined, + async_account_data_ready: msg.event === 'members/priority_data_ready' || undefined, } as JobResponseType return { member, job } @@ -70,12 +70,15 @@ export function createMemberUpdateTransport( const prevMember = prev.member const currMember = curr.member - // Compare status, MFA, job GUID, and async data ready flag to determine if we should emit + // Compare the relevant fields to determine if we should emit an update + // Return true to *prevent* emitting the event + // Return false to emit the event return ( prevMember?.connection_status === currMember?.connection_status && _isEqual(prevMember?.mfa, currMember?.mfa) && prev.job?.guid === curr.job?.guid && - prev.job?.async_account_data_ready === curr.job?.async_account_data_ready + prev.job?.async_account_data_ready === curr.job?.async_account_data_ready && + prevMember?.is_being_aggregated === currMember?.is_being_aggregated ) }), ) diff --git a/src/utilities/transport/__tests__/MemberUpdateTransport-test.ts b/src/utilities/transport/__tests__/MemberUpdateTransport-test.ts index c0d396af9c..371f27768c 100644 --- a/src/utilities/transport/__tests__/MemberUpdateTransport-test.ts +++ b/src/utilities/transport/__tests__/MemberUpdateTransport-test.ts @@ -170,7 +170,7 @@ describe('MemberUpdateTransport', () => { }) const wsMember = { guid: mockMemberGuid, connection_status: 1 } - wsMessages$.next({ topic: 'members/updated', data: wsMember }) + wsMessages$.next({ event: 'members/updated', payload: wsMember }) expect(results).toHaveLength(1) expect(results[0]).toEqual({ @@ -201,7 +201,7 @@ describe('MemberUpdateTransport', () => { }) const wsMember = { guid: mockMemberGuid, connection_status: 1 } - wsMessages$.next({ topic: 'members/priority_data_ready', data: wsMember }) + wsMessages$.next({ event: 'members/priority_data_ready', payload: wsMember }) expect(results).toHaveLength(1) expect(results[0]).toEqual({ @@ -241,7 +241,7 @@ describe('MemberUpdateTransport', () => { expect(results).toHaveLength(1) // 2. Emit identical data from WebSocket - wsMessages$.next({ topic: 'members/updated', data: mockMember }) + wsMessages$.next({ event: 'members/updated', payload: mockMember }) expect(results).toHaveLength(1) // Still 1 // 3. Trigger second poll @@ -250,7 +250,7 @@ describe('MemberUpdateTransport', () => { // 4. Emit a DIFFERENT update from WebSocket const updatedMember = { ...mockMember, connection_status: 3 } - wsMessages$.next({ topic: 'members/updated', data: updatedMember }) + wsMessages$.next({ event: 'members/updated', payload: updatedMember }) expect(results).toHaveLength(2) expect((results[1] as MemberUpdate).member?.connection_status).toBe(3) diff --git a/typings/apiTypes.d.ts b/typings/apiTypes.d.ts index 30c1e4fcc3..564e8dd67b 100644 --- a/typings/apiTypes.d.ts +++ b/typings/apiTypes.d.ts @@ -287,6 +287,7 @@ type JobResponseType = { finished_at: number started_at: number updated_at: number + async_account_data_ready?: boolean } // user types diff --git a/typings/mxTypes.d.ts b/typings/mxTypes.d.ts index f3c34fe12c..ce1965eb26 100644 --- a/typings/mxTypes.d.ts +++ b/typings/mxTypes.d.ts @@ -172,6 +172,7 @@ type JobResponseType = { job_type: number status: number finished_at: number + async_account_data_ready?: boolean } // user types From e5dd2116df1870675227cdb62ac1a1cc0471fd05 Mon Sep 17 00:00:00 2001 From: Logan Rasmussen Date: Thu, 2 Apr 2026 16:44:44 -0600 Subject: [PATCH 4/7] fix: add failure handling for a socket failure during connecting --- src/utilities/transport/MemberUpdateTransport.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/utilities/transport/MemberUpdateTransport.ts b/src/utilities/transport/MemberUpdateTransport.ts index b4fe528794..af13c1dd20 100644 --- a/src/utilities/transport/MemberUpdateTransport.ts +++ b/src/utilities/transport/MemberUpdateTransport.ts @@ -42,7 +42,7 @@ export function createMemberUpdateTransport( let transport$: Observable = polling$ - if (useWebSockets && webSocket?.webSocketMessages$) { + if (useWebSockets && webSocket?.webSocketMessages$ && webSocket?.isConnected()) { const socket$ = webSocket.webSocketMessages$.pipe( filter( (msg) => @@ -58,6 +58,9 @@ export function createMemberUpdateTransport( return { member, job } }), + // If the websocket errors out, we don't want to kill the polling stream. + // We just want to stop receiving messages from the socket and let polling continue. + catchError(() => of()), ) transport$ = merge(polling$, socket$) } From 2812894b2d0e64d573a2af1e3219431dcd3dec49 Mon Sep 17 00:00:00 2001 From: Logan Rasmussen Date: Mon, 6 Apr 2026 14:01:56 -0600 Subject: [PATCH 5/7] fix: suppress events based on most_recent_job_detail_code not changing --- src/utilities/transport/MemberUpdateTransport.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/utilities/transport/MemberUpdateTransport.ts b/src/utilities/transport/MemberUpdateTransport.ts index af13c1dd20..4593a542fd 100644 --- a/src/utilities/transport/MemberUpdateTransport.ts +++ b/src/utilities/transport/MemberUpdateTransport.ts @@ -81,7 +81,8 @@ export function createMemberUpdateTransport( _isEqual(prevMember?.mfa, currMember?.mfa) && prev.job?.guid === curr.job?.guid && prev.job?.async_account_data_ready === curr.job?.async_account_data_ready && - prevMember?.is_being_aggregated === currMember?.is_being_aggregated + prevMember?.is_being_aggregated === currMember?.is_being_aggregated && + prevMember?.most_recent_job_detail_code === currMember?.most_recent_job_detail_code ) }), ) From 5ed57e86cbdf25495e317a4a5ecc7287caf404ca Mon Sep 17 00:00:00 2001 From: Logan Rasmussen Date: Tue, 7 Apr 2026 10:00:19 -0600 Subject: [PATCH 6/7] fix: add error code to MemberUpdateTransport --- src/utilities/transport/MemberUpdateTransport.ts | 3 ++- typings/apiTypes.d.ts | 4 ++-- typings/mxTypes.d.ts | 8 ++++++-- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/utilities/transport/MemberUpdateTransport.ts b/src/utilities/transport/MemberUpdateTransport.ts index 4593a542fd..bbd793976a 100644 --- a/src/utilities/transport/MemberUpdateTransport.ts +++ b/src/utilities/transport/MemberUpdateTransport.ts @@ -82,7 +82,8 @@ export function createMemberUpdateTransport( prev.job?.guid === curr.job?.guid && prev.job?.async_account_data_ready === curr.job?.async_account_data_ready && prevMember?.is_being_aggregated === currMember?.is_being_aggregated && - prevMember?.most_recent_job_detail_code === currMember?.most_recent_job_detail_code + prevMember?.most_recent_job_detail_code === currMember?.most_recent_job_detail_code && + prevMember?.error?.error_code === currMember?.error?.error_code ) }), ) diff --git a/typings/apiTypes.d.ts b/typings/apiTypes.d.ts index 564e8dd67b..0a2f4abeb8 100644 --- a/typings/apiTypes.d.ts +++ b/typings/apiTypes.d.ts @@ -112,10 +112,10 @@ type MemberResponseType = { name?: string process_status?: number revision?: number + use_cases?: [string] | null user_guid: string - verification_is_enabled: boolean - oauth_window_uri?: string | null verification_is_enabled?: boolean + oauth_window_uri?: string | null tax_statement_is_enabled?: boolean successfully_aggreagted_at?: number } diff --git a/typings/mxTypes.d.ts b/typings/mxTypes.d.ts index ce1965eb26..d01fd7b6c6 100644 --- a/typings/mxTypes.d.ts +++ b/typings/mxTypes.d.ts @@ -46,7 +46,8 @@ type MemberResponseType = { last_job_status?: number last_update_time?: string metadata?: { [key: string]: unknown } - most_recent_job_detail_code?: number + mfa?: MfaCredentialType | object + most_recent_job_detail_code?: number | null most_recent_job_guid?: string needs_updated_credentials?: boolean name?: string @@ -54,7 +55,10 @@ type MemberResponseType = { revision?: number use_cases?: [string] | null user_guid: string - verification_is_enabled: boolean + verification_is_enabled?: boolean + oauth_window_uri?: string | null + tax_statement_is_enabled?: boolean + successfully_aggreagted_at?: number } // Institution types From 67c2773aa76d73a41e2e7240f3850f92f4d1a279 Mon Sep 17 00:00:00 2001 From: Logan Rasmussen Date: Wed, 8 Apr 2026 08:58:22 -0600 Subject: [PATCH 7/7] fix: correct a few typos in the types --- typings/apiTypes.d.ts | 2 +- typings/mxTypes.d.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/typings/apiTypes.d.ts b/typings/apiTypes.d.ts index 0a2f4abeb8..6919ce3f75 100644 --- a/typings/apiTypes.d.ts +++ b/typings/apiTypes.d.ts @@ -117,7 +117,7 @@ type MemberResponseType = { verification_is_enabled?: boolean oauth_window_uri?: string | null tax_statement_is_enabled?: boolean - successfully_aggreagted_at?: number + successfully_aggregated_at?: number } // Institution types diff --git a/typings/mxTypes.d.ts b/typings/mxTypes.d.ts index d01fd7b6c6..eb30575362 100644 --- a/typings/mxTypes.d.ts +++ b/typings/mxTypes.d.ts @@ -58,7 +58,7 @@ type MemberResponseType = { verification_is_enabled?: boolean oauth_window_uri?: string | null tax_statement_is_enabled?: boolean - successfully_aggreagted_at?: number + successfully_aggregated_at?: number } // Institution types