Skip to content

Flow: Add/Update/Delete Documents (Product Indexing)

How products are written to a Marqo ecommerce search index via direct API calls. This is the primary data ingestion path for the ecommerce platform.

Request Path

graph TD
    A["Client (e.g. customer service)"]
    B["Ecom API Cloudflare Worker ({env}-ecom-api)"]

    subgraph lambda["Ecom API Lambda ({env}-ShopifyAppAdminFunction)"]
        C["Product transform → S3 (JSONL chunks of max 1000 docs)"]
        D["SQS message per chunk"]
    end

    Q["Queue ({env}-{shop_id}-doc-queue)"]

    subgraph indexer["Ecom Indexer Lambda ({env}-EcomIndexerFunction)"]
        E["Download chunks from S3"]
        F["Validation, write protection check, internal field enrichment"]
        G["Marqo add/update/delete_documents API (via data plane cell gateway)"]
        H["Job status update in DynamoDB ({env}-EcomIndexerJobsTable)"]
    end

    A --> B
    B --> C
    C --> D
    D --> Q
    Q --> E
    E --> F
    F --> G
    G --> H

Step-by-Step

1. Trigger: Client -> Ecom API Call

Where: cloud_control_plane/components/search_proxy/

2. Trigger: Ecom API -> Ecom Lambda Call

Where: cloud_control_plane/components/shopify/admin_server/

Inspect: Check Ecom Lambda logs — see Lambda.

aws logs tail /aws/lambda/{env}-ShopifyAppAdminFunction --since 15m

3. Product Transformation & Chunking

Where: components/shopify/admin_server/admin_server/handlers/bulk_sync_handler.py

  1. Download bulk JSONL from Shopify (streamed, not loaded into memory)
  2. ProductAccumulator reconstructs complete products from flattened JSONL (variants, metafields, images linked via __parentId)
  3. ProductTransformer converts to Marqo document format with tensor fields
  4. Documents chunked (configurable chunk size) and written to S3

S3 location: {env}-ecom-product-data-bucket / shopify/{shop_id}/bulk/{job_id}/chunk_{NNNN}.jsonl

Inspect: Check S3 for chunk files — see S3.

4. SQS Message Per Chunk

Where: components/shopify/admin_server/admin_server/services/aws_operations.py

Each chunk gets an SQS message with:

{
  "job_id": "uuid",
  "shop_id": "example.myshopify.com",
  "index_name": "shopify-example-store",
  "operation": "ADD_DOCUMENTS",
  "s3_bucket": "{env}-ecom-product-data-bucket",
  "s3_key": "shopify/.../chunk_0000.jsonl"
}

Queue: {env}-ecom-indexer-{shop_id}.fifo — per-shop FIFO queues (created dynamically). Message group ID ensures ordering per shop.

Inspect: Check SQS queue depth — see SQS.

5. Job Creation in DynamoDB

Where: components/shopify/admin_server/admin_server/services/sync_service.py

A job record is created in {env}-EcomIndexerJobsTable:

  • pk: PLATFORM#shopify#SHOP#{shop_id}, sk: JOB#{created_at}#{job_id}
  • Status: PENDING
  • Includes total_items count for completion tracking

Inspect: Query jobs table — see DynamoDB and Ecommerce for schema.

aws dynamodb query --table-name {env}-EcomIndexerJobsTable \
  --index-name GSI_JobLookup \
  --key-condition-expression "job_id = :jid" \
  --expression-attribute-values '{":jid": {"S": "the-job-id"}}'

6. Indexer Lambda Processes Chunk

Where: components/ecom_indexer/ecom_indexer/lambda_function.py

  1. Parse SQS records from event
  2. Prefetch credentials: API key + AddDocsConfig (tensor fields, mappings) from EcomIndexSettingsTable
  3. Time budget management: If \<120s Lambda time remaining, defer remaining messages back to SQS
  4. Mark job IN_PROGRESS in DynamoDB

Inspect: Check indexer Lambda logs — see Lambda.

aws logs tail /aws/lambda/{env}-EcomIndexerFunction --since 15m

7. Download & Parse Documents from S3

Where: components/ecom_indexer/ecom_indexer/document_operations.py

Downloads JSONL from S3, parses each line. Invalid JSON lines are tracked in skipped_items_details.

8. Write Protection (Optimistic Locking)

Where: components/ecom_indexer/ecom_indexer/document_operations.py

Before writing, fetches existing documents from Marqo via get_documents API:

  • Compares updated_at timestamps (ISO 8601 string comparison)
  • Falls back to _mq_version integer comparison
  • Skips write if incoming version \<= stored version

Conflicts tracked in conflict_details on the job record.

9. Call Marqo add_documents API

Where: components/ecom_indexer/ecom_indexer/document_operations.py

POST {index_endpoint}/indexes/{index_name}/documents
{
  "documents": [...],
  "tensorFields": [...],
  "useExistingTensors": true/false,
  "mappings": {...}
}

Dual endpoint strategy:

  • Primary: index-specific endpoint
  • Fallback: data plane cell gateway (on transport errors)

Retry on 409 conflict: Exponential backoff, up to 9 attempts, max 40s sleep.

Inspect: Check data plane cell gateway — see API Gateway and Ecommerce.

10. Readback Integrity Check (Optional)

Where: components/ecom_indexer/ecom_indexer/readback_consistency.py

If ecom_indexer_readback feature flag enabled:

  • Queries Marqo for written documents to verify consistency
  • Retries 2 times with 15s sleep between
  • Cleans up partial documents (incomplete tensor fields)

CloudWatch metrics:

  • PartialDocumentsDetected — triggers alarm
  • PartialDocumentsRecoveredAfterRetry

Inspect: Check CloudWatch alarms — see CloudWatch.

11. Job Completion

Where: components/ecom_indexer/ecom_indexer/job_management.py

After each chunk, atomically updates job counters using DynamoDB conditional writes:

  • completed_chunks set ensures idempotence (Lambda retries won't double-count)
  • When processed + failed + skipped + conflict == total_items, job status transitions:
  • COMPLETED (all succeeded)
  • CONFLICT (some version conflicts)
  • FAILED (some hard failures)

12. Error Handling & DLQ

Transient errors (SQS retry): 429 rate limits, image download timeouts, 404 document not found.

Terminal errors (self-DLQ): Enriched with stack trace and job context before sending to DLQ. Job marked FAILED.

Inspect: Check DLQ — see SQS.

What to Look For

Symptom Where to Check
HTTP 4XX errors Ecom API Lambda logs.
Job stuck in PENDING SQS queue — messages not being consumed? Indexer Lambda errors?
Job stuck in IN_PROGRESS Indexer Lambda timeouts. Check remaining chunks via completed_chunks count.
Partial documents PartialDocumentsDetected CloudWatch alarm. Readback check logs.
Version conflicts conflict_details on job record. Another sync running concurrently?
Slow indexing Marqo latency (data plane cell). Image download timeouts. Queue depth.
DLQ messages Read DLQ messages for enriched error context. Check error_message on job.
Missing products Check S3 chunks — all products accounted for? Check skipped_items_details.
  • Ecommerce — all DDB table schemas, Lambda details
  • Lambda — how to check Lambda logs and errors
  • SQS — queue depth, DLQ inspection
  • S3 — product data bucket
  • DynamoDB — query job records
  • CloudWatch — partial document alarms
  • Settings Sync — how index settings reach the indexer