Developer Docs
Architecture & Tech Stack
Processing Stages & Jobs

Processing Stages & Jobs

This document provides detailed information about each processing stage in the TimeTiles data import pipeline, including the specific jobs that handle each stage, their inputs/outputs, and implementation details.

Stage Overview

The import pipeline consists of 8 distinct stages, each handled by specialized background jobs:

StageJob HandlerPurposeBatch Size
File Uploaddataset-detection-jobParse file structureN/A
Analyze Duplicatesanalyze-duplicates-jobFind duplicate rows5,000
Detect Schemaschema-detection-jobBuild progressive schema10,000
Validate Schemavalidate-schema-jobCompare with existing schema10,000
Await ApprovalManual processHuman review of changesN/A
Create Schema Versioncreate-schema-version-jobCreate approved schema versionN/A
Geocode Batchgeocode-batch-jobAdd location data100
Create Eventscreate-events-batch-jobGenerate final records1,000

Detailed Stage Documentation

1. Dataset Detection Stage

Job: dataset-detection-job
Trigger: import-files collection afterChange hook
Stage: N/A (pre-processing)

Purpose

Parse uploaded file to detect datasets/sheets and create individual import jobs for each detected dataset.

Process Flow

  1. File Analysis: Determine file type (CSV, Excel) and structure
  2. Sheet Detection: For Excel files, enumerate all sheets
  3. Dataset Mapping: Match sheets to existing datasets or create new ones
  4. Job Creation: Create one import-job per dataset/sheet combination

Implementation Details

// Excel file with multiple sheets
File: "company_events.xlsx"
├── Sheet "Conferences" → Dataset "conferences" → Import Job #1
├── Sheet "Workshops" → Dataset "workshops" → Import Job #2
└── Sheet "Webinars" → Dataset "webinars" → Import Job #3
 
// CSV file
File: "events.csv" → Dataset "events" → Import Job #1

Outputs

  • Updates import-files.status to "processing"
  • Creates multiple import-jobs records
  • Each job starts at analyze-duplicates stage

2. Analyze Duplicates Stage

Job: analyze-duplicates-job
Stage: analyze-duplicates
Batch Size: 5,000 rows

Purpose

Identify duplicate rows within the import file and against existing database records to reduce processing volume for subsequent stages.

Process Flow

  1. Strategy Selection: Use dataset's configured deduplication strategy
  2. Internal Analysis: Scan entire file to find duplicates within import
  3. External Check: Query existing events to find external duplicates
  4. Summary Generation: Create duplicate statistics and row mappings

Deduplication Strategies

External ID Strategy

// Use specific field as unique identifier
idStrategy: {
  type: "external-id",
  field: "event_id"
}
// Example: Row with event_id="EVT-123" is duplicate if EVT-123 already exists

Computed Hash Strategy

// Hash combination of specific fields
idStrategy: {
  type: "computed-hash",
  fields: ["name", "date", "location"]
}
// Example: Hash of "Tech Conference" + "2024-03-15" + "San Francisco"

Content Hash Strategy

// Hash entire row content
idStrategy: {
  type: "content-hash";
}
// Example: Hash of all field values concatenated

Output Data Structure

{
  duplicates: {
    strategy: "computed-hash",
    internal: [
      { rowNumber: 42, uniqueId: "hash123", firstOccurrence: 10 },
      { rowNumber: 156, uniqueId: "hash456", firstOccurrence: 89 }
    ],
    external: [
      { rowNumber: 23, uniqueId: "hash789", existingEventId: 5432 }
    ],
    summary: {
      totalRows: 10000,
      uniqueRows: 8500,
      internalDuplicates: 1200,
      externalDuplicates: 300
    }
  }
}

Performance Optimizations

  • Processes file in 5,000 row batches to manage memory
  • Uses Map data structure for O(1) duplicate lookups
  • Chunks external duplicate queries to avoid database limits
  • Skipped if deduplication disabled for dataset

3. Detect Schema Stage

