#!/usr/bin/env python
# -*- coding: utf-8 -*-
import csv
import io
from csv import DictReader, DictWriter
from io import TextIOBase
from os import PathLike
from pathlib import Path
from typing import (
Any,
Dict,
Iterable,
List,
Literal,
Mapping,
Optional,
TypeVar,
Union,
get_args,
overload,
)
from pythonwrench._core import _setup_output_fpath
from pythonwrench.cast import as_builtin
from pythonwrench.collections import dict_list_to_list_dict, list_dict_to_dict_list
from pythonwrench.functools import function_alias
from pythonwrench.typing import isinstance_generic
T = TypeVar("T")
Orient = Literal["list", "dict"]
# -- Dump / Save / Serialize content to CSV --
[docs]
def dump_csv(
data: Union[Iterable[Mapping[str, Any]], Mapping[str, Iterable[Any]], Iterable],
file: Union[str, Path, PathLike, TextIOBase, None] = None,
/,
*,
overwrite: bool = True,
make_parents: bool = True,
to_builtins: bool = False,
header: Union[bool, Literal["auto"]] = "auto",
align_content: bool = False,
replace_newline_by: Optional[str] = "\\n",
**csv_writer_kwds,
) -> str:
r"""Dump content to CSV format into string and/or file.
Args:
data: Data to serialize. Can be a list of dicts, dicts of lists or list of lists.
file: File path or buffer to write serialized data.
overwrite: If True, overwrite target filepath. defaults to True.
make_parents: Build intermediate directories to filepath. defaults to True.
to_builtins: If True, converts data to builtin equivalent before saving. defaults to False.
header: Indicates if CSV must have header. If "auto", an header is added when a dict of list or list of dicts is passed. defaults to "auto".
align_content: If True, center content at the middle of each row for better visualization. defaults to False.
replace_newline_by: Replace newline character to avoid newline in CSV content. defaults to "\\n".
\*\*csv_writer_kwds: Others optional arguments passed to CSV writer object.
Returns:
Dumped content as string.
"""
content = dumps_csv(
data,
to_builtins=to_builtins,
header=header,
align_content=align_content,
replace_newline_by=replace_newline_by,
**csv_writer_kwds,
)
if isinstance(file, (str, Path, PathLike)):
file = _setup_output_fpath(file, overwrite, make_parents)
with open(file, "w") as opened_file:
opened_file.write(content)
elif isinstance(file, TextIOBase):
file.write(content)
elif file is None:
pass
else:
msg = f"Invalid argument type {type(file)}. (expected one of str, Path, TextIOBase, None)"
raise TypeError(msg)
return content
[docs]
def dumps_csv(
data: Union[Iterable[Mapping[str, Any]], Mapping[str, Iterable[Any]], Iterable],
/,
*,
to_builtins: bool = False,
header: Union[bool, Literal["auto"]] = "auto",
align_content: bool = False,
replace_newline_by: Optional[str] = "\\n",
**csv_writer_kwds,
) -> str:
r"""Dump content to CSV format into string.
Args:
data: Data to serialize. Can be a list of dicts, dicts of lists or list of lists.
overwrite: If True, overwrite target filepath. defaults to True.
make_parents: Build intermediate directories to filepath. defaults to True.
to_builtins: If True, converts data to builtin equivalent before saving. defaults to False.
header: Indicates if CSV must have header. If "auto", an header is added when a dict of list or list of dicts is passed. defaults to "auto".
align_content: If True, center content at the middle of each row for better visualization. defaults to False.
replace_newline_by: Replace newline character to avoid newline in CSV content. defaults to "\\n".
\*\*csv_writer_kwds: Others optional arguments passed to CSV writer object.
Returns:
Dumped content as string.
"""
with io.StringIO() as buffer:
_serialize_csv(
data,
buffer,
to_builtins=to_builtins,
header=header,
align_content=align_content,
replace_newline_by=replace_newline_by,
**csv_writer_kwds,
)
content = buffer.getvalue()
return content
[docs]
def save_csv(
data: Union[Iterable[Mapping[str, Any]], Mapping[str, Iterable[Any]], Iterable],
file: Union[str, Path, PathLike, TextIOBase],
/,
*,
overwrite: bool = True,
make_parents: bool = True,
to_builtins: bool = False,
header: Union[bool, Literal["auto"]] = "auto",
align_content: bool = False,
replace_newline_by: Optional[str] = "\\n",
**csv_writer_kwds,
) -> None:
r"""Save content to CSV format into a file or buffer.
Args:
data: Data to serialize. Can be a list of dicts, dicts of lists or list of lists.
overwrite: If True, overwrite target filepath. defaults to True.
make_parents: Build intermediate directories to filepath. defaults to True.
to_builtins: If True, converts data to builtin equivalent before saving. defaults to False.
header: Indicates if CSV must have header. If "auto", an header is added when a dict of list or list of dicts is passed. defaults to "auto".
align_content: If True, center content at the middle of each row for better visualization. defaults to False.
replace_newline_by: Replace newline character to avoid newline in CSV content. defaults to "\\n".
\*\*csv_writer_kwds: Others optional arguments passed to CSV writer object.
"""
if isinstance(file, (str, Path, PathLike)):
file = _setup_output_fpath(file, overwrite=overwrite, make_parents=make_parents)
file = open(file, "w")
close = True
elif isinstance(file, TextIOBase):
close = False
else:
msg = f"Invalid argument type {type(file)}. (expected one of str, Path, PathLike, TextIOBase)"
raise TypeError(msg)
_serialize_csv(
data,
file,
to_builtins=to_builtins,
header=header,
align_content=align_content,
replace_newline_by=replace_newline_by,
**csv_writer_kwds,
)
if close:
file.close()
def _serialize_csv(
data: Union[Iterable[Mapping[str, Any]], Mapping[str, Iterable[Any]], Iterable],
buffer: TextIOBase,
*,
to_builtins: bool = False,
header: Union[bool, Literal["auto"]] = "auto",
align_content: bool = False,
replace_newline_by: Optional[str] = "\\n",
**csv_writer_kwds,
) -> None:
if to_builtins:
data = as_builtin(data)
is_mapping_iterable = isinstance_generic(data, Mapping[str, Iterable])
if is_mapping_iterable:
is_iterable_mapping = False
else:
is_iterable_mapping = isinstance_generic(data, Iterable[Mapping[str, Any]])
if header == "auto":
header = is_mapping_iterable or is_iterable_mapping
if is_mapping_iterable:
data_lst = dict_list_to_list_dict(data, "same")
elif is_iterable_mapping:
data_lst = [dict(data_i.items()) for data_i in data]
elif isinstance(data, str):
msg = f"Invalid argument type {type(data)}."
raise TypeError(msg)
elif not header and isinstance_generic(data, Iterable[str]):
data_lst = [{"0": data_i} for data_i in data]
elif not header and isinstance_generic(data, Iterable[Iterable]):
data_lst = [dict(zip(map(str, range(len(data_i))), data)) for data_i in data]
elif not header and isinstance(data, Iterable):
data_lst = [{"0": data_i} for data_i in data]
else:
msg = f"Invalid argument type {type(data)} with {header=}."
raise TypeError(msg)
del data
if header:
writer_cls = DictWriter
else:
writer_cls = csv.writer
if len(data_lst) == 0:
fieldnames = []
else:
fieldnames = [str(k) for k in data_lst[0].keys()]
if align_content:
old_fieldnames = fieldnames
data_lst = _stringify(data_lst)
fieldnames = _stringify(fieldnames)
max_num_chars = {
k: max(max(len(data_i[k]) for data_i in data_lst), len(k)) + 1
for k in fieldnames
}
fieldnames = [f"{{:^{max_num_chars[k]}s}}".format(k) for k in fieldnames]
old_to_new_fieldnames = dict(zip(old_fieldnames, fieldnames))
data_lst = [
{
old_to_new_fieldnames[k]: f"{{:^{max_num_chars[k]}s}}".format(v)
for k, v in data_i.items()
}
for data_i in data_lst
]
if replace_newline_by is not None:
def _replace_newline(s):
if not isinstance(s, str):
return s
else:
return s.replace("\n", replace_newline_by)
data_lst = [
{_replace_newline(k): _replace_newline(v) for k, v in data_i.items()}
for data_i in data_lst
]
if header:
csv_writer_kwds["fieldnames"] = fieldnames
writer = writer_cls(buffer, **csv_writer_kwds)
if isinstance(writer, DictWriter):
writer.writeheader()
writer.writerows(data_lst)
else:
data_lst = [tuple(data_i.values()) for data_i in data_lst]
writer.writerows(data_lst)
def _stringify(x: Any) -> Any:
if isinstance(x, str):
return x
elif isinstance(x, dict):
return {_stringify(k): _stringify(v) for k, v in x.items()} # type: ignore
elif isinstance(x, (list, tuple, set, frozenset)):
return type(x)(_stringify(xi) for xi in x)
else:
return str(x)
# -- Load / Read / Parse CSV content --
@overload
def load_csv(
file: Union[str, Path, TextIOBase],
/,
*,
orient: Literal["list"] = "list",
header: bool = True,
comment_start: Optional[str] = None,
strip_content: bool = False,
# CSV reader kwargs
delimiter: Optional[str] = None,
**csv_reader_kwds,
) -> List[Dict[str, Any]]: ...
@overload
def load_csv(
file: Union[str, Path, TextIOBase],
/,
*,
orient: Literal["dict"],
header: bool = True,
comment_start: Optional[str] = None,
strip_content: bool = False,
# CSV reader kwargs
delimiter: Optional[str] = None,
**csv_reader_kwds,
) -> Dict[str, List[Any]]: ...
[docs]
def load_csv(
file: Union[str, Path, TextIOBase],
/,
*,
orient: Orient = "list",
header: bool = True,
comment_start: Optional[str] = None,
strip_content: bool = False,
# CSV reader kwargs
delimiter: Optional[str] = ",",
**csv_reader_kwds,
) -> Union[List[Dict[str, Any]], Dict[str, List[Any]]]:
r"""Load content from csv filepath.
Args:
orient: Orientation of the output value. Can be "list" or "dict". defaults to "list".
header: Specify if CSV has header column. defaults to True.
comment_start: If this string is not None and a line starts with this string, the line will be ignored. defaults to None.
delimiter: Value delimiter. defaults to ",".
\*\*csv_reader_kwds: Other optional csv arguments.
Returns:
The loaded values as dict of lists, list of dicts or list of lists.
"""
if isinstance(file, (str, Path, PathLike)):
file = Path(file)
if delimiter is None:
delimiter = "\t" if file.suffix == ".tsv" else ","
file = file.open("r")
close = True
else:
close = False
data = _parse_csv(
file,
orient=orient,
header=header,
comment_start=comment_start,
strip_content=strip_content,
delimiter=delimiter,
**csv_reader_kwds,
)
if close:
file.close()
return data
@overload
def loads_csv(
content: str,
/,
*,
orient: Literal["list"] = "list",
header: bool = True,
comment_start: Optional[str] = None,
strip_content: bool = False,
# CSV reader kwargs
delimiter: Optional[str] = ",",
**csv_reader_kwds,
) -> List[Dict[str, Any]]: ...
@overload
def loads_csv(
content: str,
/,
*,
orient: Literal["dict"],
header: bool = True,
comment_start: Optional[str] = None,
strip_content: bool = False,
# CSV reader kwargs
delimiter: Optional[str] = ",",
**csv_reader_kwds,
) -> Dict[str, List[Any]]: ...
[docs]
def loads_csv(
content: str,
/,
*,
orient: Orient = "list",
header: bool = True,
comment_start: Optional[str] = None,
strip_content: bool = False,
# CSV reader kwargs
delimiter: Optional[str] = ",",
**csv_reader_kwds,
) -> Union[List[Dict[str, Any]], Dict[str, List[Any]]]:
with io.StringIO(content) as buffer:
return _parse_csv(
buffer,
orient=orient,
header=header,
comment_start=comment_start,
strip_content=strip_content,
delimiter=delimiter,
**csv_reader_kwds,
)
[docs]
@function_alias(load_csv)
def read_csv(*args, **kwargs): ...
def _parse_csv(
file: TextIOBase,
/,
*,
orient: Orient = "list",
header: bool = True,
comment_start: Optional[str] = None,
strip_content: bool = False,
# CSV reader kwargs
delimiter: Optional[str] = ",",
**csv_reader_kwds,
) -> Union[List[Dict[str, Any]], Dict[str, List[Any]]]:
if delimiter is None:
msg = f"Invalid argument {delimiter=}. (expected not None when {type(file)=})"
raise ValueError(msg)
if header:
reader_cls = DictReader
else:
reader_cls = csv.reader
reader = reader_cls(file, delimiter=delimiter, **csv_reader_kwds)
raw_data_lst = list(reader)
data_lst: List[Dict[str, Any]]
if header:
data_lst = raw_data_lst # type: ignore
else:
data_lst = [
{str(j): data_ij for j, data_ij in enumerate(data_i)}
for data_i in raw_data_lst
]
del raw_data_lst
if comment_start is not None:
data_lst = [
line
for line in data_lst
if not next(iter(line.values())).startswith(comment_start)
]
if strip_content:
data_lst = [
{k.strip(): v.strip() for k, v in data_i.items()} for data_i in data_lst
]
if orient == "dict":
result = list_dict_to_dict_list(data_lst, key_mode="same") # type: ignore
elif orient == "list":
result = data_lst
else:
msg = f"Invalid argument {orient=}. (expected one of {get_args(Orient)})"
raise ValueError(msg)
return result # type: ignore