Source code for pythonwrench.concurrent
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any, Callable, Dict, Generic, Iterable, List, Optional, TypeVar
from typing_extensions import ParamSpec
logger = logging.getLogger(__name__)
P = ParamSpec("P")
T = TypeVar("T")
[docs]
class ThreadPoolExecutorHelper(Generic[P, T]):
# Note: use commas for typing because Future is not generic in older python versions
def __init__(
self,
fn: Callable[P, T],
*,
executor_kwds: Optional[Dict[str, Any]] = None,
executor: Optional[ThreadPoolExecutor] = None,
futures: "Iterable[Future[T]]" = (),
**default_fn_kwds,
) -> None:
futures = list(futures)
super().__init__()
self.fn = fn
self.executor_kwds = executor_kwds
self.executor = executor
self.futures = futures
self.default_kwargs = default_fn_kwds
[docs]
def submit(self, *args: P.args, **kwargs: P.kwargs) -> "Future[T]":
if self.executor is None:
executor_kwds = self.executor_kwds
if executor_kwds is None:
executor_kwds = {}
self.executor = ThreadPoolExecutor(**executor_kwds)
kwargs = self.default_kwargs | kwargs # type: ignore
future = self.executor.submit(self.fn, *args, **kwargs)
self.futures.append(future)
return future
[docs]
def wait_all(self, shutdown: bool = True, verbose: bool = True) -> List[T]:
futures = self.futures
if verbose:
try:
import tqdm # type: ignore
futures = tqdm.tqdm(futures, disable=not verbose)
except ImportError:
msg = "Cannot display verbose bar because tqdm is not installed."
logger.warning(msg)
results = [future.result() for future in futures]
self.futures.clear()
if shutdown and self.executor is not None:
self.executor.shutdown()
self.executor = None
return results