Job: schema-detection-job
Stage: detect-schema
Batch Size: 10,000 rows

Purpose

Progressively build a JSON Schema representation of the data by analyzing non-duplicate rows across multiple batches.

Process Flow

  1. Batch Processing: Read file in 10,000 row chunks
  2. Duplicate Filtering: Skip rows identified as duplicates
  3. Schema Building: Use ProgressiveSchemaBuilder to analyze data types
  4. Geocoding Detection: Identify potential address/coordinate fields
  5. State Persistence: Save builder state for continuity across batches

Schema Detection Features

Type Inference

// Automatically detects field types
{
  properties: {
    "event_name": { type: "string", maxLength: 255 },
    "attendance": { type: "integer", minimum: 0, maximum: 5000 },
    "event_date": { type: "string", format: "date" },
    "is_public": { type: "boolean" },
    "cost": { type: "number", minimum: 0 }
  },
  required: ["event_name", "event_date"]
}

Geocoding Field Detection

// Identifies fields suitable for geocoding
geocodingCandidates: {
  addressField: "venue_address",      // String field with address patterns
  latitudeField: "venue_lat",         // Numeric latitude field
  longitudeField: "venue_lng",        // Numeric longitude field
}

Progressive Building

  • Each batch refines type detection
  • Handles type conflicts intelligently
  • Tracks field statistics and null rates
  • Maintains state across processing interruptions

Builder State Structure

{
  fieldStats: {
    "event_name": {
      totalValues: 8500,
      nullCount: 0,
      uniqueCount: 8200,
      typeDistribution: { string: 8500 }
    },
    "attendance": {
      totalValues: 8300,
      nullCount: 200,
      minValue: 5,
      maxValue: 4500,
      typeDistribution: { integer: 8300 }
    }
  },
  schemaVersion: 1,
  lastProcessedBatch: 3
}

4. Validate Schema Stage

Job: validate-schema-job
Stage: validate-schema
Batch Size: 10,000 rows

Purpose

Compare the detected schema against the dataset's current schema to identify breaking and non-breaking changes.

Process Flow

  1. Schema Finalization: Complete schema building using saved state
  2. Current Schema Retrieval: Get active schema from dataset-schemas
  3. Schema Comparison: Identify differences and classify changes
  4. Approval Decision: Determine if changes can be auto-approved
  5. Version Creation: Create new schema version if auto-approved

Change Classification

Breaking Changes (Require Approval)

[
  {
    field: "attendance",
    change: "type_change",
    from: "string",
    to: "integer",
  },
  {
    field: "registration_required",
    change: "required_field_removed",
  },
];

Non-Breaking Changes (Can Auto-Approve)

[
  {
    field: "event_category",
    type: "string",
    optional: true,
  },
  {
    field: "max_capacity",
    type: "integer",
    optional: true,
  },
];

Type Transformations

The system can automatically handle type mismatches through configured transformations:

Built-in Strategies:

  • Parse: Smart parsing (string "123" → number 123)
  • Cast: Simple type conversion (123 → "123")
  • Reject: Fail validation for type mismatches

Custom Transformations:

// Example: Handle European number format
const typeTransformations = [
  {
    fieldPath: "temperature",
    fromType: "string",
    toType: "number",
    transformStrategy: "custom",
    customFunction: async (value, context) => {
      if (typeof value === "string") {
        return parseFloat(value.replace(",", "."));
      }
      return value;
    },
    enabled: true,
  },
];

Auto-Approval Logic

const canAutoApprove =
  dataset.schemaConfig?.autoGrow && // Auto-grow enabled
  !comparison.hasBreakingChanges && // No breaking changes
  comparison.newFields.every((f) => f.optional); // All new fields optional
 
const requiresApproval =
  comparison.hasBreakingChanges || // Has breaking changes
  dataset.schemaConfig?.locked || // Schema locked
  !dataset.schemaConfig?.autoApproveNonBreaking; // Manual approval required

