Working with Jobs

A Job object is a mutable handle for a single quantum program submission. It tracks its own lifecycle — from creation through execution to the final result — and records every execution attempt in its history, so you can inspect, re-run, or reconfigure without losing previous results.

For submitting multiple programs at once see Composite Jobs.

Tip

Coming from oqc-qcaas-client? See Migrating from oqc-qcaas-client for a side-by-side comparison: async non-blocking execution, result attached to the job object, built-in history, reconfiguration, and persistence.

QPU Selection

Every job requires a QPU (Quantum Processing Unit) ID. The most direct approach is to supply it explicitly on each create_job() call, as shown in Quick Start. The SDK also supports two alternative patterns.

Set a default at initialisation — pass default_qpu_id once and omit it from subsequent calls:

>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
...     async with OqcSdk(
...         url=os.environ["OQC_URL"],
...         authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"],
...         default_qpu_id='qpu:uk:2:d865b5a184'
...     ) as client:
...         program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
...         job = client.create_job(program=program)  # qpu_id not required
...         outputs = await job.execute(timeout_s=10)
...         return job.completed
>>> asyncio.run(run())
True

Discover then set — query available QPUs at runtime and set a default before creating jobs:

>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
...     async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
...         qpus = await client.list_qpus()
...         simulator = next(
...             (q for q in qpus if q.get("name") == "Simulator" and q.get("status") == "ACTIVE"),
...             None
...         )
...         client.set_default_qpu(simulator["id"] if simulator else 'qpu:uk:2:d865b5a184')
...         program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
...         job = client.create_job(program=program)
...         outputs = await job.execute(timeout_s=10)
...         return job.completed
>>> asyncio.run(run())
True

Accessing Results

After execute() completes, the most natural way to read the result is directly from the job via job.result:

await job.execute(timeout_s=30)

print(job.result.data)              # {'c': {'0': 512, '1': 512}}
print(job.result.program)           # QASM string that ran
print(job.result.qpu_id)            # QPU it ran on
print(job.result.received)          # datetime of completion

job.result returns None before the first run completes; always check job.completed or job.error first if the state is uncertain.

execute() also returns a JobOutputProxy — a collection wrapper that unlocks filtering, mapping, and the CompositeJob API. Use outputs.first when you need that collection interface for a single job:

outputs = await job.execute(timeout_s=30)

result = outputs.first              # same JobResult as job.result
counts = result.data                # {'c': {'0': 512, '1': 512}}

Use .one instead of .first when you want to assert there is exactly one item — it raises ValueError if the count is not 1.

Note

``job.result`` vs ``job.outputs`` — what’s the difference?

job.result is the raw result object for the latest run — a single JobResult (or None if the job hasn’t finished). It is the quickest way to read measurement counts from a single Job.

job.outputs wraps that same result in a JobOutputProxy: a collection with a query interface (all(), filter(), map(), results(), errors()). They refer to the same underlying data — job.outputs.first and job.result point to the same object — the difference is purely the wrapper. job.outputs becomes important when you need to call reexecute() or resubmit() on the output, or when you want your code to work uniformly with CompositeJob (which has no .result shortcut because N results exist). outputs.all() is the universal accessor that works identically for both types.

See Accessing Results for the full cross-type pattern.

Fields on JobResult:

Attribute

Type

Description

.data

dict[str, Any]

Measurement data, keyed by classical register name. The inner value depends on the compiler config: the default BinaryCount format produces dict[str, int] (bitstring → count, e.g. {'c': {'0': 512, '1': 512}}); other formats such as raw may return list[list] or other nested structures.

.program

str | None

QASM program that produced this result

.qpu_id

str | None

QPU the program ran on

.task_id

UUID | None

Task identifier

.received

datetime | None

Timestamp of completion

.ok

bool

Always True for a result (False on a JobError)

.is_error

bool

Always False for a result (True on a JobError)

For accessing results from CompositeJob — including iterating all child results and writing code that handles both types uniformly — see Accessing Results in Composite Jobs.

Execution History

After execute() returns, the full output of that run is accessible via job.history. History is cumulative: each call to execute() appends a new entry, so you can compare results across runs.

>>> from oqc_qcaas_sdk import OqcSdk
>>> from oqc_qcaas_sdk.job_output import JobResult
>>> import os
>>> import asyncio
>>> async def run():
...     async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
...         qpu_id = 'qpu:uk:2:d865b5a184'
...         program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
...         job = client.create_job(program=program, qpu_id=qpu_id)
...         result = await job.execute(timeout_s=10)
...         history = job.history.all()
...         return len(history), isinstance(history[0], JobResult)
>>> asyncio.run(run())
(1, True)

history is a JobOutputProxy, so you can filter, map, and access items from it — see the JobOutputProxy section in Composite Jobs for the full API.

Re-running a Job

Job state is tracked through a lifecycle:

  • Active states: CREATED, SUBMITTED, RUNNING — the job is in-flight

  • Terminal states: COMPLETED, FAILED, CANCELLED — the job has ended

Calling execute() a second time behaves differently depending on the current state:

  • Terminal jobexecute() automatically resets the job and submits a fresh run with a new task ID. The previous result remains in history.

  • Active jobexecute() resumes polling without re-submitting. This means calling execute() again on an in-flight job is safe: it will not produce a duplicate submission. This is also useful if a previous execute() call timed out and you want to continue waiting.

If the job is currently active but you want a fresh submission rather than resuming the in-flight run, call cancel_if_active() first. This transitions the job to CANCELLED (a terminal state), so the subsequent execute() resets and resubmits:

