- [[python - concurrent futures]] - [[processes vs threads vs cores vs sockets]]: how many processes/threads is possible? - [[python - threading issues]] # Idea Example from [official documentation](https://docs.python.org/dev/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor) (minor modifications made to highlight the async nature). Max parallel workers: `max_workers` default is `min(32, os.cpu_count() + 4)` **See next code chunk for detailed explanation.** ```python # request urls with parallel threads/processes from concurrent.futures import ThreadPoolExecutor, as_completed import urllib.request import os import threading URLS = [ "http://www.google.com/", "http://www.bbc.co.uk/", "http://europe.wsj.com/", "http://some-made-up-domain.com/", "http://www.foxnews.com/", ] def load_url(url, timeout): pid = os.getpid() tid = threading.get_ident() print(f"process id: {pid}, thread id: {tid}") with urllib.request.urlopen(url, timeout=timeout) as conn: return pid, url, conn.read() if __name__ == "__main__": with ThreadPoolExecutor(max_workers=5) as executor: futures = {executor.submit(load_url, url, 60): url for url in URLS} for i, f in enumerate(as_completed(futures)): url = futures[f] try: pid, url, data = f.result() except Exception as e: print(f"{url} generated an exception: {e}") else: print(f"{url} page is {len(data)} bytes") print(f"finished {i} jobs") ``` ## Commented version ```python # request urls with parallel threads/processes from concurrent.futures import ThreadPoolExecutor, as_completed # import ProcessPoolExecutor for pariallel processes instead of threads # from concurrent.futures import ProcessPoolExecutor, as_completed import urllib.request import os import threading URLS = [ "http://www.google.com/", "http://www.bbc.co.uk/", "http://europe.wsj.com/", "http://some-made-up-domain.com/", "http://www.foxnews.com/", ] # retrieve a single page and report the URL and contents def load_url(url, timeout): pid = os.getpid() # always good to check the process id (changes if using ProcessPoolExecutor) tid = threading.get_ident() # get thread id print(f"INSIDE FUNCTION! process id: {pid}, thread id: {tid}; url: {url}") with urllib.request.urlopen(url, timeout=timeout) as conn: return pid, url, conn.read() # test function load_url(URLS[0], 30) urlsdict = {k: v for k, v in zip(URLS, range(len(URLS)))} # not required (to show the output for educational purposes) complete_order = {} # not required (to show the output for educational purposes) # __name__ == "__main__" to avoid RuntimeError (see https://docs.python.org/3.7/library/multiprocessing.html#the-spawn-and-forkserver-start-methods) if __name__ == "__main__": # use a with statement to ensure threads/processes are cleaned up promptly with ThreadPoolExecutor(max_workers=5) as executor: # start the load operations and mark each future with its URL futures = {executor.submit(load_url, url, 60): url for url in URLS} # process each thread/process when it's completed for i, f in enumerate(as_completed(futures)): url = futures[f] # deal with async nature of submit print(f"\nOUTSIDE FUNCTION: FINISH JOB {i}: {url}") print(f"url idx in URLS list: {urlsdict[url]}") # process completed jobs try: pid, url, data = f.result() # retrieve output of load_url function (effect is the same as calling load_url(url, timeout)) except Exception as e: # fail to call f.result() print(f"{url} generated an exception: {e}") else: print(f"{url} page is {len(data)} bytes") print(f"finished {i} jobs") # this block is for educational purpose: illustrates the fact that jobs are not always completed in the order in which they were submitted states = [i._state for i in futures] print(f"states of jobs: {states}") idx = states.count("FINISHED") # count no. of finished jobs complete_order[url] = idx - 1 print(f"input order: {URLS}") print(f"order in which jobs were completed: {complete_order}\n\n") print(f"input order:\n{URLS}") print(f"order in which jobs were completed (async! i.e., not in order of input!):\n{list(complete_order.keys())}") ``` # References - [concurrent.futures — Launching parallel tasks — Python 3.11.0a5 documentation](https://docs.python.org/dev/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor) - [multiprocessing — Process-based parallelism — Python 3.7.12 documentation](https://docs.python.org/3.7/library/multiprocessing.html#the-spawn-and-forkserver-start-methods)