Skip to content

Commit c4a8874

Browse files
feat(client)!: extract auto pagination to shared classes
refactor(client)!: refactor async auto-pagination refactor(client)!: rename `getNextPage{,Params}` to `nextPage{,Params}` refactor(client)!: swap `nextPage{,Params}` to return non-optional # Migration - If you were referencing the `AutoPager` class on a specific `*Page` or `*PageAsync` type, then you should instead reference the shared `AutoPager` and `AutoPagerAsync` types, under the `core` package - `AutoPagerAsync` now has different usage. You can call `.subscribe(...)` on the returned object instead to get called back each page item. You can also call `onCompleteFuture()` to get a future that completes when all items have been processed. Finally, you can call `.close()` on the returned object to stop auto-paginating early - If you were referencing `getNextPage` or `getNextPageParams`: - Swap to `nextPage()` and `nextPageParams()` - Note that these both now return non-optional types (use `hasNextPage()` before calling these, since they will throw if it's impossible to get another page) There are examples and further information about pagination in the readme.
1 parent ec6fa40 commit c4a8874

52 files changed

Lines changed: 1491 additions & 975 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 65 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -219,53 +219,101 @@ The SDK throws custom unchecked exception types:
219219

220220
## Pagination
221221

222-
For methods that return a paginated list of results, this library provides convenient ways access the results either one page at a time, or item-by-item across all pages.
222+
The SDK defines methods that return a paginated lists of results. It provides convenient ways to access the results either one page at a time or item-by-item across all pages.
223223

224224
### Auto-pagination
225225

226-
To iterate through all results across all pages, you can use `autoPager`, which automatically handles fetching more pages for you:
226+
To iterate through all results across all pages, use the `autoPager()` method, which automatically fetches more pages as needed.
227227

