#! /usr/bin/env python3 """ This app generates phased PCR primers with balanced nucleotide complexity—that is, sequences with even representation of A, T, C, and G across positions. This helps reduce issues like base-calling noise and phasing problems in sequencing platforms like Illumina. The app also includes a plot of the nucleotide balance by position of sequencing. Author: Paul Munn, Genomics Innovation Hub, Cornell University (based on a SHiny app written by Franziska Bonath: https://github.com/FranBonath/phased_primers_shiny/tree/main) Version history: - 07/28/2025: Original version """ import gradio as gr import pandas as pd import numpy as np import matplotlib.pyplot as plt import random, tempfile, os, uuid import string # ───────── global variables ───────── VERSION = "1.0" # bump when you release updates # ──────────────────────────────────── # Defines IUPAC degenerate bases: # Used to interpret ambiguous bases during complexity evaluation def compile_library_of_special_nucs(): return { "R": ["A", "G"], "Y": ["C", "T"], "S": ["G", "C"], "W": ["A", "T"], "K": ["G", "T"], "M": ["A", "C"], "B": ["C", "G", "T"], "D": ["A", "G", "T"], "H": ["A", "C", "T"], "V": ["A", "C", "G"], "N": ["A", "C", "G", "T"] } # This step normalizes special bases into standard nucleotides so we can assess actual base balance def adjust_special_nucleotides(nuc_table): """Convert degenerate bases into fractional A/T/C/G counts.""" special = compile_library_of_special_nucs() if nuc_table.shape[1] > 4: # Convert to float to avoid dtype clashes when adding fractions std = nuc_table.iloc[0, :4].astype(float).copy() for col in nuc_table.columns[4:]: if col in special: cnt = nuc_table.at[0, col] for nuc in special[col]: std[nuc] += cnt / len(special[col]) return pd.DataFrame([std]) # If only A/T/C/G in table, still make sure they're floats return nuc_table.iloc[:, :4].astype(float) # This is the core algorithm for creating phased primers with maximum nucleotide diversity at early positions # Primer complexity is engineered and evaluated through: # 1) Base balance: Calculated per-position relative abundance of A/T/C/G. # 2) Degenerate bases: Interpreted to contribute partially to standard nucleotides. # 3) Selection strategy: At each phase step, the least-represented base is chosen to improve uniformity. # This ensures that when multiple phased primers are pooled, the sequencer sees a mix of nucleotides at early cycles, # reducing issues with signal calibration and increasing sequencing quality. # # Detailed steps: # 1) Split the primer into a list of characters: # → CCTAHGGGRBGCAGCAG → ['C','C','T','A','H',...] # 2) Create amount_of_phasing + 1 identical copies of this primer. # 3) Iteratively prepend nucleotides to the copies to maximize base diversity at early sequencing cycles: # For each new position (phase_count from amount_of_phasing down to 1): # - Take a subset of original primer of length phase_count. # - Combine with previously added phasing bases. # - Count the frequency of all nucleotides (including special). # - Normalize using adjust_special_nucleotides(). # - Find the least-represented base at this position. # - If there's a tie, prefer one that’s in the current primer context. # - Add it to the beginning of appropriate phased copies. # 4) Result: a list of staggered primers (with added nucleotides up front) that differ only by a few bases, # but introduce complexity early in the read. def phase_primer(primer, phasing, chemistry): """ All chemistries enforce: • No 4 identical bases in a row at the front. • No 5-long run consisting only of C/G at the front. Random base preferences when needed: • Two-channels (original SBS): use A/C/T only for random picks (occasionally pick G at random). • Two-channels (XLEAP-SBS) : prefer C/T, allow A less often; occasionally pick G at random. • Others (Four-channel, One-channel): uniform among legal candidates. """ if phasing == 0: return [list(primer)] def violates_constraints(current_added, cand): # No 4 identical bases in a row at the front if len(current_added) >= 3 and all(x == cand for x in current_added[:3]): return True # No 5-long run consisting only of C/G at the front if cand in {"C", "G"} and len(current_added) >= 4 and all(x in {"C", "G"} for x in current_added[:4]): return True return False def pick_with_weights(pool, weights_map): try: w = [weights_map.get(b, 1.0) for b in pool] if all(v <= 0 for v in w): return random.choice(pool) return random.choices(pool, weights=w, k=1)[0] except Exception: return random.choice(pool) split = list(primer) phased = [split.copy() for _ in range(phasing + 1)] added = [] special = compile_library_of_special_nucs() for phase_cnt in range(phasing, 0, -1): subset = split[:phase_cnt] combined = added + subset counts = pd.Series(combined).value_counts() all_nucs = ["A", "T", "C", "G"] + list(special.keys()) data = {n: counts.get(n, 0) for n in all_nucs} df = adjust_special_nucleotides(pd.DataFrame([data])) # columns A/T/C/G (floats) # minimal count tie-set among A/T/C/G # compute the tie set among bases with the current minimum count two_channel = chemistry in {"Two-channels (original SBS)", "Two-channels (XLEAP-SBS)"} cols_to_consider = ["A", "C", "T"] if two_channel else ["A", "C", "T", "G"] df_min = df[cols_to_consider] min_val = df_min.min(axis=1).iat[0] choices = df_min.columns[df_min.iloc[0] == min_val].tolist() # enforce constraints first legal_choices = [b for b in choices if not violates_constraints(added, b)] # prefer choices present in the current subset when possible if legal_choices: base_pool = [c for c in legal_choices if c in subset] or legal_choices else: all_legal = [b for b in ["A", "C", "T", "G"] if not violates_constraints(added, b)] base_pool = [c for c in all_legal if c in subset] or all_legal or choices # chemistry-specific random selection rules (on constrained pool) # print('base pool: ', base_pool) if len(base_pool) == 1: chosen = base_pool[0] elif chemistry == "Two-channels (original SBS)": # occasionally pick G at random weights = {"A": 1.0, "C": 1.0, "T": 1.0, "G": 0.5} # G occasionally selected when random chosen = pick_with_weights(base_pool, weights) elif chemistry == "Two-channels (XLEAP-SBS)": # Prefer C/T; A less often; occasionally pick G at random weights = {"C": 1.0, "T": 1.0, "A": 0.5, "G": 0.5} # G occasionally selected when random chosen = pick_with_weights(base_pool, weights) else: # Four-channel & One-channel: uniform among constrained pool chosen = random.choice(base_pool) # prepend chosen base into all downstream phased primers added = [chosen] + added for i in range(phasing - phase_cnt + 1, phasing + 1): phased[i] = [chosen] + phased[i] return phased # Combines each phased primer with the adapter, and names them def design_primers(phased, adapter, raw): rows = [] for i, p in enumerate(phased): phase_len = len(p) - len(raw) phasing = ''.join(p[:phase_len]) gene = ''.join(p[phase_len:]) full = adapter + phasing + gene rows.append( dict(primer_name=f"primer_{i+1}", adapter=adapter, phasing_bases=phasing, gene_specific_primer=gene, full_sequence=full) ) return pd.DataFrame(rows) def plot_nuc(primer_list): # Use the shortest primer length so every position has data across all primers if not primer_list: fig, ax = plt.subplots(figsize=(10, 3)) ax.text(0.5, 0.5, "No primers to plot.", ha="center", va="center") ax.axis("off") return fig L = min(len(p) for p in primer_list) if L == 0: fig, ax = plt.subplots(figsize=(10, 3)) ax.text(0.5, 0.5, "Primers have zero length.", ha="center", va="center") ax.axis("off") return fig rows = [[ch.upper() for ch in p[:L]] for p in primer_list] mat = np.array(rows) total = mat.shape[0] # Targets and colors nucs = ["A", "C", "T", "G"] colors = {"A": "green", "C": "blue", "T": "red", "G": "orange"} # Degenerate-to-standard translation special = compile_library_of_special_nucs() # Floating counts for fractional contributions from degenerate bases counts = {n: np.zeros(L, dtype=float) for n in nucs} for j in range(L): col = mat[:, j] for b in col: if b in nucs: counts[b][j] += 1.0 elif b in special: # split among mapped bases mapped = special[b] frac = 1.0 / len(mapped) for m in mapped: counts[m][j] += frac else: pass # unknown character → ignore # ── Figure with a second subplot showing raw nucleotides per primer under the bars ── # ---- Dynamic sizing for the figure and subplots ---- TOP_IN = 4.2 # inches reserved for the bar plot PER_ROW_IN = 0.3 # inches per primer row BOTTOM_MIN_IN = 1.2 # clamp min height of the bottom panel BOTTOM_MAX_IN = 8.0 # clamp max height of the bottom panel bottom_in = float(np.clip(total * PER_ROW_IN, BOTTOM_MIN_IN, BOTTOM_MAX_IN)) fig = plt.figure(figsize=(10, TOP_IN + bottom_in)) gs = fig.add_gridspec( nrows=2, ncols=1, height_ratios=[TOP_IN, bottom_in], hspace=0.32 # ↑ more space between top and bottom subplots (was ~0.25–0.30) ) ax = fig.add_subplot(gs[0]) ax2 = fig.add_subplot(gs[1], sharex=ax) # ---- (your existing bar plotting code for ax) ---- x = np.arange(L) nucs = ["A", "C", "T", "G"] colors = {"A": "green", "C": "blue", "T": "red", "G": "orange"} group_width = 0.55 width = group_width / len(nucs) offsets = (np.arange(len(nucs)) - (len(nucs)-1)/2) * width for i, n in enumerate(nucs): vals = (counts[n] / total) * 100.0 ax.bar(x + offsets[i], vals, width=width, label=n, color=colors[n]) ax.set( xlabel="Base position from 5′ end (after phasing)", ylabel="Nucleotide frequency (%)", title="Nucleotide Composition by Position Across Phased Primers", ylim=(0, 100), xlim=(-0.5, L - 0.5), ) ax.set_xticks(x) ax.set_xticklabels([str(i + 1) for i in range(L)]) ax.axhline(25, color="black", linestyle="--", linewidth=1) ax.legend(title="Nucleotide", bbox_to_anchor=(1.02, 1), loc="upper left") # ---- Bottom subplot: adaptive row spacing + font size ---- # Make spacing slightly tighter as rows grow, looser when few rows row_spacing = float(np.interp(total, [5, 40], [1.80, 1.35])) # ↑ larger at high totals fontsize = float(np.interp(total, [5, 40], [10.0, 7.8])) # optional: tiny shrink with many rows ax2.set_xlim(-0.5, L - 0.5) ax2.set_ylim(row_spacing * (total - 1) + 0.5, -0.5) # accommodate spacing ax2.set_yticks([]) ax2.set_xticks(x) ax2.set_xticklabels([str(i + 1) for i in range(L)]) for spine in ["top", "right", "left"]: ax2.spines[spine].set_visible(False) # Draw letters (bold) for j in range(L): col = mat[:, j] for r, base in enumerate(col): c = colors.get(base, "#666666") y = r * row_spacing ax2.text( j, y, base, ha="center", va="center", fontsize=fontsize, fontweight="bold", fontfamily="DejaVu Sans Mono", color=c ) ax2.set_xlabel("Base position (per-primer bases shown below each position)") plt.subplots_adjust(left=0.075, right=0.87) plt.tight_layout() return fig def get_color_spec(chem): """ Returns a list of groups: (legend_label, bar_color, bases_set) Note: No 'Deg' group. Degenerate bases are split fractionally among A/C/G/T and then aggregated into these groups. """ if chem == "Four-channels (HiSeq & MiSeq)": # Two bars: A/C (red) and G/T (green) return [ ("A / C", "red", {"A", "C"}), ("G / T", "green", {"G", "T"}), ] elif chem == "Two-channels (original SBS)": return [ ("A", "orange", {"A"}), ("C", "red", {"C"}), ("T", "green", {"T"}), ("G", "gray", {"G"}), ] elif chem == "Two-channels (XLEAP-SBS)": return [ ("A", "blue", {"A"}), ("C", "cyan", {"C"}), ("T", "green", {"T"}), ("G", "gray", {"G"}), ] else: # "One-channel (iSeq 100)" return [ ("A", "green", {"A"}), ("C", "blue", {"C"}), ("T", "red", {"T"}), ("G", "gray", {"G"}), ] def plot_colors(primer_list, chemistry): # Use the shortest length so every position has data across all primers if not primer_list: fig, ax = plt.subplots(figsize=(10, 3)) ax.text(0.5, 0.5, "No primers to plot.", ha="center", va="center") ax.axis("off") return fig L = min(len(p) for p in primer_list) if L == 0: fig, ax = plt.subplots(figsize=(10, 3)) ax.text(0.5, 0.5, "Primers have zero length.", ha="center", va="center") ax.axis("off") return fig rows = [[ch.upper() for ch in p[:L]] for p in primer_list] mat = np.array(rows) total = mat.shape[0] # Fractional translation of degenerate bases → A/C/G/T special = compile_library_of_special_nucs() bases = ["A", "C", "G", "T"] base_counts = {b: np.zeros(L, dtype=float) for b in bases} for j in range(L): col = mat[:, j] for b in col: if b in base_counts: base_counts[b][j] += 1.0 elif b in special: mapped = special[b] frac = 1.0 / len(mapped) for m in mapped: if m in base_counts: base_counts[m][j] += frac else: pass # unknown → ignore # Build plotting groups per chemistry groups = [] if chemistry == "Two-channels (XLEAP-SBS)": # C contributes to BOTH A and T channels; remove separate C bar groups = [ ("A / C", "blue", base_counts["A"] + base_counts["C"]), ("T / C", "green", base_counts["T"] + base_counts["C"]), ("G", "gray", base_counts["G"]), ] elif chemistry == "Two-channels (original SBS)": # A contributes to BOTH C and T channels; remove separate A bar groups = [ ("A / C", "red", base_counts["A"] + base_counts["C"]), ("A / T", "green", base_counts["A"] + base_counts["T"]), ("G", "gray", base_counts["G"]), ] else: # Other chemistries from spec (no duplication) spec = get_color_spec(chemistry) # list of (label, color, set_of_bases) for lab, color, bset in spec: arr = np.zeros(L, dtype=float) for b in bset: arr += base_counts[b] groups.append((lab, color, arr)) # Plot with wider spacing between groups fig, ax = plt.subplots(figsize=(10, 5)) G = len(groups) x = np.arange(L) group_width = 0.55 # shrink per-position cluster to widen gaps width = group_width / G offsets = (np.arange(G) - (G - 1) / 2) * width # Allow y-axis to exceed 100% if a channel duplicates contributions (XLEAP/original SBS) max_pct = 0.0 for i, (lab, color, arr) in enumerate(groups): vals = (arr / total) * 100.0 max_pct = max(max_pct, float(np.max(vals)) if vals.size else 0.0) ax.bar(x + offsets[i], vals, width=width, label=lab, color=color) ax.set( xlabel="Base position from 5′ end (after phasing)", ylabel="Fluorescent signal representation (%)", title=f"Color Channel Composition by Position\n({chemistry})", xlim=(-0.5, L - 0.5), ylim=(0, max(100, max_pct * 1.05) if max_pct > 0 else 100), ) ax.set_xticks(x) ax.set_xticklabels([str(i + 1) for i in range(L)]) # --- add a black dotted horizontal line at 25% --- ax.axhline(25, color="black", linestyle="--", linewidth=1) ax.legend(title="Nucleotides", bbox_to_anchor=(1.02, 1), loc="upper left") plt.tight_layout() return fig # ---------- util: save fig / df to temp files ---------- # This is a utility function to save a figure to a temporary file # and return the path to the file. # It uses the tempfile module to get a temporary directory and a unique file name. # It then saves the figure to the file and closes it. # Finally, it returns the path to the file. def _tmp_path(filename: str) -> str: tmp_dir = tempfile.mkdtemp() # unique dir per run return os.path.join(tmp_dir, filename) def save_fig_fixed(fig, filename: str) -> str: path = _tmp_path(filename) fig.savefig(path, dpi=300, bbox_inches="tight") plt.close(fig) return path def save_csv_fixed(df, filename: str) -> str: path = _tmp_path(filename) df.to_csv(path, index=False) return path # ---------- main gradio function ---------- def run_tool(phasing, primer, adapter, chemistry): primers = phase_primer(primer, phasing, chemistry) df = design_primers(primers, adapter, primer) fig_nuc = plot_nuc(primers) fig_color = plot_colors(primers, chemistry) # fixed names for downloads nuc_png = save_fig_fixed(fig_nuc, "nucleotide_percentages_plot.png") clr_png = save_fig_fixed(fig_color, "color_percentages_plot.png") csv_path = save_csv_fixed(df, "primers.csv") return fig_nuc, fig_color, df, nuc_png, clr_png, csv_path chemistries = [ "Four-channels (HiSeq & MiSeq)", "Two-channels (original SBS)", "Two-channels (XLEAP-SBS)", "One-channel (iSeq 100)", ] with gr.Blocks(css=""" /* Inputs panel (light mode) */ .input-panel { background:#f3f3f3; padding:16px; border-radius:8px; } /* Inputs panel (dark mode overrides) */ html.dark .input-panel, body.dark .input-panel, [data-theme*="dark"] .input-panel { background:#2b2b2b !important; color:inherit; border:1px solid rgba(255,255,255,0.08); } .inline-row { align-items:center; gap:12px; } /* Ticks that adapt to theme */ .tick-grid { display:grid; grid-template-columns:repeat(22,1fr); column-gap:35px; font-size:11px; padding:0 0 8px 0; margin-left:20px; justify-items:start; user-select:none; color:inherit; } .tick-grid > span { justify-self:start; } @media (prefers-color-scheme: dark) { .tick-grid { color: rgba(255,255,255,.9); } } @media (prefers-color-scheme: light) { .tick-grid { color: #444; } } /* give the radio extra width */ #custom-mode { min-width: 160px; } """) as demo: gr.Markdown( f"""

