We’re Sequin — we turn APIs from services like Stripe, Shopify, and GitHub into a unified stream. Extraction is at the very core of our product, and we like to think we've come up with some helpful frameworks for extracting data from REST APIs.
When your integration is small or just getting started, you can sometimes get away with calling API endpoints just in time.
But often, that's not sufficient. When it's not, you need to perform an extraction process to get data out of the API first. With the data extracted, then you can build your integration.
Extractions are tricky, though. Most extractions I’ve seen in the wild have major flaws. If your extraction process has a flaw, it will cause holes in your data, and those can be hard to track down.
The two primary requirements for an extraction are accuracy and efficiency. You need your extraction to be 100% accurate. But you also want it to be efficient. Ideally, you don't need your extraction process to paginate an API from front-to-back every time it runs. (Both expensive and slow.)
This post covers design patterns to hopefully help you build a reliable extraction process on the first shot.
Why extract?
An API's limitations are usually what drive a team to extract. These limitations include:
- rate limit is too low
- throughput is too slow
- limited query params (can't query the data the way you want)
- limited eventing or webhook support
But extracting is also a good way to create a boundary between an API and your system. By extracting API data into systems like Kafka and Postgres, you decouple the rest of your system from the quirks and limitations of the API. You can design schemas that fit your use case and optimize for your access patterns. And you gain the ability to trigger side effects based on the data flowing through the extraction process.
What to extract?
Typically, an extraction process is trying to extract two things from the upstream API:
- A record: information about a single entity, a row in a database.
- An event: a specific thing that happened in the API.
For example, take Stripe. A record would be a JSON representation of a Stripe subscription. An event would be a JSON object that indicated that a subscription was canceled or crossed some threshold of value.
You extract records when you want to cache or store API state locally. Sometimes, you need to cache API objects to avoid rate limit issues or increase throughput. Other times, you are folding API data into your internal tables, like updating org.subscription_status
in your org
table based on the current state in Stripe.
You extract events to trigger side effects. Returning to Stripe, you might trigger a workflow whenever a customer signs up for a new subscription. You want to trigger that workflow only when the subscription is created, not when it's updated or deleted. The "rising edge" of the subscription being created is important.
In an extraction process, you're extracting records, because the current state of records is what most REST APIs return. You can then use changes to records to generate events.
Backfilling
Your extraction process most likely will begin with a backfill. In a backfill, you sweep over every API table, pulling every record into your messaging system or database. You paginate an API endpoint from the beginning to the end.
The stream of records will be changing as you paginate through it. In order to backfill safely, you need to ensure you don't skip over any records.
You need to identify two things:
- A safe strategy for ordering the stream
- A safe strategy for traversing the stream
The best-case scenario is that the API has one of the following properties:
- a "default order" which starts at the beginning of time
- auto-incrementing IDs and allows for
order by ID asc
- another unique, increasing field you can
order by asc
(e.g. an order or invoice number)
And supports either filtering (so you can do where ID > ?
) or has pagination tokens.
Often, APIs don't have one of these properties, so you have to settle for another option. These other options can have some pitfalls.
One common pitfall is using a created_at
timestamp as the cursor for your backfill. It's tempting, and often supported, but has a severe limitation: what if lots of objects can be created at the same time? For example, what if you can bulk import Contacts into your CRM or an Order's Items are created en masse when the order is created?
If you naively paginate the stream with where created_at > ?
, you risk skipping items. If page sizes are 100, and the API returns a page with 100 items that all have the same created_at
– what do you do? You can't increment created_at
, because you don't know if there are 101+ items that have the same created_at
as your current cursor. And you can't re-use the same cursor, as you'll just get back the same page of results.
If the API supports pagination tokens, usually that means you can use created_at
safely. But it's worth checking how it handles the situation of lots of items being created with the same created_at
timestamp.
Another way to navigate this situation is if the API supports compound queries. If it does, you can use a compound query to traverse the stream in these situations.
Detecting changes
After the backfill, your extraction process needs to detect changes (or perform an "incremental" sync).
Ideally, the API supports an events stream you can use to detect changes. But these are rare. Webhooks can be helpful, but have some limitations.
Your best option is almost always to sort and paginate by updated_at asc
. Then, you'll want to take into account two design considerations:
1. Can I paginate safely when lots of records have the same updated_at
timestamp?
The same challenge specified in "Backfilling" applies here. What happens if hundreds of records can have the same updated_at
, but the page size maxes out at less than that?
2. Does updated_at
get updated for the changes I care about?
Sometimes, APIs don't update the updated_at
field for a record when you'd expect them to. For example, in Salesforce, a record's formula field might change, but that won't update the record's updated_at
.
What if you can't order and paginate by updated_at
?
Some APIs make change detection very difficult (looking at you, HubSpot Associations). These APIs provide little in the way of ordering the stream or paginating it. Or, sometimes, they return objects that don't even have a updated_at
property at all.
These APIs force you to continuously re-backfill them.
A cache can help alleviate some load on your system. You can store a map of IDs to record hashes in memory or in Redis. A record hash can be a simple MD5 of each record – just be sure that your method for hashing is consistent. As you pull records, you can filter for changes based on which hashes changed.
Webhooks
If the API you're extracting from supports webhooks, those can be a great option for detecting incremental updates.
There are three things you want to be mindful of when receiving webhooks:
- Webhooks may have reliability issues.
- You don't want your system to crash and "lose" the webhook.
- Webhooks may arrive out-of-order.
For #1, webhooks are hard to audit. If the provider is struggling to send them, it's harder to detect. If the provider fails to deliver some, it might be impossible for you to find out. Polling, in contrast, can be a lot "safer." If the API is strongly consistent, you rest easy at night knowing you're not "missing" anything.
For #2, you need to be mindful of the reliability of your own system. If you crash before processing a webhook, there's no telling if the provider will resend it.
This is why it's usually wise to have a very simple webhook receiver. The only job of the receiver is to put the webhook in a messaging system like Kafka for further processing. That way, it's unlikely that you'll drop a webhook.
For #3, the provider might send webhooks out-of-order. Even if they don't, there's no telling which of your webhook receivers will "finish first" in the instance where you're processing multiple webhooks for the same record at the same time.
To turn an out-of-order webhook stream into a consistently ordered one, you'll need to use a data store. For example, you can keep track of the last updated_at
you saw for a given record ID
in Postgres. When you receive a webhook, you can lock that record's row, check if the updated_at
of the webhook is newer, and then let the webhook through if it is.
Last, it’s worth mentioning that most APIs only support webhooks for a subset of all objects. In many extractions, the team will still set up a foundation of polling for changes (to cover all objects needed). Then use webhooks where supported to augment the speed of the sync.
Generating events
So far, I've outlined how to extract records. After you've built record extraction, you can build event generation on top of that.
To generate events, you'll typically need to keep track of the current state of records somewhere. That's because an event indicates a specific change to a record. And APIs usually just return the latest version of a record.
For example, let's say you want to generate an event whenever a Stripe subscription's value passes $10,000. You'll need some persistent local state that indicates what its last value was. When the value exceeds your threshold, you can update your local state and generate an event. This ensures that you only generate an event on the "rising edge" – when the subscription first passes the threshold.
It's a good idea to keep event generation at the edge of your system, near where you're performing the extraction. That way, downstream consumers don't need to bother with caching records and deducing changes on their own.
Postgres is a great fit for this. If you cache a record or a partial record in Postgres, every time you receive the latest version of a record from your extraction process you can perform a procedure like this:
- Begin a transaction
- Select the record, including the
for update
clause - Upsert the record
For example:
begin;
-- select the stripe subscription record with for update to lock the record
select * from stripe_subscriptions
where id = $1
for update;
-- upsert the stripe subscription record
-- assuming the table has columns id, value, and other necessary fields
insert into stripe_subscriptions (id, created_at, status, [...columns])
values ($1, $2, $3, …)
on conflict (id)
do update set value = excluded.value, created_at = excluded.created_at
status = excluded.status, [...];
At this point, you have the last version of the record in-memory. And Postgres has the latest version of the record, albeit in an uncommitted transaction. Now, you can perform your business logic to determine if you should generate an event. After generating your event and pushing it to your messaging system, you can commit the transaction.
This approach has a high degree of safety. Because we "lock" the record before generating the event, we ensure that your system won't produce two of the same events. And doing this in a transaction means that if we fail to produce the event to our messaging system, we'll rollback the change in Postgres. This means that when the record's update is picked up again, we'll attempt to generate the event again.
Conclusion
As you can see, when building an extraction pipeline, the devil's in the details. Hopefully these design patterns help you build a pipeline that is both accurate and efficient.
A lot of people confuse the idea of an API integration with an extraction process. As I hope this post makes clear, extraction is often just the first step. Once your data is flowing from the API, then you can build your integration: triggering off of events and caching data in your database.