8.1. Best Practices for Third-Party Syncing

In this chapter, you'll learn about best practices for syncing data between Medusa and third-party systems.

Common Issues with Third-Party Syncing Implementation#

Syncing data between Medusa and external systems is a common use case for commerce applications. For example, if your commerce ecosystem includes an external CMS or inventory management system, you may need to sync product data between Medusa and these systems.

However, how you implement third-party syncing can significantly impact performance and memory usage. Syncing large amounts of data without proper handling can lead to issues like:

  • Out-of-memory (OOM) errors.
  • Slow syncs that block the event loop and degrade application performance.
  • Application crashes in production.

This chapter covers best practices to avoid these issues when syncing data between Medusa and third-party systems. These best practices are general programming patterns that aren't Medusa-specific, but they're essential for building robust third-party syncs.


How to Sync Data Between Systems#

Before diving into best practices, it's important to understand the general approach for syncing data between Medusa and third-party systems. Third-party syncing typically involves two main steps:

  1. Define the syncing logic in a workflow.
  2. Execute the workflow from either a scheduled job or a subscriber.

Define Syncing Logic in a Workflow#

Workflows are special functions designed for long-running, asynchronous tasks. They provide features like compensation, retries, and async execution that are essential for reliable data syncing.

When defining your syncing logic, such as pushing product data to a third-party service or pulling inventory data into Medusa, you should define a workflow that encapsulates this logic.

Medusa also exposes built-in workflows for common commerce operations, like creating or updating products, that you can leverage in your syncing logic.

For example, you can use Medusa's built-in batchProductsWorkflow to create or update products in batches:

