📝 Python

Data Pipelines — The Processing Conveyor! 🏭

0
Author
04e5cc8b-58ac-4bdc-bdee-661bbb
📅
Published
03.04.2026
⏱️
Reading time
8 min
👁️
Views
96
🌱
Level
Beginner

What Is a Data Pipeline?

A Data Pipeline is a sequential chain of functions where the output of one becomes the input of the next.

Without a pipeline (many variables):

numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# Step 1: filter even numbers
even = [x for x in numbers if x % 2 == 0]

# Step 2: double them
doubled = [x * 2 for x in even]

# Step 3: sum
total = sum(doubled)

print(total)  # 60

With a pipeline (one chain!):

from functools import reduce

numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# Pipeline: filter → map → reduce
total = reduce(
    lambda acc, x: acc + x,
    map(lambda x: x * 2, filter(lambda x: x % 2 == 0, numbers)),
    0
)

print(total)  # 60

Data flows through the pipeline! 🌊


Pipeline Visualization

Input data
    ↓
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    ↓
filter(even)
    ↓
[2, 4, 6, 8, 10]
    ↓
map(double)
    ↓
[4, 8, 12, 16, 20]
    ↓
reduce(sum)
    ↓
60

Each stage transforms the data and passes it along!


Basic Pipeline: filter → map → reduce

Example 1: Sum of Squares of Even Numbers

from functools import reduce

numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

result = reduce(
    lambda acc, x: acc + x,  # Step 3: sum
    map(
        lambda x: x ** 2,  # Step 2: square
        filter(lambda x: x % 2 == 0, numbers)  # Step 1: even numbers
    ),
    0
)

print(result)  # 220
# Even: [2, 4, 6, 8, 10]
# Squares: [4, 16, 36, 64, 100]
# Sum: 220

Example 2: Average Price of Expensive Products

products = [
    {"name": "Phone", "price": 500},
    {"name": "Laptop", "price": 1200},
    {"name": "Mouse", "price": 25},
    {"name": "Monitor", "price": 300}
]

# Pipeline: filter (>100) → map (extract prices) → reduce (average)
expensive_prices = list(filter(lambda p: p["price"] > 100, products))
prices = list(map(lambda p: p["price"], expensive_prices))
average = sum(prices) / len(prices) if prices else 0

print(average)  # 666.67
# Expensive: Phone(500), Laptop(1200), Monitor(300)
# Average: (500 + 1200 + 300) / 3 = 666.67

Pipeline with List Comprehension

List comprehension is also a pipeline, but more readable!

numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# With map/filter
result = sum(map(lambda x: x ** 2, filter(lambda x: x % 2 == 0, numbers)))

# With comprehension (SIMPLER!)
result = sum([x ** 2 for x in numbers if x % 2 == 0])

print(result)  # 220

Comprehension reads left to right!


Building a Custom Pipeline

The pipe() Function

def pipe(data, *functions):
    """Apply functions sequentially."""
    result = data
    for func in functions:
        result = func(result)
    return result

# Define the steps
def filter_even(numbers):
    return [x for x in numbers if x % 2 == 0]

def double_all(numbers):
    return [x * 2 for x in numbers]

def sum_all(numbers):
    return sum(numbers)

# Use the pipeline
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
result = pipe(numbers, filter_even, double_all, sum_all)

print(result)  # 60

Pipeline Class

class Pipeline:
    """Convenient pipeline with chained methods."""

    def __init__(self, data):
        self.data = data

    def filter(self, predicate):
        """Filter data."""
        self.data = [x for x in self.data if predicate(x)]
        return self

    def map(self, transform):
        """Transform data."""
        self.data = [transform(x) for x in self.data]
        return self

    def reduce(self, reducer, initial=None):
        """Fold data."""
        from functools import reduce
        if initial is None:
            return reduce(reducer, self.data)
        return reduce(reducer, self.data, initial)

    def collect(self):
        """Get the result."""
        return self.data

# Usage
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

result = (Pipeline(numbers)
    .filter(lambda x: x % 2 == 0)  # Even numbers
    .map(lambda x: x * 2)           # Double
    .reduce(lambda acc, x: acc + x, 0))  # Sum

print(result)  # 60

Practical Pipeline Examples

Example 1: Processing Students