Phased PCR1 Amplicon Primer Designer (version {VERSION})

This app generates phased PCR primers with balanced nucleotide complexity — that is, sequences with even representation of A, T, C, and G across positions.

""" ) # ───── inputs on light‑gray panel ───── with gr.Column(elem_classes="input-panel"): phasing_slider = gr.Slider( minimum=0, maximum=21, value=0, step=1, label="Amount of primer phasing (integer)" ) ticks = ''.join(f'{i}' for i in range(22)) gr.HTML(f"
{ticks}
") primer_in = gr.Textbox("CCTAHGGGRBGCAGCAG", label="Gene‑specific primer (5′→3′)") # adapter_in = gr.Textbox("ACACTCTTTCCCTACACGACGCTCTTCCGATCT", # label="Adapter sequence (5′→3′)") adapter_in = gr.Dropdown( choices=[ "ACACTCTTTCCCTACACGACGCTCTTCCGATCT", "GTGACTGGAGTTCAGACGTGTGCTCTTCCGATCT", "TCGTCGGCAGCGTCAGATGTGTATAAGAGACAG", "GTCTCGTGGGCTCGGAGATGTGTATAAGAGACAG", ], value="ACACTCTTTCCCTACACGACGCTCTTCCGATCT", label="Adapter" ) chem_in = gr.Dropdown( ["Four-channels (HiSeq & MiSeq)", "Two-channels (original SBS)", "Two-channels (XLEAP-SBS)", "One-channel (iSeq 100)"], value="Four-channels (HiSeq & MiSeq)", label="Sequencing chemistry" ) run_btn = gr.Button("Generate Primers") # ───── outputs: plots then table & downloads ───── nuc_plot = gr.Plot(label="Nucleotide Composition by Position") clr_plot = gr.Plot(label="Color Channel Composition by Position") gr.Markdown("
") table_out = gr.Dataframe(label="Generated Primers") file_nuc = gr.File(label="Download nucleotide_percentages_plot.png") file_clr = gr.File(label="Download color_percentages_plot.png") file_csv = gr.File(label="Download primers.csv") run_btn.click( run_tool, inputs=[phasing_slider, primer_in, adapter_in, chem_in], outputs=[nuc_plot, clr_plot, table_out, file_nuc, file_clr, file_csv] ) if __name__ == "__main__": demo.launch() # demo.launch(server_name="0.0.0.0", server_port=7860, share=True)