Skip to content

Running Job

When you run the Conduit, all steps will be iterated and underlying functions will be executed. If a function raises and exception, Conduit will catch it and store the exception in the return_value of ConduitStep object.

To run the job you can execute Conduit.run(), if you want to execute jobs one by one then you can use await keyword, however this will suspend your app until it completes executing all steps.

Before Conduit executes the underlying function, it performs several checks to verify if step has configured correctly (If it has a duplicate ID, if the block doesn't exists and so on.) and changes the status of the step from one of ConduitStatus enum.

ConduitStatus

Name Value Is used for raised exceptions in the block? Description
NONE -1 The step has not executed yet. This will be set as first value to every created step.
DONE 0 Step has executed without any errors.
SKIPPED 100 This step has skipped because of previous failed steps.
UNHANDLED_EXCEPTION 101 All general unexpected errors in the block.
BLOCK_NOT_FOUND 102 The action / block doesn't exists.
INVALID_ARGUMENT 103 Returned if block raised an ValueError.
DUPLICATE_STEP_IDS 104 Duplicate step IDs has detected.
FORBIDDEN_BLOCK 105 This block is forbidden to use because of missing tags in the job.
ASSERTION_ERROR 106 Returned if block raised an AssertionError.
INVALID_TYPE 107 Returned if user passed a type of value that doesn't match with the parameter type. If pydantic has not installed, then this will not be used as there will be no type validation without pydantic.
STEP_LIMIT_EXCEED 108 Step limit has been exceed.
BLOCK_LIMIT_EXCEED 109 Block limit has been exceed.
IF_CONDITION_FAILED 110 if_condition of the step returned False.

Events

If you want Conduit to execute the specified function when a step updates or/and job finishes, you can use on_step_update and on_job_finish.

job = Conduit()

job.on_step_update = lambda j, step: print(step.id, "has finished!")

You can also provide a async function.

job = Conduit()

async def done(j, step):
    await do_something(...)

job.on_step_update = done