Composite Jobs

A CompositeJob submits multiple quantum programs as a single unit and polls them all concurrently. Pass a list of programs to create_job() and the SDK creates a CompositeJob automatically — one child Job per program.

CompositeJob supports the same core operations as Job (execute(), outputs, completed, cancel_if_active()), so code written against one type works with the other. Where they differ is described below.

Tip

Coming from oqc-qcaas-client? The qcaas-client requires separate calls per QPU and has no broadcasting. See Migrating from oqc-qcaas-client for side-by-side examples of batch and cross-QPU execution.

Creating a Composite Job

Pass a list of programs to create_job() to create a CompositeJob. All programs are batch-submitted in a single call and polled concurrently:

>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
...     async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
...         qpu_id = 'qpu:uk:2:d865b5a184'
...         programA = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
...         programB = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; y q[0]; h q[0]; measure q[0] -> c[0];'
...         # A single program produces a Job; a list produces a CompositeJob
...         sjob = client.create_job(program=programA, qpu_id=qpu_id)
...         cjob = client.create_job(program=[programA, programB], qpu_id=qpu_id)
...         single_outputs = await sjob.execute()
...         batch_outputs = await cjob.execute()
...         return single_outputs, batch_outputs, programA
>>> single_outputs, batch_outputs, programA = asyncio.run(run())

Both execute() calls return a JobOutputProxy, so the filtering and mapping operations below work identically for single and composite jobs.

Broadcasting

create_job() accepts a list for program, qpu_id, and/or config. All three participate in the same broadcasting rule: lengths must be equal or 1. A length-1 value is repeated to match the others; equal-length lists are zipped element-by-element.

Many programs on one QPU — one config for all:

cjob = client.create_job(
    program=['prog_a', 'prog_b', 'prog_c'],
    qpu_id='qpu:uk:1:...',
)
# Three jobs, all on the same QPU, config=None for each

One program on many QPUs:

cjob = client.create_job(
    program=program,
    qpu_id=['qpu:uk:1:...', 'qpu:uk:2:...'],
)
# Same circuit run on each QPU

Paired programs, QPUs, and configs — all lists of the same length are zipped: job n runs program[n] on qpu_id[n] with config[n]:

cjob = client.create_job(
    program=[prog_a, prog_b],
    qpu_id=['qpu:uk:1:...', 'qpu:uk:2:...'],
    config=[config_a, config_b],
)
# Job 0: prog_a on qpu:uk:1 with config_a
# Job 1: prog_b on qpu:uk:2 with config_b

One config for many programs — scalar config broadcasts to every child:

cjob = client.create_job(
    program=[prog_a, prog_b, prog_c],
    qpu_id='qpu:uk:1:...',
    config=CompilerConfig(repeats=1000),
)
# All three jobs share the same config

If any two list arguments have lengths greater than 1 and differ, create_job() raises a ValueError. Passing an empty list for any argument also raises.

Filtering Results

results() returns only successful outputs; errors() returns only failed ones. Both return a new collection so operations can be chained:

single_outputs.results()  # successful outputs only
single_outputs.errors()   # failed outputs only

Supply a callable to filter() to apply custom criteria:

# Keep only outputs whose program matches programA
batch_outputs.filter(lambda o: o.program == programA)

Mapping over Results

map() applies a function to every item in the collection and returns a plain list. Chain it with results() to operate only on successful outputs:

zeros = sum(batch_outputs.results().map(lambda o: o.data['c']['0']))
ones  = sum(batch_outputs.results().map(lambda o: o.data['c']['1']))
total_shots = zeros + ones  # combined shot count across all programs

Accessing Results

The JobOutputProxy returned by execute() holds one entry per child job, in the same order as cjob.jobs. Use all() to iterate every result, or first / last for quick access to a single child. For the full JobResult field reference see Accessing Results in Working with Jobs.

outputs = await cjob.execute(timeout_s=30)

# Iterate every child's result (or error)
for item in outputs.all():
    if item.ok:
        print(item.data)            # measurement counts for this program
    else:
        print(item.error_message)   # error for this program

# Quick access to a single child
outputs.first.data['c']             # first child's counts only
outputs.last.data['c']              # last child's counts only

Accessor summary:

  • first / last — positional: first is index 0, last is the final item. For outputs this means first/last program submitted; for Job.history this means oldest/most-recent run.

  • one — returns the sole item; raises ValueError if count ≠ 1

  • all() — returns a plain list of all items

Note

outputs is a JobOutputProxy, so outputs.first / outputs.last work. cjob.jobs is a plain Python list, so .first is not available there — use cjob.jobs[0], cjob.jobs[-1], or a normal for loop.

Accessing child Jobs directly

CompositeJob does not have a single aggregate history. Instead, each child Job maintains its own independent history — one entry per execute() call made on that child. To inspect historical results, index into cjob.jobs and use the child’s history property:

