NeuroAgent

Asynchronous Web Scraping with Multithreading in Python

Optimizing web scraping: how to effectively combine aiohttp and multithreading in Python for processing 15,000+ pages. Complete guide with code examples.

How to implement both asynchronous programming and multithreading simultaneously in Python for web scraping?

I need to scrape a large number of pages (around 15,000) with 35 articles on each page for each category. If I do this synchronously using requests in a single thread, it will take a very long time (several hours), and I need the information urgently. I want to use aiohttp and multithreading.

I plan to implement it this way: each thread will process thousands of pages for a specific category in parallel with other threads (there are about 15 categories total). Within each thread, requests to aiohttp will be executed asynchronously.

The problem is that the executor doesn’t allow passing a function with await to it, and if I call the function directly in submit, multithreading doesn’t work - the threads execute sequentially.

How to correctly implement a combination of aiohttp asynchronous requests in a multithreaded environment for efficient web scraping?

My current code is provided below:

python
# Other imports
import bs4
import aiohttp, asyncio
from urllib import parse
import certifi, ssl
from concurrent.futures import ThreadPoolExecutor

# Project imports
from utils import to_dict, fetch

items_list = []
SSL_CERT = ssl.create_default_context(cafile=certifi.where())

async def serialize_topic(topic: str, topics: list[str]) -> None:
    print(f'Starting to serialize {topics.index(topic)+1}. {topic}')

    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
        topic_html = await fetch(func=session.get, url=topic, params={'limit' : 35})

    topic_soup = bs4.BeautifulSoup(topic_html, 'html.parser')

    # Getting the last page number to handle pagination
    pagination = list(topic_soup('ul', {'class' : 'pagination'}).children)
    max_page_url = pagination[-1].a.get('href')
    max_page_params = parse.urlparse(max_page_url).query # Parsing a link to the last page in order to get its number
    dict_from_query = parse.parse_qs(max_page_params)
    max_page = int(dict_from_query['page'][0])

    # Going through all pages
    for i in range(max_page):
        async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
            page_html = await fetch(func=session.get, url=topic, params={'page' : i, 'limit' : 100})

        page_soup = bs4.BeautifulSoup(page_html, 'html.parser')
        items = page_soup.find_all(class_='article')

        # Going through items on one page
        for item in items:

            # Getting item caption with its data
            # ... здесь просто получаю описание статьи и название 
            ...
            
        print(f'Serialized page number {i+1}')

    print(f'Serialized {topics.index(topic)+1}. {topic}')


async def main():
    # Getting all topics
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
        html = await fetch(func=session.get, url='')

    soup = bs4.BeautifulSoup(html, 'html.parser')
    topics = soup.find('div', {'id' : 'content'}).find_all(class_='topic')
    topics = [item.a.get('href') for item in topics]

    with ThreadPoolExecutor(max_workers=15) as executor:
        for topic in topics:
            executor.submit(await serialize_topic(), topic, topics)

    print('\n**************************\n')
    print(f'Total amount of articles: {len(items_list)}')


if __name__ == '__main__':
    asyncio.run(main())
NeuroAgent

The main problem in your code is that you’re trying to pass await serialize_topic() directly to executor.submit(). This doesn’t work correctly because executor.submit() expects a synchronous function, not an asynchronous one.

The correct implementation combining aiohttp and ThreadPoolExecutor requires using loop.run_in_executor(), which allows executing asynchronous functions in separate threads.

Table of Contents

Solving the Current Problem

In your code, you need to replace:

python
# Incorrect
executor.submit(await serialize_topic(), topic, topics)

with the correct approach using run_in_executor():

python
# Correct
loop = asyncio.get_event_loop()
await loop.run_in_executor(executor, serialize_topic, topic, topics)

Correct Implementation

Here’s how your main function should look:

python
async def main():
    # Get all topics
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
        html = await fetch(func=session.get, url='')
    
    soup = bs4.BeautifulSoup(html, 'html.parser')
    topics = soup.find('div', {'id': 'content'}).find_all(class_='topic')
    topics = [item.a.get('href') for item in topics]

    # Create event loop and executor
    loop = asyncio.get_event_loop()
    
    with ThreadPoolExecutor(max_workers=15) as executor:
        # Create tasks for each topic
        tasks = []
        for topic in topics:
            task = loop.run_in_executor(executor, serialize_topic, topic, topics)
            tasks.append(task)
        
        # Wait for all tasks to complete
        await asyncio.gather(*tasks)

    print('\n**************************\n')
    print(f'Total articles: {len(items_list)}')

Optimized Approach

For maximum performance, the following structure is recommended:

python
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlparse, parse_qs
import bs4
import certifi
import ssl

# Global variables
items_list = []
SSL_CERT = ssl.create_default_context(cafile=certifi.where())

# Synchronous function for processing one topic
def serialize_topic_sync(topic: str, topics: list[str]) -> None:
    print(f'Starting processing {topics.index(topic)+1}. {topic}')
    
    # Create event loop for this thread
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    
    try:
        # Run the asynchronous function in this thread
        loop.run_until_complete(serialize_topic_async(topic, topics))
    finally:
        loop.close()

