Module tripleblind.job

A Job represents a computational run coordinated by the TripleBlind Router amongst one or more Access Points.

For example, a Job could train a neural network on one or many datasets; train a regression; or use an already built and trained neural network to perform an inference.

Jobs have X basic elements: * An operation * One or more input datasets * A set of params to define options * Optionally, a preprocessor to assist in prepping the input dataset

The operation fits into one of several classes. It can be a predefined protocol, commonly used when training a model. It can also be a predefined operator which executes an well known procedure against the data, such as outlier detection. Finally, it can be the Asset identifier of a trained model, which will run the input data through that model to perform inference.

The typical workflow follows this pattern:

import tripleblind as tb

job = job = tb.create_job(
    job_name="...something unique for reference...",
    operation=op,  # protocol, operation or Asset of a model
    dataset=[data1,data2],  # filename(s) or Assets for input
    params={"security": "smpc", ...},  # options
}

if job.submit():
    job.wait_for_completion()

    if job.success:
        output = job.result.asset  # get the results
        output.retrieve(save_as="output.csv")  # download results locally
    else:
        print(f"Failed")

Functions

def create_job(job_name: str, operation: Union[AssetOperation], dataset: Union[Asset, str, pathlib.Path, preprocessor.package.Package, List[Union[Asset, str, pathlib.Path, preprocessor.package.Package]], ForwardRef(None)] = None, preprocessor=None, params: Optional[dict] = None, session: Optional[Session] = None) -> Job

Create a job for execution.

Here is a job to train an already defined CNN network for 3 epochs on the standard MNIST dataset: job = tb.create_job( job_name="train-mnist-cnn", operation=network_asset, dataset=[tb.DATASET_MNIST], params={"epochs": 3}, )

Args

job_name : str
Plain name of the job (appears in UI)
operation : Asset or Operation
An algorithm Asset or one of the built-in protocols, such as training a network.
dataset : Asset, str, Path, Package or list of same, optional
A dataset or list of datasets. Datasets can be specified as Assets or as a filename. When a filename is given it will automatically be converted to a temporary Asset which gets deleted at the completion of the Job.
preprocessor : Builder
A preprocessor builder that will transform input data assets before operation.
params : dict, optional
A dictionary containing parameters appropriate to the operation being invoked.
session : Session, optional
A connection session. If not specified the default session is used.

Raises

TripleblindAPIError
number of preprocessors is not 1 OR number of preprocessors does not match number of datasets

Returns

Job
A job descriptor, None if Operation is invalid
def fetch_asset(algorithm: Optional[Asset] = None, dataset: Union[Asset, List[Asset], ForwardRef(None)] = None, session: Optional[Session] = None) -> AssetUsageProps

Retrieve the permission information for an asset

Args

algorithm : Asset, optional
An algorithm
dataset : Asset or [Asset], optional
A dataset
session : Session, optional
A connection session. If not specified the default session is used.

Returns

AssetUsageProps
The asset description, or None if not found
def status_list(session: Optional[Session] = None, job_id: str = None) -> List[JobStatus]

Fetch the status reports of all jobs.

Args

session : Session, optional
A connection session. If not specified the default session is used.

Returns

List[JobStatus]
List of all available jobs (running or completed)

Classes

class AssetUsageProps (status: str)

Properties for this usage of an asset in the context of a specific job

Class variables

var status : str

Required permission to use this asset: both | alg | data

class Job (id: uuid.UUID, job_name: str, metadata: object, created: datetime.datetime, owner: dict, dataset: Asset, preprocessor: str, operation: Asset, waiting_on: List[dict] = None, status: int = 0, model_status: dict = None)

A single operation involving an algorithm or training protocol, some form of data (either a data asset or an input file) and any parameters used by the algorithm/protocol.

Subclasses

Class variables

var created : datetime.datetime

When the job was started

var datasetAsset

List of datasets used by this job.

var id : uuid.UUID

The unique identifier for this job

var job_name : str

Display name for the job

var metadata : object

Parameters and other properties associated with the job.

var model_status : dict

Additional metrics on model training

var operationAsset

The algorithm or protocol being run in this job

var owner : dict

Data about the user that started the job

var preprocessor : str

Serialized str of preprocessor that will transform input datasets before use AP side.

var status : int

Status code from AP status updates. Will not see 1 (requested) or 2 (submitted) because we only check marketplace status until permissions are granted to job. Then self.update_status gets this status from AP and starts at 3 (in progress) and completes with either 4 (complete) or 10 (error). See message_schemas/status.py for any additional error codes.

var waiting_on : List[dict]

List of required items needing permissions (valid when _market_status = 'perm')

Static methods

def active() -> Job

The last submitted Job object

Returns

Job
The active Job, or None if not is running.
def find(job_name: Union[str, re.Pattern, ForwardRef(None)] = None, job_id: Optional[str] = None, job_status: Optional[str] = None, session: Optional[Session] = None) -> Job