228-
### Synchronous
228+
When using the synchronous client, the method returns an [`Iterable`](https://docs.oracle.com/javase/8/docs/api/java/lang/Iterable.html)
229229

230230
```java
231231
import com.tryfinch.api.models.HrisDirectoryListPage;
232232
import com.tryfinch.api.models.IndividualInDirectory;
233233

234-
// As an Iterable:
235-
HrisDirectoryListPage page = client.hris().directory().list(params);
234+
HrisDirectoryListPage page = client.hris().directory().list();
235+
236+
// Process as an Iterable
236237
for (IndividualInDirectory directory : page.autoPager()) {
237238
System.out.println(directory);
238-
};
239+
}
239240

240-
// As a Stream:
241-
client.hris().directory().list(params).autoPager().stream()
241+
// Process as a Stream
242+
page.autoPager()
243+
.stream()
242244
.limit(50)
243245
.forEach(directory -> System.out.println(directory));
244246
```
245247

246-
### Asynchronous
248+
When using the asynchronous client, the method returns an [`AsyncStreamResponse`](finch-java-core/src/main/kotlin/com/tryfinch/api/core/http/AsyncStreamResponse.kt):
247249

248250
```java
249-
// Using forEach, which returns CompletableFuture<Void>:
250-
asyncClient.hris().directory().list(params).autoPager()
251-
.forEach(directory -> System.out.println(directory), executor);
251+
import com.tryfinch.api.core.http.AsyncStreamResponse;
252+
import com.tryfinch.api.models.HrisDirectoryListPageAsync;
253+
import com.tryfinch.api.models.IndividualInDirectory;
254+
import java.util.Optional;
255+
import java.util.concurrent.CompletableFuture;
256+
257+
CompletableFuture<HrisDirectoryListPageAsync> pageFuture = client.async().hris().directory().list();
258+
259+
pageFuture.thenRun(page -> page.autoPager().subscribe(directory -> {
260+
System.out.println(directory);
261+
}));
262+
263+
// If you need to handle errors or completion of the stream
264+
pageFuture.thenRun(page -> page.autoPager().subscribe(new AsyncStreamResponse.Handler<>() {
265+
@Override
266+
public void onNext(IndividualInDirectory directory) {
267+
System.out.println(directory);
268+
}
269+
270+
@Override
271+
public void onComplete(Optional<Throwable> error) {
272+
if (error.isPresent()) {
273+
System.out.println("Something went wrong!");
274+
throw new RuntimeException(error.get());
275+
} else {
276+
System.out.println("No more!");
277+
}
278+
}
279+
}));
280+
281+
// Or use futures
282+
pageFuture.thenRun(page -> page.autoPager()
283+
.subscribe(directory -> {
284+
System.out.println(directory);
285+
})
286+
.onCompleteFuture()
287+
.whenComplete((unused, error) -> {
288+
if (error != null) {
289+
System.out.println("Something went wrong!");
290+
throw new RuntimeException(error);
291+
} else {
292+
System.out.println("No more!");
293+
}
294+
}));
252295
```
253296

254297
### Manual pagination
255298

256-
If none of the above helpers meet your needs, you can also manually request pages one-by-one. A page of results has a `data()` method to fetch the list of objects, as well as top-level `response` and other methods to fetch top-level data about the page. It also has methods `hasNextPage`, `getNextPage`, and `getNextPageParams` methods to help with pagination.
299+
To access individual page items and manually request the next page, use the `items()`,
300+
`hasNextPage()`, and `nextPage()` methods:
257301

258302
```java
259303
import com.tryfinch.api.models.HrisDirectoryListPage;
260304
import com.tryfinch.api.models.IndividualInDirectory;
261305

262-
HrisDirectoryListPage page = client.hris().directory().list(params);
263-
while (page != null) {
264-
for (IndividualInDirectory directory : page.individuals()) {
306+
HrisDirectoryListPage page = client.hris().directory().list();
307+
while (true) {
308+
for (IndividualInDirectory directory : page.items()) {
265309
System.out.println(directory);
266310
}
267311

268-
page = page.getNextPage().orElse(null);
312+
if (!page.hasNextPage()) {
313+
break;
314+
}
315+
316+
page = page.nextPage();
269317
}
270318
```
271319

@@ -343,7 +391,6 @@ To set a custom timeout, configure the method call using the `timeout` method:
343391

344392
```java
345393
import com.tryfinch.api.models.HrisDirectoryListPage;
346-
import com.tryfinch.api.models.HrisDirectoryListParams;
347394

348395
HrisDirectoryListPage page = client.hris().directory().list(RequestOptions.builder().timeout(Duration.ofSeconds(30)).build());
349396
```
@@ -573,7 +620,6 @@ Or configure the method call to validate the response using the `responseValidat
573620

574621
```java
575622
import com.tryfinch.api.models.HrisDirectoryListPage;
576-
import com.tryfinch.api.models.HrisDirectoryListParams;
577623

578624
HrisDirectoryListPage page = client.hris().directory().list(RequestOptions.builder().responseValidation(true).build());
579625
```

finch-java-client-okhttp/src/main/kotlin/com/tryfinch/api/client/okhttp/FinchOkHttpClient.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import java.net.Proxy
1313
import java.time.Clock
1414
import java.time.Duration
1515
import java.util.Optional
16+
import java.util.concurrent.Executor
1617
import kotlin.jvm.optionals.getOrNull
1718

1819
class FinchOkHttpClient private constructor() {
@@ -47,6 +48,10 @@ class FinchOkHttpClient private constructor() {
4748

4849
fun jsonMapper(jsonMapper: JsonMapper) = apply { clientOptions.jsonMapper(jsonMapper) }
4950

51+
fun streamHandlerExecutor(streamHandlerExecutor: Executor) = apply {
52+
clientOptions.streamHandlerExecutor(streamHandlerExecutor)
53+
}
54+
5055
fun clock(clock: Clock) = apply { clientOptions.clock(clock) }
5156

5257
fun headers(headers: Headers) = apply { clientOptions.headers(headers) }

finch-java-client-okhttp/src/main/kotlin/com/tryfinch/api/client/okhttp/FinchOkHttpClientAsync.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import java.net.Proxy
1313
import java.time.Clock
1414
import java.time.Duration
1515
import java.util.Optional
16+
import java.util.concurrent.Executor
1617
import kotlin.jvm.optionals.getOrNull
1718

1819
class FinchOkHttpClientAsync private constructor() {
@@ -47,6 +48,10 @@ class FinchOkHttpClientAsync private constructor() {
4748

4849
fun jsonMapper(jsonMapper: JsonMapper) = apply { clientOptions.jsonMapper(jsonMapper) }
4950

51+
fun streamHandlerExecutor(streamHandlerExecutor: Executor) = apply {
52+
clientOptions.streamHandlerExecutor(streamHandlerExecutor)
53+
}
54+
5055
fun clock(clock: Clock) = apply { clientOptions.clock(clock) }
5156

5257
fun headers(headers: Headers) = apply { clientOptions.headers(headers) }
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// File generated from our OpenAPI spec by Stainless.
2+
3+
package com.tryfinch.api.core
4+
5+
import java.util.stream.Stream
6+
import java.util.stream.StreamSupport
7+
8+
class AutoPager<T> private constructor(private val firstPage: Page<T>) : Iterable<T> {
9+
10+
companion object {
11+
12+
fun <T> from(firstPage: Page<T>): AutoPager<T> = AutoPager(firstPage)
13+
}
14+
15+
override fun iterator(): Iterator<T> =
16+
generateSequence(firstPage) { if (it.hasNextPage()) it.nextPage() else null }
17+
.flatMap { it.items() }
18+
.iterator()
19+
20+
fun stream(): Stream<T> = StreamSupport.stream(spliterator(), false)
21+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// File generated from our OpenAPI spec by Stainless.
2+
3+
package com.tryfinch.api.core
4+
5+
import com.tryfinch.api.core.http.AsyncStreamResponse
6+
import java.util.Optional
7+
import java.util.concurrent.CompletableFuture
8+
import java.util.concurrent.CompletionException
9+
import java.util.concurrent.Executor
10+
import java.util.concurrent.atomic.AtomicReference
11+
12+
class AutoPagerAsync<T>
13+
private constructor(private val firstPage: PageAsync<T>, private val defaultExecutor: Executor) :
14+
AsyncStreamResponse<T> {
15+
16+
companion object {
17+
18+
fun <T> from(firstPage: PageAsync<T>, defaultExecutor: Executor): AutoPagerAsync<T> =
19+
AutoPagerAsync(firstPage, defaultExecutor)
20+
}
21+
22+
private val onCompleteFuture = CompletableFuture<Void?>()
23+
private val state = AtomicReference(State.NEW)
24+
25+
override fun subscribe(handler: AsyncStreamResponse.Handler<T>): AsyncStreamResponse<T> =
26+
subscribe(handler, defaultExecutor)
27+
28+
override fun subscribe(
29+
handler: AsyncStreamResponse.Handler<T>,
30+
executor: Executor,
31+
): AsyncStreamResponse<T> = apply {
32+
// TODO(JDK): Use `compareAndExchange` once targeting JDK 9.
33+
check(state.compareAndSet(State.NEW, State.SUBSCRIBED)) {
34+
if (state.get() == State.SUBSCRIBED) "Cannot subscribe more than once"
35+
else "Cannot subscribe after the response is closed"
36+
}
37+
38+
fun PageAsync<T>.handle(): CompletableFuture<Void?> {
39+
if (state.get() == State.CLOSED) {
40+
return CompletableFuture.completedFuture(null)
41+
}
42+
43+
items().forEach { handler.onNext(it) }
44+
return if (hasNextPage()) nextPage().thenCompose { it.handle() }
45+
else CompletableFuture.completedFuture(null)
46+
}
47+
48+
executor.execute {
49+
firstPage.handle().whenComplete { _, error ->
50+
val actualError =
51+
if (error is CompletionException && error.cause != null) error.cause else error
52+
try {
53+
handler.onComplete(Optional.ofNullable(actualError))
54+
} finally {
55+
try {
56+
if (actualError == null) {
57+
onCompleteFuture.complete(null)
58+
} else {
59+
onCompleteFuture.completeExceptionally(actualError)
60+
}
61+
} finally {
62+
close()
63+
}
64+
}
65+
}
66+
}
67+
}
68+
69+
override fun onCompleteFuture(): CompletableFuture<Void?> = onCompleteFuture
70+
71+
override fun close() {
72+
val previousState = state.getAndSet(State.CLOSED)
73+
if (previousState == State.CLOSED) {
74+
return
75+
}
76+
77+
// When the stream is closed, we should always consider it closed. If it closed due
78+
// to an error, then we will have already completed the future earlier, and this
79+
// will be a no-op.
80+
onCompleteFuture.complete(null)
81+
}
82+
}
83+
84+
private enum class State {
85+
NEW,
86+
SUBSCRIBED,
87+
CLOSED,
88+
}

finch-java-core/src/main/kotlin/com/tryfinch/api/core/ClientOptions.kt

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ import com.tryfinch.api.core.http.RetryingHttpClient
1111
import java.time.Clock
1212
import java.util.Base64
1313
import java.util.Optional
14+
import java.util.concurrent.Executor
15+
import java.util.concurrent.Executors
16+
import java.util.concurrent.ThreadFactory
17+
import java.util.concurrent.atomic.AtomicLong
1418
import kotlin.jvm.optionals.getOrNull
1519

1620
class ClientOptions
@@ -19,6 +23,7 @@ private constructor(
1923
@get:JvmName("httpClient") val httpClient: HttpClient,
2024
@get:JvmName("checkJacksonVersionCompatibility") val checkJacksonVersionCompatibility: Boolean,
2125
@get:JvmName("jsonMapper") val jsonMapper: JsonMapper,
26+
@get:JvmName("streamHandlerExecutor") val streamHandlerExecutor: Executor,
2227
@get:JvmName("clock") val clock: Clock,
2328
@get:JvmName("baseUrl") val baseUrl: String,
2429
@get:JvmName("headers") val headers: Headers,
@@ -71,6 +76,7 @@ private constructor(
7176
private var httpClient: HttpClient? = null
7277
private var checkJacksonVersionCompatibility: Boolean = true
7378
private var jsonMapper: JsonMapper = jsonMapper()
79+
private var streamHandlerExecutor: Executor? = null
7480
private var clock: Clock = Clock.systemUTC()
7581
private var baseUrl: String = PRODUCTION_URL
7682
private var headers: Headers.Builder = Headers.builder()
@@ -88,6 +94,7 @@ private constructor(
8894
httpClient = clientOptions.originalHttpClient
8995
checkJacksonVersionCompatibility = clientOptions.checkJacksonVersionCompatibility
9096
jsonMapper = clientOptions.jsonMapper
97+
streamHandlerExecutor = clientOptions.streamHandlerExecutor
9198
clock = clientOptions.clock
9299
baseUrl = clientOptions.baseUrl
93100
headers = clientOptions.headers.toBuilder()
@@ -109,6 +116,10 @@ private constructor(
109116

110117
fun jsonMapper(jsonMapper: JsonMapper) = apply { this.jsonMapper = jsonMapper }
111118

119+
fun streamHandlerExecutor(streamHandlerExecutor: Executor) = apply {
120+
this.streamHandlerExecutor = streamHandlerExecutor
121+
}
122+
112123
fun clock(clock: Clock) = apply { this.clock = clock }
113124

114125
fun baseUrl(baseUrl: String) = apply { this.baseUrl = baseUrl }
@@ -285,6 +296,21 @@ private constructor(
285296
),
286297
checkJacksonVersionCompatibility,
287298
jsonMapper,
299+
streamHandlerExecutor
300+
?: Executors.newCachedThreadPool(
301+
object : ThreadFactory {
302+
303+
private val threadFactory: ThreadFactory =
304+
Executors.defaultThreadFactory()
305+
private val count = AtomicLong(0)
306+
307+
override fun newThread(runnable: Runnable): Thread =
308+
threadFactory.newThread(runnable).also {
309+
it.name =
310+
"finch-stream-handler-thread-${count.getAndIncrement()}"
311+
}
312+
}
313+
),
288314
clock,
289315
baseUrl,
290316
headers.build(),
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// File generated from our OpenAPI spec by Stainless.
2+
3+
package com.tryfinch.api.core
4+
5+
/**
6+
* An interface representing a single page, with items of type [T], from a paginated endpoint
7+
* response.
8+
*
9+
* Implementations of this interface are expected to request additional pages synchronously. For
10+
* asynchronous pagination, see the [PageAsync] interface.
11+
*/
12+
interface Page<T> {
13+
14+
/**
15+
* Returns whether there's another page after this one.
16+
*
17+
* The method generally doesn't make requests so the result depends entirely on the data in this
18+
* page. If a significant amount of time has passed between requesting this page and calling
19+
* this method, then the result could be stale.
20+
*/
21+
fun hasNextPage(): Boolean
22+
23+
/**
24+
* Returns the page after this one by making another request.
25+
*
26+
* @throws IllegalStateException if it's impossible to get the next page. This exception is
27+
* avoidable by calling [hasNextPage] first.
28+
*/
29+
fun nextPage(): Page<T>
30+
31+
/** Returns the items in this page. */
32+
fun items(): List<T>
33+
}

0 commit comments

Comments
 (0)