Source code for watchful.enrich

"""
This script is run on the Python command line to execute data enrichment. If a
custom enricher is used, then the :func:`enrich_dataset` function is used.
"""
################################################################################


import argparse
import datetime
import os
import pathlib
import shutil
import sys
from typing import List, Type
from watchful import client, attributes
from watchful.enricher import Enricher


[docs]def enrich_dataset( custom_enricher_cls: Type[Enricher], args: List[str] = None, ) -> None: """ This is the function to use for performing custom data enrichment. Custom data enrichment variables, functions and mdoels can be defined in ``custom_enricher_cls`` and are used to perform the data enrichment. :param custom_enricher_cls: A custom enricher class that inherited :class:`Enricher` and implemented its abstract methods. :type custom_enricher_cls: Enricher :param args: A list containing optional input arguments as defined in :func:`main`. :type args: List[str] """ if args is None: args = [] attributes.set_multiprocessing(False) custom_enricher = custom_enricher_cls() main(args, custom_enricher)
[docs]def main(args: List[str] = None, custom_enricher: Enricher = None) -> None: """ This is the utility function for performing data enrichment without a custom enricher; it is generally not called directly but invoked via the Python command line by the user. To perform data enrichment with a custom enricher, use :func:`enrich_dataset` instead. This function contains the logic and pipelining for the data enrichment. :param args: A list containing optional input arguments as defined in the ``parser`` arguments below. :type args: List[str] """ if args is None: args = [] parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter ) # The dataset file; use the dataset currently opened in Watchful if it is # not provided. This is not used in remote enrichment as we will retrieve # the dataset from the remote Watchful application. parser.add_argument( "--in_file", type=str, default="", help="Optional original csv dataset filepath; if not given use the \ current dataset opened in Watchful.", ) # The attributes output file that is specifically formatted for integration # with the Watchful application. parser.add_argument( "--out_file", type=str, default="", help='Optional output attribute filepath; if given must end with \ ".attrs" extension.', ) # The attribute file to ingest from, if it is available. parser.add_argument( "--attr_file", type=str, default="", help="Optional input csv attribute filepath; if not given create the \ initial spacy attributes.", ) # The columns in the ``attr_file`` csv file to use as attributes; use all # attributes if it is not provided. parser.add_argument( "--attr_names", type=str, default="", help="Optional comma delimited string of attribute names to be used; \ if not given use all attribute names.", ) # The host to use; use "localhost" if it is not provided. parser.add_argument( "--host", type=str, default="localhost", help='Optional string host running Watchful; if not given use \ "localhost".', ) # The port to use; use "9001" if it is not provided. parser.add_argument( "--port", type=str, default="9001", help='Optional string port number running Watchful; if not given use \ "9001".', ) # The out-of-the-box NLP to use if no ``attr_file`` is provided. parser.add_argument( "--standard_nlp", type=str, default="spacy", help='Optional out-of-the-box NLP to use, currently "spacy" and \ "flair" are available; "spacy" if unspecified.', ) # Option to use multiprocessing. This is still in internal alpha mode and is # not expected to be used by user. parser.add_argument( "--multiprocessing", action="store_true", help="Optional explicit use of multiprocessing on available physical \ cpu cores; no explicit use if unspecified.", ) # Option to use Watchful binary. This is for backward compatibility and is # not expected to be used by user. parser.add_argument( "--is_local", action="store_true", help="Optional use of local Watchful local application instead of a \ hosted application; hosted application if unspecified.", ) args = parser.parse_args(args=args) attributes.set_multiprocessing(args.multiprocessing) client.external(host=args.host, port=args.port) summary = client.get() project_id = client.get_project_id(summary) ( dataset_id, datasets_dir, args.in_file, ) = attributes.get_dataset_id_dir_filepath( summary, args.in_file, args.is_local ) # ``args.in_file`` will still be returned as "" if Watchful application is # remote. Therefore, we create a temporary filepath for ``args.in_file`` to # download the original dataset to. if not args.is_local: if args.in_file == "": user_home_path = os.path.expanduser("~") working_dir = os.path.join(user_home_path, "watchful", "working") os.makedirs(working_dir, exist_ok=True) args.in_file = os.path.join(working_dir, dataset_id) client.export_dataset_to_path(args.in_file, summary["field_names"]) else: print( 'in_file must be initially "" for enrichment to a remote ' f'Watchful applcation; got "{args.in_file}" instead.' ) sys.exit(1) if args.out_file: # Check that ``out_file`` has ".attrs" extension. try: is_ext_attrs = os.path.splitext(args.out_file)[1] == ".attrs" if is_ext_attrs: del_out_file = False # check that ``out_file``'s directory exists. out_file_dir = pathlib.Path(args.out_file).parent if not os.path.isdir(out_file_dir): print(f"Directory {out_file_dir} does not exist.") sys.exit(1) else: print( f'out_file {args.out_file} must end with ".attrs" ' "extension." ) sys.exit(1) except OSError as err_msg: print(err_msg) print(f'out_file {args.out_file} must end with ".attrs" extension.') sys.exit(1) else: del_out_file = True # Create a temporary ``out_file`` and mark it for deletion. args.out_file = f"{os.path.splitext(args.in_file)[0]}.attrs" # Enrich with attributes from a csv file, that is, already created from an # external pipeline. if args.attr_file: # Enrich with all attributes. if not args.attr_names: print( f"Enriching {args.in_file} using all attributes from " f"{args.attr_file} ..." ) attributes.enrich( args.in_file, args.out_file, attributes.enrich_row_with_attribute_data, attributes.get_vars_for_enrich_row_with_attribute_data( args.attr_names, args.attr_file ), ) if not del_out_file: print(f"Wrote attributes to {args.out_file}.") # Enrich with specified attributes. else: val_success = attributes.validate_attribute_names( args.attr_names, args.attr_file ) if val_success: print( f"Enriching {args.in_file} using {args.attr_names} " f"attributes from {args.attr_file} ..." ) attributes.enrich( args.in_file, args.out_file, attributes.enrich_row_with_attribute_data, attributes.get_vars_for_enrich_row_with_attribute_data( args.attr_names, args.attr_file ), ) if not del_out_file: print(f"Wrote attributes to {args.out_file}.") else: print( f"At least one of your attribute names in " f"{args.attr_names} do not match those in the attribute " f"input file {args.attr_file}." ) sys.exit(1) # Enrich using out-of-the-box NLPs. elif custom_enricher is None: # SpaCy NLP. if args.standard_nlp == "spacy": # Want to know what pipes are used? Uncomment these: # nlp = attributes.load_spacy() # import pprint # pprint.PrettyPrinter(indent=4).pprint(nlp.analyze_pipes()) # ``enrich_row`` is the user custom function for enriching every row # of the dataset. ``spacy_atterize_fn``, ``spacy_atterize`` and # ``load_spacy()`` are the additional user variables to perform the # data enrichment. print(f"Using {args.standard_nlp} ...") print(f"Enriching {args.in_file} ...") attributes.enrich( args.in_file, args.out_file, attributes.enrich_row, ( attributes.spacy_atterize_fn, attributes.spacy_atterize, *attributes.load_spacy(), ), ) if not del_out_file: print(f"Wrote attributes to {args.out_file}.") # Flair NLP. elif args.standard_nlp == "flair": # ``enrich_row`` is the user custom function for enriching every row # of the dataset. ``flair_atterize_fn``, ``flair_atterize`` and # ``*load_flair()`` are the additional user variables to perform the # data enrichment. print(f"Using {args.standard_nlp} ...") print(f"Enriching {args.in_file} ...") attributes.enrich( args.in_file, args.out_file, attributes.enrich_row, ( attributes.flair_atterize_fn, attributes.flair_atterize, *attributes.load_flair(), ), ) if not del_out_file: print(f"Wrote attributes to {args.out_file}.") else: print( f"The nlp {args.standard_nlp} is not implemented.\nNo " "enrichment done." ) sys.exit(1) # Enrich using custom enricher class. else: # Perform custom data enrichment. Custom data enrichment variables, # functions and models in ``custom_enricher`` are used to perform the # data enrichment. print("Using your custom enricher ...") print(f"Enriching {args.in_file} ...") attributes.enrich( args.in_file, args.out_file, custom_enricher.enrich_row, custom_enricher.enrichment_args, ) if not del_out_file: print(f"Wrote attributes to {args.out_file}.") # If Watchful application is remote, delete the downloaded dataset as the # data enrichment is completed. if not args.is_local: try: os.remove(args.in_file) except FileNotFoundError as err_msg: print(err_msg) print( f"Error removing downloaded dataset file from {args.in_file}." ) sys.exit(1) # Check that the active project and the opened dataset have not changed. summary = client.get() current_project_id = client.get_project_id(summary) if project_id != current_project_id: print( f"Current project {current_project_id} is different from the " f"enriched project {project_id}!" ) sys.exit(1) current_dataset_id = client.get_dataset_id(summary) if dataset_id != current_dataset_id: print( f"Current dataset {current_dataset_id} is different from the " f"enriched dataset {dataset_id}!" ) sys.exit(1) # Format the attributes file timestamp as "yyyy-mm-dd_hh-mm-ss-ssssss". # Use the full timestamp for completeness; though it's reasonable # "yyyy-mm-dd_hh-mm-ss" should work uniquely too. # This timestamp format is: # - usable for pulling out the latest attributes # - usable for pulling out a time-series progression of attributes # - consistent with Kubeflow pipelines integration # The final filename format is "filename__yyyy-mm-dd_hh-mm-ss-ssssss.attrs". timestamp = ( str(datetime.datetime.now()) .replace(" ", "_") .replace(":", "-") .replace(".", "-") ) dest_attr_filename = f"__{timestamp}.".join( os.path.basename(args.out_file).split(".") ) # Copy the created attributes output file to Watchful home attributes # directory if the Watchful application is local. if args.is_local: dest_attr_filepath = os.path.join( datasets_dir, "attrs", dest_attr_filename ) try: os.makedirs(os.path.dirname(dest_attr_filepath), exist_ok=True) shutil.copyfile(args.out_file, dest_attr_filepath) except OSError as err_msg: print(err_msg) print( f"Error copying attribute output file from {args.out_file} " f"to {dest_attr_filepath}." ) sys.exit(1) # Load attributes filepath into Watchful application. # Example usage: # curl -iX POST http://localhost:9001/api \ # --header "Content-Type: application/json" \ # --data "{\"verb\":\"attributes\",\ # \"id\":\"9570b0b5-4a58-445f-9b51-b434caca2650\",\ # \"filepath\":\"/path/to/attributes_file.attrs\"}" # Arguments: # id: dataset id # filepath: attributes filepath res = client.load_attributes(dataset_id, dest_attr_filename) # Upload the created attributes output file to Watchful application if it is # remote. else: res = client.upload_attributes(dataset_id, args.out_file) msg = ( f"attributes via watchful {args.host}:{args.port} to dataset " f"id {dataset_id}." ) if "error_msg" in res and res["error_msg"]: print(f"Error ingesting {msg}") else: print(f"Ingested {msg}") # Remove temporary attributes output file. if del_out_file: try: os.remove(args.out_file) except FileNotFoundError as err_msg: print(err_msg) print( "Error removing temporary attribute output file from " f"{args.out_file}." ) sys.exit(1)
if __name__ == "__main__": # This is the Python command line to use for performing data enrichment # without a custom enricher. To perform data enrichment with a custom # enricher, use :func:`enrich_dataset` instead. # Refer to :func:`main`for the optional input arguments. main(sys.argv[1:])