#!/usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
from functools import partial
import os
from karld import is_py3
from iter_karld_tools import i_batch
if is_py3():
unicode = str
from karld.loadump import is_file_csv
from karld.loadump import i_get_csv_data
from karld.run_together import csv_file_consumer
from karld.run_together import pool_run_files_to_files
from karld.run_together import serial_run_files_to_files
from karld.run_together import distribute_run_to_runners
from karld.run_together import distribute_multi_run_to_runners
from stream_tap import Bucket
from stream_tap import stream_tap
[docs]def get_fruit(item):
"""Get things that are fruit.
:returns: thing of item if it's a fruit"""
if len(item) == 2 and item[1] == u"fruit":
return item[0]
[docs]def certain_kind_tap(data_items):
"""
As the stream of data items go by, get different
kinds of information from them, in this case,
the things that are fruit and metal, collecting
each kind with a different spigot.
stream_tap doesn't consume the data_items iterator
by itself, it's a generator and must be consumed
by something else. In this case, it's consuming
the items by casting the iterator to a tuple,
but doing it in batches.
Since each batch is not referenced by anything
the memory can be freed by the garbage collector,
so no matter the size of the data_items, only a little
memory is needed. The only things retained
are the results, which should just be a subset
of the items and in this case, the getter functions
only return a portion of each item it matches.
:param data_items: A sequence of unicode strings
"""
fruit_spigot = Bucket(get_fruit)
metal_spigot = Bucket(get_metal)
items = stream_tap((fruit_spigot, metal_spigot), data_items)
for batch in i_batch(100, items):
tuple(batch)
return fruit_spigot.contents(), metal_spigot.contents()
[docs]def run(in_dir, pool):
"""
Run the composition of csv_file_consumer and information tap
with the csv files in the input directory, and collect
the results from each file and merge them together,
printing both kinds of results.
"""
files_to_files_runner = serial_run_files_to_files
if pool:
print("multi-processing")
files_to_files_runner = pool_run_files_to_files
results = files_to_files_runner(
partial(csv_file_consumer,
certain_kind_tap),
in_dir, filter_func=is_file_csv)
fruit_results = []
metal_results = []
for fruits, metals in results:
for fruit in fruits:
fruit_results.append(fruit)
for metal in metals:
metal_results.append(metal)
print("=== fruits ===")
for fruit in fruit_results:
print(fruit)
print("=== metals ===")
for metal in metal_results:
print(metal)
[docs]def run_distribute(in_path):
"""
Run the composition of csv_file_consumer and information tap
with the csv files in the input directory, and collect
the results from each file and merge them together,
printing both kinds of results.
"""
results = distribute_run_to_runners(
certain_kind_tap,
in_path, reader=i_get_csv_data, batch_size=3)
fruit_results = []
metal_results = []
for fruits, metals in results:
for fruit in fruits:
fruit_results.append(fruit)
for metal in metals:
metal_results.append(metal)
print("=== fruits ===")
for fruit in fruit_results:
print(fruit)
print("=== metals ===")
for metal in metal_results:
print(metal)
[docs]def run_distribute_multi(in_dir):
"""
Run the composition of csv_file_consumer and information tap
with the csv files in the input directory, and collect
the results from each file and merge them together,
printing both kinds of results.
"""
results = distribute_multi_run_to_runners(
certain_kind_tap,
in_dir, reader=i_get_csv_data, batch_size=3, filter_func=is_file_csv)
fruit_results = []
metal_results = []
for fruits, metals in results:
for fruit in fruits:
fruit_results.append(fruit)
for metal in metals:
metal_results.append(metal)
print("=== fruits ===")
for fruit in fruit_results:
print(fruit)
print("=== metals 䤗 ===")
for metal in metal_results:
print(metal)
[docs]def main(*args):
"""
Try it::
python stream_searcher.py
or::
python stream_searcher.py --pool True
or::
python stream_searcher.py --in-dir test_data/things_kinds
or::
python stream_searcher.py --pool True --in-dir test_data/things_kinds
"""
parser = argparse.ArgumentParser(*args)
parser.add_argument("--in-dir",
default=os.path.join("test_data", "things_kinds"),
help="Data source directory")
parser.add_argument("--in-file",
default=None,
help="Data source file")
parser.add_argument("--single", help="Use a single process. "
"Helpful for debugging")
args = parser.parse_args()
if args.in_file:
print("file")
run_distribute(args.in_file)
elif args.single:
print("single")
run(args.in_dir, False)
else:
print("multi")
run_distribute_multi(args.in_dir)
if __name__ == "__main__":
main()