You passed a TaskFlow output directly into a standard non-templated traditional operator without resolving it.
def task2(**kwargs): # Retrieve data from XCom customer_data = kwargs['ti'].xcom_pull(key='customer_data') if customer_data: # Process customer data print(customer_data)
@task def generate_config(): return "batch_size": 64, "threshold": 0.05 @task def process_batch(config): # Airflow automatically resolves 'config' via XCom behind the scenes print(f"Processing with batch size: config['batch_size']") # DAG Layout config_data = generate_config() process_batch(config_data) Use code with caution. 2. The Dangerous Pitfall: The Metadata Database Bottleneck
[Task A] ---> (Returns Data) ---> [Custom XCom Backend] ---> Uploads to Cloud Storage (S3/GCS) | v Stores URI Reference in Airflow Metadata DB Step-by-Step Custom Backend Implementation
class MyCustomXComBackend(BaseXCom):
When a task returns a value, the Custom Backend intercepts it, uploads the payload to a secure bucket, and stores only a small URI reference string in the Airflow database.
Downstream tasks explicitly request this data by querying the XCom table using the upstream task's ID and the matching key. Implicit vs. Explicit XComs You can interact with XComs in two primary ways: Explicit Pushing and Pulling
The most popular and recommended method for scaling your XCom usage is to adopt the . This approach exclusively uses an external object store, such as AWS S3, GCP Cloud Storage, or Azure Blob Storage, to persist large XCom values. This eliminates the database bottleneck and bypasses the 48KB size limit.
Since Airflow 2.0, the makes handling data between tasks much cleaner. When you return a value from a @task decorated function, it is automatically pushed as an XCom. airflow xcom exclusive
XComs are small pieces of metadata, such as file paths, API response tokens, or status flags, that tasks push to the Airflow metadata database. By default, tasks in Airflow are fully isolated and may run on completely different workers. XCom bridges this gap. An XCom is defined by a few key attributes: : The name of the data (default is return_value ). Task ID : The task that pushed the data. DAG ID : The DAG to which the task belongs. Defining "Airflow XCom Exclusive"
def transform_large_data(**kwargs): # Pull the file path (small metadata) s3_path = kwargs['ti'].xcom_pull(task_ids='extract', key='s3_path') # Read and process the large file from S3 df = pd.read_parquet(s3_path) # Process and write results back
: Airflow must serialize data to JSON or strings, causing high CPU utilization for complex Python objects.
Tasks interact with XComs through two main methods on the TaskInstance object: You passed a TaskFlow output directly into a
: The specific execution instance (DAG run) of the pipeline. The Explicit vs. Implicit Paradox XComs can be pushed and pulled in two ways:
In Apache Airflow, (short for "cross-communication") is the primary mechanism for tasks to share small pieces of data within a DAG run. Unlike global Variables , which are designed for static configuration, XComs are tied to specific task instances and the lifecycle of a single execution. Core Functionality: Push & Pull
); anything smaller stays in the DB, while larger objects are offloaded to storage automatically. Apache Airflow Modern Usage: TaskFlow API Starting with Airflow 2.0, the TaskFlow API
When using the PythonOperator or TaskFlow API, any value returned by the function is automatically pushed to XCom with the key return_value . 2. Pulling Data Explicit XComs You can interact with XComs in
def clear(self, dag_id, task_id, run_id, **kwargs): """Custom cleanup logic""" # Delete stored XComs from your backend pass
The you pass between tasks (JSON, DataFrames, File Paths)