Next Stage Decision

  • Auto-approved: Proceed to create-schema-version
  • Requires approval: Move to await-approval
  • Schema unchanged: Proceed to geocode-batch

5. Await Approval Stage

Job: Manual process (no background job)
Stage: await-approval

Purpose

Pause processing for human review of schema changes that require approval.

Process Flow

  1. Notification: Alert dataset administrators of pending approval
  2. Review Interface: Present schema changes for human review
  3. Decision Capture: Record approval/rejection with timestamp and user
  4. Schema Creation: If approved, create new schema version
  5. Stage Transition: Move to next stage or mark as failed

Approval Data Structure

{
  schemaValidation: {
    requiresApproval: true,
    approvalReason: "Breaking schema changes detected",
    approved: true,
    approvedBy: 1234,                    // User ID
    approvedAt: "2024-01-15T10:30:00Z",
    breakingChanges: [...],              // List of breaking changes
    newFields: [...]                     // List of new fields
  }
}

Manual Intervention

  • Administrators can approve/reject via API or admin interface
  • Rejection marks import as failed
  • Approval triggers create-schema-version stage
  • Approval workflow service handles the transition logic

6. Create Schema Version Stage

Job: create-schema-version-job
Stage: create-schema-version
Batch Size: N/A (single operation)

Purpose

Create a new schema version in the database after schema approval, handling the version creation in a separate transaction to avoid circular dependencies and deadlocks.

Process Flow

  1. Validation Check: Verify import job exists and schema is approved
  2. Duplicate Prevention: Skip if schema version already exists
  3. Dataset Retrieval: Get the associated dataset record
  4. Version Creation: Use SchemaVersioningService to create schema version
  5. Job Update: Link the schema version to the import job
  6. Stage Transition: Move to geocode-batch stage

Implementation Details

// Schema version creation process
const schemaVersion = await SchemaVersioningService.createSchemaVersion(payload, {
  dataset: dataset.id,
  schema: job.schema,                    // Detected/approved schema
  fieldMetadata: fieldStats || {},       // Field statistics from detection
  autoApproved: false,                   // Manual approval workflow
  approvedBy: approvedById,              // User who approved
  importSources: [],                     // Will be populated later
});

Error Handling

  • Missing Job: Throws error if import job not found
  • No Approval: Skips processing if schema not approved
  • Missing Dataset: Throws error if dataset not found
  • Creation Failure: Marks import job as failed and stops processing

Transaction Safety

This job runs in a separate transaction from the approval process to prevent:

  • Circular dependency issues with schema validation
  • Database deadlocks during concurrent operations
  • Partial state corruption if approval fails after version creation

Output Data Structure

{
  output: {
    schemaVersionId: "64f1a2b3c4d5e6f7a8b9c0d1" // Created version ID
  }
}
 
// Or if skipped
{
  output: {
    skipped: true
  }
}

Next Stage

Upon successful completion, the job automatically transitions to geocode-batch stage.


7. Geocode Batch Stage

Job: geocode-batch-job
Stage: geocode-batch
Batch Size: 100 rows

Purpose

Enrich data with geographic coordinates by geocoding addresses or validating provided coordinates.

Process Flow

  1. Candidate Evaluation: Check if geocoding candidates were detected
  2. Batch Processing: Process 100 rows at a time (API rate limit friendly)
  3. Duplicate Skipping: Skip rows identified as duplicates
  4. Geocoding/Validation: Either geocode addresses or validate coordinates
  5. Result Storage: Store geocoding results by row number
  6. Progress Tracking: Update geocoding progress

Geocoding Scenarios

Address Geocoding

// Input: Address string
row["venue_address"] = "123 Main St, San Francisco, CA";
 
// Output: Coordinates with confidence
{
  rowNumber: 42,
  coordinates: { lat: 37.7749, lng: -122.4194 },
  confidence: 0.95,
  formattedAddress: "123 Main Street, San Francisco, CA 94102, USA"
}

Coordinate Validation

// Input: Latitude/longitude fields
row["venue_lat"] = "37.7749";
row["venue_lng"] = "-122.4194";
 
