import codecs
import os
from os import walk
import json
from itertools import chain
from itertools import count
try:
from itertools import imap
except ImportError:
imap = map
from itertools import repeat
from itertools import starmap
from functools import partial
from operator import itemgetter
from iter_karld_tools import i_batch
from karld import is_py3
from karld.unicode_io import csv_reader
from karld.unicode_io import get_csv_row_writer
LINE_BUFFER_SIZE = 5000
FILE_BUFFER_SIZE = 10485760 # -1 # 419430400
WALK_SUB_DIR = 0
WALK_FILES = 2
__all__ = ['dump_dicts_to_json_file',
'ensure_dir',
'ensure_file_path_dir',
'file_path_and_name',
'i_get_csv_data',
'i_get_json_data',
'i_read_buffered_file',
'i_read_buffered_text_file',
'i_read_buffered_text_file',
'i_walk_dir_for_filepaths_names',
'i_walk_dir_for_paths_names',
'identity',
'is_file_csv',
'is_file_json',
'raw_line_reader',
'split_csv_file',
'split_file',
'split_file_output',
'split_file_output_csv',
'split_file_output_json',
'write_as_csv',
'write_as_json']
[docs]def ensure_dir(directory):
"""
If directory doesn't exist, make it.
:param directory: path to directory
:type directory: str
"""
if not os.path.exists(directory):
os.makedirs(directory)
[docs]def ensure_file_path_dir(file_path):
"""
Ensure the parent directory of the file path.
:param file_path: Path to file.
:type file_path: str
"""
ensure_dir(os.path.abspath(os.path.dirname(file_path)))
[docs]def i_read_buffered_file(file_name, buffering=FILE_BUFFER_SIZE, binary=True,
py3_csv_read=False, encoding='utf-8'):
"""
Generator of lines of a file name, with buffering for
speed.
"""
kwargs = dict(buffering=buffering, )
if is_py3():
if not binary:
kwargs.update(dict(encoding=encoding))
if py3_csv_read:
kwargs.update(dict(newline=''))
with open(file_name, 'r' + ('b' if binary else 't'), **kwargs) as stream:
for line in stream:
yield line
i_read_buffered_text_file = partial(i_read_buffered_file, binary=False)
i_read_buffered_binary_file = partial(i_read_buffered_file, binary=True)
def i_get_unicode_lines(file_name, encoding='utf-8', **kwargs):
"""
A generator for reading a text file as unicode lines.
:param file_name: Path to file.
:param encoding: Encoding of the file.
:yields: Lines of the file decoded from encoding to unicode.
"""
buffering = kwargs.get('buffering', FILE_BUFFER_SIZE)
read_file_kwargs = dict(buffering=buffering, encoding=encoding)
if is_py3():
stream = i_read_buffered_text_file(file_name, **read_file_kwargs)
for line in stream:
yield line
else:
stream = i_read_buffered_binary_file(file_name, **read_file_kwargs)
for line in codecs.iterdecode(stream, encoding, **kwargs):
yield line
[docs]def i_get_csv_data(file_name, *args, **kwargs):
"""A generator for reading a csv file.
"""
buffering = kwargs.get('buffering', FILE_BUFFER_SIZE)
read_file_kwargs = dict(buffering=buffering)
if is_py3():
read_file_kwargs.update(dict(binary=False))
read_file_kwargs.update(dict(py3_csv_read=True))
data = i_read_buffered_file(file_name, **read_file_kwargs)
for row in csv_reader(data, *args, **kwargs):
yield row
[docs]def i_get_json_data(file_name, *args, **kwargs):
"""A generator for reading file with json documents
delimited by newlines.
"""
buffering = kwargs.get('buffering', FILE_BUFFER_SIZE)
read_file_kwargs = dict(buffering=buffering)
data = i_read_buffered_file(file_name, **read_file_kwargs)
for row in data:
yield json.loads(row.decode())
[docs]def write_as_csv(items, file_name, append=False,
line_buffer_size=None, buffering=FILE_BUFFER_SIZE,
get_csv_row_writer=get_csv_row_writer):
"""
Writes out items to a csv file in groups.
:param items: An iterable collection of collections.
:param file_name: path to the output file.
:param append: whether to append or overwrite the file.
:param line_buffer_size: number of lines to write at a time.
:param buffering: number of bytes to buffer files
:type buffering: int
:param get_csv_row_writer: callable that returns a csv row writer function,
customize this for non-default options:
`custom_writer = partial(get_csv_row_writer, delimiter="|");`
`write_as_csv(items, 'my_out_file', get_csv_row_writer=custom_writer)`
"""
if line_buffer_size is None:
line_buffer_size = LINE_BUFFER_SIZE
if append:
mode = 'a'
else:
mode = 'w'
kwargs = dict(buffering=buffering)
if is_py3():
mode += 't'
kwargs.update(dict(newline=''))
else:
mode += 'b'
with open(file_name, mode, **kwargs) as csv_file:
write_row = get_csv_row_writer(csv_file)
batches = i_batch(line_buffer_size, items)
for batch in batches:
for row in batch:
write_row(row)
[docs]def is_file_csv(file_path_name):
"""
Is the file a csv file? Identify by extension.
:param file_path_name:
:type file_path_name: str
"""
_, file_name = file_path_name
return file_name[-4:].lower() == '.csv'
[docs]def is_file_json(file_path_name):
"""
Is the file a json file? Identify by extension.
:param file_path_name:
:type file_path_name: str
"""
_, file_name = file_path_name
return file_name[-5:].lower() == '.json'
[docs]def write_as_json(items, file_name, buffering=FILE_BUFFER_SIZE):
"""writes each dictionary in the dicts iterable
to a line of the file as json.
:param items: A sequence of json dumpable objects.
:param file_name: the path of the output file.
:param buffering: number of bytes to buffer files
:type buffering: int
"""
with open(file_name, 'w+', buffering=buffering) as json_file:
for item in items:
json_file.write(json.dumps(item) + os.linesep)
[docs]def dump_dicts_to_json_file(file_name, dicts, buffering=FILE_BUFFER_SIZE):
"""writes each dictionary in the dicts iterable
to a line of the file as json.
NOTE: Deprecated. replaced by write_as_json, to match the signature
of write_to_csv.
:param buffering: number of bytes to buffer files
:type buffering: int
"""
return write_as_json(dicts, file_name, buffering=buffering)
[docs]def split_file_output_json(filename, dict_list, out_dir=None, max_lines=1100,
buffering=FILE_BUFFER_SIZE):
"""
Split an iterable of JSON serializable rows of data
into groups and write each to a shard.
:param buffering: number of bytes to buffer files
:type buffering: int
"""
dirname = os.path.abspath(os.path.dirname(filename))
if out_dir is None:
out_dir = dirname
basename = os.path.basename(filename)
batches = i_batch(max_lines, dict_list)
index = count()
for group in batches:
write_as_json(
group,
os.path.join(out_dir, "{0}_{1}".format(next(index), basename)),
buffering=buffering)
[docs]def split_file_output_csv(filename, data, out_dir=None, max_lines=1100,
buffering=FILE_BUFFER_SIZE,
write_as_csv=write_as_csv):
"""
Split an iterable of csv serializable rows of data
into groups and write each to a csv shard.
:param buffering: number of bytes to buffer files
:type buffering: int
"""
batches = i_batch(max_lines, data)
dirname = os.path.abspath(os.path.dirname(filename))
if out_dir is None:
out_dir = dirname
basename = os.path.basename(filename)
index = count()
for group in batches:
write_as_csv(
group,
os.path.join(out_dir, "{0}_{1}".format(next(index), basename)),
buffering=buffering
)
[docs]def split_file_output(name, data, out_dir, max_lines=1100,
buffering=FILE_BUFFER_SIZE):
"""
Split an iterable lines into groups and write each to
a shard.
:param name: Each shard will use this in it's name.
:type name: str
:param data: Iterable of data to write.
:type data: iter
:param out_dir: Path to directory to write the shards.
:type out_dir: str
:param max_lines: Max number of lines per shard.
:type max_lines: int
:param buffering: number of bytes to buffer files
:type buffering: int
"""
batches = i_batch(max_lines, data)
if is_py3():
join_str = b''
else:
join_str = ''
index = count()
for group in batches:
file_path = os.path.join(out_dir,
"{0}_{1}".format(next(index), name))
with open(file_path, 'wb', buffering=buffering) as shard_file:
shard_file.write(join_str.join(group))
[docs]def raw_line_reader(file_object):
return (line for line in file_object)
[docs]def split_file(file_path, out_dir=None, max_lines=200000,
buffering=FILE_BUFFER_SIZE, line_reader=raw_line_reader,
split_file_writer=split_file_output, read_binary=True):
"""
Opens then shards the file.
:param file_path: Path to the large input file.
:type file_path: str
:param max_lines: Max number of lines in each shard.
:type max_lines: int
:param out_dir: Path of directory to put the shards.
:type out_dir: str
:param buffering: number of bytes to buffer files
:type buffering: int
"""
dir_name = os.path.abspath(os.path.dirname(file_path))
# Use the name of the file to name the output
base_name = os.path.basename(file_path)
if out_dir is None:
# Default the output directory to the same directory of
# input file.
out_dir = dir_name
else:
ensure_dir(out_dir)
data_file = i_read_buffered_file(file_path, buffering, binary=read_binary)
data = line_reader(data_file)
split_file_writer(base_name, data, out_dir, max_lines=max_lines,
buffering=buffering)
split_csv_file = partial(split_file,
line_reader=csv_reader,
split_file_writer=split_file_output_csv,
read_binary=True)
if is_py3():
split_csv_file = partial(split_file,
line_reader=csv_reader,
split_file_writer=split_file_output_csv,
read_binary=False)
split_csv_file.__doc__ = """
Split a large csv file without separating newlines in quotes. Runs slower
than split_file.
the csv reader and writer use the default dialect.
customize this for non-default options:
`custom_reader = partial(csv_reader, delimiter="|");`
`split_multi_line_csv_file('input_file.csv', line_reader=custom_reader)`
Writing the csv data with a non-default dialect requires defining
a split_file_writer with a custom write_as_csv with a custom
csv row writer factory.
```my_split_file_writer = partial(
split_file_output_csv,
write_as_csv=partial(
write_as_csv,
get_csv_row_writer=partial(
get_csv_row_writer, delimiter="|")))```
`split_multi_line_csv_file('input_file.csv',
split_file_writer=my_split_file_writer)`
"""
split_multi_line_csv_file = split_csv_file
[docs]def file_path_and_name(path, base_name):
"""
Join the path and base_name and yield it and the base_name.
:param path: Directory path
:type path: str
:param base_name: File name
:type base_name: str
:return: `tuple` of file path and file name.
"""
return os.path.join(path, base_name), base_name
[docs]def identity(*args):
return args
[docs]def i_walk_dir_for_paths_names(root_dir):
"""
Walks a directory yielding the directory of files
and names of files.
:param root_dir: path to a directory.
:type root_dir: str
"""
return chain.from_iterable(
(
imap(identity, repeat(subdir), files)
for subdir, files
in imap(itemgetter(WALK_SUB_DIR, WALK_FILES), walk(root_dir))
)
)
[docs]def i_walk_dir_for_filepaths_names(root_dir):
"""
Walks a directory yielding the paths and names
of files.
:param root_dir: path to a directory.
:type root_dir: str
"""
return starmap(file_path_and_name,
i_walk_dir_for_paths_names(root_dir))