Composite Jobs¶
A CompositeJob submits multiple quantum programs as a single unit and
polls them all concurrently. Pass a list of programs to create_job()
and the SDK creates a CompositeJob automatically — one child Job per
program.
CompositeJob supports the same core operations as Job
(execute(), outputs, completed, cancel_if_active()), so
code written against one type works with the other. Where they differ is
described below.
Tip
Coming from oqc-qcaas-client? The qcaas-client requires separate
calls per QPU and has no broadcasting. See Migrating from oqc-qcaas-client for
side-by-side examples of batch and cross-QPU execution.
Creating a Composite Job¶
Pass a list of programs to create_job() to create a CompositeJob.
All programs are batch-submitted in a single call and polled concurrently:
>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
... async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
... qpu_id = 'qpu:uk:2:d865b5a184'
... programA = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
... programB = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; y q[0]; h q[0]; measure q[0] -> c[0];'
... # A single program produces a Job; a list produces a CompositeJob
... sjob = client.create_job(program=programA, qpu_id=qpu_id)
... cjob = client.create_job(program=[programA, programB], qpu_id=qpu_id)
... single_outputs = await sjob.execute()
... batch_outputs = await cjob.execute()
... return single_outputs, batch_outputs, programA
>>> single_outputs, batch_outputs, programA = asyncio.run(run())
Both execute() calls return a JobOutputProxy, so the filtering and
mapping operations below work identically for single and composite jobs.
Broadcasting¶
create_job() accepts a list for program, qpu_id, and/or config.
All three participate in the same broadcasting rule: lengths must be equal or
1. A length-1 value is repeated to match the others; equal-length lists are
zipped element-by-element.
Many programs on one QPU — one config for all:
cjob = client.create_job(
program=['prog_a', 'prog_b', 'prog_c'],
qpu_id='qpu:uk:1:...',
)
# Three jobs, all on the same QPU, config=None for each
One program on many QPUs:
cjob = client.create_job(
program=program,
qpu_id=['qpu:uk:1:...', 'qpu:uk:2:...'],
)
# Same circuit run on each QPU
Paired programs, QPUs, and configs — all lists of the same length are
zipped: job n runs program[n] on qpu_id[n] with config[n]:
cjob = client.create_job(
program=[prog_a, prog_b],
qpu_id=['qpu:uk:1:...', 'qpu:uk:2:...'],
config=[config_a, config_b],
)
# Job 0: prog_a on qpu:uk:1 with config_a
# Job 1: prog_b on qpu:uk:2 with config_b
One config for many programs — scalar config broadcasts to every child:
cjob = client.create_job(
program=[prog_a, prog_b, prog_c],
qpu_id='qpu:uk:1:...',
config=CompilerConfig(repeats=1000),
)
# All three jobs share the same config
If any two list arguments have lengths greater than 1 and differ, create_job()
raises a ValueError. Passing an empty list for any argument also raises.
Filtering Results¶
results() returns only successful outputs; errors() returns only
failed ones. Both return a new collection so operations can be chained:
single_outputs.results() # successful outputs only
single_outputs.errors() # failed outputs only
Supply a callable to filter() to apply custom criteria:
# Keep only outputs whose program matches programA
batch_outputs.filter(lambda o: o.program == programA)
Mapping over Results¶
map() applies a function to every item in the collection and returns a
plain list. Chain it with results() to operate only on successful outputs:
zeros = sum(batch_outputs.results().map(lambda o: o.data['c']['0']))
ones = sum(batch_outputs.results().map(lambda o: o.data['c']['1']))
total_shots = zeros + ones # combined shot count across all programs
Accessing Results¶
The JobOutputProxy returned by execute() holds one entry per child
job, in the same order as cjob.jobs. Use all() to iterate every
result, or first / last for quick access to a single child. For the
full JobResult field reference see Accessing Results in Working with Jobs.
outputs = await cjob.execute(timeout_s=30)
# Iterate every child's result (or error)
for item in outputs.all():
if item.ok:
print(item.data) # measurement counts for this program
else:
print(item.error_message) # error for this program
# Quick access to a single child
outputs.first.data['c'] # first child's counts only
outputs.last.data['c'] # last child's counts only
Accessor summary:
first/last— positional:firstis index 0,lastis the final item. Foroutputsthis means first/last program submitted; forJob.historythis means oldest/most-recent run.one— returns the sole item; raisesValueErrorif count ≠ 1all()— returns a plainlistof all items
Note
outputs is a JobOutputProxy, so outputs.first / outputs.last
work. cjob.jobs is a plain Python list, so .first is not
available there — use cjob.jobs[0], cjob.jobs[-1], or a normal
for loop.
Accessing child Jobs directly¶
CompositeJob does not have a single aggregate history. Instead, each
child Job maintains its own independent history — one entry per
execute() call made on that child. To inspect historical results, index
into cjob.jobs and use the child’s history property:
first_child = cjob.jobs[0] # the Job for program[0]
second_child = cjob.jobs[1] # the Job for program[1]
# Each child's history is a full JobOutputProxy (oldest-to-newest)
first_child.history.all() # all runs of child 0
first_child.history.results().map(lambda r: r.data["c"]) # counts per run
To compare all children’s results from a specific execute() round, use
the JobOutputProxy it returned — that is the authoritative record of that
round. Do not reconstruct it from per-child history, which has no cross-child
ordering guarantee:
outputs_run1 = await cjob.execute(timeout_s=30)
outputs_run2 = await cjob.execute(timeout_s=30)
for item in outputs_run1.results().all():
print(item.qpu_id, item.data) # results from round 1 only
# Reconfigure one child independently and re-run only that child
first_child.reconfigure(program=new_qasm)
await first_child.execute(timeout_s=10)
Writing code that handles both Job and CompositeJob
Both types return a JobOutputProxy from execute(). outputs.all()
is the safe, universal accessor — it returns a one-element list for a Job
and an N-element list for a CompositeJob, so the same loop handles both:
outputs = await job_or_composite.execute(timeout_s=30)
for item in outputs.all():
if item.ok:
print(item.data)
else:
print(item.error_message)
Concurrent Execution¶
The SDK submits all child programs in a single batched request and then polls
them concurrently using asyncio. You do not need to manage concurrency
manually:
>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run_composite():
... async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
... qpu_id = 'qpu:uk:2:d865b5a184'
... programA = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
... programB = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; y q[0]; h q[0]; measure q[0] -> c[0];'
... cjob = client.create_job(program=[programA, programB], qpu_id=qpu_id)
... result = await cjob.execute(timeout_s=10)
... return len(cjob)
>>> asyncio.run(run_composite())
2
Re-running a Composite Job¶
Calling execute() on a CompositeJob that has already completed applies
the same re-execution logic as Re-running a Job in Working with Jobs, but
per-child:
Terminal children (COMPLETED/FAILED/CANCELLED) are reset and included in a fresh batch submission.
Active children (SUBMITTED/RUNNING) are only polled — they are not re-submitted.
Inspect child job states at any time via active_jobs and terminal_jobs:
cjob.terminal_jobs # children that have finished
cjob.active_jobs # children still in-flight
await cjob.execute() # resubmits terminal children, polls active ones
If you want every child to start fresh regardless of state — discarding any
in-flight work — call cancel_if_active() before re-executing. This cancels
all active children concurrently, putting them into a terminal state so
execute() resubmits them all:
await cjob.cancel_if_active() # cancels any in-flight children
await cjob.execute() # all children resubmitted fresh
Matching Errors to Programs¶
When one or more child programs fail, the child Job objects retain the
error details alongside the program that caused them.
Via child jobs
>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
... async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
... qpu_id = 'qpu:uk:2:d865b5a184'
... good_program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
... bad_program = 'OPENQASM 5.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
... programs = [good_program, bad_program, good_program]
... cjob = client.create_job(program=programs, qpu_id=qpu_id)
... outputs = await cjob.execute(timeout_s=15)
... failed_jobs = cjob.failed_jobs
... successful_jobs = cjob.completed_jobs
... failed_programs = [(j.program, j.error.error_message) for j in failed_jobs]
... return len(cjob.jobs), len(failed_jobs), len(successful_jobs), 'OPENQASM 5.0' in failed_programs[0][0]
>>> asyncio.run(run())
(3, 1, 2, True)
Via the returned outputs
The collection returned by execute() also supports filtering — use
results() for successful outputs and errors() for failed ones. Each
item carries the original program attribute:
>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
... async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
... qpu_id = 'qpu:uk:2:d865b5a184'
... good_program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
... bad_program = 'OPENQASM 5.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
... cjob = client.create_job(program=[good_program, bad_program], qpu_id=qpu_id)
... outputs = await cjob.execute(timeout_s=30)
... error_programs = [err.program for err in outputs.errors().all()]
... success_programs = [res.program for res in outputs.results().all()]
... total = len(error_programs) + len(success_programs)
... return total, len(success_programs) > 0
>>> asyncio.run(run())
(2, True)
Cancellation¶
cancel() sends cancellation requests to all child jobs concurrently.
cancel_if_active() does the same but skips any children that have already
reached a terminal state:
# Cancel everything still in-flight; no-op for finished children
await cjob.cancel_if_active()