Skip to content

Pipelines

Pipeline class enables users to interact with the functional properties of the Pipelines infrastructure such as create, read or delete pipelines. It can also be used for creating pipeline runs and jobs. Args: token (str): token copy from polly. Usage: from polly.pipelines import Pipeline

pipeline = Pipeline(token)

create_run(pipeline_id, run_name=None, priority='low', tags={}, domain_context={})

This function is used to create a Pipeline run.

A run is a collection of jobs, this functions creates an empty run in which the jobs can be added.

Parameters:

Name Type Description Default
pipeline_id str

pipeline_id for which the run is to be created

required
run_name str

name of the run

None
priority str

priority of the run, can be low | medium | high

'low'
tags dict

a dict of key-value pair with tag_name -> tag_value mapping

{}
domain_context dict

domain context for a run

{}

Returns:

Type Description

It will return a JSON object which is the pipeline run. (See Examples)

Raises:

Type Description
wrongParamException

invalid parameter passed

get_job(job_id)

This function returns the job data for the provided job_id

Parameters:

Name Type Description Default
job_id str

the job_id for which the data is required

required

Returns:

Type Description

It will return a JSON object with pipeline job data. (See Examples)

Raises:

Type Description
wrongParamException

invalid parameter passed

get_jobs(run_id)

This function returns the list of jobs executed for a run.

Parameters:

Name Type Description Default
run_id str

the run_id for which the jobs are required

required
org_id (str, Optional)

to filter runs based on the org_id

required
user_id (str, Optional)

to filter the run_id based on user_id

required
page_size (int, Optional)

number of runs to be fetched per request, default = 10

required
page_after (int, Optional)

number of pages to be skipped, default = 0

required

Returns:

Type Description

It will return a list of JSON object with pipeline runs. (See Examples)

Raises:

Type Description
wrongParamException

invalid parameter passed

get_pipeline(pipeline_id)

This function returns the pipeline data of the provided pipeline_id.

Parameters:

Name Type Description Default
pipeline_id str

pipeline_id for required pipeline

required

Returns:

Type Description

It will return a JSON object with pipeline data. (See Examples)

Raises:

Type Description
wrongParamException

invalid parameter passed

get_pipelines()

This function returns all the pipelines that the user have access to Please use this function with default values for the paramters.

Returns:

Type Description

It will return a list of JSON objects. (See Examples)

get_run(run_id)

This function returns the pipeline run data

Parameters:

Name Type Description Default
run_id str

the run_id for which the data is required

required

Returns:

Type Description

It will return a list of JSON object with pipeline run data. (See Examples)

Raises:

Type Description
wrongParamException

invalid parameter passed

get_runs(status=None, priority=None)

This function returns the list of pipeline runs

Parameters:

Name Type Description Default
org_id (str, Optional)

to filter runs based on the org_id

required
user_id (str, Optional)

to filter the run_id based on user_id

required
page_size (int, Optional)

number of runs to be fetched per request, default = 10

required
page_after (int, Optional)

number of pages to be skipped, default = 0

required

Returns:

Type Description

It will return a list of JSON object with pipeline runs. (See Examples)

Raises:

Type Description
wrongParamException

invalid parameter passed

submit_job(run_id, parameters, config, job_name=None)

This function is used for creating jobs for a particular run. Args: run_id (str): run_id in which the job is to be created. parameters (dict): a key-value object of all the required parameters of pipeline config (dict): config definition for the pipeline job. should be of format {"infra": {"cpu": int, "memory": int, "storage": int}} job_name (str, Optional): name of the job, auto-generated if not assigned

Returns:

Type Description

It will return a JSON object with pipeline data. (See Examples)

Raises:

Type Description
wrongParamException

invalid parameter passed

Examples

Pipeline class of polly-python can be initialised using the code block below:-

# Install polly python
pip install polly-python

# Import libraries
from polly.auth import Polly
from polly.pipelines import Pipeline

# Create omixatlas object and authenticate
AUTH_TOKEN=(os.environ['POLLY_REFRESH_TOKEN'])
pipelines = Pipelines(token=AUTH_TOKEN)

Get All Pipelines

pipelines.get_pipelines()
[
    {
        "id":"fb40282e-05d8-49c5-bd84-e5ad04cd8233",
        "type":"pipelines",
        "attributes":{
            "name":"toy",
            "display_name":"toy",
            "description":"Pipeline description",
            "executor":"nextflow",
            "deployment_stage":"dev",
            "config":{

            },
            "org_id":"1",
            "user_id":"1703140688",
            "user_name":"polly@elucidata.io",
            "created_at":1709473284864,
            "last_updated_at":1713784141145
        }
    }
]

Get Pipeline by ID

