- [[python - concurrent futures]]
- [[processes vs threads vs cores vs sockets]]: how many processes/threads is possible?
- [[python - threading issues]]
# Idea
Example from [official documentation](https://docs.python.org/dev/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor) (minor modifications made to highlight the async nature).
Max parallel workers: `max_workers` default is `min(32, os.cpu_count() + 4)`
**See next code chunk for detailed explanation.**
```python
# request urls with parallel threads/processes
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import os
import threading
URLS = [
"http://www.google.com/",
"http://www.bbc.co.uk/",
"http://europe.wsj.com/",
"http://some-made-up-domain.com/",
"http://www.foxnews.com/",
]
def load_url(url, timeout):
pid = os.getpid()
tid = threading.get_ident()
print(f"process id: {pid}, thread id: {tid}")
with urllib.request.urlopen(url, timeout=timeout) as conn:
return pid, url, conn.read()
if __name__ == "__main__":
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(load_url, url, 60): url for url in URLS}
for i, f in enumerate(as_completed(futures)):
url = futures[f]
try:
pid, url, data = f.result()
except Exception as e:
print(f"{url} generated an exception: {e}")
else:
print(f"{url} page is {len(data)} bytes")
print(f"finished {i} jobs")
```
## Commented version
```python
# request urls with parallel threads/processes
from concurrent.futures import ThreadPoolExecutor, as_completed
# import ProcessPoolExecutor for pariallel processes instead of threads
# from concurrent.futures import ProcessPoolExecutor, as_completed
import urllib.request
import os
import threading
URLS = [
"http://www.google.com/",
"http://www.bbc.co.uk/",
"http://europe.wsj.com/",
"http://some-made-up-domain.com/",
"http://www.foxnews.com/",
]
# retrieve a single page and report the URL and contents
def load_url(url, timeout):
pid = os.getpid() # always good to check the process id (changes if using ProcessPoolExecutor)
tid = threading.get_ident() # get thread id
print(f"INSIDE FUNCTION! process id: {pid}, thread id: {tid}; url: {url}")
with urllib.request.urlopen(url, timeout=timeout) as conn:
return pid, url, conn.read()
# test function
load_url(URLS[0], 30)
urlsdict = {k: v for k, v in zip(URLS, range(len(URLS)))} # not required (to show the output for educational purposes)
complete_order = {} # not required (to show the output for educational purposes)
# __name__ == "__main__" to avoid RuntimeError (see https://docs.python.org/3.7/library/multiprocessing.html#the-spawn-and-forkserver-start-methods)
if __name__ == "__main__":
# use a with statement to ensure threads/processes are cleaned up promptly
with ThreadPoolExecutor(max_workers=5) as executor:
# start the load operations and mark each future with its URL
futures = {executor.submit(load_url, url, 60): url for url in URLS}
# process each thread/process when it's completed
for i, f in enumerate(as_completed(futures)):
url = futures[f] # deal with async nature of submit
print(f"\nOUTSIDE FUNCTION: FINISH JOB {i}: {url}")
print(f"url idx in URLS list: {urlsdict[url]}")
# process completed jobs
try:
pid, url, data = f.result() # retrieve output of load_url function (effect is the same as calling load_url(url, timeout))
except Exception as e: # fail to call f.result()
print(f"{url} generated an exception: {e}")
else:
print(f"{url} page is {len(data)} bytes")
print(f"finished {i} jobs")
# this block is for educational purpose: illustrates the fact that jobs are not always completed in the order in which they were submitted
states = [i._state for i in futures]
print(f"states of jobs: {states}")
idx = states.count("FINISHED") # count no. of finished jobs
complete_order[url] = idx - 1
print(f"input order: {URLS}")
print(f"order in which jobs were completed: {complete_order}\n\n")
print(f"input order:\n{URLS}")
print(f"order in which jobs were completed (async! i.e., not in order of input!):\n{list(complete_order.keys())}")
```
# References
- [concurrent.futures — Launching parallel tasks — Python 3.11.0a5 documentation](https://docs.python.org/dev/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor)
- [multiprocessing — Process-based parallelism — Python 3.7.12 documentation](https://docs.python.org/3.7/library/multiprocessing.html#the-spawn-and-forkserver-start-methods)