Source code for watchful.client

"""
This script provides the functions required for interacting directly with
Watchful client application.
"""
################################################################################


import base64
import csv
import io
import json
import os
import socket
import subprocess
import sys
import time
import urllib
from typing import Callable, Dict, Generator, List, Literal, Optional, Union
from uuid import uuid4
import requests


THIS_DIR_PATH = os.path.dirname(os.path.abspath(__file__))
SCHEME: Literal["http", "https"] = "http"
HOST: str = "localhost"
PORT: Optional[str] = "9002"
API_TIMEOUT_SEC: int = 600


def _refresh() -> None:
    """
    This function explicitly refreshes the imported ``watchful`` package.
    """

    del sys.modules["watchful"]
    import watchful

    print(f"watchful version: {watchful.__version__}")


def _get_conn_url() -> str:
    """
    This function creates the HTTP connection url from the global ``SCHEME``,
    ``HOST`` and ``PORT``.

    :return: The HTTP connection url.
    :rtype: str
    """

    assert SCHEME in [
        "http",
        "https",
    ], '`SCHEME` must be either "http" or "https"!'

    return f"{SCHEME}://{HOST}:{PORT}" if PORT else f"{SCHEME}://{HOST}"


[docs]def await_port_opening(port: int, timeout_sec: int = 10) -> None: """ This function waits for the port to be open; it returns None if ``port`` was opened within the timeout, otherwise it raises an exception. It is used for awaiting Watchful process startup. :param port: The port. :type port: int :param timeout_sec: The timeout in seconds, defaults to 10. :type timeout_sec: int, optional """ end = time.time_ns() + (timeout_sec * 1_000_000_000) while time.time_ns() < end: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout_sec) res = sock.connect_ex((HOST, port)) if res == 0: return None time.sleep(0.001) raise Exception("Timed out waiting for Watchful to start")
[docs]def spawn_cmd(cmd: str, env: str = None) -> int: """ This function spawns a command and returns the PID of the spawned process. :param cmd: The command. :type cmd: str :param env: The environment, defaults to None. :type env: str, optional :return: The PID of the spawned process. :rtype: int """ with subprocess.Popen( cmd + " &\n echo $!", shell=True, stdout=subprocess.PIPE, env=env ) as proc: out, _ = proc.communicate() pid = int(out.decode("utf-8")[:-1]) return pid
[docs]def await_summary( pred: Callable, halt_fn: Callable = lambda x: False, unchanged_timeout: int = 60, ) -> Optional[Dict]: """ This function returns the summary once ``pred(summary)`` returns true, or stops waiting once ``halt_fn`` returns true and then returns None, or raises an exception if the summary is unchanged for ``unchanged_timeout`` seconds. :param pred: The predicate function. :type pred: Callable :param halt_fn: The halt function, defaults to lambda x: False. :type halt_fn: Callable, optional :param unchanged_timeout: The timeout in seconds, defaults to 60. :type unchanged_timeout: int, optional :return: The dictionary of the HTTP response from :func:`get()`. :rtype: Dict, optional """ prev_summary = None end = float("inf") while time.time_ns() < end: summary = get() if halt_fn(summary): return None if pred(summary): return summary if prev_summary == summary: end = time.time_ns() + (unchanged_timeout * 1_000_000_000) prev_summary = summary time.sleep(0.03) # 30ms sleep for ~30 requests per second raise Exception( "Timed out awaiting summary. Summary went stale for " + str(unchanged_timeout) + " seconds" )
def _assert_success(summary: Dict) -> Optional[Dict]: """ This function raises an exception if summary contains "error_msg", otherwise it returns the summary. :param summary: The dictionary of the HTTP response from a connection request. :type summary: Dict :return: The summary. :rtype: Dict, optional """ if "error_msg" in summary and summary["error_msg"]: verb_str = "" if "error_verb" in summary and summary["error_verb"]: verb_str = f' ({summary["error_verb"]})' raise Exception(f'Summary error{verb_str}: {summary["error_msg"]}') return summary # as used by register_summary_string_hook API_SUMMARY_HOOK_CALLBACK = None
[docs]def register_summary_hook(function: Callable) -> None: """ This function allows you to provide a function that will be called with every summary object that is returned from any API call to the /api endpoint, that being the raw response body before JSON parsing. This can be used, for example, to instrument a test suite with a function that writes every summary object to disk and then creating a dataset of Watchful summary objects for further analysis. Most SDK users probably won't be reaching for this function every day, but if you find a clever use for it, let us know! :param function: Your function to be called with every summary string :type function: Callable """ global API_SUMMARY_HOOK_CALLBACK API_SUMMARY_HOOK_CALLBACK = function
def _read_response( response: requests.models.Response, response_is_summary: bool = False ) -> Optional[Dict]: """ This function raises an exception if ``response.status_code`` is not 200, otherwise it returns ``ret``. If ``response_is_summary`` then we will also run the ``API_SUMMARY_HOOK_CALLBACK`` (any hook that was provided), so this is only appropriate for endpoints that return a summary, that is, the /api endpoint rather than some other JSON object like /config, /remote, etc. :param resp: The HTTP response from a connection request. :type resp: requests.models.Response :param response_is_summary: Boolean indicating if the response is known to be a summary object. :type response_is_summary: bool, optional :return: The dictionary of ``resp``. :rtype: Dict, optional """ # the assertion is here because that's what our API endpoints always return assert ( 200 == response.status_code ), f"Request could have failed with status {response.status_code}." json_str = response.text if response_is_summary and API_SUMMARY_HOOK_CALLBACK: API_SUMMARY_HOOK_CALLBACK(json_str) ret = json.loads(json_str) # One idea: # if ret["error_msg"]: # raise Exception(ret["error_msg"]) if "error_msg" in ret and ret["error_msg"]: print(f'{ret["error_verb"]}: {ret["error_msg"]}') return ret
[docs]def request( method: str = "GET", path: str = "/", **kwargs: Dict ) -> requests.models.Response: """ This is a wrapper function for API calls; made up of the API method, path and optional keyword arguments. :param method: The API method string in uppercase. :type method: str :param path: The path string after the hostname and port. :type path: str :param kwargs: Optional parameters to include in the API call. :type kwargs: Dict :return: The HTTP response from the connection request. :rtype: requests.models.Response """ methods = ["GET", "POST", "PUT"] assert ( method in methods ), f"{method} is not one of the currently implemented methods: {methods}!" default_headers = {} version_filepath = os.path.join(THIS_DIR_PATH, "VERSION") with open(version_filepath, encoding="utf-8") as f: version = f.readline() default_headers.update({"x-watchful-sdk": version}) headers = kwargs.get("headers", {}) if headers == {}: headers.update(default_headers) kwargs["headers"] = headers else: headers.update(default_headers) return getattr(requests, method.lower())( f"{_get_conn_url()}{path}", **kwargs )
[docs]def api(verb: str, **kwargs: Dict) -> Optional[Dict]: """ This is a convenience function for API calls; made up of a verb and optional keyword arguments. :param verb: The verb for the API. :type verb: str :param kwargs: Optional parameters to support the API for ``verb``. :type kwargs: Dict :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ action = kwargs # already a dictionary action["verb"] = verb return api_send_action(action)
[docs]def api_send_action(action: Dict) -> Optional[Dict]: """ This is a convenience function for API calls with an action. :param action: The ``verb`` for the API with optional parameters. :type action: Dict :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ fields = { "data": json.dumps(action), "headers": {"Content-Type": "application/json"}, "timeout": API_TIMEOUT_SEC, } response = request("POST", "/api", **fields) return _read_response(response, response_is_summary=True)
[docs]def ephemeral(port: str = "9002") -> None: """ This function starts the backend using the specified ``port`` for an interactive session without persistence. :param port: The port, defaults to "9002". :type port: str, optional """ _ = spawn_cmd( f"watchful -p {port} --no-persistence " f">watchful_ephemeral_output.txt 2>&1" ) await_port_opening(int(port)) external(port=port)
[docs]def external( host: str = "localhost", port: str = "9001", scheme: str = "http" ) -> None: """ This function changes the global ``HOST``, ``PORT`` and ``SCHEME`` values. :param host: The host, defaults to "localhost". :type host: str, optional :param port: The port, defaults to "9001". :type port: str, optional :param scheme: The scheme, either "http" or "https", defaults to "http". :type scheme: str, optional """ assert scheme in [ "http", "https", ], '`scheme` must be either "http" or "https"!' global SCHEME, HOST, PORT SCHEME = scheme HOST = host PORT = port
[docs]def list_projects() -> Dict: """ This function lists the available projects. :return: The dictionary of the HTTP response from the connection request. :rtype: Dict """ response = request("GET", "/projects", timeout=API_TIMEOUT_SEC) return json.loads(response.text)
[docs]def open_project(id_: str) -> str: """ This function opens a project via its project id, which is the path to its hints file. :param id_: The project id. :type id_: str :return: The read HTTP response. :rtype: str """ response = request( "POST", "/projects", data=json.dumps(id_), headers={"Content-Type": "application/json"}, timeout=API_TIMEOUT_SEC, ) ret = response.content.decode("utf-8") await_plabels() return ret
[docs]def create_project(title_: Optional[str] = None) -> Union[str, Optional[Dict]]: """ This function creates a new project. Additionally, if title is supplied, a title is given to the newly created project. :param title: The title for the project. :type title: str, optional :return: If a title is supplied and ``open_project("new")`` is successful, the dictionary of the HTTP response from the connection request from ``title(title)``; otherwise the read HTTP response from ``open_project("new")``. :rtype: Union[str, Optional[Dict]] """ ret = open_project("new") if title_ is not None and ret == '"OK"\n': return title(title_) return ret
[docs]def title(title_: str) -> Optional[Dict]: """ This function gives a title to a newly created project. :param title_: The title for the project. :type title_: str :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return api("title", title=title_)
[docs]def get_project_id(summary: Dict) -> str: """ This function gets the active project id from ``summary``. For correctness, we use the ``summary`` that has been success asserted via ``_assert_success``. :param summary: The dictionary of the HTTP response from a connection request. :type summary: Dict :return: The project id. :rtype: str """ if "project_id" in summary: return summary["project_id"] raise Exception("No project is currently active.")
[docs]def get_dataset_id(summary: Dict) -> str: """ This function gets the active dataset id from ``summary``. For correctness, we use the ``summary`` that has been success asserted via ``_assert_success``. :param summary: The dictionary of the HTTP response from a connection request. :type summary: Dict :return: The dataset id. :rtype: str """ if "datasets" in summary: # ``dataset_ids`` should either be empty or contain one dataset id. dataset_ids = summary["datasets"] if len(dataset_ids) == 0: raise Exception("No dataset is currently opened.") dataset_id = dataset_ids[0] return dataset_id raise Exception("`datasets` is currently not available.")
[docs]def get_watchful_home(summary: Dict, is_local: bool = True) -> str: """ This function gets Watchful home from ``summary``. For correctness, we use the summary that has been success asserted via ``_assert_success``. If Watchful home is not available and the Watchful application is local, we derive Watchful home from the user home. :param summary: The dictionary of the HTTP response from a connection request. :type summary: Dict :param is_local: The boolean indicating if the Watchful application is local, defaults to True :type is_local: bool, optional :return: Watchful home. :rtype: str """ if "watchful_home" in summary: return summary["watchful_home"] if is_local: user_home = os.path.expanduser("~") watchful_home = os.path.join(user_home, "watchful") return watchful_home raise Exception("`watchful_home` is currently not available.")
[docs]def get_datasets_dir(summary: Dict, is_local: bool = True) -> str: """ This function infers the datasets directory from ``summary``. For correctness, we use the summary that has been success asserted via ``_assert_success``. :param summary: The dictionary of the HTTP response from a connection request. :type summary: Dict :param is_local: The boolean indicating if the Watchful application is local, defaults to True :type is_local: bool, optional :return: The datasets directory. :rtype: str """ watchful_home = get_watchful_home(summary, is_local) datasets_dir = os.path.join(watchful_home, "datasets") return datasets_dir
[docs]def get_dataset_filepath(summary: Dict, is_local: bool = True) -> str: """ This function infers the datasets filepath from ``summary``. For correctness, we use the summary that has been success asserted via ``_assert_success``. As this function uses file operations, it does not work when the Watchful application is remote, and in such case returns "". :param summary: The dictionary of the HTTP response from a connection request. :type summary: Dict :param is_local: The boolean indicating if the Watchful application is local, defaults to True :type is_local: bool, optional :return: The dataset filepath. :rtype: str """ if not is_local: return "" datasets_dir = get_datasets_dir(summary, is_local) dataset_id = get_dataset_id(summary) dataset_ref_path = os.path.join(datasets_dir, "refs", dataset_id) # Check that ``dataset_ref_path`` exists. if not os.path.isfile(dataset_ref_path): raise Exception(f"File {dataset_ref_path} does not exist.") with open(dataset_ref_path, encoding="utf-8") as f: dataset_ref = f.readline() dataset_filepath = os.path.join(datasets_dir, "raw", dataset_ref) # Check that ``dataset_filepath`` exists. if not os.path.isfile(dataset_filepath): raise Exception(f"File {dataset_filepath} does not exist.") return dataset_filepath
# def poll(): # pass
[docs]def records(csv_: str) -> Optional[Dict]: """ This function loads the csv dataset. :param csv_: The csv dataset. :type csv_: str :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return api("records", data=csv_, content_type="text/csv")
def _column_flag( columns: List[bool], flag: Literal["inferenceable"] = "inferenceable", ) -> Optional[Dict]: """ This function sets a flag for each of the columns of the dataset in the currently active project. :param columns: A list of true/false values, specifying whether the flag should be set for each column. :type columns: List :param flag: The flag to be set; "inferenceable" is currently the only supported flag. :type flag: str, optional :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return api("column_flag", flag=flag, columns=columns)
[docs]def set_column_flag( columns: Optional[List[str]] = None, flag: Literal["inferenceable"] = "inferenceable", pos_sense: bool = True, ) -> Optional[Dict]: """ This function sets a flag for each of the columns of the dataset in the currently active project. As a default, given columns will be set to ``True`` and all other columns will be set to ``False``. A helper to indicate all available columns as given is to omit ``columns``. :param flag: The flag to be set; "inferenceable" is currently the only supported flag. :type flag: str, optional :param columns: A list of column names specifying whether the flag should be set. :type columns: List, optional :param pos_sense: A boolean specifying whether the setting of the flag is in positive or negative sense. :type pos_sense: bool, optional :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ col_names = get()["field_names"] n_col_names = len(col_names) if columns is None: col_bools = [pos_sense] * n_col_names else: n_columns = len(columns) assert ( len(set(columns)) == n_columns ), f"At least one of the given columns in {columns} is duplicate!" assert len(set(columns) - set(col_names)) == 0, ( f"At least one of the given columns in {columns} is not available " f"in the dataset columns {col_names}!" ) def __f(x): i = x in columns return i if pos_sense else not i col_bools = list(map(__f, col_names)) return _column_flag(col_bools, flag)
[docs]def ignore_column_flag( columns: Optional[List[str]] = None, flag: Literal["inferenceable"] = "inferenceable", ) -> Optional[Dict]: """ This function sets a flag for each of the columns of the dataset in the currently active project. Given columns will be set to ``False`` and all other columns will be set to ``True``. A helper to indicate all available columns as given is to omit ``columns``. :param flag: The flag to be set; "inferenceable" is currently the only supported flag. :type flag: str, optional :param columns: A list of column names specifying whether the flag should be set to ``False``. :type columns: List, optional :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return set_column_flag(columns, flag, False)
[docs]def class_(class__: str) -> Optional[Dict]: """ This function creates a class. :param class__: The class. :type class__: str :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return api("class", name=class__)
[docs]def create_class(class__: str, class_type: str = "ftc") -> Optional[Dict]: """ This function creates a class. :param class__: The class. :type class__: str :param class_type: The class type, it can be either "ftc" or "ner", defaults to "ftc". :type class_type: str, optional :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return api("class", name=class__, class_type=class_type)
[docs]def query_async(q: str, page: int = 0) -> Optional[Dict]: """ This function queries for a page. As it is asynchronous, the immediate HTTP response is likely not updated yet. :param q: The query. :type q: str :param page: The page, defaults to 0. :type page: int, optional :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return api("query", query=q, page=page)
[docs]def query(q: str, page: int = 0) -> Optional[Dict]: """ This function queries for a page and returns an updated HTTP response. :param q: The query. :type q: str :param page: The page, defaults to 0. :type page: int, optional :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ query_async(q, page) return await_summary( lambda s: s["query_completed"], lambda s: s["query"] != q )
[docs]def query_all(q: str, max_pages: int = 0) -> Generator[List[str], None, None]: """ This function evaluates the query returning the results as opposed to the summary. By default, it returns all results for the query (all pages). This can be limited by setting max_pages to the positive number of pages you want. Each query result is a vector with a string for each field that is returned. Note that TOKS, SENTS, CELLS queries only return one field and each result will be wrapped in a vector of one string. :param q: The query. :type q: str :param max_pages: The maximum page, defaults to 0. :type max_pages: int, optional :return: The fields. :rtype: Generator[List[str], None, None] """ page = 0 while True: summary = query(q, page) for fields in [cand["fields"] for cand in summary["candidates"]]: yield fields page += 1 if summary["query_end"] or max_pages and page == max_pages: break
[docs]def base_rate(class__: str, rate: int) -> Optional[Dict]: """ This function sets the base rate for a class. :param class__: The class to set a base rate for. :type class__: str :param rate: The base rate for the class. :type rate: int :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return api("base_rate", label=class__, rate=rate)
[docs]def await_plabels() -> Optional[Dict]: """ This function gets the updated HTTP response. :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return await_summary(lambda s: s["status"] == "current")
[docs]def hinter_async(class__: str, query_: str, weight: int) -> Optional[Dict]: """ This function creates a hinter. As it is asynchronous, the immediate HTTP response is likely not updated yet. :param class__: The class for the hinter. :type class__: str :param query_: The query for the hinter. :type query_: str :param weight: The weight for the hinter. :type weight: int :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return api("hinter", label=class__, weight=weight, query=query_)
[docs]def hinter(class__: str, query_: str, weight: int) -> Optional[Dict]: """ This function creates a hinter and returns an updated HTTP response. :param class__: The class for the hinter. :type class__: str :param query_: The query for the hinter. :type query_: str :param weight: The weight for the hinter. :type weight: int :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ _assert_success(hinter_async(class__, query_, weight)) return await_plabels()
[docs]def delete(id_: int) -> Optional[Dict]: """ This function deletes a hinter. :param id_: The hinter id to delete. :type id_: int :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return api("delete", id=id_)
[docs]def delete_class(class__: str) -> Optional[Dict]: """ This function deletes a class. :param class__: The class to delete. :type class__: str :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return api("delete", class_name=class__)
[docs]def get() -> Optional[Dict]: """ This function gets the current status of the Watchful application, containing information such as your currently active project, dataset examples (candidates) and classes, hinters created, hand labels and label distribution, confidences and error rate, recall and precision and many more. :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return api("nop")
[docs]def external_hinter(class__: str, name: str, weight: int) -> Optional[Dict]: """ This function creates an external hinter. :param class__: The class for the hinter. :type class__: str :param name: The name for the hinter. :type name: str :param weight: The weight for the hinter. :type weight: int :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ _assert_success( api( "hinter", query="[external]", name=name, label=class__, weight=weight, ) ) return await_plabels()
[docs]def upload_attributes( dataset_id: str, attributes_filepath: str, ) -> Optional[Dict]: """ This function uploads the attributes for the ``dataset_id`` to the remote Watchful application, where the Watchful application then saves it to a filepath according to its stable application logic. :param dataset_id: The dataset id. :type dataset_id: str :param attributes_filepath: The attributes filepath. :type attributes_filepath: str :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ with open(attributes_filepath, encoding="utf-8") as attributes_file: response = request( "PUT", f"/datasets/{dataset_id}/attributes", data=attributes_file, headers={"Content-Type": "text/plain"}, timeout=API_TIMEOUT_SEC, ) assert ( response.status_code == 200 ), f"Request could have failed with status {response.status_code}." return _assert_success(_read_response(response))
[docs]def load_attributes( dataset_id: str, attributes_filename: str, ) -> Optional[Dict]: """ This function is used in the case of Watchful application being on the same machine as the data enrichment. :param dataset_id: The dataset id. :type dataset_id: str :param attributes_filename: The attributes filename. :type attributes_filename: str :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return api("attributes", id=dataset_id, file=attributes_filename)
def _dump(offset: int) -> Optional[Dict]: """ This function returns a chunk of candidates in "hint API order". :param offset: The offset chunk. :type offset: int :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return api("dump", offset=offset)
[docs]def dump() -> Generator[List[str], None, None]: """ This function returns all the candidates in "hint API order". :return: The generator of all the candidates. :rtype: Generator[List[str], None, None] """ n_cands = get()["n_candidates"] offset = 0 chunk = [] while offset < n_cands: summary = _dump(offset) # TODO: Add error handling. chunk = summary["candidates"] offset += len(chunk) i = 0 while i < len(chunk): yield chunk[i] i += 1
[docs]def dump_dicts() -> Generator[Dict[str, str], None, None]: """ This function returns all the candidates in "hint API order", together with the column names for all values. :return: The generator of all the candidates, each as a dictionary of named values. :rtype: Generator[Dict[str, str], None, None] """ field_names = get()["field_names"] for c in dump(): yield dict(zip(field_names, c))
[docs]def hint(name: str, offset: int, values: List[bool]) -> Optional[Dict]: """ This function adds the hints for an external hinter. :param name: The hinter name. :type name: str :param offset: The offset. :type offset: int :param values: The hints. :type values: List[bool] :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional TODO: Come up with a better streaming Python API here. """ values = list(map(lambda x: x and 1 or 0, values)) return _assert_success(api("hint", name=name, offset=offset, values=values))
[docs]def apply_hints(name: str) -> Optional[Dict]: """ This function applies the hints for an external hinter. :param name: The hinter name. :type name: str :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ _assert_success(api("apply_hints", name=name)) return await_plabels()
[docs]def hint_all(name: str, values: List[bool]) -> Optional[Dict]: """ This function applies the hints for an external hinter. :param name: The hinter name. :type name: str :param values: The hints. :type values: List[bool] :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ hint(name, 0, values) return apply_hints(name)
[docs]def export_stream( content_type: str = "text/csv", mode: str = "ftc", ) -> requests.models.Response: """ This function begins the export using the export_stream call. The result is not JSON, but is data to be processed directly. For FTC mode, content_type must be text/csv and mode must be ftc. For NER mode, content_type must be application/jsonlines and mode must be ner. On success, it returns the requests.models.Response object from which you can read the data. :param content_type: The content type of the export, defaults to "text/csv". :type content_type: str, optional :param mode: The mode of the export, it can be either "ftc" or "ner", defaults to "ftc". :type mode: str, optional :return: The HTTP response from the connection request. :rtype: requests.models.Response """ _content_type = urllib.parse.quote_plus(content_type) _mode = urllib.parse.quote_plus(mode) response = request( "GET", f"/export_stream?content-type={_content_type}&mode={_mode}", stream=True, timeout=API_TIMEOUT_SEC, ) assert ( 200 == response.status_code ), f"Request could have failed with status {response.status_code}." return response
[docs]def export_dataset_to_path(out_file: str, fields: List[str] = None) -> None: """ This function exports the original dataset via a buffered stream to the specified output file path. It takes ``fields`` as an optional argument for the header (column names), for the case where the callee expects to use specific columns; otherwise it uses the column names returned by the Watchful application. An exception is raised when the dataset's column names do not match the user's expected column names. :param out_file: The file path to export the original dataset to. :type out_file: str :param fields: The list of column names to use for the dataset export. :type fields: List, optional """ if not fields: fields = get()["field_names"] n_cols = len(fields) with open(out_file, "w", encoding="utf-8") as f: writer = csv.writer(f) stream = export_stream().raw stream.auto_close = False reader = csv.reader( io.TextIOWrapper(stream, encoding="utf-8", newline="") ) header = next(reader) if header[:n_cols] != fields: raise Exception( f"Dataset's column names {header} did not match the expected " f"column names {fields}." ) writer.writerow(fields) for row in reader: writer.writerow(row[:n_cols])
[docs]def export_async() -> Optional[Dict]: """ This function exports the dataset. As it is asynchronous, the immediate HTTP response is likely not updated yet. :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return api("export", query="", content_type="text/csv")
[docs]def export() -> Optional[Dict]: """ This function exports the dataset and returns an updated HTTP response. :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ n_exports = len(get()["exports"]) export_async() return await_summary(lambda s: len(s["exports"]) == n_exports + 1)
[docs]def export_preview(mode: str = "ftc") -> Optional[Dict]: """ Returns a preview of the export. :param mode: The mode of the export preview, it can be either "ftc" or "ner", defaults to "ftc". :type mode: str, optional :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return api("export_preview", mode=mode)
[docs]def export_project() -> requests.models.Response: """ This function returns a consolidated version (a single *.hints file) of the currently open project. Unlike other GET endpoints that return a summary object, this function returns a streamed file, the *.hints project file. :return: The HTTP response from the connection request. :rtype: requests.models.Response """ response = request("GET", "/export_project", timeout=API_TIMEOUT_SEC) assert ( 200 == response.status_code ), f"Request could have failed with status {response.status_code}." return response
[docs]def is_utf8( csv_bytes: bytes = None, filepath: str = None, threshold: float = 0.5, is_fast: bool = True, ) -> bool: """ This function attempts to detect if the encoding of the given bytes or the content of the given filepath is utf-8. It returns `True` if the detected encoding is utf-8 and has a confidence of the given threshold or more, otherwise `False`. This function may need some tweaking for a very large dataset, but should work with the ``is_fast`` argument set to `True` by default. :param csv_bytes: The csv dataset bytes. :type csv_bytes: bytes :param filepath: The path of the csv dataset file. :type filepath: str :param threshold: The minimum confidence required to accept the detected encoding. :type threshold: float, optional :param is_fast: Whether to use fast encoding detection with a lower accuracy, or not. :type is_fast: bool, optional :return: `True` if the detected encoding is utf-8 and has a confidence of the given threshold or more, otherwise `False`. :rtype: bool """ if csv_bytes is None and filepath is None: raise Exception( "Both filepath and csv_bytes are not specified. " "One of them needs to be specified." ) if csv_bytes and filepath: raise Exception( "Both filepath and csv_bytes are specified. " "Only one of them needs to be specified." ) if is_fast: import cchardet as chardet else: import chardet if csv_bytes: res = chardet.detect(csv_bytes) else: if os.path.isfile(filepath): with open(filepath, "rb") as f: res = chardet.detect(f.read()) else: raise Exception( f"There is no file at the given file path {filepath}!" ) # Because cChardet likes to be specific with encoding, # ASCII will be specified if the valid UTF-8 characters # fit within that subset. valid_encodings = ["utf-8", "ascii"] enc = res["encoding"].lower() if enc in valid_encodings and res["confidence"] >= threshold: return True return False
[docs]def create_dataset( csv_bytes: bytes, columns: List[str], filename: str = "none", has_header: bool = True, threshold_detect: float = 0.5, is_fast_detect: bool = True, force_load: bool = True, ) -> str: """ This function loads the specified columns of a csv dataset and returns the dataset id if its encoding is detected to be utf-8 or if dataset loading is forced. :param csv_bytes: The csv dataset bytes. :type csv_bytes: bytes :param columns: The list of column names to use. :type columns: List[str] :param filename: The csv dataset filename, defaults to "none". :type filename: str, optional :param has_header: The boolean indicating if the csv dataset has a header, defaults to True. :type has_header: bool, optional :param threshold_detect: The minimum confidence required to accept the detected encoding. :type threshold_detect: float, optional :param is_fast_detect: Whether to use fast encoding detection with a lower accuracy, or not. :type is_fast_detect: bool, optional :param force_load: The boolean indicating if the csv dataset will be loaded even when its encoding is detected to be non-utf-8, defaults to True. This is useful in rare cases where the csv dataset is detected to be non-utf-8 encoded and the user is sure about the csv dataset being utf-8 encoded. :type force_load: bool, optional :return: The dataset id. :rtype: str TODO: Add error handling. """ is_csv_bytes_utf8 = is_utf8( csv_bytes, None, threshold_detect, is_fast_detect ) if is_csv_bytes_utf8 or force_load: id_ = str(uuid4()) response = request( "POST", f"/api/_stream/{id_}/0/true", data=csv_bytes, headers={"Content-Type": "text/csv"}, timeout=API_TIMEOUT_SEC, ) _ = _read_response(response) params = json.dumps({"filename": filename, "has_header": has_header}) response = request( "POST", f"/api/_stream/{id_}", data=params, headers={"Content-Type": "application/json"}, timeout=API_TIMEOUT_SEC, ) dataset_id = _read_response(response)["id"] api("dataset_add", id=dataset_id, columns=columns) return dataset_id raise Exception( "Dataset is not loaded as the encoding of the csv dataset is not " "detected to be utf-8 and dataset loading is not forced." )
[docs]def label_single(row: List[str]) -> List[str]: """ This function labels a candidate row. :param row: The candidate row. :type row: List[str] :return: The plabels for the candidate row. :rtype: List[str] """ sio = io.StringIO() w = csv.writer(sio) w.writerow(row) csv_row = sio.getvalue().encode("utf-8") sio.close() response = request( "POST", "/label", data=csv_row, headers={"Content-Type": "text/csv"}, timeout=API_TIMEOUT_SEC, ) response_body = response.content.decode("utf-8") csv_str = io.StringIO(response_body) rdr = csv.reader(csv_str) rdr_list = list(rdr) if len(rdr_list) == 2: fields = get()["field_names"] assert ( fields == rdr_list[0][: len(fields)] ), "server prepended the header to the labeled row" else: assert len(rdr_list) == 1, "server returned a single row" return rdr_list[-1]
[docs]def config_set(key: str, value: str) -> Optional[Dict]: """ This function sets one app instance configuration parameter using a key and value pair. :param key: The parameter name. :type key: str :param value: The parameter value. :type value: str :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ params = json.dumps({"verb": "set", "key": key, "value": value}) response = request( "POST", "/config", data=params, headers={"Content-Type": "application/json"}, timeout=API_TIMEOUT_SEC, ) return _read_response(response)
[docs]def config() -> Optional[Dict]: """ This function retrieves the app instance configuration parameters ``remote``, ``username``, ``role`` and ``authorization`` and their values. :return: A dictionary of key value pairs :rtype: Dict, optional """ response = request( "GET", "/config", data=None, headers={"Content-Type": "application/json"}, timeout=API_TIMEOUT_SEC, ) return _read_response(response)
[docs]def set_hub_url(url: str) -> Optional[Dict]: """ This function sets the Watchful hub URL of the Watchful client. The Watchful hub URL should not change after data has been fetched or published to a hub. :param url: The Watchful hub URL. :type url: str :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ response = request( "POST", "/set_hub_url", data=url, headers={"Content-Type": "text/plain"}, timeout=API_TIMEOUT_SEC, ) return _read_response(response)
[docs]def candidate_dicts(summary: Optional[Dict] = None) -> List[Dict[str, str]]: """ This function retrieves and returns all the candidates, together with the column names for all values. :param summary: The dictionary of the HTTP response from a connection request, defaults to None. :type summary: Dict, optional :return: The list of all the candidates, each as a dictionary of named values. :rtype: List[Dict[str, str]] """ if summary is None: summary = get() return list( map( lambda c: dict(zip(summary["field_names"], c["fields"])), summary["candidates"], ) )
[docs]def exit_backend() -> None: """ This function exits the backend. Note that the API call will usually fail because the backend exits before returning a HTTP response so we suppress the error. This is useful locally, for tests, and during development, but not in dockerized Watchful application instances. """ try: api("exit") # see docstring above except requests.exceptions.ConnectionError: pass
## Hub API
[docs]def hub_api(verb: str, token: str, **kwargs: Dict) -> Optional[Dict]: """ This is a convenience function for collaboration API calls with Watchful; made up of a verb, a token and optional keyword arguments. :param verb: The verb for the hub API. :type verb: str :param verb: The user's auth token. :type token: str :param kwargs: Optional parameters to support the hub API for ``verb``. :type kwargs: Dict :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ headers = {"Content-Type": "application/json"} headers.update({"Authorization": "Bearer " + token}) action = kwargs # already a dictionary action["verb"] = verb response = request( "POST", "/remote", data=json.dumps(action), headers=headers, timeout=API_TIMEOUT_SEC, ) return _assert_success(_read_response(response))
[docs]def login(email: str, password: str) -> Optional[Dict]: """ This function performs login with the email and password with Watchful hub. :param email: The user's email. :type email: str :param password: The user's password. :type password: str :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ headers = {"Content-Type": "application/json"} credentials = base64.b64encode(str.encode(f"{email}:{password}")).decode( "utf-8" ) headers.update({"Authorization": f"Basic {credentials}"}) response = request( "POST", "/remote", data=json.dumps({"verb": "login"}), headers=headers, timeout=API_TIMEOUT_SEC, ) return _assert_success(_read_response(response))
[docs]def publish(token: Optional[str] = None) -> Optional[Dict]: """ This function performs publish with Watchful hub. :param token: The user's auth token, defaults to None. :type token: str, optional :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return hub_api("publish", token)
[docs]def fetch(token: Optional[str] = None) -> Optional[Dict]: """ This function performs fetch with Watchful hub. :param token: The user's auth token, defaults to None. :type token: str, optional :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return hub_api("fetch", token)
[docs]def pull(token: Optional[str] = None) -> Optional[Dict]: """ This function performs pull with Watchful hub. :param token: The user's auth token, defaults to None. :type token: str, optional :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return hub_api("pull", token)
[docs]def push(token: Optional[str] = None) -> Optional[Dict]: """ This function performs push with Watchful hub. :param token: The user's auth token, defaults to None. :type token: str, optional :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return hub_api("push", token)
[docs]def peek(token: Optional[str] = None) -> Optional[Dict]: """ This function performs peek with Watchful hub. :param token: The user's auth token, defaults to None. :type token: str, optional :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return hub_api("peek", token)
[docs]def whoami(token: Optional[str] = None) -> Optional[Dict]: """ This function performs whoami with Watchful hub. :param token: The user's auth token, defaults to None. :type token: str, optional :return: The dictionary of the HTTP response from the connection request. :rtype: Dict, optional """ return hub_api("whoami", token)