await job.cancel_if_active()  # ensures the job is in a terminal state
await job.execute()           # always does a fresh submission
>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
...     async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
...         qpu_id = 'qpu:uk:2:d865b5a184'
...         program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
...         job = client.create_job(program=program, qpu_id=qpu_id)
...         # First run
...         await job.execute(timeout_s=10)
...         first_task = job.task_id
...         # Second run: job is terminal, so execute() resets and resubmits
...         await job.execute(timeout_s=10)
...         second_task = job.task_id
...         return first_task != second_task, len(job.history.all()), job.completed
>>> asyncio.run(run())
(True, 2, True)

Reconfiguration and Re-execution

Each JobOutput stored in job.history (or returned by execute()) carries the settings that produced it — program, QPU ID, and compiler configuration. You can use any of these outputs as a template to reconfigure the job and re-run from a known baseline.

The simplest approach is reexecute() on a JobOutput, which restores its settings onto the parent job and immediately submits a new run:

>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
...     async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
...         qpu_id = 'qpu:uk:2:d865b5a184'
...         program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
...         job = client.create_job(program=program, qpu_id=qpu_id)
...         first_outputs = await job.execute(timeout_s=10)
...         # Resubmit with the exact same settings as the first run
...         second_outputs = await first_outputs.first.reexecute()
...         return job.completed, len(job.history.all())
>>> asyncio.run(run())
(True, 2)

For more control, call reconfigure() on the job directly. Pass a memento to restore settings from a specific historical output; keyword overrides are applied on top, so you can change individual fields while keeping everything else the same:

>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
...     async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
...         qpu_id = 'qpu:uk:2:d865b5a184'
...         bad_program = 'OPENQASM 5.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
...         job = client.create_job(program=bad_program, qpu_id=qpu_id)
...         first_outputs = await job.execute(timeout_s=10)
...         # Restore the first run's settings, but swap in a corrected program
...         good_program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
...         job.reconfigure(memento=first_outputs.first, program=good_program)
...         await job.execute(timeout_s=10)
...         history = job.history.all()
...         return len(history), sum(1 for h in history if h.is_error), sum(1 for h in history if h.ok)
>>> asyncio.run(run())
(2, 1, 1)

Omitting memento and passing only keyword arguments is a shorthand when you just want to change a field without referencing a specific historical run:

job.reconfigure(program=new_program)
await job.execute()

Exception-based Error Handling

By default execute() stores errors on the job without raising exceptions, letting you decide whether an error is fatal. If you prefer exception-based control flow, call raise_for_status() after execution:

>>> from oqc_qcaas_sdk import OqcSdk
>>> from oqc_qcaas_sdk.exceptions import JobFailed
>>> import os
>>> import asyncio
>>> async def run():
...     async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
...         qpu_id = 'qpu:uk:2:d865b5a184'
...         bad_program = 'OPENQASM 5.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
...         job = client.create_job(program=bad_program, qpu_id=qpu_id)
...         await job.execute(timeout_s=10)
...         try:
...             job.raise_for_status()
...             return "Success"
...         except JobFailed:
...             return "Failed"
>>> asyncio.run(run())
'Failed'

You can also inspect which program was executed when a failure occurred by examining job.history:

>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
...     async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
...         qpu_id = 'qpu:uk:2:d865b5a184'
...         bad_program = 'OPENQASM 5.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
...         job = client.create_job(program=bad_program, qpu_id=qpu_id)
...         await job.execute(timeout_s=15)
...         good_program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
...         job.reconfigure(program=good_program)
...         await job.execute(timeout_s=15)
...         history = job.history.all()
...         failed_runs = [h for h in history if h.is_error]
...         successful_runs = [h for h in history if h.ok]
...         if failed_runs:
...             failed_program = failed_runs[0].program
...             return len(history), len(failed_runs), len(successful_runs), 'OPENQASM 5.0' in failed_program
...         return None
>>> asyncio.run(run())
(2, 1, 1, True)

Cancellation

Call cancel() to send a cancellation request for the active run:

>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
...     async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
...         qpu_id = 'qpu:uk:2:d865b5a184'
...         program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
...         job = client.create_job(program=program, qpu_id=qpu_id)
...         await job.submit()
...         await job.cancel()
...         return job.state
>>> asyncio.run(run())
<JobState.CANCELLED: 'CANCELLED'>

Note

Cancellation is timing-dependent: if the job completes before the cancel request is processed, cancel() may have no effect.

Use cancel_if_active() when you want to cancel only if the job has not already finished. It is a no-op for terminal jobs, so it is safe to call at any point without first checking state:

# Safe to call regardless of current job state
await job.cancel_if_active()

Execution Diagnostics

By default a Job stores only the measurement result (or error). Pass fetch_diagnostics=True to create_job() to additionally populate timing, metrics, and metadata on the job after each run reaches a terminal state.

job = sdk.create_job(
    program=qasm,
    qpu_id="qpu:uk:2:d865b5a184",
    fetch_diagnostics=True,
)
await job.execute()

print(job.timings)    # dict | None — wall-clock and queue timings
print(job.metrics)    # dict | None — gate counts, circuit depth, etc.
print(job.metadata)   # dict | None — compiler and hardware metadata

The three attributes are None until the job has completed at least one run. Each attribute is fetched independently. If one call fails (e.g. the data is not yet available), that attribute remains None while the others may still be populated — the rest of the job and its result are unaffected.

Diagnostics are preserved across save() / load() round-trips, and the flag itself is also persisted so a reloaded job continues to fetch diagnostics on subsequent runs.