What Is a Data Pipeline?
A Data Pipeline is a sequential chain of functions where the output of one becomes the input of the next.
Without a pipeline (many variables):
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# Step 1: filter even numbers
even = [x for x in numbers if x % 2 == 0]
# Step 2: double them
doubled = [x * 2 for x in even]
# Step 3: sum
total = sum(doubled)
print(total) # 60
With a pipeline (one chain!):
from functools import reduce
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# Pipeline: filter → map → reduce
total = reduce(
lambda acc, x: acc + x,
map(lambda x: x * 2, filter(lambda x: x % 2 == 0, numbers)),
0
)
print(total) # 60
Data flows through the pipeline! 🌊
Pipeline Visualization
Input data
↓
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
↓
filter(even)
↓
[2, 4, 6, 8, 10]
↓
map(double)
↓
[4, 8, 12, 16, 20]
↓
reduce(sum)
↓
60
Each stage transforms the data and passes it along!
Basic Pipeline: filter → map → reduce
Example 1: Sum of Squares of Even Numbers
from functools import reduce
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
result = reduce(
lambda acc, x: acc + x, # Step 3: sum
map(
lambda x: x ** 2, # Step 2: square
filter(lambda x: x % 2 == 0, numbers) # Step 1: even numbers
),
0
)
print(result) # 220
# Even: [2, 4, 6, 8, 10]
# Squares: [4, 16, 36, 64, 100]
# Sum: 220
Example 2: Average Price of Expensive Products
products = [
{"name": "Phone", "price": 500},
{"name": "Laptop", "price": 1200},
{"name": "Mouse", "price": 25},
{"name": "Monitor", "price": 300}
]
# Pipeline: filter (>100) → map (extract prices) → reduce (average)
expensive_prices = list(filter(lambda p: p["price"] > 100, products))
prices = list(map(lambda p: p["price"], expensive_prices))
average = sum(prices) / len(prices) if prices else 0
print(average) # 666.67
# Expensive: Phone(500), Laptop(1200), Monitor(300)
# Average: (500 + 1200 + 300) / 3 = 666.67
Pipeline with List Comprehension
List comprehension is also a pipeline, but more readable!
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# With map/filter
result = sum(map(lambda x: x ** 2, filter(lambda x: x % 2 == 0, numbers)))
# With comprehension (SIMPLER!)
result = sum([x ** 2 for x in numbers if x % 2 == 0])
print(result) # 220
Comprehension reads left to right!
Building a Custom Pipeline
The pipe() Function
def pipe(data, *functions):
"""Apply functions sequentially."""
result = data
for func in functions:
result = func(result)
return result
# Define the steps
def filter_even(numbers):
return [x for x in numbers if x % 2 == 0]
def double_all(numbers):
return [x * 2 for x in numbers]
def sum_all(numbers):
return sum(numbers)
# Use the pipeline
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
result = pipe(numbers, filter_even, double_all, sum_all)
print(result) # 60
Pipeline Class
class Pipeline:
"""Convenient pipeline with chained methods."""
def __init__(self, data):
self.data = data
def filter(self, predicate):
"""Filter data."""
self.data = [x for x in self.data if predicate(x)]
return self
def map(self, transform):
"""Transform data."""
self.data = [transform(x) for x in self.data]
return self
def reduce(self, reducer, initial=None):
"""Fold data."""
from functools import reduce
if initial is None:
return reduce(reducer, self.data)
return reduce(reducer, self.data, initial)
def collect(self):
"""Get the result."""
return self.data
# Usage
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
result = (Pipeline(numbers)
.filter(lambda x: x % 2 == 0) # Even numbers
.map(lambda x: x * 2) # Double
.reduce(lambda acc, x: acc + x, 0)) # Sum
print(result) # 60
Practical Pipeline Examples
Example 1: Processing Students
students = [
{"name": "Алиса", "grade": 95, "active": True},
{"name": "Боб", "grade": 67, "active": False},
{"name": "Карл", "grade": 88, "active": True},
{"name": "Дима", "grade": 72, "active": True},
{"name": "Ева", "grade": 91, "active": False}
]
# Pipeline: active → high grades (>=80) → names → uppercase
result = [
name.upper()
for student in students
if student["active"] and student["grade"] >= 80
for name in [student["name"]]
]
print(result) # ['АЛИСА', 'КАРЛ']
# Or step by step:
active = [s for s in students if s["active"]]
high_achievers = [s for s in active if s["grade"] >= 80]
names = [s["name"] for s in high_achievers]
upper_names = [n.upper() for n in names]
print(upper_names) # ['АЛИСА', 'КАРЛ']
Example 2: Text Analysis
text = "Python is AWESOME! Learn Python, love Python."
# Pipeline: words → lowercase → letters only → unique → sorted
words = text.split()
lower_words = [w.lower() for w in words]
clean_words = [w.strip(",.!") for w in lower_words]
unique_words = list(set(clean_words))
sorted_words = sorted(unique_words)
print(sorted_words) # ['awesome', 'is', 'learn', 'love', 'python']
# Compact pipeline version
result = sorted(set(w.strip(",.!").lower() for w in text.split()))
print(result) # ['awesome', 'is', 'learn', 'love', 'python']
Example 3: Processing AI Model Data
# Raw AI model data
raw_data = [
{"model": "GPT-3", "accuracy": 0.92, "loss": 0.15},
{"model": "BERT", "accuracy": 0.88, "loss": 0.22},
{"model": "T5", "accuracy": 0.95, "loss": 0.12},
{"model": "RoBERTa", "accuracy": 0.90, "loss": 0.18}
]
# Pipeline: accuracy > 0.9 → sort by loss → extract names
high_accuracy = [m for m in raw_data if m["accuracy"] > 0.9]
sorted_models = sorted(high_accuracy, key=lambda m: m["loss"])
model_names = [m["model"] for m in sorted_models]
print(model_names) # ['T5', 'GPT-3', 'RoBERTa']
Data Cleaning Pipeline
Cleaning and Validation
def clean_data_pipeline(raw_data):
"""Data cleaning pipeline."""
# Step 1: remove None
step1 = [x for x in raw_data if x is not None]
# Step 2: remove empty strings
step2 = [x for x in step1 if x != ""]
# Step 3: trim whitespace
step3 = [x.strip() if isinstance(x, str) else x for x in step2]
# Step 4: remove duplicates
step4 = list(set(step3))
# Step 5: sort
step5 = sorted(step4)
return step5
# Dirty data
dirty = [None, " Python ", "", "Java", "Python", " Go ", None, "Java"]
clean = clean_data_pipeline(dirty)
print(clean) # ['Go', 'Java', 'Python']
Email Validation
def validate_emails_pipeline(emails):
"""Email validation pipeline."""
# Step 1: remove empty
non_empty = [e for e in emails if e]
# Step 2: must contain @
with_at = [e for e in non_empty if "@" in e]
# Step 3: must have a domain
with_domain = [e for e in with_at if "." in e.split("@")[-1]]
# Step 4: lowercase
normalized = [e.lower().strip() for e in with_domain]
# Step 5: remove duplicates
unique = list(set(normalized))
return sorted(unique)
# Dirty emails
dirty_emails = [
"alice@gmail.com",
"BOB@YAHOO.COM",
"invalid-email",
"",
"alice@gmail.com", # Duplicate
"charlie@example" # No domain
]
clean_emails = validate_emails_pipeline(dirty_emails)
print(clean_emails) # ['alice@gmail.com', 'bob@yahoo.com']
Generator-Based Pipeline (for Big Data)
For LARGE datasets, use generators!
def filter_even_gen(numbers):
"""Generator for filtering."""
for n in numbers:
if n % 2 == 0:
yield n
def double_gen(numbers):
"""Generator for doubling."""
for n in numbers:
yield n * 2
# Pipeline with generators (lazy processing!)
numbers = range(1000000) # One million numbers!
pipeline = double_gen(filter_even_gen(numbers))
# Processed ON DEMAND!
result = sum(pipeline)
print(result) # Fast, without loading everything into memory!
Generators don’t load everything into memory! 💡
Error Handling in a Pipeline
Safe Pipeline
def safe_pipeline(data, *functions):
"""Pipeline with error handling."""
result = data
for func in functions:
try:
result = func(result)
except Exception as e:
print(f"Error in {func.__name__}: {e}")
return None
return result
def divide_by_zero(x):
"""Dangerous function."""
return x / 0
def double(x):
return x * 2
# Usage
result = safe_pipeline(10, double, divide_by_zero)
# Output: Error in divide_by_zero: division by zero
print(result) # None
Pipeline + map/filter/reduce
Combining All the Tools
from functools import reduce
data = [
{"product": "Phone", "quantity": 2, "price": 500},
{"product": "Laptop", "quantity": 1, "price": 1200},
{"product": "Mouse", "quantity": 5, "price": 25},
{"product": "Monitor", "quantity": 2, "price": 300}
]
# Pipeline: quantity > 1 → compute total → sum up
filtered = filter(lambda item: item["quantity"] > 1, data)
totals = map(lambda item: item["quantity"] * item["price"], filtered)
grand_total = reduce(lambda acc, x: acc + x, totals, 0)
print(grand_total) # 1725
# Phone: 2 * 500 = 1000
# Mouse: 5 * 25 = 125
# Monitor: 2 * 300 = 600
# Total: 1725
Common Mistakes
Mistake 1: Mutating the Original Data
def bad_pipeline(data):
data.sort() # Mutates the original!
return [x * 2 for x in data]
numbers = [3, 1, 2]
result = bad_pipeline(numbers)
print(numbers) # [1, 2, 3] ← Changed!
# ✅ CORRECT: pure function
def good_pipeline(data):
sorted_data = sorted(data) # New list
return [x * 2 for x in sorted_data]
numbers = [3, 1, 2]
result = good_pipeline(numbers)
print(numbers) # [3, 1, 2] ← Unchanged!
Mistake 2: Pipeline Too Long
# ❌ BAD: unreadable!
result = reduce(
lambda a, x: a + x,
map(
lambda x: x ** 2,
filter(
lambda x: x > 0,
map(
lambda x: x - 10,
filter(lambda x: x % 2 == 0, data)
)
)
),
0
)
# ✅ BETTER: break into steps
step1 = [x for x in data if x % 2 == 0]
step2 = [x - 10 for x in step1]
step3 = [x for x in step2 if x > 0]
step4 = [x ** 2 for x in step3]
result = sum(step4)
Summary
A Data Pipeline Is:
- ✅ A sequence of data transformations
- ✅ Output of one function → input of the next
- ✅ Pure functions with no side effects
- ✅ Readable and predictable code
Core Stages:
Data → filter → map → reduce → Result
Tools:
# filter() — select
filter(lambda x: x > 0, data)
# map() — transform
map(lambda x: x * 2, data)
# reduce() — fold
reduce(lambda acc, x: acc + x, data, 0)
# List comprehension — all in one
[x * 2 for x in data if x > 0]
Rules for a Good Pipeline:
- ✅ Each step is a pure function
- ✅ Don’t mutate the source data
- ✅ Break complex pipelines into steps
- ✅ Use generators for large datasets
- ✅ Handle errors
What’s Next?
Now you know Data Pipelines! 🎉
Next topics:
- Function composition
- Generator expressions — lazy evaluation
- Decorators — function wrappers
Data pipelines are the foundation of data processing in AI! 🤖📊
💬 Comments (0)
No comments yet
Be the first to share your opinion about this article!