pipelines.get_pipeline(pipeline_id="fb40282e-05d8-49c5-bd84-e5ad04cd8233")
{
    "id":"fb40282e-05d8-49c5-bd84-e5ad04cd8233",
    "type":"pipelines",
    "attributes":{
        "name":"toy",
        "display_name":"toy",
        "description":"Pipeline description",
        "executor":"nextflow",
        "deployment_stage":"dev",
        "config":{

        },
        "org_id":"1",
        "user_id":"1703140688",
        "user_name":"polly@elucidata.io",
        "created_at":1709473284864,
        "last_updated_at":1713784141145
    }
}

Create a pipeline run

Users can create runs for pipelines they have access, as shown below:-

run = pipeline.create_run(
    pipeline_id=demo_pipeline_id,
    run_name="TEST_RUN",
    priority="medium",
    tags={},
    domain_context={},
)
{
    "id":"7c273042-a8dd-49dc-9208-a24ea4cc3296",
    "type":"runs",
    "attributes":{
        "name":"TEST_RUN",
        "created_at":17137412412778,
        "pipeline_id":"fb402852-05d8-49c5-bd84-e5ad04cd8233",
        "priority":"high",
        "domain_context":{

        },
        "tags":{
            "org_id":""
        },
        "org_id":"1",
        "user_id":"1658226496",
        "user_name":"john.doe@elucidata.io",
        "num_jobs":0,
        "num_successful_jobs":0,
        "num_failed_jobs":0,
        "status":"PENDING",
        "last_updated_at":1713794894778
    }
}

Submit pipeline job

Users can submit jobs for pipeline as shown below:-

job = pipeline.submit_job(
    run_id = "run_id",
    parameters = {
        "param_a": "value_a",
        "param_b": "value_b"
    },
    config = {
        "infra": {
            "cpu": 1,
            "memory": 2,
            "storage": 120
        }
    }
)

Get runs for the current user, and filter by status or priority

# Get Runs by user
runs = pipeline.get_runs()

# Get runs with filter
filtered_runs_by_status = pipeline.get_runs(
    status="PARTIALLY_COMPLETED"
)

# Get runs with filter
filtered_runs_by_priority = pipeline.get_runs(
    priority="low"
)
[
    {
        "id":"1659632141379__1686063841__1641216496",
        "type":"runs",
        "attributes":{
            "name":"Multiple OA",
            "created_at":1686063857177,
            "pipeline_id":"None",
            "priority":"high",
            "domain_context":{
                "source":"migration",
                "repo_id":"16596812811111",
                "repo_version":168412063841
            },
            "tags":{

            },
            "org_id":"1",
            "user_id":"1658226496",
            "user_name":"John Doe",
            "num_jobs":5,
            "num_successful_jobs":3,
            "num_failed_jobs":2,
            "status":"PARTIALLY_COMPLETED",
            "last_updated_at":1693139935542
        }
    }
]

Get a run by ID

pipeline.get_run(run_id="<RUN_ID>")
{
    "id":"8b3db5bd-12cf-4929-bdda-d1900553f5ad",
    "type":"runs",
    "attributes":{
        "name":"76afcb3d-f9d4-480c-91ec-9d23a2064e47",
        "created_at":1694256985387,
        "pipeline_id":"ce03e312-e9cf-46f0-985a-e14f93066cd3",
        "priority":"low",
        "domain_context":{

        },
        "tags":{

        },
        "org_id":"1",
        "user_id":"1658226496",
        "user_name":"aditya.asthana@elucidata.io",
        "num_jobs":1,
        "num_successful_jobs":0,
        "num_failed_jobs":1,
        "status":"ERRORED",
        "last_updated_at":1694257190419
    }
}

Get jobs inside a run

pipeline.get_jobs(run_id="<RUN_ID>")
[
    {
        "id":"56cb3a52-c280-4433-b73f-13875dbb2480",
        "type":"jobs",
        "attributes":{
            "name":"job_name",
            "parameters":{
                "x":12,
                "y":13
            },
            "config":{

            },
            "run_id":"8b3db5bd-12cf-4929-bdda-d1900553f5ad",
            "stage":"Processing",
            "progress":"1/2",
            "errored":true,
            "finished":false,
            "created_at":1694256987866,
            "last_updated_at":1694257190419
        }
    }
]

Get a job by ID

pipeline.get_job(job_id='<JOB_ID>')
{
    "id":"561b1a52-c280-4433-b73f-13875dbb2480",
    "type":"jobs",
    "attributes":{
        "name":"job#0",
        "parameters":{
            "x":12,
            "y":13
        },
        "config":{

        },
        "run_id":"41245bd-12cf-4929-bdda-d1900553f5ad",
        "stage":"Processing",
        "progress":"1/2",
        "errored":true,
        "finished":false,
        "created_at":1694256987866,
        "last_updated_at":1694257190419
    }
}