Using Apache Airflow to Build a Pipeline for Scraped Data

If you need to gather and process data from multiple websites on a regular schedule, orchestrating and managing all those scraping jobs can become complex quickly. This is where Apache Airflow comes in – it is an invaluable tool for building robust and reliable pipelines for your scraped data.

In this comprehensive guide, we‘ll walk through how to leverage Airflow to scrape data from e-commerce sites, handle processing and storage, set up recurring workflows, monitor pipeline health, and more.

Why Build Scraping Pipelines?

Let‘s first discuss why you may need managed scraping pipelines in the first place. Some common use cases:

Price Monitoring – Track prices for products on e-commerce sites like Amazon to analyze price trends over time.

Inventory Monitoring – Check inventory levels and availability for key products you rely on.

Content Monitoring – Monitor changes to content on news sites or discussion forums.

Data Aggregation – Gather structured data from multiple sources to build a dataset.

Machine Learning – Feed scraped data into ML models for training.

For these applications, scraping ad hoc is not sustainable. You need recurring, reliable workflows to gather the latest data.

Some data points you might collect from e-commerce sites:

  • Product title, description, categories
  • Pricing information like MSRP and sale price
  • Inventory counts and availability
  • Images and media
  • Ratings and reviews

This data can power everything from pricing analytics to inventory management. But effectively gathering it at scale requires robust orchestration.

Scraping E-Commerce Sites with Oxylabs

There are many ways to build web scrapers – you can code them from scratch in Python using libraries like Selenium and Beautiful Soup. However, for robust commercial scraping, purpose built tools are recommended.

Oxylabs provides a powerful scraping API that handles proxies, rotations, retries, and more under the hood. This simplifies your scraping code significantly.

Some key benefits of using a scraping API:

  • Avoid blocks – Rotating proxies and residential IPs make you look like a normal user.
  • Higher success rates – Built-in retries and logic to handle errors.
  • Simplified code – No need to orchestrate proxies and retries yourself.
  • Enterprise scale – Scales to thousands of requests per minute.
  • Push architecture – Jobs queue up then pull data when ready.

Let‘s see a Python example using the Oxylabs API to scrape a product page:

import requests 

API_KEY = ‘YOUR_API_KEY‘

url = ‘https://books.toscrape.com/catalogue/the-hound-of-the-baskervilles_987/index.html‘

job = requests.post(‘https://api.oxylabs.io/v1/queries‘, 
  json={‘url‘: url},
  auth=(API_KEY, ‘‘)
).json()

job_id = job[‘id‘]

status = ‘pending‘
while status != ‘done‘:
  res = requests.get(f‘https://api.oxylabs.io/v1/queries/{job_id}‘, 
    auth=(API_KEY, ‘‘))
  status = res.json()[‘status‘]
  time.sleep(5)

data = requests.get(f‘https://api.oxylabs.io/v1/queries/{job_id}/data‘,
  auth=(API_KEY, ‘‘)).json()

print(data[0][‘content‘]) 

This submits the scraping job → checks status in a loop → retrieves results once completed.

All the complexity of proxy handling, rotations, and retries is abstracted away. We can focus on integrating scraped data into our pipelines.

Overview of Apache Airflow

Apache Airflow is an open-source workflow management platform designed to programmatically author, schedule and monitor workflows.

Some key features:

  • Dynamic pipelines created as directed acyclic graphs (DAGs)
  • Built-in scheduling of workflows
  • Monitoring task execution and handling failures
  • Scalable to very complex pipelines
  • Extensible and open source

Airflow architecture

As shown above, the main Airflow components are:

  • Web Server – GUI to interact with and visualize pipelines
  • Scheduler – Handles trigger and scheduling workflows
  • Metadata DB – Stores pipeline definitions and state
  • Executor – Manages compute resources and running tasks
  • Workers – Perform the actual task execution

Together this provides a framework to define workflows programmatically while Airflow handles the execution.

Airflow pipelines are defined as Python scripts, allowing full programmatic control. You have the power to build very complex and custom pipelines without hand-coding all the orchestration logic yourself.

Setting Up Airflow with Docker

Airflow can be installed natively or using Docker. For this guide, we‘ll use the official Airflow Docker images as they provide an easy way to spin up all components.

The docker-compose.yaml file from Apache Airflow GitHub brings up the required services:

services:
  postgres: # Metadata database 
  redis: # Message broker 
  airflow-webserver:
  airflow-scheduler:
  airflow-worker:

To get started:

  1. Create a folder for the project
  2. Save the docker-compose.yaml file
  3. Run docker-compose up
  4. Access UI at http://localhost:8080

This brings up Airflow with connections to a Postgres database and Redis message broker. You can also mount local folders for DAGs, logs, plugins, and configuration.

The containers provide everything needed to start building pipelines. As you scale, switch to a more robust multi-node production deployment.

Defining DAGs and Tasks

The core of Airflow is the DAG (directed acyclic graph), which defines the tasks in a pipeline and dependencies between them.

DAGs are defined programmatically in Python scripts. For example:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

default_args = {
  ‘owner‘: ‘airflow‘,
  ...
}

dag = DAG(
  ‘demo_pipeline‘,
  default_args=default_args,
  schedule_interval=‘@daily‘,
  ... 
)

scrape_task = PythonOperator(
  task_id=‘scrape‘,  
  python_callable=scrape,
  dag=dag
)

process_task = PythonOperator(
  task_id=‘process‘,
  python_callable=process,
  dag=dag
)

scrape_task >> process_task

This DAG runs daily. It has two Tasks – scrape and process – with a dependency between them.

The scrape task calls a scrape() function when run. Airflow dynamically loads and executes the code.

We can further expand this example:

from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.email import EmailOperator

scrape_task = PythonOperator(task_id=‘scrape‘,...) 

process_task = PythonOperator(task_id=‘process‘,...)

store_task = BashOperator(task_id=‘store‘, bash_command=‘...‘)

error_email_task = EmailOperator(task_id=‘send_error_email‘,...)

def check_errors(**context):
  # Logic to check for errors
  if errors:
    return ‘send_error_email‘ 
  else:
    return ‘end‘

check_errors_task = BranchPythonOperator(task_id=‘check_errors‘,...)

scrape_task >> process_task >> store_task >> check_errors_task >> error_email_task

This showcases different operators like Bash and Email. The BranchPythonOperator performs conditional logic – powerful for handling failures and branching execution.

When defining pipelines, modularize work into discrete tasks connected by dependencies. This provides flexibility to build very sophisticated orchestrations.

Scraping Pipeline Example

Let‘s walk through an example Airflow pipeline that:

  1. Scrapes product data from an e-commerce site
  2. Processes and validates the data
  3. Stores the data as CSV and in a database
  4. Sends an email summary

We can orchestrate this with Airflow as follows:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator 
from airflow.operators.email import EmailOperator

dag = DAG(‘book_scraper‘, schedule_interval=‘@daily‘)

scrape_task = PythonOperator(
  task_id=‘scrape‘,
  python_callable=scrape_books # Custom function
)

validate_task = PythonOperator(
  task_id=‘validate‘,
  python_callable=validate # Custom function  
)

write_csv_task = BashOperator(
  task_id=‘write_csv‘,
  bash_command=‘echo "{{ task_instance.xcom_pull("scrape") }}" > data.csv‘ 
)

store_db_task = PythonOperator(
  task_id=‘store_db‘,
  python_callable=store_data # Writes to database
) 

email_task = EmailOperator(
  task_id=‘send_email‘,
  to=‘[email protected]‘,
  subject=‘Book Extract Complete‘,
  html_content=‘Scrape and load finished successfully!‘
)

(scrape_task >> 
 [validate_task, write_csv_task] >> 
 store_db_task >> 
 email_task)

This DAG defines the end-to-end pipeline. Key steps:

  • Scrape data with a Python function
  • Validate and write CSV in parallel
  • Store in database
  • Email team upon completion

The bitshift operators link tasks in order of dependencies. Airflow handles execution, retry logic, and monitoring for us.

Scheduling and Triggers

Airflow provides many options to control when and how often pipelines run:

  • schedule_interval – Cron format for periodic scheduling
dag = DAG(
  schedule_interval=‘0 0 * * *‘ # Daily at 12AM  
)
  • start_date – Earliest date for scheduler to trigger pipeline
dag = DAG(
  ...,
  start_date=datetime(2022, 1, 1)
)
  • max_active_runs – Number of concurrent runs allowed

  • execution_timeout – Max runtime before DAG fails

  • External Triggers – API calls to trigger pipelines

  • Manual Triggers – Ability to trigger pipelines on demand

Take advantage of these to control and tune how Airflow schedules workflows based on business needs.

Best Practices as Complexity Grows

As your pipelines grow in complexity, here are some best practices to structure Airflow projects:

Separate environment configuration – Set up an .env file or use Docker secrets to manage environment variables like credentials. Load these into your DAGs dynamically.

Parameterize reusable logic – For logic needed across DAGs like sending emails, use parameterized functions rather than repeating code.

Split up large DAGs – Break large DAG files into multiple small DAGs grouped by business process. Easier to maintain and troubleshoot.

Version control DAGs – Store DAG files in a Git repository for easy versioning.

Use custom Docker images – Extend the base Airflow image with your own dependencies and packages.

Add testing – Implement unit and integration tests for critical logic and fragile dependencies.

Enable HA – For production, run Airflow in High Availability mode across multiple nodes.

Monitor metrics – Instrument key DAG metrics like duration, success rate, and output size. Track in tools like StatsD.

Following best practices avoids bottlenecks and technical debt as your pipelines grow.

Monitoring Pipeline Health

Once you have Airflow pipelines in production, real-time monitoring and observability becomes critical.

The Airflow UI provides valuable insight into pipeline health:

Airflow dashboard

Key metrics to monitor include:

  • DAG status – Failed vs successful runs
  • Task success rate – Frequency of task failures
  • Duration – Runtime of DAGs and tasks
  • Output metrics – Count/volume of data produced
  • Errors – Exceptions and stacktraces
  • Load – Number of task instances running

For large workflows, visualizing this data is key. Tools like Grafana make it easy to build custom dashboards tied to your Airflow metrics:

Grafana Airflow Dashboard

Reviewing metrics regularly spots issues like:

  • Overall pipeline failures
  • Poor task performance
  • Data output decreases
  • Runtime spikes indicating instability

Continuously monitoring pipeline observability allows you to optimize stability, performance, and data quality over time.

Final Thoughts

Apache Airflow provides extremely powerful capabilities for managing complex workflows involving web scraping.

Key takeaways:

  • Easily build reusable pipelines with Airflow‘s rich library of operators
  • Native scheduling, monitoring, retries and failure handling
  • Ability to orchestrate sophisticated multi-stage workflows
  • Integration with tools like Oxylabs allows focusing on pipeline logic rather than data extraction
  • Robust framework scales to large production workflows
  • Improved visibility into pipeline health and data through monitoring

If you are running recurring scraping jobs, I highly recommend evaluating Airflow as it can save tremendous engineering time. Let me know if you have any other questions on how to leverage Airflow for your web scraping needs!

How useful was this post?

Click on a star to rate it!

Average rating 0 / 5. Vote count: 0

No votes so far! Be the first to rate this post.