8.1. Best Practices for Third-Party Syncing
In this chapter, you'll learn about best practices for syncing data between Medusa and third-party systems.
Common Issues with Third-Party Syncing Implementation#
Syncing data between Medusa and external systems is a common use case for commerce applications. For example, if your commerce ecosystem includes an external CMS or inventory management system, you may need to sync product data between Medusa and these systems.
However, how you implement third-party syncing can significantly impact performance and memory usage. Syncing large amounts of data without proper handling can lead to issues like:
- Out-of-memory (OOM) errors.
- Slow syncs that block the event loop and degrade application performance.
- Application crashes in production.
This chapter covers best practices to avoid these issues when syncing data between Medusa and third-party systems. These best practices are general programming patterns that aren't Medusa-specific, but they're essential for building robust third-party syncs.
How to Sync Data Between Systems#
Before diving into best practices, it's important to understand the general approach for syncing data between Medusa and third-party systems. Third-party syncing typically involves two main steps:
- Define the syncing logic in a workflow.
- Execute the workflow from either a scheduled job or a subscriber.
Define Syncing Logic in a Workflow#
Workflows are special functions designed for long-running, asynchronous tasks. They provide features like compensation, retries, and async execution that are essential for reliable data syncing.
When defining your syncing logic, such as pushing product data to a third-party service or pulling inventory data into Medusa, you should define a workflow that encapsulates this logic.
Medusa also exposes built-in workflows for common commerce operations, like creating or updating products, that you can leverage in your syncing logic.
For example, you can use Medusa's built-in batchProductsWorkflow to create or update products in batches:
1import { MedusaContainer } from "@medusajs/framework/types"2import { batchProductsWorkflow } from "@medusajs/medusa/core-flows"3 4export default async function syncProductsJob(container: MedusaContainer) {5 // ...6 await batchProductsWorkflow(container).run({7 input: {8 create: productsToCreate,9 update: productsToUpdate,10 },11 })12}
Execute Workflows from Scheduled Jobs or Subscribers#
After defining your syncing logic in a workflow, or choosing a Medusa workflow to use, you need to execute it based on your syncing requirements:
- Scheduled Jobs: Use scheduled jobs for periodic syncs, such as syncing products daily or inventory hourly. Scheduled jobs run at specified intervals and can trigger your workflow to perform the sync.
- Subscribers: Use subscribers for event-driven syncs, such as syncing data when a product is updated in Medusa or when an order is placed. Subscribers listen for specific events and can trigger your workflow in response.
If you've set up server and worker instances, the worker will handle the execution. So, the syncing execution won't block the main server process.
In the scheduled job or subscriber, you retrieve the data to be synced from the third-party service or from Medusa itself. Then, you execute the workflow, passing it the data to be synced.
For example, the following scheduled job fetches products from a third-party service and syncs them to Medusa using a workflow:
1import { MedusaContainer } from "@medusajs/framework/types"2import { batchProductsWorkflow } from "@medusajs/medusa/core-flows"3 4export default async function syncProductsJob(container: MedusaContainer) {5 const productStream = streamProductsFromApi()6 const batchedProducts = batchProducts(productStream, 50)7 8 for await (const batch of batchedProducts) {9 const { 10 productsToCreate, 11 productsToUpdate,12 } = await prepareProducts(batch)13 14 await batchProductsWorkflow(container).run({15 input: {16 create: productsToCreate,17 update: productsToUpdate,18 },19 })20 }21}
You'll learn about best practices for implementing the data fetching, batching, and preparation logic in the next sections.
Syncing Best Practices#
The following sections cover best practices for implementing third-party syncing in a way that minimizes memory usage, maximizes performance, and ensures reliability.
Full Scheduled Job Code
Stream Data from External APIs#
When retrieving data from external APIs using fetch, the common approach is to fetch the data and load it entirely into memory using response.json(). For example:
However, this can lead to high memory usage and performance issues, especially with large datasets.
Instead, process data in streams or batches. This approach allows you to handle large datasets without loading everything into memory at once. You can use libraries like stream-json to parse JSON data incrementally as it's received.

First, install the stream-json library in your Medusa project:
Then, use it in your scheduled job or subscriber to stream and parse JSON data from the third-party service:
1import { Readable } from "stream"2import { parser } from "stream-json"3import { pick } from "stream-json/filters/Pick"4import { streamArray } from "stream-json/streamers/StreamArray"5import { chain } from "stream-chain"6 7const API_FETCH_SIZE = 2008 9async function* streamProductsFromApi() {10 let offset = 011 let hasMore = true12 13 while (hasMore) {14 const url = `https://third-party-api.com/products?limit=${API_FETCH_SIZE}&offset=${offset}`15 // TODO: Add retry with exponential backoff16 const response = await fetch(url)17 18 // Convert web ReadableStream to Node.js Readable19 const nodeStream = Readable.fromWeb(response.body as any)20 21 // Create a streaming JSON parser pipeline that:22 // 1. Parses JSON incrementally23 // 2. Picks only the "products" array24 // 3. Streams each array item individually25 const pipeline = chain([26 nodeStream,27 parser(),28 pick({ filter: "products" }),29 streamArray(),30 ])31 32 let productCount = 033 34 try {35 // Yield each product one at a time36 for await (const { value } of pipeline) {37 yield value38 productCount++39 40 // TODO: Yield to event loop periodically to prevent blocking41 }42 } catch (streamError: any) {43 // TODO: Handle stream errors44 }45 46 // If the products are less than expected, there are no more products47 if (productCount < API_FETCH_SIZE) {48 hasMore = false49 } else {50 offset += productCount51 }52 }53}54 55export default async function syncProductsJob(container: MedusaContainer) {56 const productStream = streamProductsFromApi()57 // ...58}
In the above snippet, you set up a streaming JSON parser that processes the API response incrementally. Instead of loading the entire response into memory, the parser yields each product one at a time.
This approach significantly reduces memory usage, as you only hold JSON tokens and the current product being parsed, rather than the entire dataset.
Handle Stream Errors
When working with streams, it's important to handle potential errors that may occur during streaming, such as network interruptions. Catch these errors and implement retry logic or error reporting as needed.
For example:
1async function* streamProductsFromApi() {2 // Initial setup...3 while (hasMore) {4 // Setup pipeline...5 try {6 for await (const { value } of pipeline) {7 yield value8 // ...9 }10 } catch (streamError: any) {11 // Handle stream errors (socket closed mid-stream)12 if (13 streamError.code === "UND_ERR_SOCKET" || 14 streamError.code === "ECONNRESET"15 ) {16 throw new MedusaError(17 MedusaError.Types.UNEXPECTED_STATE,18 `Stream interrupted after ${productCount} products: ${streamError.message}`19 )20 }21 throw streamError22 }23 // Update pagination...24 }25}26 27export default async function syncProductsJob(container: MedusaContainer) {28 // Initial setup...29 30 const productStream = streamProductsFromApi()31 // sync products...32}
In the above snippet, you catch stream errors and check for specific error codes that indicate transient network issues. You can then decide how to handle these errors, such as retrying the fetch or logging the error.
Retrieve Only Necessary Fields#
A common performance pitfall when syncing data is retrieving more fields than necessary from third-party services or Medusa's Query. This leads to increased data size, slower performance, and higher memory usage.
When retrieving data from third-party services or with Medusa's Query, only request the necessary fields. Then, to efficiently group existing data for updates, use a Map for quick lookups.
For example, don't retrieve all product fields like this:
Instead, only request the fields you need:
1export default async function syncProductsJob(container: MedusaContainer) {2 const query = container.resolve(ContainerRegistrationKeys.QUERY)3 4 // Initial setup...5 6 const productStream = streamProductsFromApi()7 const batchedProducts = batchProducts(productStream, PROCESS_BATCH_SIZE)8 9 for await (const batch of batchedProducts) {10 // Increment data...11 12 // Extract external IDs from this batch to look up in Medusa13 const externalIds = batch.map((p) => p.id)14 15 // Query Medusa for products matching these external IDs16 const { data: existingProducts } = await query.graph(17 {18 entity: "product",19 fields: ["id", "updated_at", "external_id"],20 filters: {21 external_id: externalIds,22 },23 }24 )25 26 // Build a map for quick lookup27 const existingByExternalId = new Map(28 existingProducts.map((p) => [p.external_id, { 29 id: p.id, 30 updatedAt: p.updated_at,31 }])32 )33 34 // Process batch and sync to Medusa...35 }36}
In the above snippet, after retrieving a batch of products from the external API, you query Medusa's products to find existing products that match the external IDs. You only request the necessary fields for this operation.
Then, you build a map existingByExternalId that enables efficient lookups when determining whether to create or update products.
This approach minimizes the amount of data transferred and processed, leading to better performance and lower memory usage.
Use Async Generators#
Async generators are a powerful feature in JavaScript that allow you to define asynchronous iterators. They're particularly useful for processing large datasets incrementally, as they enable you to yield data items one at a time without loading everything into memory.
When retrieving large datasets from third-party services, use async generators to yield data items one at a time. This allows you to process data incrementally without loading everything into memory.

For example:
1async function* streamProductsFromApi() {2 // Initial setup...3 4 while (hasMore) {5 // Setup pipeline...6 7 try {8 for await (const { value } of pipeline) {9 yield value // Yield one product at a time10 11 // TODO: Yield to event loop periodically to prevent blocking...12 }13 } catch (streamError: any) {14 // Handle stream errors...15 }16 17 // Update pagination...18 }19}20 21async function* batchProducts(22 products: AsyncGenerator,23 batchSize: number24): AsyncGenerator<any[]> {25 let batch: any[] = []26 27 for await (const product of products) {28 batch.push(product)29 30 if (batch.length >= batchSize) {31 yield batch32 // Release reference for GC33 batch = []34 }35 }36 37 // Yield remaining products38 if (batch.length > 0) {39 yield batch40 }41}42 43export default async function syncProductsJob(container: MedusaContainer) {44 // Initial setup...45 46 const productStream = streamProductsFromApi()47 const batchedProducts = batchProducts(productStream, PROCESS_BATCH_SIZE)48 49 for await (const batch of batchedProducts) {50 // Process batch and sync to Medusa...51 }52}
In the above snippet, you define two async generators:
streamProductsFromApi: Yields individual products from the third-party service one at a time.batchProducts: Takes an async generator of products and yields them in batches of a specified size.
Then, in your scheduled job, you consume these generators using for await...of loops to process product batches incrementally.
This approach keeps memory usage low, as you only hold the current product or batch in memory at any given time.
Release References for Garbage Collection
When using async generators, it's important to release references to processed data to allow for garbage collection. This keeps memory usage low, especially when processing large datasets.
For example, the batchProducts generator demonstrates this:
1async function* batchProducts(2 products: AsyncGenerator,3 batchSize: number4): AsyncGenerator<any[]> {5 let batch: any[] = []6 7 for await (const product of products) {8 batch.push(product)9 10 if (batch.length >= batchSize) {11 yield batch12 // Release reference for GC13 batch = []14 }15 }16 17 // Yield remaining products18 if (batch.length > 0) {19 yield batch20 }21}
Handle Backpressure#
Backpressure is a mechanism that manages the flow of data between producers (API fetches) and consumers (data processing workflows). Without proper backpressure handling, fast API fetches can overwhelm the processing workflow, leading to high memory usage and potential crashes.
Handle backpressure by controlling the pace at which data is processed. One effective way to do this in JavaScript is using for await...of loops, which naturally provide backpressure by waiting for each iteration to complete before fetching the next item.

For example, you can implement backpressure handling in your scheduled job:
1export default async function syncProductsJob(container: MedusaContainer) {2 // Initial setup...3 4 const productStream = streamProductsFromApi()5 const batchedProducts = batchProducts(productStream, PROCESS_BATCH_SIZE)6 7 for await (const batch of batchedProducts) {8 // Process batch and sync to Medusa...9 // The next batch won't be fetched until processing of the current batch is complete10 }11}
In the above snippet, the for await...of loop processes each batch of products. This ensures that the next batch isn't fetched until the current batch has been fully processed, effectively implementing backpressure.
This approach keeps memory usage controlled and prevents the system from being overwhelmed by incoming data, ensuring stability during large data syncs.
Retry Errors with Exponential Backoff#
Errors can occur during data syncing due to transient network issues, rate limiting, or temporary unavailability of third-party services. To improve reliability, implement retry logic with exponential backoff for transient errors.
For example, implement a custom function that fetches data with retry logic, then use it to fetch data from the third-party service:
1const MAX_RETRIES = 32const RETRY_DELAY_MS = 10003 4async function fetchWithRetry(5 url: string,6 retries = MAX_RETRIES7): Promise<Response> {8 let lastError: Error | null = null9 10 for (let attempt = 1; attempt <= retries; attempt++) {11 try {12 // TODO: Add request timeout...13 const response = await fetch(url)14 15 if (!response.ok) {16 throw new Error(`HTTP ${response.status}: ${response.statusText}`)17 }18 19 return response20 } catch (error: any) {21 lastError = error22 const isRetryable =23 error.code === "UND_ERR_SOCKET" ||24 error.code === "ECONNREFUSED" ||25 error.code === "ECONNRESET" ||26 error.code === "ETIMEDOUT" ||27 error.name === "AbortError"28 29 if (isRetryable && attempt < retries) {30 // Exponential backoff: 1s → 2s → 4s31 const delay = RETRY_DELAY_MS * Math.pow(2, attempt - 1)32 await new Promise((resolve) => setTimeout(resolve, delay))33 } else {34 break35 }36 }37 }38 39 throw lastError40}41 42async function* streamProductsFromApi() {43 // Initial setup...44 45 while (hasMore) {46 const url = `https://third-party-api.com/products?limit=${API_FETCH_SIZE}&offset=${offset}`47 48 const response = await fetchWithRetry(url)49 // TODO Setup pipeline...50 }51}52 53export default async function syncProductsJob(container: MedusaContainer) {54 // Initial setup...55 56 const productStream = streamProductsFromApi()57 // Process and sync products...58}
In the above snippet, the fetchWithRetry function attempts to fetch a URL multiple times if a retryable error occurs. It uses exponential backoff to increase the delay between retries, reducing the load on the third-party service.
This approach improves the reliability of your data syncing process by handling transient errors gracefully.
Set Request Timeouts#
When making API calls to third-party services, always set request timeouts. This prevents the event loop from being blocked indefinitely if the third-party service is unresponsive.
For example, set a request timeout on a fetch call using the AbortController:
1const FETCH_TIMEOUT_MS = 300002 3async function fetchWithRetry(4 url: string,5 retries = MAX_RETRIES6): Promise<Response> {7 const lastError: Error | null = null8 9 for (let attempt = 1; attempt <= retries; attempt++) {10 try {11 const controller = new AbortController()12 const timeoutId = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS)13 14 const response = await fetch(url, {15 signal: controller.signal,16 // Keep connection alive for efficiency with multiple requests17 headers: {18 "Connection": "keep-alive",19 },20 })21 22 clearTimeout(timeoutId)23 24 if (!response.ok) {25 throw new Error(`HTTP ${response.status}: ${response.statusText}`)26 }27 28 return response29 } catch (error: any) {30 // Retry logic with exponential backoff...31 }32 }33 34 throw lastError35}36 37async function* streamProductsFromApi() {38 // initial setup...39 40 while (hasMore) {41 const url = `https://third-party-api.com/products?limit=${API_FETCH_SIZE}&offset=${offset}`42 43 const response = await fetchWithRetry(url)44 // Setup streaming pipeline...45 }46}47 48export default async function syncProductsJob(container: MedusaContainer) {49 // Initial setup...50 51 const productStream = streamProductsFromApi()52 // Process and sync products...53}
In the above snippet, an AbortController sets a timeout for the fetch request. If the request takes longer than the specified timeout, it's aborted, preventing indefinite blocking of the event loop.
This approach ensures that your application remains responsive even when third-party services are slow or unresponsive.
Yield to the Event Loop#
When processing large amounts of data in loops, periodically yield control back to the event loop. This prevents blocking the event loop for extended periods, which can lead to unresponsiveness in your application. For example, it may prevent other scheduled jobs or subscribers from executing.
Yield to the event loop using setImmediate. For example:
1async function* streamProductsFromApi() {2 // Initial setup...3 4 while (hasMore) {5 // Setup pipeline...6 7 try {8 // Yield each product one at a time9 for await (const { value } of pipeline) {10 yield value11 productCount++12 13 // Yield to event loop periodically to prevent blocking14 if (productCount % 100 === 0) {15 await new Promise((resolve) => setImmediate(resolve))16 }17 }18 } catch (streamError: any) {19 // Handle stream errors...20 }21 22 // If the products are less than expected, there are no more products23 if (productCount < API_FETCH_SIZE) {24 hasMore = false25 } else {26 offset += productCount27 }28 }29}30 31export default async function syncProductsJob(container: MedusaContainer) {32 const productStream = streamProductsFromApi()33 // Process and sync products...34}
In the above snippet, the code yields to the event loop every 100 products processed. This allows other tasks in the event loop to execute, improving overall responsiveness.