students = [
    {"name": "Алиса", "grade": 95, "active": True},
    {"name": "Боб", "grade": 67, "active": False},
    {"name": "Карл", "grade": 88, "active": True},
    {"name": "Дима", "grade": 72, "active": True},
    {"name": "Ева", "grade": 91, "active": False}
]

# Pipeline: active → high grades (>=80) → names → uppercase
result = [
    name.upper()
    for student in students
    if student["active"] and student["grade"] >= 80
    for name in [student["name"]]
]

print(result)  # ['АЛИСА', 'КАРЛ']

# Or step by step:
active = [s for s in students if s["active"]]
high_achievers = [s for s in active if s["grade"] >= 80]
names = [s["name"] for s in high_achievers]
upper_names = [n.upper() for n in names]
print(upper_names)  # ['АЛИСА', 'КАРЛ']

Example 2: Text Analysis

text = "Python is AWESOME! Learn Python, love Python."

# Pipeline: words → lowercase → letters only → unique → sorted
words = text.split()
lower_words = [w.lower() for w in words]
clean_words = [w.strip(",.!") for w in lower_words]
unique_words = list(set(clean_words))
sorted_words = sorted(unique_words)

print(sorted_words)  # ['awesome', 'is', 'learn', 'love', 'python']

# Compact pipeline version
result = sorted(set(w.strip(",.!").lower() for w in text.split()))
print(result)  # ['awesome', 'is', 'learn', 'love', 'python']

Example 3: Processing AI Model Data

# Raw AI model data
raw_data = [
    {"model": "GPT-3", "accuracy": 0.92, "loss": 0.15},
    {"model": "BERT", "accuracy": 0.88, "loss": 0.22},
    {"model": "T5", "accuracy": 0.95, "loss": 0.12},
    {"model": "RoBERTa", "accuracy": 0.90, "loss": 0.18}
]

# Pipeline: accuracy > 0.9 → sort by loss → extract names
high_accuracy = [m for m in raw_data if m["accuracy"] > 0.9]
sorted_models = sorted(high_accuracy, key=lambda m: m["loss"])
model_names = [m["model"] for m in sorted_models]

print(model_names)  # ['T5', 'GPT-3', 'RoBERTa']

Data Cleaning Pipeline

Cleaning and Validation

def clean_data_pipeline(raw_data):
    """Data cleaning pipeline."""

    # Step 1: remove None
    step1 = [x for x in raw_data if x is not None]

    # Step 2: remove empty strings
    step2 = [x for x in step1 if x != ""]

    # Step 3: trim whitespace
    step3 = [x.strip() if isinstance(x, str) else x for x in step2]

    # Step 4: remove duplicates
    step4 = list(set(step3))

    # Step 5: sort
    step5 = sorted(step4)

    return step5

# Dirty data
dirty = [None, "  Python  ", "", "Java", "Python", "  Go  ", None, "Java"]

clean = clean_data_pipeline(dirty)
print(clean)  # ['Go', 'Java', 'Python']

Email Validation

def validate_emails_pipeline(emails):
    """Email validation pipeline."""

    # Step 1: remove empty
    non_empty = [e for e in emails if e]

    # Step 2: must contain @
    with_at = [e for e in non_empty if "@" in e]

    # Step 3: must have a domain
    with_domain = [e for e in with_at if "." in e.split("@")[-1]]

    # Step 4: lowercase
    normalized = [e.lower().strip() for e in with_domain]

    # Step 5: remove duplicates
    unique = list(set(normalized))

    return sorted(unique)

# Dirty emails
dirty_emails = [
    "alice@gmail.com",
    "BOB@YAHOO.COM",
    "invalid-email",
    "",
    "alice@gmail.com",  # Duplicate
    "charlie@example"   # No domain
]

clean_emails = validate_emails_pipeline(dirty_emails)
print(clean_emails)  # ['alice@gmail.com', 'bob@yahoo.com']

Generator-Based Pipeline (for Big Data)

For LARGE datasets, use generators!

def filter_even_gen(numbers):
    """Generator for filtering."""
    for n in numbers:
        if n % 2 == 0:
            yield n

def double_gen(numbers):
    """Generator for doubling."""
    for n in numbers:
        yield n * 2

# Pipeline with generators (lazy processing!)
numbers = range(1000000)  # One million numbers!

pipeline = double_gen(filter_even_gen(numbers))

# Processed ON DEMAND!
result = sum(pipeline)
print(result)  # Fast, without loading everything into memory!

