Data Processing Pipeline
The TimeTiles data processing pipeline is a hook-driven, event-based system that transforms uploaded files into structured events. The pipeline automatically orchestrates a series of processing stages through Payload CMS collection hooks and background jobs.
Architecture Overview
The pipeline follows a single source of truth principle where:
- Files stay on disk: Raw data is never stored in the database during processing
- Metadata in database: Only processing state, progress, and results are tracked
- Event-driven flow: Collection hooks automatically advance the pipeline
- Batch processing: Large files are processed in manageable chunks
Core Collections
The pipeline uses four main collections that work together:
import-files
- File upload and metadata storageimport-jobs
- Processing state and orchestrationdatasets
- Configuration and schema managementevents
- Final processed data storage
Processing Flow
Processing Stages
1. File Upload & Dataset Detection
Trigger: File uploaded to import-files
collection
Handler: dataset-detection-job
Purpose: Parse file structure and create import jobs
When a file is uploaded:
- File is stored on disk in the
import-files
directory afterChange
hook inimport-files
triggers dataset detection- For Excel files: Each sheet becomes a separate import job
- For CSV files: Single import job is created
- Jobs are linked to appropriate datasets (auto-created if needed)
// Multiple import jobs can be created from a single file
File: "events_2024.xlsx"
├── Sheet "Conferences" → Import Job #1 → Dataset "conferences"
├── Sheet "Workshops" → Import Job #2 → Dataset "workshops"
└── Sheet "Meetings" → Import Job #3 → Dataset "meetings"
2. Duplicate Analysis
Stage: analyze-duplicates
Handler: analyze-duplicates-job
Purpose: Identify internal and external duplicates early in the process
The system analyzes the entire file to build a duplicate map before processing:
Internal Duplicates: Rows within the same import that are duplicates of each other External Duplicates: Rows that match existing events in the database
// Duplicate detection strategies (configured per dataset)
- external-id: Use specific field (e.g., "id", "uuid")
- computed-hash: Hash specific fields (e.g., "name + date + location")
- content-hash: Hash entire row content
- hybrid: Try external ID first, fallback to computed hash
Benefits of early duplicate analysis:
- Reduces processing volume for subsequent stages
- Avoids unnecessary geocoding API calls
- Prevents creation of duplicate events
3. Schema Detection
Stage: detect-schema
Handler: schema-detection-job
Purpose: Build a progressive schema from non-duplicate data
The schema detection runs in batches, progressively building understanding of the data:
What it detects:
- Field types (string, number, date, boolean)
- Required vs optional fields
- Field statistics (min/max values, unique counts)
- Geocoding candidates (address, latitude/longitude fields)
Progressive building: Each batch refines the schema, handling type conflicts and null values intelligently.
4. Schema Validation
Stage: validate-schema
Handler: validate-schema-job
Purpose: Compare detected schema with existing dataset schema
The validator compares the new schema against the current dataset schema:
Breaking changes (require approval):
- Field type changes (string → number)
- Removed required fields
- Incompatible format changes
Non-breaking changes (can be auto-approved):
- New optional fields
- Expanded field constraints
- Additional enum values
Auto-approval conditions:
- Dataset has
autoGrow
enabled - No breaking changes detected
- Dataset not locked for changes
5. Manual Approval (Optional)
Stage: await-approval
Purpose: Human review of schema changes
When changes require approval:
- Import pauses at this stage
- Notification sent to dataset administrators
- Approval/rejection updates the import job
- Approved changes create new schema version
6. Geocoding
Stage: geocode-batch
Handler: geocode-batch-job
Purpose: Enrich data with geographic coordinates
Address geocoding: Convert address strings to lat/lng coordinates Coordinate validation: Validate provided latitude/longitude pairs Rate limiting: Respects external API limits with small batch sizes
Geocoding happens before event creation so location data is available immediately.
7. Event Creation
Stage: create-events
Handler: create-events-batch-job
Purpose: Create final event records with all enrichments
For each non-duplicate row:
- Generate unique ID using dataset strategy
- Apply geocoding results if available
- Extract timestamp and location display
- Create event with complete data
Error handling: Individual row failures don't stop the batch, errors are logged per row.
Pipeline Orchestration
Hook-Driven Architecture
The pipeline uses Payload CMS collection hooks for automatic orchestration:
// import-jobs collection afterChange hook
export const advanceToNextStage = async ({ doc, previousDoc, req }) => {
// Only trigger on stage changes
if (doc.stage === previousDoc?.stage) return;
switch (doc.stage) {
case "detect-schema":
await req.payload.jobs.queue({
task: "detect-schema",
input: { importJobId: doc.id, batchNumber: 0 },
});
break;
// ... other stages
}
};
Batch Processing
Large files are processed in configurable batches:
export const BATCH_SIZES = {
DUPLICATE_ANALYSIS: 5000, // Memory efficient
SCHEMA_DETECTION: 10000, // Larger for efficiency
GEOCODING: 100, // Small for API limits
EVENT_CREATION: 1000, // Avoid transaction timeouts
};
Error Recovery
The system includes comprehensive error handling:
Stage-level failures: Mark import job as failed, preserve partial progress Row-level failures: Log errors but continue processing other rows Retry mechanisms: Built-in retry logic for transient failures Manual recovery: Administrators can resume from last successful stage
Data Flow Principles
File-Based Processing
// Data flows from file → memory → database
const rows = await readBatchFromFile(filePath, {
sheetIndex: job.sheetIndex,
startRow: batchNumber * BATCH_SIZE,
limit: BATCH_SIZE,
});
// Process in memory
const processedRows = await transformRows(rows);
// Only final results stored in database
await createEvents(processedRows);
Metadata Tracking
Throughout processing, only metadata is stored:
{
stage: "geocode-batch",
progress: { current: 1500, total: 10000 },
duplicates: {
internal: [/* row numbers */],
external: [/* row numbers */],
summary: { totalRows: 10000, uniqueRows: 8500 }
},
geocodingResults: {
"42": { coordinates: { lat: 40.7128, lng: -74.0060 } }
// Key = row number, Value = geocoding result
}
}
Version Tracking
Payload's built-in versioning tracks the complete pipeline progression:
- Each stage change creates a new version
- Full audit trail of processing decisions
- Ability to analyze bottlenecks and failures
- Recovery information for debugging
Performance Characteristics
Scalability
- Memory efficient: Batch processing prevents memory exhaustion
- Database friendly: Minimal database writes during processing
- API respectful: Geocoding respects rate limits
- Parallelizable: Multiple import jobs can run concurrently
Monitoring
The pipeline provides extensive monitoring capabilities:
- Progress tracking: Real-time progress for each stage
- Performance metrics: Processing times and throughput
- Error logging: Detailed error information per row/batch
- Resource usage: Memory and API quota consumption
Configuration
Dataset Configuration
Each dataset can configure its processing behavior:
{
// ID Strategy - How to identify unique events
idStrategy: {
type: "external", // external, computed, auto, hybrid
externalIdPath: "id", // Path to ID field in data
computedIdFields: [ // Fields to hash for computed ID
{ fieldPath: "name" },
{ fieldPath: "date" }
],
duplicateStrategy: "skip" // skip, update, version
},
// Deduplication configuration
deduplicationConfig: {
enabled: true,
strategy: "external-id" // external-id, computed-hash, content-hash, hybrid
},
// Schema behavior
schemaConfig: {
locked: false, // Require approval for ALL changes
autoGrow: true, // Auto-approve safe changes
strictValidation: false, // Block import on validation failures
allowTransformations: true,// Enable type transformations
maxSchemaDepth: 3, // Max nesting depth
autoApproveNonBreaking: true
},
// Type transformations
typeTransformations: [
{
fieldPath: "temperature",
fromType: "string",
toType: "number",
transformStrategy: "parse", // parse, cast, custom, reject
enabled: true
}
],
// Enum detection
enumDetection: {
mode: "count", // count or percentage
threshold: 50 // Max values or percentage
},
// Geographic detection
geoFieldDetection: {
autoDetect: true,
latitudePath: null, // Auto-detected or manual override
longitudePath: null
},
// Processing limits
maxConcurrentJobs: 3,
processingTimeout: 3600000 // 1 hour
}
System Configuration
Global pipeline settings:
{
maxFileSize: 100 * 1024 * 1024, // 100MB
supportedFormats: [".csv", ".xlsx", ".xls"],
maxConcurrentImports: 10,
retryAttempts: 3,
retryBackoff: "exponential"
}
Best Practices
For Data Providers
- Consistent Structure: Keep your data structure consistent across imports
- Meaningful Field Names: Use descriptive field names that make sense to users
- Include IDs: Provide unique identifiers when possible to enable efficient deduplication
- Type Consistency: Keep field types consistent to avoid transformation overhead
- Geographic Data: Use standard field names like
latitude
,longitude
,address
for auto-detection
For System Administrators
- Monitor Schema Changes: Check pending approvals after imports regularly
- Configure Transformations: Set up rules for known type mismatches to reduce manual intervention
- Monitor Field Growth: Watch for excessive field proliferation that may impact performance
- Document Decisions: Note why certain schema changes were approved/rejected
- Set Appropriate Batch Sizes: Tune batch sizes based on your system's memory and performance characteristics
For Large Datasets
- Monitor Field Count: Datasets with >1000 fields may impact performance
- Set Appropriate Thresholds: Configure enum detection thresholds for your data patterns
- Use Progressive Import: Let schema build incrementally for large files rather than forcing immediate completion
- Plan for Geocoding: Large address datasets should account for API rate limits and costs
- Consider Partitioning: Very large datasets may benefit from splitting across multiple imports
Troubleshooting
Common Issues
Import stuck in processing:
- Check job queue status and error logs
- Verify all required services are running
- Look for memory or disk space issues
- Check for database connection problems
Schema approval needed:
- Review breaking changes in the admin interface
- Check if dataset has
locked
schema configuration - Verify user permissions for schema approval
- Consider enabling
autoGrow
for non-breaking changes
Duplicate events appearing:
- Check your ID strategy configuration (external vs computed vs auto)
- Verify external ID path is correct for your data structure
- For computed IDs, ensure selected fields create unique combinations
- Check duplicate handling strategy (skip/update/version)
- Review import logs for duplicate detection details
Geocoding failures:
- Verify API keys are configured and valid
- Check rate limit status and quotas
- Ensure address data is properly formatted
- Review geocoding candidate detection results
- Consider manual field mapping if auto-detection fails
Schema conflicts:
- Check pending schemas in the admin panel
- Configure type transformations for known mismatches
- Review schema history in dataset-schemas collection
- Use auto-grow for safe changes only
- Check import logs for transformation details
Memory issues:
- Reduce batch sizes for very large files
- Monitor system memory usage during imports
- Consider processing files in smaller chunks
- Check for memory leaks in custom transformation functions
Performance degradation:
- Monitor batch processing times
- Check database query performance
- Review geocoding API response times
- Consider scaling background job workers
- Optimize schema depth and field counts
Debugging Tools
Version history: Review processing progression through import job versions Error logs: Detailed per-row error information in import job records Performance metrics: Processing times and throughput logged per stage Manual intervention: Resume processing from any stage using admin interface Database queries: Direct inspection of import-jobs collection for detailed state
Recovery Procedures
Stage-level recovery:
- Identify last successful stage from import job record
- Reset stage to previous successful state
- Queue appropriate job to resume processing
- Monitor for successful completion
Batch-level recovery:
- Check progress.current vs progress.total
- Resume from last processed batch number
- Verify data integrity of partial results
- Continue processing from interruption point
Complete restart:
- Mark current import job as failed
- Create new import job from same file
- Apply lessons learned from failure analysis
- Monitor new import for successful completion
This pipeline design provides a robust, scalable, and maintainable approach to data processing while leveraging Payload CMS's built-in capabilities for orchestration and state management.