Working with Jobs¶
A Job object is a mutable handle for a single quantum program submission.
It tracks its own lifecycle — from creation through execution to the final
result — and records every execution attempt in its history, so you can
inspect, re-run, or reconfigure without losing previous results.
For submitting multiple programs at once see Composite Jobs.
Tip
Coming from oqc-qcaas-client? See Migrating from oqc-qcaas-client for a
side-by-side comparison: async non-blocking execution, result attached
to the job object, built-in history, reconfiguration, and persistence.
QPU Selection¶
Every job requires a QPU (Quantum Processing Unit) ID. The most direct
approach is to supply it explicitly on each create_job() call, as shown in
Quick Start. The SDK also supports two alternative patterns.
Set a default at initialisation — pass default_qpu_id once and omit
it from subsequent calls:
>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
... async with OqcSdk(
... url=os.environ["OQC_URL"],
... authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"],
... default_qpu_id='qpu:uk:2:d865b5a184'
... ) as client:
... program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
... job = client.create_job(program=program) # qpu_id not required
... outputs = await job.execute(timeout_s=10)
... return job.completed
>>> asyncio.run(run())
True
Discover then set — query available QPUs at runtime and set a default before creating jobs:
>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
... async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
... qpus = await client.list_qpus()
... simulator = next(
... (q for q in qpus if q.get("name") == "Simulator" and q.get("status") == "ACTIVE"),
... None
... )
... client.set_default_qpu(simulator["id"] if simulator else 'qpu:uk:2:d865b5a184')
... program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
... job = client.create_job(program=program)
... outputs = await job.execute(timeout_s=10)
... return job.completed
>>> asyncio.run(run())
True
Accessing Results¶
After execute() completes, the most natural way to read the result is
directly from the job via job.result:
await job.execute(timeout_s=30)
print(job.result.data) # {'c': {'0': 512, '1': 512}}
print(job.result.program) # QASM string that ran
print(job.result.qpu_id) # QPU it ran on
print(job.result.received) # datetime of completion
job.result returns None before the first run completes; always check
job.completed or job.error first if the state is uncertain.
execute() also returns a JobOutputProxy — a collection wrapper that
unlocks filtering, mapping, and the CompositeJob API. Use outputs.first
when you need that collection interface for a single job:
outputs = await job.execute(timeout_s=30)
result = outputs.first # same JobResult as job.result
counts = result.data # {'c': {'0': 512, '1': 512}}
Use .one instead of .first when you want to assert there is exactly
one item — it raises ValueError if the count is not 1.
Note
``job.result`` vs ``job.outputs`` — what’s the difference?
job.result is the raw result object for the latest run — a single
JobResult (or None if the job hasn’t finished). It is the quickest
way to read measurement counts from a single Job.
job.outputs wraps that same result in a JobOutputProxy: a
collection with a query interface (all(), filter(), map(),
results(), errors()). They refer to the same underlying data —
job.outputs.first and job.result point to the same object — the
difference is purely the wrapper. job.outputs becomes important when
you need to call reexecute() or resubmit() on the output, or when
you want your code to work uniformly with CompositeJob (which has no
.result shortcut because N results exist). outputs.all() is the
universal accessor that works identically for both types.
See Accessing Results for the full cross-type pattern.
Fields on JobResult:
Attribute |
Type |
Description |
|---|---|---|
|
|
Measurement data, keyed by classical register name. The inner value
depends on the compiler config: the default |
|
|
QASM program that produced this result |
|
|
QPU the program ran on |
|
|
Task identifier |
|
|
Timestamp of completion |
|
|
Always |
|
|
Always |
For accessing results from CompositeJob — including iterating all child
results and writing code that handles both types uniformly — see
Accessing Results in Composite Jobs.
Execution History¶
After execute() returns, the full output of that run is accessible via
job.history. History is cumulative: each call to execute() appends a
new entry, so you can compare results across runs.
>>> from oqc_qcaas_sdk import OqcSdk
>>> from oqc_qcaas_sdk.job_output import JobResult
>>> import os
>>> import asyncio
>>> async def run():
... async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
... qpu_id = 'qpu:uk:2:d865b5a184'
... program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
... job = client.create_job(program=program, qpu_id=qpu_id)
... result = await job.execute(timeout_s=10)
... history = job.history.all()
... return len(history), isinstance(history[0], JobResult)
>>> asyncio.run(run())
(1, True)
history is a JobOutputProxy, so you can filter, map, and access items
from it — see the JobOutputProxy section in Composite Jobs for the
full API.
Re-running a Job¶
Job state is tracked through a lifecycle:
Active states: CREATED, SUBMITTED, RUNNING — the job is in-flight
Terminal states: COMPLETED, FAILED, CANCELLED — the job has ended
Calling execute() a second time behaves differently depending on the
current state:
Terminal job —
execute()automatically resets the job and submits a fresh run with a new task ID. The previous result remains inhistory.Active job —
execute()resumes polling without re-submitting. This means callingexecute()again on an in-flight job is safe: it will not produce a duplicate submission. This is also useful if a previousexecute()call timed out and you want to continue waiting.
If the job is currently active but you want a fresh submission rather than
resuming the in-flight run, call cancel_if_active() first. This
transitions the job to CANCELLED (a terminal state), so the subsequent
execute() resets and resubmits:
await job.cancel_if_active() # ensures the job is in a terminal state
await job.execute() # always does a fresh submission
>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
... async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
... qpu_id = 'qpu:uk:2:d865b5a184'
... program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
... job = client.create_job(program=program, qpu_id=qpu_id)
... # First run
... await job.execute(timeout_s=10)
... first_task = job.task_id
... # Second run: job is terminal, so execute() resets and resubmits
... await job.execute(timeout_s=10)
... second_task = job.task_id
... return first_task != second_task, len(job.history.all()), job.completed
>>> asyncio.run(run())
(True, 2, True)
Reconfiguration and Re-execution¶
Each JobOutput stored in job.history (or returned by execute())
carries the settings that produced it — program, QPU ID, and compiler
configuration. You can use any of these outputs as a template to reconfigure
the job and re-run from a known baseline.
The simplest approach is reexecute() on a JobOutput, which restores
its settings onto the parent job and immediately submits a new run:
>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
... async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
... qpu_id = 'qpu:uk:2:d865b5a184'
... program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
... job = client.create_job(program=program, qpu_id=qpu_id)
... first_outputs = await job.execute(timeout_s=10)
... # Resubmit with the exact same settings as the first run
... second_outputs = await first_outputs.first.reexecute()
... return job.completed, len(job.history.all())
>>> asyncio.run(run())
(True, 2)
For more control, call reconfigure() on the job directly. Pass a
memento to restore settings from a specific historical output; keyword
overrides are applied on top, so you can change individual fields while
keeping everything else the same:
>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
... async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
... qpu_id = 'qpu:uk:2:d865b5a184'
... bad_program = 'OPENQASM 5.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
... job = client.create_job(program=bad_program, qpu_id=qpu_id)
... first_outputs = await job.execute(timeout_s=10)
... # Restore the first run's settings, but swap in a corrected program
... good_program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
... job.reconfigure(memento=first_outputs.first, program=good_program)
... await job.execute(timeout_s=10)
... history = job.history.all()
... return len(history), sum(1 for h in history if h.is_error), sum(1 for h in history if h.ok)
>>> asyncio.run(run())
(2, 1, 1)
Omitting memento and passing only keyword arguments is a shorthand when
you just want to change a field without referencing a specific historical run:
job.reconfigure(program=new_program)
await job.execute()
Exception-based Error Handling¶
By default execute() stores errors on the job without raising exceptions,
letting you decide whether an error is fatal. If you prefer exception-based
control flow, call raise_for_status() after execution:
>>> from oqc_qcaas_sdk import OqcSdk
>>> from oqc_qcaas_sdk.exceptions import JobFailed
>>> import os
>>> import asyncio
>>> async def run():
... async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
... qpu_id = 'qpu:uk:2:d865b5a184'
... bad_program = 'OPENQASM 5.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
... job = client.create_job(program=bad_program, qpu_id=qpu_id)
... await job.execute(timeout_s=10)
... try:
... job.raise_for_status()
... return "Success"
... except JobFailed:
... return "Failed"
>>> asyncio.run(run())
'Failed'
You can also inspect which program was executed when a failure occurred by
examining job.history:
>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
... async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
... qpu_id = 'qpu:uk:2:d865b5a184'
... bad_program = 'OPENQASM 5.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
... job = client.create_job(program=bad_program, qpu_id=qpu_id)
... await job.execute(timeout_s=15)
... good_program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
... job.reconfigure(program=good_program)
... await job.execute(timeout_s=15)
... history = job.history.all()
... failed_runs = [h for h in history if h.is_error]
... successful_runs = [h for h in history if h.ok]
... if failed_runs:
... failed_program = failed_runs[0].program
... return len(history), len(failed_runs), len(successful_runs), 'OPENQASM 5.0' in failed_program
... return None
>>> asyncio.run(run())
(2, 1, 1, True)
Cancellation¶
Call cancel() to send a cancellation request for the active run:
>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
... async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
... qpu_id = 'qpu:uk:2:d865b5a184'
... program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
... job = client.create_job(program=program, qpu_id=qpu_id)
... await job.submit()
... await job.cancel()
... return job.state
>>> asyncio.run(run())
<JobState.CANCELLED: 'CANCELLED'>
Note
Cancellation is timing-dependent: if the job completes before the cancel
request is processed, cancel() may have no effect.
Use cancel_if_active() when you want to cancel only if the job has not
already finished. It is a no-op for terminal jobs, so it is safe to call at
any point without first checking state:
# Safe to call regardless of current job state
await job.cancel_if_active()
Execution Diagnostics¶
By default a Job stores only the measurement result (or error). Pass
fetch_diagnostics=True to create_job() to additionally populate
timing, metrics, and metadata on the job after each run reaches a terminal
state.
job = sdk.create_job(
program=qasm,
qpu_id="qpu:uk:2:d865b5a184",
fetch_diagnostics=True,
)
await job.execute()
print(job.timings) # dict | None — wall-clock and queue timings
print(job.metrics) # dict | None — gate counts, circuit depth, etc.
print(job.metadata) # dict | None — compiler and hardware metadata
The three attributes are None until the job has completed at least one run.
Each attribute is fetched independently. If one call fails (e.g. the data is
not yet available), that attribute remains None while the others may still
be populated — the rest of the job and its result are unaffected.
Diagnostics are preserved across save() / load() round-trips, and the
flag itself is also persisted so a reloaded job continues to fetch diagnostics
on subsequent runs.