Processing Stages & Jobs
This document provides detailed information about each processing stage in the TimeTiles data import pipeline, including the specific jobs that handle each stage, their inputs/outputs, and implementation details.
Stage Overview
The import pipeline consists of 8 distinct stages, each handled by specialized background jobs:
Stage | Job Handler | Purpose | Batch Size |
---|---|---|---|
File Upload | dataset-detection-job | Parse file structure | N/A |
Analyze Duplicates | analyze-duplicates-job | Find duplicate rows | 5,000 |
Detect Schema | schema-detection-job | Build progressive schema | 10,000 |
Validate Schema | validate-schema-job | Compare with existing schema | 10,000 |
Await Approval | Manual process | Human review of changes | N/A |
Create Schema Version | create-schema-version-job | Create approved schema version | N/A |
Geocode Batch | geocode-batch-job | Add location data | 100 |
Create Events | create-events-batch-job | Generate final records | 1,000 |
Detailed Stage Documentation
1. Dataset Detection Stage
Job: dataset-detection-job
Trigger: import-files
collection afterChange
hook
Stage: N/A (pre-processing)
Purpose
Parse uploaded file to detect datasets/sheets and create individual import jobs for each detected dataset.
Process Flow
- File Analysis: Determine file type (CSV, Excel) and structure
- Sheet Detection: For Excel files, enumerate all sheets
- Dataset Mapping: Match sheets to existing datasets or create new ones
- Job Creation: Create one
import-job
per dataset/sheet combination
Implementation Details
// Excel file with multiple sheets
File: "company_events.xlsx"
├── Sheet "Conferences" → Dataset "conferences" → Import Job #1
├── Sheet "Workshops" → Dataset "workshops" → Import Job #2
└── Sheet "Webinars" → Dataset "webinars" → Import Job #3
// CSV file
File: "events.csv" → Dataset "events" → Import Job #1
Outputs
- Updates
import-files.status
to "processing" - Creates multiple
import-jobs
records - Each job starts at
analyze-duplicates
stage
2. Analyze Duplicates Stage
Job: analyze-duplicates-job
Stage: analyze-duplicates
Batch Size: 5,000 rows
Purpose
Identify duplicate rows within the import file and against existing database records to reduce processing volume for subsequent stages.
Process Flow
- Strategy Selection: Use dataset's configured deduplication strategy
- Internal Analysis: Scan entire file to find duplicates within import
- External Check: Query existing events to find external duplicates
- Summary Generation: Create duplicate statistics and row mappings
Deduplication Strategies
External ID Strategy
// Use specific field as unique identifier
idStrategy: {
type: "external-id",
field: "event_id"
}
// Example: Row with event_id="EVT-123" is duplicate if EVT-123 already exists
Computed Hash Strategy
// Hash combination of specific fields
idStrategy: {
type: "computed-hash",
fields: ["name", "date", "location"]
}
// Example: Hash of "Tech Conference" + "2024-03-15" + "San Francisco"
Content Hash Strategy
// Hash entire row content
idStrategy: {
type: "content-hash";
}
// Example: Hash of all field values concatenated
Output Data Structure
{
duplicates: {
strategy: "computed-hash",
internal: [
{ rowNumber: 42, uniqueId: "hash123", firstOccurrence: 10 },
{ rowNumber: 156, uniqueId: "hash456", firstOccurrence: 89 }
],
external: [
{ rowNumber: 23, uniqueId: "hash789", existingEventId: 5432 }
],
summary: {
totalRows: 10000,
uniqueRows: 8500,
internalDuplicates: 1200,
externalDuplicates: 300
}
}
}
Performance Optimizations
- Processes file in 5,000 row batches to manage memory
- Uses Map data structure for O(1) duplicate lookups
- Chunks external duplicate queries to avoid database limits
- Skipped if deduplication disabled for dataset
3. Detect Schema Stage
Job: schema-detection-job
Stage: detect-schema
Batch Size: 10,000 rows
Purpose
Progressively build a JSON Schema representation of the data by analyzing non-duplicate rows across multiple batches.
Process Flow
- Batch Processing: Read file in 10,000 row chunks
- Duplicate Filtering: Skip rows identified as duplicates
- Schema Building: Use
ProgressiveSchemaBuilder
to analyze data types - Geocoding Detection: Identify potential address/coordinate fields
- State Persistence: Save builder state for continuity across batches
Schema Detection Features
Type Inference
// Automatically detects field types
{
properties: {
"event_name": { type: "string", maxLength: 255 },
"attendance": { type: "integer", minimum: 0, maximum: 5000 },
"event_date": { type: "string", format: "date" },
"is_public": { type: "boolean" },
"cost": { type: "number", minimum: 0 }
},
required: ["event_name", "event_date"]
}
Geocoding Field Detection
// Identifies fields suitable for geocoding
geocodingCandidates: {
addressField: "venue_address", // String field with address patterns
latitudeField: "venue_lat", // Numeric latitude field
longitudeField: "venue_lng", // Numeric longitude field
}
Progressive Building
- Each batch refines type detection
- Handles type conflicts intelligently
- Tracks field statistics and null rates
- Maintains state across processing interruptions
Builder State Structure
{
fieldStats: {
"event_name": {
totalValues: 8500,
nullCount: 0,
uniqueCount: 8200,
typeDistribution: { string: 8500 }
},
"attendance": {
totalValues: 8300,
nullCount: 200,
minValue: 5,
maxValue: 4500,
typeDistribution: { integer: 8300 }
}
},
schemaVersion: 1,
lastProcessedBatch: 3
}
4. Validate Schema Stage
Job: validate-schema-job
Stage: validate-schema
Batch Size: 10,000 rows
Purpose
Compare the detected schema against the dataset's current schema to identify breaking and non-breaking changes.
Process Flow
- Schema Finalization: Complete schema building using saved state
- Current Schema Retrieval: Get active schema from
dataset-schemas
- Schema Comparison: Identify differences and classify changes
- Approval Decision: Determine if changes can be auto-approved
- Version Creation: Create new schema version if auto-approved
Change Classification
Breaking Changes (Require Approval)
[
{
field: "attendance",
change: "type_change",
from: "string",
to: "integer",
},
{
field: "registration_required",
change: "required_field_removed",
},
];
Non-Breaking Changes (Can Auto-Approve)
[
{
field: "event_category",
type: "string",
optional: true,
},
{
field: "max_capacity",
type: "integer",
optional: true,
},
];
Type Transformations
The system can automatically handle type mismatches through configured transformations:
Built-in Strategies:
- Parse: Smart parsing (string "123" → number 123)
- Cast: Simple type conversion (123 → "123")
- Reject: Fail validation for type mismatches
Custom Transformations:
// Example: Handle European number format
const typeTransformations = [
{
fieldPath: "temperature",
fromType: "string",
toType: "number",
transformStrategy: "custom",
customFunction: async (value, context) => {
if (typeof value === "string") {
return parseFloat(value.replace(",", "."));
}
return value;
},
enabled: true,
},
];
Auto-Approval Logic
const canAutoApprove =
dataset.schemaConfig?.autoGrow && // Auto-grow enabled
!comparison.hasBreakingChanges && // No breaking changes
comparison.newFields.every((f) => f.optional); // All new fields optional
const requiresApproval =
comparison.hasBreakingChanges || // Has breaking changes
dataset.schemaConfig?.locked || // Schema locked
!dataset.schemaConfig?.autoApproveNonBreaking; // Manual approval required
Next Stage Decision
- Auto-approved: Proceed to
create-schema-version
- Requires approval: Move to
await-approval
- Schema unchanged: Proceed to
geocode-batch
5. Await Approval Stage
Job: Manual process (no background job)
Stage: await-approval
Purpose
Pause processing for human review of schema changes that require approval.
Process Flow
- Notification: Alert dataset administrators of pending approval
- Review Interface: Present schema changes for human review
- Decision Capture: Record approval/rejection with timestamp and user
- Schema Creation: If approved, create new schema version
- Stage Transition: Move to next stage or mark as failed
Approval Data Structure
{
schemaValidation: {
requiresApproval: true,
approvalReason: "Breaking schema changes detected",
approved: true,
approvedBy: 1234, // User ID
approvedAt: "2024-01-15T10:30:00Z",
breakingChanges: [...], // List of breaking changes
newFields: [...] // List of new fields
}
}
Manual Intervention
- Administrators can approve/reject via API or admin interface
- Rejection marks import as failed
- Approval triggers
create-schema-version
stage - Approval workflow service handles the transition logic
6. Create Schema Version Stage
Job: create-schema-version-job
Stage: create-schema-version
Batch Size: N/A (single operation)
Purpose
Create a new schema version in the database after schema approval, handling the version creation in a separate transaction to avoid circular dependencies and deadlocks.
Process Flow
- Validation Check: Verify import job exists and schema is approved
- Duplicate Prevention: Skip if schema version already exists
- Dataset Retrieval: Get the associated dataset record
- Version Creation: Use
SchemaVersioningService
to create schema version - Job Update: Link the schema version to the import job
- Stage Transition: Move to
geocode-batch
stage
Implementation Details
// Schema version creation process
const schemaVersion = await SchemaVersioningService.createSchemaVersion(payload, {
dataset: dataset.id,
schema: job.schema, // Detected/approved schema
fieldMetadata: fieldStats || {}, // Field statistics from detection
autoApproved: false, // Manual approval workflow
approvedBy: approvedById, // User who approved
importSources: [], // Will be populated later
});
Error Handling
- Missing Job: Throws error if import job not found
- No Approval: Skips processing if schema not approved
- Missing Dataset: Throws error if dataset not found
- Creation Failure: Marks import job as failed and stops processing
Transaction Safety
This job runs in a separate transaction from the approval process to prevent:
- Circular dependency issues with schema validation
- Database deadlocks during concurrent operations
- Partial state corruption if approval fails after version creation
Output Data Structure
{
output: {
schemaVersionId: "64f1a2b3c4d5e6f7a8b9c0d1" // Created version ID
}
}
// Or if skipped
{
output: {
skipped: true
}
}
Next Stage
Upon successful completion, the job automatically transitions to geocode-batch
stage.
7. Geocode Batch Stage
Job: geocode-batch-job
Stage: geocode-batch
Batch Size: 100 rows
Purpose
Enrich data with geographic coordinates by geocoding addresses or validating provided coordinates.
Process Flow
- Candidate Evaluation: Check if geocoding candidates were detected
- Batch Processing: Process 100 rows at a time (API rate limit friendly)
- Duplicate Skipping: Skip rows identified as duplicates
- Geocoding/Validation: Either geocode addresses or validate coordinates
- Result Storage: Store geocoding results by row number
- Progress Tracking: Update geocoding progress
Geocoding Scenarios
Address Geocoding
// Input: Address string
row["venue_address"] = "123 Main St, San Francisco, CA";
// Output: Coordinates with confidence
{
rowNumber: 42,
coordinates: { lat: 37.7749, lng: -122.4194 },
confidence: 0.95,
formattedAddress: "123 Main Street, San Francisco, CA 94102, USA"
}
Coordinate Validation
// Input: Latitude/longitude fields
row["venue_lat"] = "37.7749";
row["venue_lng"] = "-122.4194";
// Output: Validated coordinates
{
rowNumber: 42,
coordinates: { lat: 37.7749, lng: -122.4194 },
confidence: 1.0,
source: "provided"
}
Error Handling
- Individual geocoding failures don't stop the batch
- Failed geocoding attempts are logged but processing continues
- Rate limit errors trigger exponential backoff
- Malformed addresses are skipped with warning
Result Storage Structure
{
geocodingResults: {
"42": {
coordinates: { lat: 37.7749, lng: -122.4194 },
confidence: 0.95,
formattedAddress: "123 Main Street, San Francisco, CA"
},
"156": {
coordinates: { lat: 40.7128, lng: -74.0060 },
confidence: 0.88,
formattedAddress: "New York, NY, USA"
}
},
geocodingProgress: {
current: 200,
total: 8500
}
}
8. Create Events Stage
Job: create-events-batch-job
Stage: create-events
Batch Size: 1,000 rows
Purpose
Create the final event records in the database with all processing results applied.
Process Flow
- Batch Reading: Read 1,000 rows from file
- Duplicate Filtering: Skip rows marked as duplicates
- ID Generation: Generate unique ID using dataset strategy
- Data Enrichment: Apply geocoding results and extract metadata
- Event Creation: Create records in
events
collection - Progress Tracking: Update processing progress
- Completion: Mark import as completed when all batches processed
Event Creation Process
For each non-duplicate row:
const event = {
dataset: dataset.id,
importJob: importJobId,
data: row, // Original row data
uniqueId: generateUniqueId(row, strategy),
eventTimestamp: extractTimestamp(row),
// Apply geocoding if available
location: geocoding
? {
latitude: geocoding.coordinates.lat,
longitude: geocoding.coordinates.lng,
}
: undefined,
coordinateSource: geocoding
? {
type: "geocoded",
confidence: geocoding.confidence,
}
: { type: "none" },
validationStatus: "pending",
schemaVersionNumber: job.datasetSchemaVersion,
};
Timestamp Extraction
// Looks for common timestamp fields in order
const timestampFields = ["timestamp", "date", "datetime", "created_at", "event_date", "event_time"];
// Falls back to current time if no valid timestamp found
Error Handling
- Individual row failures are logged but don't stop batch
- Failed events are recorded in import job errors array
- Successful events update progress counter
- Database transaction failures trigger batch retry
Completion Processing
// When all batches complete, update final results
{
stage: "completed",
results: {
totalEvents: 8200, // Successfully created events
duplicatesSkipped: 1500, // Internal + external duplicates
geocoded: 7800, // Events with location data
errors: 300 // Individual row failures
}
}
Stage Transitions
Automatic Transitions
Most stage transitions happen automatically via collection hooks:
// import-jobs afterChange hook triggers next stage
switch (doc.stage) {
case "analyze-duplicates":
// Queue first schema detection batch
await payload.jobs.queue({
task: "detect-schema",
input: { importJobId: doc.id, batchNumber: 0 },
});
break;
}
Manual Transitions
Only the approval stage requires manual intervention:
// API endpoint for manual approval
POST /api/import-jobs/:id/approve
{
approved: true,
notes: "Approved new optional fields"
}
Error Recovery
Stage-Level Recovery
- Each stage can be retried from its starting point
- Previous stage results are preserved
- Failed jobs maintain reference to last successful stage
Batch-Level Recovery
- Batch processing allows resuming from specific row ranges
- Progress tracking enables precise resume points
- Partial results are saved incrementally
Data Integrity
- All database operations are transactional
- Failed stages don't corrupt partial data
- Version history provides complete audit trail
This detailed breakdown provides the foundation for understanding, debugging, and extending the TimeTiles data processing pipeline.