Return a job with a name or id that matches the search pattern

Args

job_name : str, optional
Name of job to find, optional
job_id : str, optional
ID of job to find, optional
job_status : str, optional
status of job being searched for. options include 'active', 'waiting', 'history'
session : Session, optional
A connection session. If not specified the default session is used.

Raises

TripleblindProcessError
Search matches multiple jobs

Returns

Job
The searched-for Job object, or None if not found
def find_all(job_name: Union[str, re.Pattern, ForwardRef(None)] = None, job_id: Optional[str] = None, job_status: Optional[str] = None, session: Optional[Session] = None, max: Optional[int] = None) -> List[Job]

Return a list of jobs with a name or id that matches the search pattern

Args

job_name : str, optional
Name of job to find, optional
job_id : str, optional
ID of job to find, optional
job_status : str, optional
status of job being searched for. options include 'active', 'waiting', 'history'
session : Session, optional
A connection session. If not specified the default session is used.
max : int, optional
maximum number of jobs to return, optional

Returns

List[Job]
A list of found jobs, or None if no match found

Instance variables

var resultJobResult
var success : bool

Methods

def cancel(self, session: Optional[Session] = None) -> bool

Cancel a pending (waiting on permissions or queued) job

This will NOT stop a job which has already begun calculation. Use Job.kill() if you wish to unconditionally stop a job which is waiting on permission, queued or already calculating.

Args

session : Session, optional
A connection session. If not specified, the default session is used.

Returns

bool
True if job was canceled.
def get_status_stream(self, session: Optional[Session] = None) -> StatusOutputStream

Obtain a generator which produces process status messages

Args

session : Session, optional
A connection session. If not specified, the default session is used.

Returns

StatusOutputStream
The status message generator.
def handle_keyboard_interrupt(self, auto_cleanup: bool = True, session: Optional[Session] = None)

For interactive scripts, prompt user when a Ctrl+C is detected

When a local script that launches a process is stopped, the remote operation can be allowed to continue or can be stopped also. This method determines if that question is appropriate and acts accordingly.

Args

auto_cleanup : bool
If True, temporary assets will be deleted.
session : Session, optional
A connection session. If not specified, the default session is used.
def kill(self, session: Optional[Session] = None) -> bool

Immediately terminate an active job.

This will also cancel a job which is waiting on an access request response or waiting in the queue for computational resources.

Args

session : Session, optional
A connection session. If not specified, the default session is used.

Returns

bool
True if job was terminated.
def pretty_status_msg(self)
def submit(self, input_file=None, session: Optional[Session] = None) -> bool

Begin execution of a job.

To submit a file along with your job you would do something like this

with open("/path/to/myfile.png", "rb") as f:
    job.submit(input_file=f})

Args

input_file : file, optional
File to submit with the job (deprecated)
session : Session, optional
A connection session. If not specified, the default session is used.

Returns

bool
True if successfully started, False if failed
def update_router_status(self, session: Session = None) -> bool

Fetch the latest status from the Router

NOTE: This updates both self._market_status AND the self.waiting_on

Args

session : Session, optional
A connection session. If not specified, the default session is used.

Result

bool: True if updated, False if unchanged

def update_status(self, session: Optional[Session] = None, update_throttle=None)

Fetch the latest status for this job

Args

session : Session, optional
A connection session. If not specified, the default session is used.
update_throttle : float
Number of seconds to wait before reaching out to the Router. Useful to avoid flooding the network needlessly. Default is a 1 sec throttle.

Returns

bool
True if found and updated, False otherwise
def wait_for_completion(self, silent: bool = False, wait_for_permission: bool = True, session: Optional[Session] = None)

Wait for a job to complete, optionally waiting for permissions.

Args

silent : bool, optional
Suppress status messages during execution? Default is to show messages.
wait_for_permission : bool, optional
When True this will block until a dataset owner grants permission. Otherwise it will return immediately if permission is required. Defaults to blocking behavior.
session : Session, optional
A connection session. If not specified, the default session is used.

Raises

TripleblindProcessError
Process failed or cancelled.
def waiting_on_permission(self) -> bool

Tests if the job is waiting for one or more access permission grant(s)

Returns

bool
True if waiting, False otherwise
def waiting_on_queue(self) -> bool

Tests if the job is in the queue

Returns

bool
True if queued, False otherwise
class JobResult (raw_status: object)

JobResult wraps the text output by a completed job.

Job results are all returned as simple strings. These strings can be parsed differently depending on the type of job. Some have simple text output (e.g. an inference), some contain the asset IDs of algorithms or datasets generated by the process.

Class variables

var raw_status : object

Raw, unparsed output from the job.

Instance variables

var assetAsset

Identifier of the dataset created by the job.

var dataframe : pd.DataFrame

Dataframe of tabular output (shortcut for table.dataframe)

var table : TableAsset

Table version of the dataset created by the job.

class JobStatus (id: uuid.UUID, job_id: uuid.UUID, code: int, status: str, when: datetime.datetime, communications: int, error: str = '')

