Parallel Processing
The parallel_processing module provides utilities for efficient parallel computation of MagGeo's geomagnetic field analysis pipeline, enabling faster processing of large GPS trajectories.
parallel_processing
Parallel processing utilities for MagGeo interpolation and CHAOS calculations.
This module provides functions to parallelize the MagGeo pipeline correctly: - Only GPS trajectory data is chunked for parallel processing - Complete Swarm data (A, B, C) is passed to each worker process - Each GPS point can find its matches across all Swarm data for proper interpolation - CHAOS calculations are performed after interpolation with correct data flow
Functions
get_optimal_chunk_size(total_gps_points, n_cores, min_chunk_size=50)
Calculate optimal chunk size for GPS trajectory parallel processing.
Parameters
total_gps_points : int Total number of GPS points in the trajectory n_cores : int Number of CPU cores available min_chunk_size : int, default 50 Minimum chunk size to ensure efficiency
Returns
int Optimal chunk size for GPS trajectory chunking
split_gps_trajectory_into_chunks(gps_df, chunk_size)
Split GPS trajectory DataFrame into chunks for parallel processing.
IMPORTANT: Only the GPS trajectory is chunked. Swarm data must remain complete for each worker to find proper matches for interpolation.
Parameters
gps_df : pd.DataFrame GPS trajectory DataFrame to split chunk_size : int Size of each GPS chunk
Returns
List[pd.DataFrame] List of GPS trajectory chunks
process_gps_chunk_complete_pipeline(chunk_data)
Process a GPS trajectory chunk through the complete MagGeo pipeline.
This function follows the correct MagGeo logic: 1. For each GPS point in the chunk, interpolate using ALL Swarm data 2. Calculate CHAOS ground values for all interpolated points 3. Calculate additional geomagnetic components
Parameters
chunk_data : tuple Tuple containing (gps_chunk, complete_swarm_a, complete_swarm_b, complete_swarm_c, dt_seconds)
Returns
pd.DataFrame Complete annotated DataFrame chunk with all geomagnetic values
parallel_maggeo_annotation(gps_df, swarm_a, swarm_b, swarm_c, dt_seconds=14400, n_cores=None, chunk_size=None)
Perform parallel MagGeo annotation following the correct logic.
- Only GPS trajectory is chunked for parallel processing
- Complete Swarm data (A, B, C) is passed to each core
- Each GPS point finds matches across ALL Swarm data for proper interpolation.
- this could be enhaced with a better approach in the future.
- CHAOS calculations follow after interpolation with correct data flow
Parameters
gps_df : pd.DataFrame GPS trajectory DataFrame swarm_a, swarm_b, swarm_c : pd.DataFrame Complete Swarm satellite data DataFrames (NOT chunked) dt_seconds : int, default 14400 Time window in seconds for interpolation n_cores : int, optional Number of cores to use. If None, uses all available cores. chunk_size : int, optional Size of GPS chunks for processing. If None, calculated automatically.
Returns
pd.DataFrame Complete annotated DataFrame with all geomagnetic values
Overview
This module implements parallel processing strategies specifically optimized for MagGeo's workflow:
- GPS Trajectory Chunking: Split large GPS trajectories for parallel processing
- Complete Swarm Data Access: Each worker process gets access to complete Swarm datasets
- Proper Interpolation: Ensures each GPS point can find matches across all Swarm data
- Integrated Pipeline: Handles interpolation, CHAOS calculations, and component derivation
Key Functions
parallel_maggeo_annotation
Perform parallel MagGeo annotation following the correct logic.
- Only GPS trajectory is chunked for parallel processing
- Complete Swarm data (A, B, C) is passed to each core
- Each GPS point finds matches across ALL Swarm data for proper interpolation.
- this could be enhaced with a better approach in the future.
- CHAOS calculations follow after interpolation with correct data flow
Parameters
gps_df : pd.DataFrame GPS trajectory DataFrame swarm_a, swarm_b, swarm_c : pd.DataFrame Complete Swarm satellite data DataFrames (NOT chunked) dt_seconds : int, default 14400 Time window in seconds for interpolation n_cores : int, optional Number of cores to use. If None, uses all available cores. chunk_size : int, optional Size of GPS chunks for processing. If None, calculated automatically.
Returns
pd.DataFrame Complete annotated DataFrame with all geomagnetic values
Example:
from maggeo.parallel_processing import parallel_maggeo_annotation
import pandas as pd
# Load GPS trajectory and Swarm data
gps_df = pd.read_csv('large_trajectory.csv')
swarm_a = pd.read_csv('swarm_a_data.csv')
swarm_b = pd.read_csv('swarm_b_data.csv')
swarm_c = pd.read_csv('swarm_c_data.csv')
# Process in parallel
result = parallel_maggeo_annotation(
gps_df=gps_df,
swarm_a=swarm_a,
swarm_b=swarm_b,
swarm_c=swarm_c,
dt_seconds=14400, # 4-hour time window
n_cores=4,
chunk_size=100
)
print(f"Processed {len(result)} points using 4 cores")
get_optimal_chunk_size
Calculate optimal chunk size for GPS trajectory parallel processing.
Parameters
total_gps_points : int Total number of GPS points in the trajectory n_cores : int Number of CPU cores available min_chunk_size : int, default 50 Minimum chunk size to ensure efficiency
Returns
int Optimal chunk size for GPS trajectory chunking
Example:
# Calculate optimal chunk size
chunk_size = get_optimal_chunk_size(
total_gps_points=50000,
n_cores=4,
min_chunk_size=50
)
print(f"Optimal chunk size: {chunk_size}")
split_gps_trajectory_into_chunks
Split GPS trajectory DataFrame into chunks for parallel processing.
IMPORTANT: Only the GPS trajectory is chunked. Swarm data must remain complete for each worker to find proper matches for interpolation.
Parameters
gps_df : pd.DataFrame GPS trajectory DataFrame to split chunk_size : int Size of each GPS chunk
Returns
List[pd.DataFrame] List of GPS trajectory chunks
Example:
# Split GPS trajectory into chunks
chunks = split_gps_trajectory_into_chunks(
gps_df=large_gps_df,
chunk_size=100
)
print(f"Created {len(chunks)} GPS chunks")
process_gps_chunk_complete_pipeline
Process a GPS trajectory chunk through the complete MagGeo pipeline.
This function follows the correct MagGeo logic: 1. For each GPS point in the chunk, interpolate using ALL Swarm data 2. Calculate CHAOS ground values for all interpolated points 3. Calculate additional geomagnetic components
Parameters
chunk_data : tuple Tuple containing (gps_chunk, complete_swarm_a, complete_swarm_b, complete_swarm_c, dt_seconds)
Returns
pd.DataFrame Complete annotated DataFrame chunk with all geomagnetic values
Processing Strategy
Correct Parallel Architecture
Unlike typical parallel processing approaches, MagGeo uses a specialized strategy:
gps_chunks = split_gps_trajectory_into_chunks(gps_df, chunk_size=100)
# Each worker gets:
# - Small GPS chunk (e.g., 100 points)
# - Complete Swarm data (A, B, C) for proper spatiotemporal matching
for gps_chunk in gps_chunks:
result = process_chunk(gps_chunk, complete_swarm_a, complete_swarm_b, complete_swarm_c)
Why This Architecture?
- Spatiotemporal Interpolation: Each GPS point needs to find the best matches across ALL Swarm data
- Temporal Windows: GPS points may need Swarm data from hours before/after
- Quality Filtering: Workers need access to complete datasets to filter by quality
- Proper CHAOS Integration: CHAOS calculations require complete interpolated datasets
Performance Optimization
Automatic Chunk Sizing
# Automatic optimization based on data size and available cores
result = parallel_maggeo_annotation(
gps_df=gps_df,
swarm_a=swarm_a,
swarm_b=swarm_b,
swarm_c=swarm_c,
# chunk_size automatically calculated
# n_cores automatically detected
)
Manual Tuning
# Manual optimization for specific scenarios
optimal_chunk = get_optimal_chunk_size(
total_gps_points=len(gps_df),
n_cores=8,
min_chunk_size=50 # Prevent tiny chunks
)
result = parallel_maggeo_annotation(
gps_df=gps_df,
swarm_a=swarm_a,
swarm_b=swarm_b,
swarm_c=swarm_c,
chunk_size=optimal_chunk,
n_cores=8
)
Integration with Main Workflow
Automatic Parallel Processing
The main MagGeo function automatically uses parallel processing for large datasets:
import maggeo
params = {
'data_dir': 'data',
'gpsfilename': 'large_trajectory.csv', # 10,000+ points
'lat_col': 'latitude',
'long_col': 'longitude',
'datetime_col': 'timestamp',
'token': 'your_token',
# Parallel processing enabled automatically for large datasets
'parallel': True, # Optional: force parallel processing
'n_cores': 4 # Optional: specify core count
}
result = maggeo.annotate_gps_with_geomag(params)
Complete Processing Pipeline
Each worker process follows this pipeline:
1. Interpolation Phase
# For each GPS point in chunk:
# - Find spatiotemporally closest Swarm measurements using st_idw_process
# - Apply inverse distance weighting
# - Calculate interpolated magnetic field components
2. CHAOS Calculation Phase
# For the complete chunk:
# - Calculate CHAOS model predictions using chaos_ground_values
# - Derive observed vs. model components
# - Add N, E, C, N_Obs, E_Obs, C_Obs columns
3. Component Derivation Phase
# Calculate derived magnetic components:
# - H: Horizontal intensity = √(N² + E²)
# - D: Declination = arctan(E/N)
# - I: Inclination = arctan(C/H)
# - F: Total intensity = √(N² + E² + C²)
Error Handling
Graceful Degradation
When a GPS point fails interpolation, the system creates a "bad point" result:
bad_point_result = {
'Latitude': gps_lat,
'Longitude': gps_lon,
'N_res': float('nan'),
'E_res': float('nan'),
'C_res': float('nan'),
'TotalPoints': 0,
'Minimum_Distance': float('nan'),
'Average_Distance': float('nan'),
'Kp': float('nan')
}
CHAOS Calculation Failure
If CHAOS calculation fails for a chunk, NaN values are added for all CHAOS-derived columns: N
, E
, C
, N_Obs
, E_Obs
, C_Obs
, H
, D
, I
, F
.
Performance Guidelines
When to Use Parallel Processing: - GPS trajectories > 1,000 points: Significant speedup - GPS trajectories > 10,000 points: Major performance improvement - Multiple CPU cores available - Sufficient RAM for complete Swarm datasets
Performance Expectations:
GPS Points | Cores | Expected Speedup |
---|---|---|
1,000 | 4 | 2-3x |
10,000 | 4 | 3-4x |
50,000 | 8 | 5-7x |
100,000+ | 8 | 6-8x |
Actual performance depends on data complexity, system specifications, and Swarm data density.