src/jobs/sync-products.ts
1import { MedusaContainer } from "@medusajs/framework/types"2import { batchProductsWorkflow } from "@medusajs/medusa/core-flows"3
4export default async function syncProductsJob(container: MedusaContainer) {5  // ...6  await batchProductsWorkflow(container).run({7    input: {8      create: productsToCreate,9      update: productsToUpdate,10    },11  })12}

Execute Workflows from Scheduled Jobs or Subscribers#

After defining your syncing logic in a workflow, or choosing a Medusa workflow to use, you need to execute it based on your syncing requirements:

  • Scheduled Jobs: Use scheduled jobs for periodic syncs, such as syncing products daily or inventory hourly. Scheduled jobs run at specified intervals and can trigger your workflow to perform the sync.
  • Subscribers: Use subscribers for event-driven syncs, such as syncing data when a product is updated in Medusa or when an order is placed. Subscribers listen for specific events and can trigger your workflow in response.

If you've set up server and worker instances, the worker will handle the execution. So, the syncing execution won't block the main server process.

Tip: Cloud creates server and worker instances for your project automatically, so you don't need to set this up manually.

In the scheduled job or subscriber, you retrieve the data to be synced from the third-party service or from Medusa itself. Then, you execute the workflow, passing it the data to be synced.

For example, the following scheduled job fetches products from a third-party service and syncs them to Medusa using a workflow:

src/jobs/sync-products.ts
1import { MedusaContainer } from "@medusajs/framework/types"2import { batchProductsWorkflow } from "@medusajs/medusa/core-flows"3
4export default async function syncProductsJob(container: MedusaContainer) {5  const productStream = streamProductsFromApi()6  const batchedProducts = batchProducts(productStream, 50)7
8  for await (const batch of batchedProducts) {9    const { 10      productsToCreate, 11      productsToUpdate,12    } = await prepareProducts(batch)13    14    await batchProductsWorkflow(container).run({15      input: {16        create: productsToCreate,17        update: productsToUpdate,18      },19    })20  }21}

You'll learn about best practices for implementing the data fetching, batching, and preparation logic in the next sections.


Syncing Best Practices#

The following sections cover best practices for implementing third-party syncing in a way that minimizes memory usage, maximizes performance, and ensures reliability.

Full Scheduled Job Code

Stream Data from External APIs#

When retrieving data from external APIs using fetch, the common approach is to fetch the data and load it entirely into memory using response.json(). For example:

Code
1const response = await fetch("https://third-party-api.com/products")2// Load entire response into memory3const data = await response.json()

However, this can lead to high memory usage and performance issues, especially with large datasets.

Instead, process data in streams or batches. This approach allows you to handle large datasets without loading everything into memory at once. You can use libraries like stream-json to parse JSON data incrementally as it's received.

Diagram showcasing object in memory when loading entire JSON response vs streaming JSON parsing

First, install the stream-json library in your Medusa project:

Then, use it in your scheduled job or subscriber to stream and parse JSON data from the third-party service:

src/jobs/sync-products.ts
1import { Readable } from "stream"2import { parser } from "stream-json"3import { pick } from "stream-json/filters/Pick"4import { streamArray } from "stream-json/streamers/StreamArray"5import { chain } from "stream-chain"6
7const API_FETCH_SIZE = 2008
9async function* streamProductsFromApi() {10  let offset = 011  let hasMore = true12  13  while (hasMore) {14    const url = `https://third-party-api.com/products?limit=${API_FETCH_SIZE}&offset=${offset}`15    // TODO: Add retry with exponential backoff16    const response = await fetch(url)17    18    // Convert web ReadableStream to Node.js Readable19    const nodeStream = Readable.fromWeb(response.body as any)20
21    // Create a streaming JSON parser pipeline that:22    // 1. Parses JSON incrementally23    // 2. Picks only the "products" array24    // 3. Streams each array item individually25    const pipeline = chain([26      nodeStream,27      parser(),28      pick({ filter: "products" }),29      streamArray(),30    ])31
32    let productCount = 033
34    try {35      // Yield each product one at a time36      for await (const { value } of pipeline) {37        yield value38        productCount++39
40        // TODO: Yield to event loop periodically to prevent blocking41      }42    } catch (streamError: any) {43      // TODO: Handle stream errors44    }45
46    // If the products are less than expected, there are no more products47    if (productCount < API_FETCH_SIZE) {48      hasMore = false49    } else {50      offset += productCount51    }52  }53}54
55export default async function syncProductsJob(container: MedusaContainer) {56  const productStream = streamProductsFromApi()57  // ...58}

In the above snippet, you set up a streaming JSON parser that processes the API response incrementally. Instead of loading the entire response into memory, the parser yields each product one at a time.

This approach significantly reduces memory usage, as you only hold JSON tokens and the current product being parsed, rather than the entire dataset.

Handle Stream Errors

When working with streams, it's important to handle potential errors that may occur during streaming, such as network interruptions. Catch these errors and implement retry logic or error reporting as needed.

For example:

src/jobs/sync-products.ts
1async function* streamProductsFromApi() {2  // Initial setup...3  while (hasMore) {4    // Setup pipeline...5    try {6      for await (const { value } of pipeline) {7        yield value8        // ...9      }10    } catch (streamError: any) {11      // Handle stream errors (socket closed mid-stream)12      if (13        streamError.code === "UND_ERR_SOCKET" || 14        streamError.code === "ECONNRESET"15      ) {16        throw new MedusaError(17          MedusaError.Types.UNEXPECTED_STATE,18          `Stream interrupted after ${productCount} products: ${streamError.message}`19        )20      }21      throw streamError22    }23    // Update pagination...24  }25}26
27export default async function syncProductsJob(container: MedusaContainer) {28  // Initial setup...29
30  const productStream = streamProductsFromApi()31  // sync products...32}

In the above snippet, you catch stream errors and check for specific error codes that indicate transient network issues. You can then decide how to handle these errors, such as retrying the fetch or logging the error.

Retrieve Only Necessary Fields#

A common performance pitfall when syncing data is retrieving more fields than necessary from third-party services or Medusa's Query. This leads to increased data size, slower performance, and higher memory usage.

When retrieving data from third-party services or with Medusa's Query, only request the necessary fields. Then, to efficiently group existing data for updates, use a Map for quick lookups.

For example, don't retrieve all product fields like this:

Code
1// DON'T2const { data: existingProducts } = await query.graph(3  {4    entity: "product",5    fields: ["*"],6    filters: {7      external_id: externalIds,8    },9  }10)

Instead, only request the fields you need:

src/jobs/sync-products.ts
1export default async function syncProductsJob(container: MedusaContainer) {2  const query = container.resolve(ContainerRegistrationKeys.QUERY)3  4  // Initial setup...5
6  const productStream = streamProductsFromApi()7  const batchedProducts = batchProducts(productStream, PROCESS_BATCH_SIZE)8
9  for await (const batch of batchedProducts) {10    // Increment data...11
12    // Extract external IDs from this batch to look up in Medusa13    const externalIds = batch.map((p) => p.id)14
15    // Query Medusa for products matching these external IDs16    const { data: existingProducts } = await query.graph(17      {18        entity: "product",19        fields: ["id", "updated_at", "external_id"],20        filters: {21          external_id: externalIds,22        },23      }24    )25
26    // Build a map for quick lookup27    const existingByExternalId = new Map(28      existingProducts.map((p) => [p.external_id, { 29        id: p.id, 30        updatedAt: p.updated_at,31      }])32    )33
34    // Process batch and sync to Medusa...35  }36}

In the above snippet, after retrieving a batch of products from the external API, you query Medusa's products to find existing products that match the external IDs. You only request the necessary fields for this operation.

Then, you build a map existingByExternalId that enables efficient lookups when determining whether to create or update products.

This approach minimizes the amount of data transferred and processed, leading to better performance and lower memory usage.

Use Async Generators#

Async generators are a powerful feature in JavaScript that allow you to define asynchronous iterators. They're particularly useful for processing large datasets incrementally, as they enable you to yield data items one at a time without loading everything into memory.

When retrieving large datasets from third-party services, use async generators to yield data items one at a time. This allows you to process data incrementally without loading everything into memory.

Diagram showcasing how batches are loaded into memories one at a time with async generators

For example:

src/jobs/sync-products.ts
1async function* streamProductsFromApi() {2  // Initial setup...3
4  while (hasMore) {5    // Setup pipeline...6
7    try {8      for await (const { value } of pipeline) {9        yield value // Yield one product at a time10
11        // TODO: Yield to event loop periodically to prevent blocking...12      }13    } catch (streamError: any) {14      // Handle stream errors...15    }16
17    // Update pagination...18  }19}20
21async function* batchProducts(22  products: AsyncGenerator,23  batchSize: number24): AsyncGenerator<any[]> {25  let batch: any[] = []26
27  for await (const product of products) {28    batch.push(product)29
30    if (batch.length >= batchSize) {31      yield batch32      // Release reference for GC33      batch = []34    }35  }36
37  // Yield remaining products38  if (batch.length > 0) {39    yield batch40  }41}42
43export default async function syncProductsJob(container: MedusaContainer) {44  // Initial setup...45
46  const productStream = streamProductsFromApi()47  const batchedProducts = batchProducts(productStream, PROCESS_BATCH_SIZE)48
49  for await (const batch of batchedProducts) {50    // Process batch and sync to Medusa...51  }52}

In the above snippet, you define two async generators:

  1. streamProductsFromApi: Yields individual products from the third-party service one at a time.
  2. batchProducts: Takes an async generator of products and yields them in batches of a specified size.

Then, in your scheduled job, you consume these generators using for await...of loops to process product batches incrementally.

This approach keeps memory usage low, as you only hold the current product or batch in memory at any given time.

Release References for Garbage Collection

When using async generators, it's important to release references to processed data to allow for garbage collection. This keeps memory usage low, especially when processing large datasets.

For example, the batchProducts generator demonstrates this:

src/jobs/sync-products.ts
1async function* batchProducts(2  products: AsyncGenerator,3  batchSize: number4): AsyncGenerator<any[]> {5  let batch: any[] = []6
7  for await (const product of products) {8    batch.push(product)9
10    if (batch.length >= batchSize) {11      yield batch12      // Release reference for GC13      batch = []14    }15  }16
17  // Yield remaining products18  if (batch.length > 0) {19    yield batch20  }21}

Handle Backpressure#

Backpressure is a mechanism that manages the flow of data between producers (API fetches) and consumers (data processing workflows). Without proper backpressure handling, fast API fetches can overwhelm the processing workflow, leading to high memory usage and potential crashes.

Handle backpressure by controlling the pace at which data is processed. One effective way to do this in JavaScript is using for await...of loops, which naturally provide backpressure by waiting for each iteration to complete before fetching the next item.

Diagram showcasing timeline from start to end of scheduled job execution where batches are fetched and synced one at a time

For example, you can implement backpressure handling in your scheduled job:

src/jobs/sync-products.ts
1export default async function syncProductsJob(container: MedusaContainer) {2  // Initial setup...3
4  const productStream = streamProductsFromApi()5  const batchedProducts = batchProducts(productStream, PROCESS_BATCH_SIZE)6
7  for await (const batch of batchedProducts) {8    // Process batch and sync to Medusa...9    // The next batch won't be fetched until processing of the current batch is complete10  }11}

In the above snippet, the for await...of loop processes each batch of products. This ensures that the next batch isn't fetched until the current batch has been fully processed, effectively implementing backpressure.

This approach keeps memory usage controlled and prevents the system from being overwhelmed by incoming data, ensuring stability during large data syncs.

Retry Errors with Exponential Backoff#

Errors can occur during data syncing due to transient network issues, rate limiting, or temporary unavailability of third-party services. To improve reliability, implement retry logic with exponential backoff for transient errors.

For example, implement a custom function that fetches data with retry logic, then use it to fetch data from the third-party service:

src/jobs/sync-products.ts
1const MAX_RETRIES = 32const RETRY_DELAY_MS = 10003
4async function fetchWithRetry(5  url: string,6  retries = MAX_RETRIES7): Promise<Response> {8  let lastError: Error | null = null9
10  for (let attempt = 1; attempt <= retries; attempt++) {11    try {12      // TODO: Add request timeout...13      const response = await fetch(url)14
15      if (!response.ok) {16        throw new Error(`HTTP ${response.status}: ${response.statusText}`)17      }18
19      return response20    } catch (error: any) {21      lastError = error22      const isRetryable =23        error.code === "UND_ERR_SOCKET" ||24        error.code === "ECONNREFUSED" ||25        error.code === "ECONNRESET" ||26        error.code === "ETIMEDOUT" ||27        error.name === "AbortError"28
29      if (isRetryable && attempt < retries) {30        // Exponential backoff: 1s → 2s → 4s31        const delay = RETRY_DELAY_MS * Math.pow(2, attempt - 1)32        await new Promise((resolve) => setTimeout(resolve, delay))33      } else {34        break35      }36    }37  }38
39  throw lastError40}41
42async function* streamProductsFromApi() {43  // Initial setup...44
45  while (hasMore) {46    const url = `https://third-party-api.com/products?limit=${API_FETCH_SIZE}&offset=${offset}`47
48    const response = await fetchWithRetry(url)49    // TODO Setup pipeline...50  }51}52
53export default async function syncProductsJob(container: MedusaContainer) {54  // Initial setup...55
56  const productStream = streamProductsFromApi()57  // Process and sync products...58}

In the above snippet, the fetchWithRetry function attempts to fetch a URL multiple times if a retryable error occurs. It uses exponential backoff to increase the delay between retries, reducing the load on the third-party service.

This approach improves the reliability of your data syncing process by handling transient errors gracefully.

Set Request Timeouts#

When making API calls to third-party services, always set request timeouts. This prevents the event loop from being blocked indefinitely if the third-party service is unresponsive.

For example, set a request timeout on a fetch call using the AbortController:

src/jobs/sync-products.ts
1const FETCH_TIMEOUT_MS = 300002
3async function fetchWithRetry(4  url: string,5  retries = MAX_RETRIES6): Promise<Response> {7  const lastError: Error | null = null8
9  for (let attempt = 1; attempt <= retries; attempt++) {10    try {11      const controller = new AbortController()12      const timeoutId = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS)13
14      const response = await fetch(url, {15        signal: controller.signal,16        // Keep connection alive for efficiency with multiple requests17        headers: {18          "Connection": "keep-alive",19        },20      })21
22      clearTimeout(timeoutId)23
24      if (!response.ok) {25        throw new Error(`HTTP ${response.status}: ${response.statusText}`)26      }27
28      return response29    } catch (error: any) {30      // Retry logic with exponential backoff...31    }32  }33
34  throw lastError35}36
37async function* streamProductsFromApi() {38  // initial setup...39
40  while (hasMore) {41    const url = `https://third-party-api.com/products?limit=${API_FETCH_SIZE}&offset=${offset}`42
43    const response = await fetchWithRetry(url)44    // Setup streaming pipeline...45  }46}47
48export default async function syncProductsJob(container: MedusaContainer) {49  // Initial setup...50
51  const productStream = streamProductsFromApi()52  // Process and sync products...53}

In the above snippet, an AbortController sets a timeout for the fetch request. If the request takes longer than the specified timeout, it's aborted, preventing indefinite blocking of the event loop.

This approach ensures that your application remains responsive even when third-party services are slow or unresponsive.

Yield to the Event Loop#

When processing large amounts of data in loops, periodically yield control back to the event loop. This prevents blocking the event loop for extended periods, which can lead to unresponsiveness in your application. For example, it may prevent other scheduled jobs or subscribers from executing.

Yield to the event loop using setImmediate. For example:

src/jobs/sync-products.ts
1async function* streamProductsFromApi() {2  // Initial setup...3  4  while (hasMore) {5    // Setup pipeline...6
7    try {8      // Yield each product one at a time9      for await (const { value } of pipeline) {10        yield value11        productCount++12
13        // Yield to event loop periodically to prevent blocking14        if (productCount % 100 === 0) {15          await new Promise((resolve) => setImmediate(resolve))16        }17      }18    } catch (streamError: any) {19      // Handle stream errors...20    }21
22    // If the products are less than expected, there are no more products23    if (productCount < API_FETCH_SIZE) {24      hasMore = false25    } else {26      offset += productCount27    }28  }29}30
31export default async function syncProductsJob(container: MedusaContainer) {32  const productStream = streamProductsFromApi()33  // Process and sync products...34}

In the above snippet, the code yields to the event loop every 100 products processed. This allows other tasks in the event loop to execute, improving overall responsiveness.

Was this chapter helpful?
Ask Anything
Ask any questions about Medusa. Get help with your development.
You can also use the Medusa MCP server in Cursor, VSCode, etc...
FAQ
What is Medusa?
How can I create a module?
How can I create a data model?
How do I create a workflow?
How can I extend a data model in the Product Module?
Recipes
How do I build a marketplace with Medusa?
How do I build digital products with Medusa?
How do I build subscription-based purchases with Medusa?
What other recipes are available in the Medusa documentation?
Chat is cleared on refresh
Line break