JobStatus is a snapshot of the state of a Job performing an operation

NOTE: This object is static, you will need to request a new status snapshot if the job is still running.

Class variables

var code : int
var communications : int
var error : str
var id : uuid.UUID
var job_id : uuid.UUID
var status : str
var when : datetime.datetime

Instance variables

var error_code : int

Failure code, None if the job succeeded.

var resultJobResult

A JobResult for a successful run, None if the job failed.

class RemoteStatusOutputStream (job_id: str, session=None)

Remote Continuous Inference Output Stream

Similar to the StatusOutputStream, but used when connecting to a Process initiated by another party (if allowed).

See StatusOutputStream for more detail on the output format.

Methods

def handle_exception(self, e: BaseException)
def remote_status(self, max_retries: int = 5, initial_wait: float = 0.5, backoff_factor: int = 2) -> Generator[dict, None, Optional[Exception]]

Obtain a generator which produces process status messages

Note: Use the StatusOutputStream.format_status_message() to convert the emitted status messages into nicely formatted output.

Args

max_retries : int
How many times to re-attempt a connection if a network failure occurs.
initial_wait : float
Seconds to wait in the initial retry.
backoff_factor : int
Step factor used in retry pauses.

Returns

RemoteStatusOutputStream
The status message generator.
class StatusOutputStream (job: Job, session: Session)

Job Output Status Message interface

During the running of a Job it can generate status messages to communicate intermediate results (e.g. in a Continuous Inference), checkpoint information (e.g. in a training at each epoch), or any kind of message the protocol wants to send.

Typical usage pattern is::

output_stream = model.infer(..., stream_output=True)
try:
    for status in output_stream.status():
        # status is either a string with control messages, or a dict
        if isinstance(status, dict):
            print(f"Calculated at: {status['data_gathered_timestamp']}")
            do_something(status["result"])
except BaseException as e:
    output_stream.handle_exception(e)

Each protocol has its own status output format. The basic format of the status messages is::

{
    "__type": "MSG_TYPE"
    # custom fields for the type
}

Below are details of formats from current protocols.


Continuous Federated Inference::

{
    "__type": "InferenceResults",

    # Protocol specific data
    "data_gathered_timestamp": 1680441227.5509229,
    "result": [[5.0], [5.0], [5.0], [5.0], [5.0], [5.0], [5.0], [5.0], [5.0], [5.0]],
}

Type = "SplitTrainMetrics"::

{
    '__type': 'SplitTrainMetrics',
    'test_results': 'Client0: Test records: 2500, Accuracy: 10.08%, Loss: 2.30\nClient1: Test records: 2500, Accuracy: 10.48%, Loss: 2.30\n',
    'train_reports': [
                        {'accuracy': 10.035555555555556, 'loss': 2.4022168402777777, 'client_id': 0, '__type': 'SplitTrainClassificationReport'},
                        {'accuracy': 9.933333333333334, 'loss': 2.3709133680555556, 'client_id': 1, '__type': 'SplitTrainClassificationReport'}
                        ],
    'epoch_count': 1,
    'epoch': 0
}

Type = "FederatedTrainMetrics"::

{
    '__type': 'FederatedTrainMetrics',
}

Type = "NNSMPCMetrics"::

{
    '__type': 'NNSMPCMetrics',
}

Type = "NNFEDMetrics"::

{
    '__type': 'NNFEDMetrics',
}

Type = "RegressionTrainingMetrics"::

{
    '__type': 'RegressionTrainingMetrics',
}

Type = "PSIMetrics"::

{
    '__type': 'PSIMetrics',
    'hash_table_size': 100
}

Type = "RecommenderTrainMetrics"::

{
    '__type': 'RecommenderTrainMetrics',
}

Static methods

def format_status_message(msg: dict) -> List[str]

Convert a message from the status stream into a human-readable form

The msg dict is assumed to be in the standard output format returned by the Access Point for a running job.

Args

msg : dict
The status message structure.

Returns

List[str], str
The formatted output (may be multi-line) and the next single-line message

Methods

def handle_exception(self, e: BaseException, auto_cleanup: bool = True)

Generic handler for exceptions that can occur during stream handling

Args

e : BaseException
Any exception, including the expected ones of KeyboardInterrupt and StopIteration

Raises

e
If the exception is truly unexpected, it is re-raised.
def status(self, max_retries: int = 5, initial_wait: float = 0.5, backoff_factor: int = 2) -> Generator[Union[dict, Literal['waiting on permission', 'starting']], None, Optional[Exception]]

Obtain a generator which produces process status messages

Note: Use the format_status_message() to convert the emitted status messages into nicely formatted output.

Args

max_retries : int
How many times to re-attempt a connection if a network failure occurs.
initial_wait : float
Seconds to wait in the initial retry.
backoff_factor : int
Step factor used in retry pauses.

Returns

StatusOutputStream
The status message generator.