#! /usr/bin/env python3
"""
This app generates phased PCR primers with balanced nucleotide complexity—that is, sequences with even representation of A, T, C, and G across positions.
This helps reduce issues like base-calling noise and phasing problems in sequencing platforms like Illumina.
The app also includes a plot of the nucleotide balance by position of sequencing.
Author: Paul Munn, Genomics Innovation Hub, Cornell University
(based on a SHiny app written by Franziska Bonath: https://github.com/FranBonath/phased_primers_shiny/tree/main)
Version history:
- 07/28/2025: Original version
"""
import gradio as gr
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import random, tempfile, os, uuid
import string
# ───────── global variables ─────────
VERSION = "1.0" # bump when you release updates
# ────────────────────────────────────
# Defines IUPAC degenerate bases:
# Used to interpret ambiguous bases during complexity evaluation
def compile_library_of_special_nucs():
return {
"R": ["A", "G"], "Y": ["C", "T"], "S": ["G", "C"], "W": ["A", "T"],
"K": ["G", "T"], "M": ["A", "C"], "B": ["C", "G", "T"], "D": ["A", "G", "T"],
"H": ["A", "C", "T"], "V": ["A", "C", "G"], "N": ["A", "C", "G", "T"]
}
# This step normalizes special bases into standard nucleotides so we can assess actual base balance
def adjust_special_nucleotides(nuc_table):
"""Convert degenerate bases into fractional A/T/C/G counts."""
special = compile_library_of_special_nucs()
if nuc_table.shape[1] > 4:
# Convert to float to avoid dtype clashes when adding fractions
std = nuc_table.iloc[0, :4].astype(float).copy()
for col in nuc_table.columns[4:]:
if col in special:
cnt = nuc_table.at[0, col]
for nuc in special[col]:
std[nuc] += cnt / len(special[col])
return pd.DataFrame([std])
# If only A/T/C/G in table, still make sure they're floats
return nuc_table.iloc[:, :4].astype(float)
# This is the core algorithm for creating phased primers with maximum nucleotide diversity at early positions
# Primer complexity is engineered and evaluated through:
# 1) Base balance: Calculated per-position relative abundance of A/T/C/G.
# 2) Degenerate bases: Interpreted to contribute partially to standard nucleotides.
# 3) Selection strategy: At each phase step, the least-represented base is chosen to improve uniformity.
# This ensures that when multiple phased primers are pooled, the sequencer sees a mix of nucleotides at early cycles,
# reducing issues with signal calibration and increasing sequencing quality.
#
# Detailed steps:
# 1) Split the primer into a list of characters:
# → CCTAHGGGRBGCAGCAG → ['C','C','T','A','H',...]
# 2) Create amount_of_phasing + 1 identical copies of this primer.
# 3) Iteratively prepend nucleotides to the copies to maximize base diversity at early sequencing cycles:
# For each new position (phase_count from amount_of_phasing down to 1):
# - Take a subset of original primer of length phase_count.
# - Combine with previously added phasing bases.
# - Count the frequency of all nucleotides (including special).
# - Normalize using adjust_special_nucleotides().
# - Find the least-represented base at this position.
# - If there's a tie, prefer one that’s in the current primer context.
# - Add it to the beginning of appropriate phased copies.
# 4) Result: a list of staggered primers (with added nucleotides up front) that differ only by a few bases,
# but introduce complexity early in the read.
def phase_primer(primer, phasing, chemistry):
"""
All chemistries enforce:
• No 4 identical bases in a row at the front.
• No 5-long run consisting only of C/G at the front.
Random base preferences when needed:
• Two-channels (original SBS): use A/C/T only for random picks (occasionally pick G at random).
• Two-channels (XLEAP-SBS) : prefer C/T, allow A less often; occasionally pick G at random.
• Others (Four-channel, One-channel): uniform among legal candidates.
"""
if phasing == 0:
return [list(primer)]
def violates_constraints(current_added, cand):
# No 4 identical bases in a row at the front
if len(current_added) >= 3 and all(x == cand for x in current_added[:3]):
return True
# No 5-long run consisting only of C/G at the front
if cand in {"C", "G"} and len(current_added) >= 4 and all(x in {"C", "G"} for x in current_added[:4]):
return True
return False
def pick_with_weights(pool, weights_map):
try:
w = [weights_map.get(b, 1.0) for b in pool]
if all(v <= 0 for v in w):
return random.choice(pool)
return random.choices(pool, weights=w, k=1)[0]
except Exception:
return random.choice(pool)
split = list(primer)
phased = [split.copy() for _ in range(phasing + 1)]
added = []
special = compile_library_of_special_nucs()
for phase_cnt in range(phasing, 0, -1):
subset = split[:phase_cnt]
combined = added + subset
counts = pd.Series(combined).value_counts()
all_nucs = ["A", "T", "C", "G"] + list(special.keys())
data = {n: counts.get(n, 0) for n in all_nucs}
df = adjust_special_nucleotides(pd.DataFrame([data])) # columns A/T/C/G (floats)
# minimal count tie-set among A/T/C/G
# compute the tie set among bases with the current minimum count
two_channel = chemistry in {"Two-channels (original SBS)", "Two-channels (XLEAP-SBS)"}
cols_to_consider = ["A", "C", "T"] if two_channel else ["A", "C", "T", "G"]
df_min = df[cols_to_consider]
min_val = df_min.min(axis=1).iat[0]
choices = df_min.columns[df_min.iloc[0] == min_val].tolist()
# enforce constraints first
legal_choices = [b for b in choices if not violates_constraints(added, b)]
# prefer choices present in the current subset when possible
if legal_choices:
base_pool = [c for c in legal_choices if c in subset] or legal_choices
else:
all_legal = [b for b in ["A", "C", "T", "G"] if not violates_constraints(added, b)]
base_pool = [c for c in all_legal if c in subset] or all_legal or choices
# chemistry-specific random selection rules (on constrained pool)
# print('base pool: ', base_pool)
if len(base_pool) == 1:
chosen = base_pool[0]
elif chemistry == "Two-channels (original SBS)":
# occasionally pick G at random
weights = {"A": 1.0, "C": 1.0, "T": 1.0, "G": 0.5} # G occasionally selected when random
chosen = pick_with_weights(base_pool, weights)
elif chemistry == "Two-channels (XLEAP-SBS)":
# Prefer C/T; A less often; occasionally pick G at random
weights = {"C": 1.0, "T": 1.0, "A": 0.5, "G": 0.5} # G occasionally selected when random
chosen = pick_with_weights(base_pool, weights)
else:
# Four-channel & One-channel: uniform among constrained pool
chosen = random.choice(base_pool)
# prepend chosen base into all downstream phased primers
added = [chosen] + added
for i in range(phasing - phase_cnt + 1, phasing + 1):
phased[i] = [chosen] + phased[i]
return phased
# Combines each phased primer with the adapter, and names them
def design_primers(phased, adapter, raw):
rows = []
for i, p in enumerate(phased):
phase_len = len(p) - len(raw)
phasing = ''.join(p[:phase_len])
gene = ''.join(p[phase_len:])
full = adapter + phasing + gene
rows.append(
dict(primer_name=f"primer_{i+1}",
adapter=adapter,
phasing_bases=phasing,
gene_specific_primer=gene,
full_sequence=full)
)
return pd.DataFrame(rows)
def plot_nuc(primer_list):
# Use the shortest primer length so every position has data across all primers
if not primer_list:
fig, ax = plt.subplots(figsize=(10, 3))
ax.text(0.5, 0.5, "No primers to plot.", ha="center", va="center")
ax.axis("off")
return fig
L = min(len(p) for p in primer_list)
if L == 0:
fig, ax = plt.subplots(figsize=(10, 3))
ax.text(0.5, 0.5, "Primers have zero length.", ha="center", va="center")
ax.axis("off")
return fig
rows = [[ch.upper() for ch in p[:L]] for p in primer_list]
mat = np.array(rows)
total = mat.shape[0]
# Targets and colors
nucs = ["A", "C", "T", "G"]
colors = {"A": "green", "C": "blue", "T": "red", "G": "orange"}
# Degenerate-to-standard translation
special = compile_library_of_special_nucs()
# Floating counts for fractional contributions from degenerate bases
counts = {n: np.zeros(L, dtype=float) for n in nucs}
for j in range(L):
col = mat[:, j]
for b in col:
if b in nucs:
counts[b][j] += 1.0
elif b in special: # split among mapped bases
mapped = special[b]
frac = 1.0 / len(mapped)
for m in mapped:
counts[m][j] += frac
else:
pass # unknown character → ignore
# ── Figure with a second subplot showing raw nucleotides per primer under the bars ──
# ---- Dynamic sizing for the figure and subplots ----
TOP_IN = 4.2 # inches reserved for the bar plot
PER_ROW_IN = 0.3 # inches per primer row
BOTTOM_MIN_IN = 1.2 # clamp min height of the bottom panel
BOTTOM_MAX_IN = 8.0 # clamp max height of the bottom panel
bottom_in = float(np.clip(total * PER_ROW_IN, BOTTOM_MIN_IN, BOTTOM_MAX_IN))
fig = plt.figure(figsize=(10, TOP_IN + bottom_in))
gs = fig.add_gridspec(
nrows=2, ncols=1,
height_ratios=[TOP_IN, bottom_in],
hspace=0.32 # ↑ more space between top and bottom subplots (was ~0.25–0.30)
)
ax = fig.add_subplot(gs[0])
ax2 = fig.add_subplot(gs[1], sharex=ax)
# ---- (your existing bar plotting code for ax) ----
x = np.arange(L)
nucs = ["A", "C", "T", "G"]
colors = {"A": "green", "C": "blue", "T": "red", "G": "orange"}
group_width = 0.55
width = group_width / len(nucs)
offsets = (np.arange(len(nucs)) - (len(nucs)-1)/2) * width
for i, n in enumerate(nucs):
vals = (counts[n] / total) * 100.0
ax.bar(x + offsets[i], vals, width=width, label=n, color=colors[n])
ax.set(
xlabel="Base position from 5′ end (after phasing)",
ylabel="Nucleotide frequency (%)",
title="Nucleotide Composition by Position Across Phased Primers",
ylim=(0, 100),
xlim=(-0.5, L - 0.5),
)
ax.set_xticks(x)
ax.set_xticklabels([str(i + 1) for i in range(L)])
ax.axhline(25, color="black", linestyle="--", linewidth=1)
ax.legend(title="Nucleotide", bbox_to_anchor=(1.02, 1), loc="upper left")
# ---- Bottom subplot: adaptive row spacing + font size ----
# Make spacing slightly tighter as rows grow, looser when few rows
row_spacing = float(np.interp(total, [5, 40], [1.80, 1.35])) # ↑ larger at high totals
fontsize = float(np.interp(total, [5, 40], [10.0, 7.8])) # optional: tiny shrink with many rows
ax2.set_xlim(-0.5, L - 0.5)
ax2.set_ylim(row_spacing * (total - 1) + 0.5, -0.5) # accommodate spacing
ax2.set_yticks([])
ax2.set_xticks(x)
ax2.set_xticklabels([str(i + 1) for i in range(L)])
for spine in ["top", "right", "left"]:
ax2.spines[spine].set_visible(False)
# Draw letters (bold)
for j in range(L):
col = mat[:, j]
for r, base in enumerate(col):
c = colors.get(base, "#666666")
y = r * row_spacing
ax2.text(
j, y, base,
ha="center", va="center",
fontsize=fontsize,
fontweight="bold",
fontfamily="DejaVu Sans Mono",
color=c
)
ax2.set_xlabel("Base position (per-primer bases shown below each position)")
plt.subplots_adjust(left=0.075, right=0.87)
plt.tight_layout()
return fig
def get_color_spec(chem):
"""
Returns a list of groups: (legend_label, bar_color, bases_set)
Note: No 'Deg' group. Degenerate bases are split fractionally among A/C/G/T
and then aggregated into these groups.
"""
if chem == "Four-channels (HiSeq & MiSeq)":
# Two bars: A/C (red) and G/T (green)
return [
("A / C", "red", {"A", "C"}),
("G / T", "green", {"G", "T"}),
]
elif chem == "Two-channels (original SBS)":
return [
("A", "orange", {"A"}),
("C", "red", {"C"}),
("T", "green", {"T"}),
("G", "gray", {"G"}),
]
elif chem == "Two-channels (XLEAP-SBS)":
return [
("A", "blue", {"A"}),
("C", "cyan", {"C"}),
("T", "green", {"T"}),
("G", "gray", {"G"}),
]
else: # "One-channel (iSeq 100)"
return [
("A", "green", {"A"}),
("C", "blue", {"C"}),
("T", "red", {"T"}),
("G", "gray", {"G"}),
]
def plot_colors(primer_list, chemistry):
# Use the shortest length so every position has data across all primers
if not primer_list:
fig, ax = plt.subplots(figsize=(10, 3))
ax.text(0.5, 0.5, "No primers to plot.", ha="center", va="center")
ax.axis("off")
return fig
L = min(len(p) for p in primer_list)
if L == 0:
fig, ax = plt.subplots(figsize=(10, 3))
ax.text(0.5, 0.5, "Primers have zero length.", ha="center", va="center")
ax.axis("off")
return fig
rows = [[ch.upper() for ch in p[:L]] for p in primer_list]
mat = np.array(rows)
total = mat.shape[0]
# Fractional translation of degenerate bases → A/C/G/T
special = compile_library_of_special_nucs()
bases = ["A", "C", "G", "T"]
base_counts = {b: np.zeros(L, dtype=float) for b in bases}
for j in range(L):
col = mat[:, j]
for b in col:
if b in base_counts:
base_counts[b][j] += 1.0
elif b in special:
mapped = special[b]
frac = 1.0 / len(mapped)
for m in mapped:
if m in base_counts:
base_counts[m][j] += frac
else:
pass # unknown → ignore
# Build plotting groups per chemistry
groups = []
if chemistry == "Two-channels (XLEAP-SBS)":
# C contributes to BOTH A and T channels; remove separate C bar
groups = [
("A / C", "blue", base_counts["A"] + base_counts["C"]),
("T / C", "green", base_counts["T"] + base_counts["C"]),
("G", "gray", base_counts["G"]),
]
elif chemistry == "Two-channels (original SBS)":
# A contributes to BOTH C and T channels; remove separate A bar
groups = [
("A / C", "red", base_counts["A"] + base_counts["C"]),
("A / T", "green", base_counts["A"] + base_counts["T"]),
("G", "gray", base_counts["G"]),
]
else:
# Other chemistries from spec (no duplication)
spec = get_color_spec(chemistry) # list of (label, color, set_of_bases)
for lab, color, bset in spec:
arr = np.zeros(L, dtype=float)
for b in bset:
arr += base_counts[b]
groups.append((lab, color, arr))
# Plot with wider spacing between groups
fig, ax = plt.subplots(figsize=(10, 5))
G = len(groups)
x = np.arange(L)
group_width = 0.55 # shrink per-position cluster to widen gaps
width = group_width / G
offsets = (np.arange(G) - (G - 1) / 2) * width
# Allow y-axis to exceed 100% if a channel duplicates contributions (XLEAP/original SBS)
max_pct = 0.0
for i, (lab, color, arr) in enumerate(groups):
vals = (arr / total) * 100.0
max_pct = max(max_pct, float(np.max(vals)) if vals.size else 0.0)
ax.bar(x + offsets[i], vals, width=width, label=lab, color=color)
ax.set(
xlabel="Base position from 5′ end (after phasing)",
ylabel="Fluorescent signal representation (%)",
title=f"Color Channel Composition by Position\n({chemistry})",
xlim=(-0.5, L - 0.5),
ylim=(0, max(100, max_pct * 1.05) if max_pct > 0 else 100),
)
ax.set_xticks(x)
ax.set_xticklabels([str(i + 1) for i in range(L)])
# --- add a black dotted horizontal line at 25% ---
ax.axhline(25, color="black", linestyle="--", linewidth=1)
ax.legend(title="Nucleotides", bbox_to_anchor=(1.02, 1), loc="upper left")
plt.tight_layout()
return fig
# ---------- util: save fig / df to temp files ----------
# This is a utility function to save a figure to a temporary file
# and return the path to the file.
# It uses the tempfile module to get a temporary directory and a unique file name.
# It then saves the figure to the file and closes it.
# Finally, it returns the path to the file.
def _tmp_path(filename: str) -> str:
tmp_dir = tempfile.mkdtemp() # unique dir per run
return os.path.join(tmp_dir, filename)
def save_fig_fixed(fig, filename: str) -> str:
path = _tmp_path(filename)
fig.savefig(path, dpi=300, bbox_inches="tight")
plt.close(fig)
return path
def save_csv_fixed(df, filename: str) -> str:
path = _tmp_path(filename)
df.to_csv(path, index=False)
return path
# ---------- main gradio function ----------
def run_tool(phasing, primer, adapter, chemistry):
primers = phase_primer(primer, phasing, chemistry)
df = design_primers(primers, adapter, primer)
fig_nuc = plot_nuc(primers)
fig_color = plot_colors(primers, chemistry)
# fixed names for downloads
nuc_png = save_fig_fixed(fig_nuc, "nucleotide_percentages_plot.png")
clr_png = save_fig_fixed(fig_color, "color_percentages_plot.png")
csv_path = save_csv_fixed(df, "primers.csv")
return fig_nuc, fig_color, df, nuc_png, clr_png, csv_path
chemistries = [
"Four-channels (HiSeq & MiSeq)",
"Two-channels (original SBS)",
"Two-channels (XLEAP-SBS)",
"One-channel (iSeq 100)",
]
with gr.Blocks(css="""
/* Inputs panel (light mode) */
.input-panel { background:#f3f3f3; padding:16px; border-radius:8px; }
/* Inputs panel (dark mode overrides) */
html.dark .input-panel, body.dark .input-panel, [data-theme*="dark"] .input-panel {
background:#2b2b2b !important; color:inherit; border:1px solid rgba(255,255,255,0.08);
}
.inline-row { align-items:center; gap:12px; }
/* Ticks that adapt to theme */
.tick-grid {
display:grid;
grid-template-columns:repeat(22,1fr);
column-gap:35px;
font-size:11px;
padding:0 0 8px 0;
margin-left:20px;
justify-items:start;
user-select:none;
color:inherit;
}
.tick-grid > span { justify-self:start; }
@media (prefers-color-scheme: dark) { .tick-grid { color: rgba(255,255,255,.9); } }
@media (prefers-color-scheme: light) { .tick-grid { color: #444; } }
/* give the radio extra width */
#custom-mode { min-width: 160px; }
""") as demo:
gr.Markdown(
f"""
Phased PCR1 Amplicon Primer Designer
(version {VERSION})
This app generates phased PCR primers with balanced nucleotide complexity — that is, sequences with even representation of A, T, C, and G across positions.
"""
)
# ───── inputs on light‑gray panel ─────
with gr.Column(elem_classes="input-panel"):
phasing_slider = gr.Slider(
minimum=0, maximum=21, value=0, step=1,
label="Amount of primer phasing (integer)"
)
ticks = ''.join(f'{i}' for i in range(22))
gr.HTML(f"{ticks}
")
primer_in = gr.Textbox("CCTAHGGGRBGCAGCAG",
label="Gene‑specific primer (5′→3′)")
# adapter_in = gr.Textbox("ACACTCTTTCCCTACACGACGCTCTTCCGATCT",
# label="Adapter sequence (5′→3′)")
adapter_in = gr.Dropdown(
choices=[
"ACACTCTTTCCCTACACGACGCTCTTCCGATCT",
"GTGACTGGAGTTCAGACGTGTGCTCTTCCGATCT",
"TCGTCGGCAGCGTCAGATGTGTATAAGAGACAG",
"GTCTCGTGGGCTCGGAGATGTGTATAAGAGACAG",
],
value="ACACTCTTTCCCTACACGACGCTCTTCCGATCT",
label="Adapter"
)
chem_in = gr.Dropdown(
["Four-channels (HiSeq & MiSeq)",
"Two-channels (original SBS)",
"Two-channels (XLEAP-SBS)",
"One-channel (iSeq 100)"],
value="Four-channels (HiSeq & MiSeq)",
label="Sequencing chemistry"
)
run_btn = gr.Button("Generate Primers")
# ───── outputs: plots then table & downloads ─────
nuc_plot = gr.Plot(label="Nucleotide Composition by Position")
clr_plot = gr.Plot(label="Color Channel Composition by Position")
gr.Markdown("
")
table_out = gr.Dataframe(label="Generated Primers")
file_nuc = gr.File(label="Download nucleotide_percentages_plot.png")
file_clr = gr.File(label="Download color_percentages_plot.png")
file_csv = gr.File(label="Download primers.csv")
run_btn.click(
run_tool,
inputs=[phasing_slider, primer_in, adapter_in, chem_in],
outputs=[nuc_plot, clr_plot, table_out, file_nuc, file_clr, file_csv]
)
if __name__ == "__main__":
demo.launch()
# demo.launch(server_name="0.0.0.0", server_port=7860, share=True)