# Asynchronous function for processing one topic
async def serialize_topic_async(topic: str, topics: list[str]) -> None:
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
        topic_html = await fetch(func=session.get, url=topic, params={'limit': 35})
    
    topic_soup = bs4.BeautifulSoup(topic_html, 'html.parser')
    
    # Get the last page number
    pagination = list(topic_soup('ul', {'class': 'pagination'}).children)
    max_page_url = pagination[-1].a.get('href')
    max_page_params = urlparse(max_page_url).query
    dict_from_query = parse_qs(max_page_params)
    max_page = int(dict_from_query['page'][0])
    
    # Process all pages
    for i in range(1, max_page + 1):
        async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
            page_html = await fetch(func=session.get, url=topic, params={'page': i, 'limit': 100})
        
        page_soup = bs4.BeautifulSoup(page_html, 'html.parser')
        items = page_soup.find_all(class_='article')
        
        # Process page items
        for item in items:
            # ... your processing logic ...
            pass
        
        print(f'Processed page {i} for topic {topic}')

    print(f'Completed processing {topics.index(topic)+1}. {topic}')

async def main():
    # Get all topics
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
        html = await fetch(func=session.get, url='')
    
    soup = bs4.BeautifulSoup(html, 'html.parser')
    topics = soup.find('div', {'id': 'content'}).find_all(class_='topic')
    topics = [item.a.get('href') for item in topics]

    # Use ThreadPoolExecutor for parallel topic processing
    with ThreadPoolExecutor(max_workers=min(15, len(topics))) as executor:
        # Create synchronous tasks for each thread
        futures = []
        for topic in topics:
            future = executor.submit(serialize_topic_sync, topic, topics)
            futures.append(future)
        
        # Wait for all threads to complete
        for future in futures:
            future.result()

    print(f'Total articles processed: {len(items_list)}')

if __name__ == '__main__':
    asyncio.run(main())

Complete Working Example

Here’s a complete working example with additional optimizations:

python
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse, parse_qs
import bs4
import certifi
import ssl
import time
from typing import List, Dict

# Global variables
items_list = []
SSL_CERT = ssl.create_default_context(cafile=certifi.where())

async def fetch_with_retry(
    session: aiohttp.ClientSession, 
    url: str, 
    params: Dict = None, 
    max_retries: int = 3,
    delay: float = 1.0
) -> str:
    """Function for making requests with automatic retries"""
    for attempt in range(max_retries):
        try:
            async with session.get(url, params=params, timeout=30) as response:
                response.raise_for_status()
                return await response.text()
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(delay * (2 ** attempt))

async def process_page(session: aiohttp.ClientSession, topic_url: str, page: int) -> List[Dict]:
    """Process one page with articles"""
    try:
        page_html = await fetch_with_retry(
            session, 
            topic_url, 
            params={'page': page, 'limit': 100}
        )
        
        page_soup = bs4.BeautifulSoup(page_html, 'html.parser')
        items = page_soup.find_all(class_='article')
        
        articles = []
        for item in items:
            # Example data extraction
            title = item.find('h2').text.strip() if item.find('h2') else 'No title'
            description = item.find('p').text.strip() if item.find('p') else 'No description'
            
            articles.append({
                'title': title,
                'description': description,
                'page': page,
                'topic': topic_url
            })
        
        return articles
    except Exception as e:
        print(f"Error processing page {page} for {topic_url}: {e}")
        return []

async def process_topic_async(topic_url: str, topics: List[str]) -> List[Dict]:
    """Asynchronous processing of one topic"""
    print(f'Starting processing topic {topics.index(topic_url)+1}/{len(topics)}: {topic_url}')
    
    async with aiohttp.ClientSession(
        connector=aiohttp.TCPConnector(ssl=SSL_CERT, limit=100, limit_per_host=30),
        timeout=aiohttp.ClientTimeout(total=60)
    ) as session:
        # Get first page to determine total number of pages
        try:
            first_page_html = await fetch_with_retry(session, topic_url, params={'limit': 35})
        except Exception as e:
            print(f"Failed to get first page for {topic_url}: {e}")
            return []
        
        topic_soup = bs4.BeautifulSoup(first_page_html, 'html.parser')
        
        try:
            # Determine number of pages
            pagination = list(topic_soup('ul', {'class': 'pagination'}).children)
            max_page_url = pagination[-1].a.get('href')
            max_page_params = urlparse(max_page_url).query
            dict_from_query = parse_qs(max_page_params)
            max_page = int(dict_from_query['page'][0])
        except (IndexError, KeyError, ValueError):
            max_page = 1  # If pagination not found, process only first page
        
        print(f'Topic {topic_url}: found {max_page} pages')
        
        # Create tasks for all pages
        tasks = []
        for page in range(1, max_page + 1):
            task = process_page(session, topic_url, page)
            tasks.append(task)
        
        # Wait for all tasks for this topic to complete
        all_articles = []
        for task in tasks:
            articles = await task
            all_articles.extend(articles)
        
        print(f'Completed processing topic {topics.index(topic_url)+1}: {len(all_articles)} articles')
        return all_articles