first_child  = cjob.jobs[0]   # the Job for program[0]
second_child = cjob.jobs[1]   # the Job for program[1]

# Each child's history is a full JobOutputProxy (oldest-to-newest)
first_child.history.all()                                # all runs of child 0
first_child.history.results().map(lambda r: r.data["c"])   # counts per run

To compare all children’s results from a specific execute() round, use the JobOutputProxy it returned — that is the authoritative record of that round. Do not reconstruct it from per-child history, which has no cross-child ordering guarantee:

outputs_run1 = await cjob.execute(timeout_s=30)
outputs_run2 = await cjob.execute(timeout_s=30)

for item in outputs_run1.results().all():
    print(item.qpu_id, item.data)   # results from round 1 only

# Reconfigure one child independently and re-run only that child
first_child.reconfigure(program=new_qasm)
await first_child.execute(timeout_s=10)

Writing code that handles both Job and CompositeJob

Both types return a JobOutputProxy from execute(). outputs.all() is the safe, universal accessor — it returns a one-element list for a Job and an N-element list for a CompositeJob, so the same loop handles both:

outputs = await job_or_composite.execute(timeout_s=30)

for item in outputs.all():
    if item.ok:
        print(item.data)
    else:
        print(item.error_message)

Concurrent Execution

The SDK submits all child programs in a single batched request and then polls them concurrently using asyncio. You do not need to manage concurrency manually:

>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run_composite():
...     async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
...         qpu_id = 'qpu:uk:2:d865b5a184'
...         programA = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
...         programB = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; y q[0]; h q[0]; measure q[0] -> c[0];'
...         cjob = client.create_job(program=[programA, programB], qpu_id=qpu_id)
...         result = await cjob.execute(timeout_s=10)
...         return len(cjob)
>>> asyncio.run(run_composite())
2

Re-running a Composite Job

Calling execute() on a CompositeJob that has already completed applies the same re-execution logic as Re-running a Job in Working with Jobs, but per-child:

  • Terminal children (COMPLETED/FAILED/CANCELLED) are reset and included in a fresh batch submission.

  • Active children (SUBMITTED/RUNNING) are only polled — they are not re-submitted.

Inspect child job states at any time via active_jobs and terminal_jobs:

cjob.terminal_jobs    # children that have finished
cjob.active_jobs      # children still in-flight
await cjob.execute()  # resubmits terminal children, polls active ones

If you want every child to start fresh regardless of state — discarding any in-flight work — call cancel_if_active() before re-executing. This cancels all active children concurrently, putting them into a terminal state so execute() resubmits them all:

await cjob.cancel_if_active()  # cancels any in-flight children
await cjob.execute()           # all children resubmitted fresh

Matching Errors to Programs

When one or more child programs fail, the child Job objects retain the error details alongside the program that caused them.

Via child jobs

>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
...     async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
...         qpu_id = 'qpu:uk:2:d865b5a184'
...         good_program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
...         bad_program = 'OPENQASM 5.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
...         programs = [good_program, bad_program, good_program]
...         cjob = client.create_job(program=programs, qpu_id=qpu_id)
...         outputs = await cjob.execute(timeout_s=15)
...         failed_jobs = cjob.failed_jobs
...         successful_jobs = cjob.completed_jobs
...         failed_programs = [(j.program, j.error.error_message) for j in failed_jobs]
...         return len(cjob.jobs), len(failed_jobs), len(successful_jobs), 'OPENQASM 5.0' in failed_programs[0][0]
>>> asyncio.run(run())
(3, 1, 2, True)

Via the returned outputs

The collection returned by execute() also supports filtering — use results() for successful outputs and errors() for failed ones. Each item carries the original program attribute:

>>> from oqc_qcaas_sdk import OqcSdk
>>> import os
>>> import asyncio
>>> async def run():
...     async with OqcSdk(url=os.environ["OQC_URL"], authentication_token=os.environ["OQC_AUTHENTICATION_TOKEN"]) as client:
...         qpu_id = 'qpu:uk:2:d865b5a184'
...         good_program = 'OPENQASM 2.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
...         bad_program = 'OPENQASM 5.0; include "qelib1.inc"; qreg q[1]; creg c[1]; h q[0]; measure q[0] -> c[0];'
...         cjob = client.create_job(program=[good_program, bad_program], qpu_id=qpu_id)
...         outputs = await cjob.execute(timeout_s=30)
...         error_programs = [err.program for err in outputs.errors().all()]
...         success_programs = [res.program for res in outputs.results().all()]
...         total = len(error_programs) + len(success_programs)
...         return total, len(success_programs) > 0
>>> asyncio.run(run())
(2, True)

Cancellation

cancel() sends cancellation requests to all child jobs concurrently. cancel_if_active() does the same but skips any children that have already reached a terminal state:

# Cancel everything still in-flight; no-op for finished children
await cjob.cancel_if_active()