// Output: Validated coordinates
{
  rowNumber: 42,
  coordinates: { lat: 37.7749, lng: -122.4194 },
  confidence: 1.0,
  source: "provided"
}

Error Handling

  • Individual geocoding failures don't stop the batch
  • Failed geocoding attempts are logged but processing continues
  • Rate limit errors trigger exponential backoff
  • Malformed addresses are skipped with warning

Result Storage Structure

{
  geocodingResults: {
    "42": {
      coordinates: { lat: 37.7749, lng: -122.4194 },
      confidence: 0.95,
      formattedAddress: "123 Main Street, San Francisco, CA"
    },
    "156": {
      coordinates: { lat: 40.7128, lng: -74.0060 },
      confidence: 0.88,
      formattedAddress: "New York, NY, USA"
    }
  },
  geocodingProgress: {
    current: 200,
    total: 8500
  }
}

8. Create Events Stage

Job: create-events-batch-job
Stage: create-events
Batch Size: 1,000 rows

Purpose

Create the final event records in the database with all processing results applied.

Process Flow

  1. Batch Reading: Read 1,000 rows from file
  2. Duplicate Filtering: Skip rows marked as duplicates
  3. ID Generation: Generate unique ID using dataset strategy
  4. Data Enrichment: Apply geocoding results and extract metadata
  5. Event Creation: Create records in events collection
  6. Progress Tracking: Update processing progress
  7. Completion: Mark import as completed when all batches processed

Event Creation Process

For each non-duplicate row:

const event = {
  dataset: dataset.id,
  importJob: importJobId,
  data: row, // Original row data
  uniqueId: generateUniqueId(row, strategy),
  eventTimestamp: extractTimestamp(row),
 
  // Apply geocoding if available
  location: geocoding
    ? {
        latitude: geocoding.coordinates.lat,
        longitude: geocoding.coordinates.lng,
      }
    : undefined,
 
  coordinateSource: geocoding
    ? {
        type: "geocoded",
        confidence: geocoding.confidence,
      }
    : { type: "none" },
 
  validationStatus: "pending",
  schemaVersionNumber: job.datasetSchemaVersion,
};

Timestamp Extraction

// Looks for common timestamp fields in order
const timestampFields = ["timestamp", "date", "datetime", "created_at", "event_date", "event_time"];
 
// Falls back to current time if no valid timestamp found

Error Handling

  • Individual row failures are logged but don't stop batch
  • Failed events are recorded in import job errors array
  • Successful events update progress counter
  • Database transaction failures trigger batch retry

Completion Processing

// When all batches complete, update final results
{
  stage: "completed",
  results: {
    totalEvents: 8200,              // Successfully created events
    duplicatesSkipped: 1500,        // Internal + external duplicates
    geocoded: 7800,                 // Events with location data
    errors: 300                     // Individual row failures
  }
}

Stage Transitions

Automatic Transitions

Most stage transitions happen automatically via collection hooks:

// import-jobs afterChange hook triggers next stage
switch (doc.stage) {
  case "analyze-duplicates":
    // Queue first schema detection batch
    await payload.jobs.queue({
      task: "detect-schema",
      input: { importJobId: doc.id, batchNumber: 0 },
    });
    break;
}

Manual Transitions

Only the approval stage requires manual intervention:

// API endpoint for manual approval
POST /api/import-jobs/:id/approve
{
  approved: true,
  notes: "Approved new optional fields"
}

Error Recovery

Stage-Level Recovery

  • Each stage can be retried from its starting point
  • Previous stage results are preserved
  • Failed jobs maintain reference to last successful stage

Batch-Level Recovery

  • Batch processing allows resuming from specific row ranges
  • Progress tracking enables precise resume points
  • Partial results are saved incrementally

Data Integrity

  • All database operations are transactional
  • Failed stages don't corrupt partial data
  • Version history provides complete audit trail

This detailed breakdown provides the foundation for understanding, debugging, and extending the TimeTiles data processing pipeline.