def process_topic_sync(topic_url: str, topics: List[str]) -> List[Dict]:
    """Synchronous wrapper for processing a topic in a separate thread"""
    # Create a new event loop for this thread
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    
    try:
        # Run the asynchronous function
        return loop.run_until_complete(process_topic_async(topic_url, topics))
    finally:
        loop.close()

async def main():
    # Get list of all topics
    async with aiohttp.ClientSession(
        connector=aiohttp.TCPConnector(ssl=SSL_CERT),
        timeout=aiohttp.ClientTimeout(total=30)
    ) as session:
        try:
            html = await fetch_with_retry(session, '')
        except Exception as e:
            print(f"Failed to get topics list: {e}")
            return
        
        soup = bs4.BeautifulSoup(html, 'html.parser')
        topics_elements = soup.find('div', {'id': 'content'}).find_all(class_='topic')
        topics = [item.a.get('href') for item in topics_elements]
        
        print(f'Found {len(topics)} topics to process')
    
    # Determine optimal number of worker threads
    max_workers = min(15, len(topics))
    print(f'Using {max_workers} worker threads')
    
    # Start processing in ThreadPoolExecutor
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit tasks to threads
        futures = []
        for topic in topics:
            future = executor.submit(process_topic_sync, topic, topics)
            futures.append(future)
        
        # Collect results as they complete
        total_articles = 0
        for future in as_completed(futures):
            try:
                articles = future.result()
                items_list.extend(articles)
                total_articles += len(articles)
                print(f'Total articles processed: {total_articles}')
            except Exception as e:
                print(f'Error processing topic: {e}')
    
    end_time = time.time()
    
    print('\n**************************')
    print(f'Total articles processed: {len(items_list)}')
    print(f'Execution time: {end_time - start_time:.2f} seconds')
    print(f'Average speed: {len(items_list)/(end_time - start_time):.2f} articles/second')

if __name__ == '__main__':
    asyncio.run(main())

Additional Optimizations

  1. Connection limit management:

    python
    connector = aiohttp.TCPConnector(
        ssl=SSL_CERT,
        limit=100,          # Maximum number of connections
        limit_per_host=30,  # Maximum number of connections per host
        force_close=True    # Close unused connections
    )
    
  2. Adding delays between requests:

    python
    import random
    
    # Inside page processing loop
    await asyncio.sleep(random.uniform(0.5, 2.0))  # Random delay
    
  3. Using semaphore to limit parallel requests:

    python
    semaphore = asyncio.Semaphore(50)  # Maximum 50 concurrent requests
    
    async def fetch_with_semaphore(session, url, params=None):
        async with semaphore:
            return await fetch_with_retry(session, url, params)
    
  4. Saving results as they’re processed:

    python
    import json
    from pathlib import Path
    
    def save_articles(articles: List[Dict], filename: str):
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(articles, f, ensure_ascii=False, indent=2)
    
    # In main function
    save_articles(items_list, 'articles.json')
    

Performance Recommendations

  1. Determining optimal number of threads:

    python
    import multiprocessing
    
    def get_optimal_workers():
        cpu_count = multiprocessing.cpu_count()
        return min(15, max(1, cpu_count - 1))
    
  2. Using asynchronous data saving:

    python
    async def save_articles_async(articles: List[Dict], filename: str):
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, save_articles, articles, filename)
    
  3. Progress monitoring:

    python
    from tqdm import tqdm
    
    # In ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        with tqdm(total=len(topics), desc="Processing topics") as pbar:
            for topic in topics:
                executor.submit(process_topic_sync, topic, topics)
                pbar.update(1)
    
  4. Error handling and retries:

    python
    async def fetch_with_retry(
        session: aiohttp.ClientSession,
        url: str,
        params: Dict = None,
        max_retries: int = 5,
        base_delay: float = 1.0
    ) -> str:
        for attempt in range(max_retries):
            try:
                async with session.get(url, params=params, timeout=30) as response:
                    response.raise_for_status()
                    return await response.text()
            except (aiohttp.ClientError, asyncio.TimeoutError) as e:
                if attempt == max_retries - 1:
                    raise
                delay = base_delay * (2 ** attempt)  # Exponential backoff
                print(f"Retrying {attempt + 1}/{max_retries} after {delay}s...")
                await asyncio.sleep(delay)
    

This implementation will allow you to efficiently process 15,000 pages with optimal use of system resources and high execution speed.

Sources

  1. Web Scraping with asyncio - Python Free-Threading Guide
  2. Optimizing Web Scraping Speed in Python - Techniques and Best Practices
  3. Asynchronous Web Scraping in Python using concurrent module
  4. AsyncIO and concurrent.futures.ThreadPoolExecutor - Stack Overflow
  5. Threaded workers with AIOHTTP
  6. Event Loop — Python 3.14.0 documentation
  7. Combining Coroutines with Threads and Processes — PyMOTW 3