Generators don’t load everything into memory! 💡


Error Handling in a Pipeline

Safe Pipeline

def safe_pipeline(data, *functions):
    """Pipeline with error handling."""
    result = data

    for func in functions:
        try:
            result = func(result)
        except Exception as e:
            print(f"Error in {func.__name__}: {e}")
            return None

    return result

def divide_by_zero(x):
    """Dangerous function."""
    return x / 0

def double(x):
    return x * 2

# Usage
result = safe_pipeline(10, double, divide_by_zero)
# Output: Error in divide_by_zero: division by zero
print(result)  # None

Pipeline + map/filter/reduce

Combining All the Tools

from functools import reduce

data = [
    {"product": "Phone", "quantity": 2, "price": 500},
    {"product": "Laptop", "quantity": 1, "price": 1200},
    {"product": "Mouse", "quantity": 5, "price": 25},
    {"product": "Monitor", "quantity": 2, "price": 300}
]

# Pipeline: quantity > 1 → compute total → sum up
filtered = filter(lambda item: item["quantity"] > 1, data)
totals = map(lambda item: item["quantity"] * item["price"], filtered)
grand_total = reduce(lambda acc, x: acc + x, totals, 0)

print(grand_total)  # 1725
# Phone: 2 * 500 = 1000
# Mouse: 5 * 25 = 125
# Monitor: 2 * 300 = 600
# Total: 1725

Common Mistakes

Mistake 1: Mutating the Original Data

def bad_pipeline(data):
    data.sort()  # Mutates the original!
    return [x * 2 for x in data]

numbers = [3, 1, 2]
result = bad_pipeline(numbers)
print(numbers)  # [1, 2, 3]  ← Changed!

# ✅ CORRECT: pure function
def good_pipeline(data):
    sorted_data = sorted(data)  # New list
    return [x * 2 for x in sorted_data]

numbers = [3, 1, 2]
result = good_pipeline(numbers)
print(numbers)  # [3, 1, 2]  ← Unchanged!

Mistake 2: Pipeline Too Long

# ❌ BAD: unreadable!
result = reduce(
    lambda a, x: a + x,
    map(
        lambda x: x ** 2,
        filter(
            lambda x: x > 0,
            map(
                lambda x: x - 10,
                filter(lambda x: x % 2 == 0, data)
            )
        )
    ),
    0
)

# ✅ BETTER: break into steps
step1 = [x for x in data if x % 2 == 0]
step2 = [x - 10 for x in step1]
step3 = [x for x in step2 if x > 0]
step4 = [x ** 2 for x in step3]
result = sum(step4)

Summary

A Data Pipeline Is:

  • ✅ A sequence of data transformations
  • ✅ Output of one function → input of the next
  • ✅ Pure functions with no side effects
  • ✅ Readable and predictable code

Core Stages:

Data → filter → map → reduce → Result

Tools:

# filter() — select
filter(lambda x: x > 0, data)

# map() — transform
map(lambda x: x * 2, data)

# reduce() — fold
reduce(lambda acc, x: acc + x, data, 0)

# List comprehension — all in one
[x * 2 for x in data if x > 0]

Rules for a Good Pipeline:

  • ✅ Each step is a pure function
  • ✅ Don’t mutate the source data
  • ✅ Break complex pipelines into steps
  • ✅ Use generators for large datasets
  • ✅ Handle errors

What’s Next?

Now you know Data Pipelines! 🎉

Next topics:
- Function composition
- Generator expressions — lazy evaluation
- Decorators — function wrappers

Data pipelines are the foundation of data processing in AI! 🤖📊

Your reaction to the article

💬 Comments (0)

🔐 Sign in to leave a comment
🚪 Login
💭

No comments yet

Be the first to share your opinion about this article!

🔗 Similar

Similar articles

Continue learning with these materials

📝

Setting Up Your Environment: Python, pip, and VS …

Before writing code locally, you need to set up three tools: Python, pip, and VS...

📅 04.06.2026 👁️ 17
📝

The datetime Module: Working with Dates and Times

datetime is Python's standard module for working with dates and times. It's part of the...

📅 08.05.2026 👁️ 67
📝

.env Files and Environment Variables: Keeping Sec…

Imagine you wrote a program with an API key hardcoded in the source and pushed...

📅 08.05.2026 👁️ 76

Did you like the article?

Subscribe to our updates and receive new articles first. Grow with PyLand!