Pipelines
Pipeline class enables users to interact with the functional properties of the Pipelines infrastructure such as create, read or delete pipelines. It can also be used for creating pipeline batches and runs.
Parameters:
-
token
(str
, default:None
) –token copy from polly.
Usage
from polly.pipelines import Pipeline
pipeline = Pipeline(token)
cancel_run
This function cancels a run Args: run_id (str): the run_id of the run to be cancelled
Returns:
-
object
–It will return a JSON object with pipeline run data. (See Examples)
Raises:
-
wrongParamException
–invalid parameter passed
create_batch
This function is used to create a Pipeline batch.
A batch is a collection of runs, this functions creates an empty batch in which the runs can be added.
Parameters:
-
pipeline_id
(str
) –pipeline_id for which the batch is to be created
-
name
(str
, default:None
) –name of the batch
-
description
(str
, default:None
) –description of the batch
-
priority
(str
, default:'low'
) –priority of the batch, can be low | medium | high
-
tags
(dict
, default:{}
) –a dict of key-value pair with tag_name -> tag_value mapping
-
domain_context
(dict
, default:{}
) –domain context for a batch
Returns:
-
object
–It will return a JSON object which is the pipeline batch. (See Examples)
Raises:
-
wrongParamException
–invalid parameter passed
get_batch
This function returns the pipeline batch data
Parameters:
-
batch_id
(str
) –the batch_id for which the data is required
Returns:
-
list
–It will return a list of JSON object with pipeline batch data. (See Examples)
Raises:
-
wrongParamException
–invalid parameter passed
get_pipeline
This function returns the pipeline data of the provided pipeline_id.
Parameters:
-
pipeline_id
(str
) –pipeline_id for required pipeline
Returns:
-
object
–It will return a JSON object with pipeline data. (See Examples)
Raises:
-
wrongParamException
–invalid parameter passed
get_run
This function returns the run data for the provided run_id
Parameters:
-
run_id
(str
) –the run_id for which the data is required
Returns:
-
object
–It will return a JSON object with pipeline run data. (See Examples)
Raises:
-
wrongParamException
–invalid parameter passed
list_batches
This function returns the list of pipeline batches
Parameters:
-
status
((str, Optional)
, default:None
) –to filter batches based on the status
-
priority
((str, Optional)
, default:None
) –to filter the batches based on priority
-
search_term
((str, Optional)
, default:None
) –to filter the batches based on a search term.
Returns:
-
list
–It will return a list of JSON object with pipeline batches. (See Examples)
Raises:
-
wrongParamException
–invalid parameter passed
list_pipelines
This function returns all the pipelines that the user have access to Please use this function with default values for the paramters.
Returns:
-
list
–It will return a list of JSON objects. (See Examples)
list_runs
This function returns the list of runs executed for a batch.
Parameters:
-
batch_id
(str
) –the batch_id for which the runs are required
Returns:
-
list
–It will return a list of JSON object with pipeline runs. (See Examples)
Raises:
-
wrongParamException
–invalid parameter passed
submit_run
This function is used for creating runs for a particular batch.
Parameters:
-
batch_id
(str
) –batch_id in which the run is to be created.
-
parameters
(dict
) –a key-value object of all the required parameters of pipeline
-
config
(dict
, default:{}
) –config definition for the pipeline run. should be of format {"infra": {"cpu": int, "memory": int, "storage": int}}
-
run_name
((str, Optional)
, default:None
) –name of the run, auto-generated if not assigned
Returns:
-
Object
–It will return a JSON object with pipeline data. (See Examples)
Raises:
-
wrongParamException
–invalid parameter passed
Examples
Pipeline class of polly-python can be initialised using the code block below:-
# Install polly python
pip install polly-python
# Import libraries
from polly.auth import Polly
from polly.pipelines import Pipeline
# Create omixatlas object and authenticate
AUTH_TOKEN=(os.environ['POLLY_REFRESH_TOKEN'])
pipelines = Pipelines(token=AUTH_TOKEN)
Get All Pipelines
[
{
"id":"fb40282e-05d8-49c5-bd84-e5ad04cd8233",
"type":"pipelines",
"attributes":{
"name":"toy",
"display_name":"toy",
"description":"Pipeline description",
"executor":"nextflow",
"deployment_stage":"dev",
"config":{
},
"org_id":"1",
"user_id":"1703140688",
"user_name":"polly@elucidata.io",
"created_at":1709473284864,
"last_updated_at":1713784141145
}
}
]
Get Pipeline by ID
{
"id":"fb40282e-05d8-49c5-bd84-e5ad04cd8233",
"type":"pipelines",
"attributes":{
"name":"toy",
"display_name":"toy",
"description":"Pipeline description",
"executor":"nextflow",
"deployment_stage":"dev",
"config":{
},
"org_id":"1",
"user_id":"1703140688",
"user_name":"polly@elucidata.io",
"created_at":1709473284864,
"last_updated_at":1713784141145
}
}
Create a pipeline run
Users can create runs for pipelines they have access, as shown below:-
run = pipeline.create_run(
pipeline_id=demo_pipeline_id,
run_name="TEST_RUN",
priority="medium",
tags={},
domain_context={},
)
{
"id":"7c273042-a8dd-49dc-9208-a24ea4cc3296",
"type":"runs",
"attributes":{
"name":"TEST_RUN",
"created_at":17137412412778,
"pipeline_id":"fb402852-05d8-49c5-bd84-e5ad04cd8233",
"priority":"high",
"domain_context":{
},
"tags":{
"org_id":""
},
"org_id":"1",
"user_id":"1658226496",
"user_name":"john.doe@elucidata.io",
"num_jobs":0,
"num_successful_jobs":0,
"num_failed_jobs":0,
"status":"PENDING",
"last_updated_at":1713794894778
}
}
Submit pipeline job
Users can submit jobs for pipeline as shown below:-
job = pipeline.submit_job(
run_id = "run_id",
parameters = {
"param_a": "value_a",
"param_b": "value_b"
},
config = {
"infra": {
"cpu": 1,
"memory": 2,
"storage": 120
}
}
)
Get runs for the current user, and filter by status or priority
# Get Runs by user
runs = pipeline.get_runs()
# Get runs with filter
filtered_runs_by_status = pipeline.get_runs(
status="PARTIALLY_COMPLETED"
)
# Get runs with filter
filtered_runs_by_priority = pipeline.get_runs(
priority="low"
)
[
{
"id":"1659632141379__1686063841__1641216496",
"type":"runs",
"attributes":{
"name":"Multiple OA",
"created_at":1686063857177,
"pipeline_id":"None",
"priority":"high",
"domain_context":{
"source":"migration",
"repo_id":"16596812811111",
"repo_version":168412063841
},
"tags":{
},
"org_id":"1",
"user_id":"1658226496",
"user_name":"John Doe",
"num_jobs":5,
"num_successful_jobs":3,
"num_failed_jobs":2,
"status":"PARTIALLY_COMPLETED",
"last_updated_at":1693139935542
}
}
]
Get a run by ID
{
"id":"8b3db5bd-12cf-4929-bdda-d1900553f5ad",
"type":"runs",
"attributes":{
"name":"76afcb3d-f9d4-480c-91ec-9d23a2064e47",
"created_at":1694256985387,
"pipeline_id":"ce03e312-e9cf-46f0-985a-e14f93066cd3",
"priority":"low",
"domain_context":{
},
"tags":{
},
"org_id":"1",
"user_id":"1658226496",
"user_name":"aditya.asthana@elucidata.io",
"num_jobs":1,
"num_successful_jobs":0,
"num_failed_jobs":1,
"status":"ERRORED",
"last_updated_at":1694257190419
}
}
Get jobs inside a run
[
{
"id":"56cb3a52-c280-4433-b73f-13875dbb2480",
"type":"jobs",
"attributes":{
"name":"job_name",
"parameters":{
"x":12,
"y":13
},
"config":{
},
"run_id":"8b3db5bd-12cf-4929-bdda-d1900553f5ad",
"stage":"Processing",
"progress":"1/2",
"errored":true,
"finished":false,
"created_at":1694256987866,
"last_updated_at":1694257190419
}
}
]
Get a job by ID
{
"id":"561b1a52-c280-4433-b73f-13875dbb2480",
"type":"jobs",
"attributes":{
"name":"job#0",
"parameters":{
"x":12,
"y":13
},
"config":{
},
"run_id":"41245bd-12cf-4929-bdda-d1900553f5ad",
"stage":"Processing",
"progress":"1/2",
"errored":true,
"finished":false,
"created_at":1694256987866,
"last_updated_at":1694257190419
}
}