How to implement both asynchronous programming and multithreading simultaneously in Python for web scraping?
I need to scrape a large number of pages (around 15,000) with 35 articles on each page for each category. If I do this synchronously using requests in a single thread, it will take a very long time (several hours), and I need the information urgently. I want to use aiohttp and multithreading.
I plan to implement it this way: each thread will process thousands of pages for a specific category in parallel with other threads (there are about 15 categories total). Within each thread, requests to aiohttp will be executed asynchronously.
The problem is that the executor doesn’t allow passing a function with await to it, and if I call the function directly in submit, multithreading doesn’t work - the threads execute sequentially.
How to correctly implement a combination of aiohttp asynchronous requests in a multithreaded environment for efficient web scraping?
My current code is provided below:
# Other imports
import bs4
import aiohttp, asyncio
from urllib import parse
import certifi, ssl
from concurrent.futures import ThreadPoolExecutor
# Project imports
from utils import to_dict, fetch
items_list = []
SSL_CERT = ssl.create_default_context(cafile=certifi.where())
async def serialize_topic(topic: str, topics: list[str]) -> None:
print(f'Starting to serialize {topics.index(topic)+1}. {topic}')
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
topic_html = await fetch(func=session.get, url=topic, params={'limit' : 35})
topic_soup = bs4.BeautifulSoup(topic_html, 'html.parser')
# Getting the last page number to handle pagination
pagination = list(topic_soup('ul', {'class' : 'pagination'}).children)
max_page_url = pagination[-1].a.get('href')
max_page_params = parse.urlparse(max_page_url).query # Parsing a link to the last page in order to get its number
dict_from_query = parse.parse_qs(max_page_params)
max_page = int(dict_from_query['page'][0])
# Going through all pages
for i in range(max_page):
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
page_html = await fetch(func=session.get, url=topic, params={'page' : i, 'limit' : 100})
page_soup = bs4.BeautifulSoup(page_html, 'html.parser')
items = page_soup.find_all(class_='article')
# Going through items on one page
for item in items:
# Getting item caption with its data
# ... здесь просто получаю описание статьи и название
...
print(f'Serialized page number {i+1}')
print(f'Serialized {topics.index(topic)+1}. {topic}')
async def main():
# Getting all topics
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
html = await fetch(func=session.get, url='')
soup = bs4.BeautifulSoup(html, 'html.parser')
topics = soup.find('div', {'id' : 'content'}).find_all(class_='topic')
topics = [item.a.get('href') for item in topics]
with ThreadPoolExecutor(max_workers=15) as executor:
for topic in topics:
executor.submit(await serialize_topic(), topic, topics)
print('\n**************************\n')
print(f'Total amount of articles: {len(items_list)}')
if __name__ == '__main__':
asyncio.run(main())
The main problem in your code is that you’re trying to pass await serialize_topic() directly to executor.submit(). This doesn’t work correctly because executor.submit() expects a synchronous function, not an asynchronous one.
The correct implementation combining aiohttp and ThreadPoolExecutor requires using loop.run_in_executor(), which allows executing asynchronous functions in separate threads.
Table of Contents
- Solving the Current Problem
- Correct Implementation
- Optimized Approach
- Complete Working Example
- Additional Optimizations
- Performance Recommendations
Solving the Current Problem
In your code, you need to replace:
# Incorrect
executor.submit(await serialize_topic(), topic, topics)
with the correct approach using run_in_executor():
# Correct
loop = asyncio.get_event_loop()
await loop.run_in_executor(executor, serialize_topic, topic, topics)
Correct Implementation
Here’s how your main function should look:
async def main():
# Get all topics
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
html = await fetch(func=session.get, url='')
soup = bs4.BeautifulSoup(html, 'html.parser')
topics = soup.find('div', {'id': 'content'}).find_all(class_='topic')
topics = [item.a.get('href') for item in topics]
# Create event loop and executor
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=15) as executor:
# Create tasks for each topic
tasks = []
for topic in topics:
task = loop.run_in_executor(executor, serialize_topic, topic, topics)
tasks.append(task)
# Wait for all tasks to complete
await asyncio.gather(*tasks)
print('\n**************************\n')
print(f'Total articles: {len(items_list)}')
Optimized Approach
For maximum performance, the following structure is recommended:
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlparse, parse_qs
import bs4
import certifi
import ssl
# Global variables
items_list = []
SSL_CERT = ssl.create_default_context(cafile=certifi.where())
# Synchronous function for processing one topic
def serialize_topic_sync(topic: str, topics: list[str]) -> None:
print(f'Starting processing {topics.index(topic)+1}. {topic}')
# Create event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Run the asynchronous function in this thread
loop.run_until_complete(serialize_topic_async(topic, topics))
finally:
loop.close()
# Asynchronous function for processing one topic
async def serialize_topic_async(topic: str, topics: list[str]) -> None:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
topic_html = await fetch(func=session.get, url=topic, params={'limit': 35})
topic_soup = bs4.BeautifulSoup(topic_html, 'html.parser')
# Get the last page number
pagination = list(topic_soup('ul', {'class': 'pagination'}).children)
max_page_url = pagination[-1].a.get('href')
max_page_params = urlparse(max_page_url).query
dict_from_query = parse_qs(max_page_params)
max_page = int(dict_from_query['page'][0])
# Process all pages
for i in range(1, max_page + 1):
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
page_html = await fetch(func=session.get, url=topic, params={'page': i, 'limit': 100})
page_soup = bs4.BeautifulSoup(page_html, 'html.parser')
items = page_soup.find_all(class_='article')
# Process page items
for item in items:
# ... your processing logic ...
pass
print(f'Processed page {i} for topic {topic}')
print(f'Completed processing {topics.index(topic)+1}. {topic}')
async def main():
# Get all topics
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
html = await fetch(func=session.get, url='')
soup = bs4.BeautifulSoup(html, 'html.parser')
topics = soup.find('div', {'id': 'content'}).find_all(class_='topic')
topics = [item.a.get('href') for item in topics]
# Use ThreadPoolExecutor for parallel topic processing
with ThreadPoolExecutor(max_workers=min(15, len(topics))) as executor:
# Create synchronous tasks for each thread
futures = []
for topic in topics:
future = executor.submit(serialize_topic_sync, topic, topics)
futures.append(future)
# Wait for all threads to complete
for future in futures:
future.result()
print(f'Total articles processed: {len(items_list)}')
if __name__ == '__main__':
asyncio.run(main())
Complete Working Example
Here’s a complete working example with additional optimizations:
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse, parse_qs
import bs4
import certifi
import ssl
import time
from typing import List, Dict
# Global variables
items_list = []
SSL_CERT = ssl.create_default_context(cafile=certifi.where())
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
params: Dict = None,
max_retries: int = 3,
delay: float = 1.0
) -> str:
"""Function for making requests with automatic retries"""
for attempt in range(max_retries):
try:
async with session.get(url, params=params, timeout=30) as response:
response.raise_for_status()
return await response.text()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(delay * (2 ** attempt))
async def process_page(session: aiohttp.ClientSession, topic_url: str, page: int) -> List[Dict]:
"""Process one page with articles"""
try:
page_html = await fetch_with_retry(
session,
topic_url,
params={'page': page, 'limit': 100}
)
page_soup = bs4.BeautifulSoup(page_html, 'html.parser')
items = page_soup.find_all(class_='article')
articles = []
for item in items:
# Example data extraction
title = item.find('h2').text.strip() if item.find('h2') else 'No title'
description = item.find('p').text.strip() if item.find('p') else 'No description'
articles.append({
'title': title,
'description': description,
'page': page,
'topic': topic_url
})
return articles
except Exception as e:
print(f"Error processing page {page} for {topic_url}: {e}")
return []
async def process_topic_async(topic_url: str, topics: List[str]) -> List[Dict]:
"""Asynchronous processing of one topic"""
print(f'Starting processing topic {topics.index(topic_url)+1}/{len(topics)}: {topic_url}')
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=SSL_CERT, limit=100, limit_per_host=30),
timeout=aiohttp.ClientTimeout(total=60)
) as session:
# Get first page to determine total number of pages
try:
first_page_html = await fetch_with_retry(session, topic_url, params={'limit': 35})
except Exception as e:
print(f"Failed to get first page for {topic_url}: {e}")
return []
topic_soup = bs4.BeautifulSoup(first_page_html, 'html.parser')
try:
# Determine number of pages
pagination = list(topic_soup('ul', {'class': 'pagination'}).children)
max_page_url = pagination[-1].a.get('href')
max_page_params = urlparse(max_page_url).query
dict_from_query = parse_qs(max_page_params)
max_page = int(dict_from_query['page'][0])
except (IndexError, KeyError, ValueError):
max_page = 1 # If pagination not found, process only first page
print(f'Topic {topic_url}: found {max_page} pages')
# Create tasks for all pages
tasks = []
for page in range(1, max_page + 1):
task = process_page(session, topic_url, page)
tasks.append(task)
# Wait for all tasks for this topic to complete
all_articles = []
for task in tasks:
articles = await task
all_articles.extend(articles)
print(f'Completed processing topic {topics.index(topic_url)+1}: {len(all_articles)} articles')
return all_articles
def process_topic_sync(topic_url: str, topics: List[str]) -> List[Dict]:
"""Synchronous wrapper for processing a topic in a separate thread"""
# Create a new event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Run the asynchronous function
return loop.run_until_complete(process_topic_async(topic_url, topics))
finally:
loop.close()
async def main():
# Get list of all topics
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=SSL_CERT),
timeout=aiohttp.ClientTimeout(total=30)
) as session:
try:
html = await fetch_with_retry(session, '')
except Exception as e:
print(f"Failed to get topics list: {e}")
return
soup = bs4.BeautifulSoup(html, 'html.parser')
topics_elements = soup.find('div', {'id': 'content'}).find_all(class_='topic')
topics = [item.a.get('href') for item in topics_elements]
print(f'Found {len(topics)} topics to process')
# Determine optimal number of worker threads
max_workers = min(15, len(topics))
print(f'Using {max_workers} worker threads')
# Start processing in ThreadPoolExecutor
start_time = time.time()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit tasks to threads
futures = []
for topic in topics:
future = executor.submit(process_topic_sync, topic, topics)
futures.append(future)
# Collect results as they complete
total_articles = 0
for future in as_completed(futures):
try:
articles = future.result()
items_list.extend(articles)
total_articles += len(articles)
print(f'Total articles processed: {total_articles}')
except Exception as e:
print(f'Error processing topic: {e}')
end_time = time.time()
print('\n**************************')
print(f'Total articles processed: {len(items_list)}')
print(f'Execution time: {end_time - start_time:.2f} seconds')
print(f'Average speed: {len(items_list)/(end_time - start_time):.2f} articles/second')
if __name__ == '__main__':
asyncio.run(main())
Additional Optimizations
-
Connection limit management:
pythonconnector = aiohttp.TCPConnector( ssl=SSL_CERT, limit=100, # Maximum number of connections limit_per_host=30, # Maximum number of connections per host force_close=True # Close unused connections ) -
Adding delays between requests:
pythonimport random # Inside page processing loop await asyncio.sleep(random.uniform(0.5, 2.0)) # Random delay -
Using semaphore to limit parallel requests:
pythonsemaphore = asyncio.Semaphore(50) # Maximum 50 concurrent requests async def fetch_with_semaphore(session, url, params=None): async with semaphore: return await fetch_with_retry(session, url, params) -
Saving results as they’re processed:
pythonimport json from pathlib import Path def save_articles(articles: List[Dict], filename: str): with open(filename, 'w', encoding='utf-8') as f: json.dump(articles, f, ensure_ascii=False, indent=2) # In main function save_articles(items_list, 'articles.json')
Performance Recommendations
-
Determining optimal number of threads:
pythonimport multiprocessing def get_optimal_workers(): cpu_count = multiprocessing.cpu_count() return min(15, max(1, cpu_count - 1)) -
Using asynchronous data saving:
pythonasync def save_articles_async(articles: List[Dict], filename: str): loop = asyncio.get_event_loop() await loop.run_in_executor(None, save_articles, articles, filename) -
Progress monitoring:
pythonfrom tqdm import tqdm # In ThreadPoolExecutor with ThreadPoolExecutor(max_workers=max_workers) as executor: with tqdm(total=len(topics), desc="Processing topics") as pbar: for topic in topics: executor.submit(process_topic_sync, topic, topics) pbar.update(1) -
Error handling and retries:
pythonasync def fetch_with_retry( session: aiohttp.ClientSession, url: str, params: Dict = None, max_retries: int = 5, base_delay: float = 1.0 ) -> str: for attempt in range(max_retries): try: async with session.get(url, params=params, timeout=30) as response: response.raise_for_status() return await response.text() except (aiohttp.ClientError, asyncio.TimeoutError) as e: if attempt == max_retries - 1: raise delay = base_delay * (2 ** attempt) # Exponential backoff print(f"Retrying {attempt + 1}/{max_retries} after {delay}s...") await asyncio.sleep(delay)
This implementation will allow you to efficiently process 15,000 pages with optimal use of system resources and high execution speed.
Sources
- Web Scraping with asyncio - Python Free-Threading Guide
- Optimizing Web Scraping Speed in Python - Techniques and Best Practices
- Asynchronous Web Scraping in Python using concurrent module
- AsyncIO and concurrent.futures.ThreadPoolExecutor - Stack Overflow
- Threaded workers with AIOHTTP
- Event Loop — Python 3.14.0 documentation
- Combining Coroutines with Threads and Processes — PyMOTW 3