from dataclasses import dataclass, field
from .compound import Ingredient
import mcol
import mrich
[docs]
class Recipe:
"""A Recipe stores data corresponding to a specific synthetic recipe involving several products, reactants, intermediates, and reactions."""
_db = None
def __init__(
self,
db: "Database",
*,
products: "IngredientSet | None" = None,
reactants: "IngredientSet | None" = None,
intermediates: "IngredientSet | None" = None,
reactions: "ReactionSet | None" = None,
compounds: "IngredientSet | None" = None,
):
from .cset import IngredientSet
from .rset import ReactionSet
if products is None:
products = IngredientSet(db)
if reactants is None:
reactants = IngredientSet(db)
if intermediates is None:
intermediates = IngredientSet(db)
if compounds is None:
compounds = IngredientSet(db)
if reactions is None:
reactions = ReactionSet(db)
# check typing
assert isinstance(products, IngredientSet)
assert isinstance(reactants, IngredientSet)
assert isinstance(intermediates, IngredientSet)
assert isinstance(compounds, IngredientSet)
assert isinstance(reactions, ReactionSet)
self._products = products
self._reactants = reactants
self._intermediates = intermediates
self._reactions = reactions
self._compounds = compounds
self._db = db
self._hash = None
self._score = None
# caches
self._product_compounds = None
self._poses = None
self._interactions = None
self._combined_compounds = None
### FACTORIES
[docs]
@classmethod
def from_reaction(
cls,
reaction,
amount=1,
*,
debug: bool = False,
pick_cheapest: bool = True,
permitted_reactions: "ReactionSet | None" = None,
quoted_only: bool = False,
supplier: None | str = None,
unavailable_reaction: str = "error",
reaction_checking_cache: dict[int, bool] = None,
reaction_reactant_cache: dict[int, bool] = None,
inner: bool = False,
get_ingredient_quotes: bool = True,
) -> "Recipe | list[Recipe]":
"""Create a :class:`.Recipe` from a :class:`.Reaction` and its upstream dependencies
:param reaction: reaction to create recipe from
:param amount: amount in ``mg`` (Default value = 1)
:param debug: bool: increase verbosity for debugging (Default value = False)
:param pick_cheapest: bool: choose the cheapest solution (Default value = True)
:param permitted_reactions: once consider reactions in this set (Default value = None)
:param quoted_only: bool: only allow reactants with quotes (Default value = False)
:param supplier: None | str: optionally restrict quotes to only this supplier (Default value = None)
:param unavailable_reaction: define the behaviour for when a reaction has unavailable reactants (Default value = 'error')
:param inner: used to indicate that this is a recursive call (Default value = False)
:param get_ingredient_quotes: get quotes for ingredients in this recipe
"""
from .reaction import Reaction
assert isinstance(reaction, Reaction)
from .cset import IngredientSet
from .rset import ReactionSet
if debug:
mrich.debug(
f"Recipe.from_reaction(R{reaction.id}, {amount=}, {pick_cheapest=})"
)
mrich.debug(f"{reaction.product.id=}")
mrich.debug(f"{reaction.reactants.ids=}")
if permitted_reactions:
assert reaction in permitted_reactions
# raise NotImplementedError
db = reaction.db
recipe = cls.__new__(cls)
recipe.__init__(
db,
products=IngredientSet(
db,
[
reaction.product.as_ingredient(
amount=amount, get_quote=get_ingredient_quotes
)
],
),
reactants=IngredientSet(db, [], supplier=supplier),
intermediates=IngredientSet(db, []),
reactions=ReactionSet(db, [reaction.id], sort=False),
)
recipes = [recipe]
if quoted_only or supplier:
if debug:
mrich.debug(f"Checking reactant_availability: {reaction=}")
if reaction_checking_cache and reaction.id in reaction_checking_cache:
ok = reaction_checking_cache[reaction.id]
print("reaction_checking_cache used")
else:
ok = reaction.check_reactant_availability(supplier=supplier)
# print('cache not used')
if reaction_checking_cache is not None:
reaction_checking_cache[reaction.id] = ok
if not ok:
if unavailable_reaction == "error":
mrich.error(f"Reactants not available for {reaction=}")
if pick_cheapest:
return None
else:
return []
def get_reactant_amount_pairs(reaction):
if reaction_reactant_cache and reaction.id in reaction_reactant_cache:
print("reaction_reactant_cache used")
return reaction_reactant_cache[reaction.id]
else:
pairs = reaction.get_reactant_amount_pairs(compound_object=False)
if reaction_reactant_cache is not None:
reaction_reactant_cache[reaction.id] = pairs
return pairs
if debug:
mrich.debug(f"get_reactant_amount_pairs({reaction.id})")
pairs = get_reactant_amount_pairs(reaction)
for reactant, reactant_amount in pairs:
reactant = db.get_compound(id=reactant)
if debug:
mrich.debug(f"{reactant.id=}, {reactant_amount=}")
# scale amount
reactant_amount *= amount
reactant_amount /= reaction.product_yield
inner_reactions = reactant.get_reactions(
none="quiet", permitted_reactions=permitted_reactions
)
if inner_reactions:
if debug:
if len(inner_reactions) == 1:
mrich.debug(f"Reactant has ONE inner reaction")
else:
mrich.warning(f"{reactant=} has MULTIPLE inner reactions")
new_recipes = []
inner_recipes = []
for reaction in inner_reactions:
reaction_recipes = Recipe.from_reaction(
reaction=reaction,
amount=reactant_amount,
debug=debug,
pick_cheapest=False,
quoted_only=quoted_only,
supplier=supplier,
unavailable_reaction=unavailable_reaction,
reaction_checking_cache=reaction_checking_cache,
reaction_reactant_cache=reaction_reactant_cache,
inner=True,
)
inner_recipes += reaction_recipes
for recipe in recipes:
for inner_recipe in inner_recipes:
combined_recipe = recipe.copy()
combined_recipe.reactants += inner_recipe.reactants
combined_recipe.intermediates += inner_recipe.intermediates
combined_recipe.reactions += inner_recipe.reactions
combined_recipe.intermediates.add(
reactant.as_ingredient(reactant_amount, supplier=supplier)
)
new_recipes.append(combined_recipe)
recipes = new_recipes
else:
ingredient = reactant.as_ingredient(reactant_amount, supplier=supplier)
for recipe in recipes:
recipe.reactants.add(ingredient)
# reverse ReactionSet's
if not inner:
for recipe in recipes:
recipe.reactions.reverse()
if pick_cheapest:
if debug:
mrich.debug("Picking cheapest")
priced = [r for r in recipes if r.get_price(supplier=supplier)]
# priced = [r for r in recipes if r.price]
if not priced:
mrich.error("0 recipes with prices, can't choose cheapest")
return recipes
sorted_recipes = sorted(
priced, key=lambda r: r.get_price(supplier=supplier)
)
if debug:
for recipe in recipes:
mrich.debug(f"{recipe}, {recipe.price}")
return sorted_recipes[0]
# return sorted(priced, key=lambda r: r.price)[0]
return recipes
[docs]
@classmethod
def from_reactions(
cls,
reactions: "ReactionSet",
amount: float = 1,
pick_cheapest: bool = True,
permitted_reactions: "ReactionSet | None" = None,
final_products_only: bool = True,
return_products: bool = False,
supplier: str | None = None,
use_routes: bool = False,
debug: bool = False,
**kwargs,
) -> "Recipe | list[Recipe] | CompoundSet":
"""Create a :class:`.Recipe` from a :class:`.ReactionSet` and its upstream dependencies
:param reactions: reactions to create recipe from
:param amount: amount in ``mg`` (Default value = 1)
:param debug: bool: increase verbosity for debugging (Default value = False)
:param pick_cheapest: bool: choose the cheapest solution (Default value = True)
:param permitted_reactions: once consider reactions in this set (Default value = None)
:param final_products_only: don't get routes to intermediates (Default value = True)
:param return_products: return the :class:`.CompoundSet` of products instead (Default value = False)
"""
from .rset import ReactionSet
from .cset import IngredientSet, CompoundSet
assert isinstance(reactions, ReactionSet)
db = reactions.db
if debug:
mrich.debug("Recipe.from_reactions()")
mrich.var("reactions", reactions)
mrich.var("amount", amount)
mrich.var("final_products_only", final_products_only)
mrich.var("permitted_reactions", permitted_reactions)
# get all the products
products = reactions.products
if debug:
mrich.var("products", products)
# return products
if final_products_only:
if debug:
mrich.var("products.str_ids", products.str_ids)
# raise NotImplementedError
ids = reactions.db.execute(
f"""
SELECT DISTINCT compound_id FROM compound
LEFT JOIN reactant ON compound_id = reactant_compound
WHERE reactant_compound IS NULL
AND compound_id IN {products.str_ids}
"""
).fetchall()
ids = [i for i, in ids]
products = CompoundSet(db, ids)
if debug:
mrich.var("final products", products)
# return ids
if return_products:
return products
recipe = Recipe.from_compounds(
compounds=products,
amount=amount,
permitted_reactions=reactions,
pick_cheapest=pick_cheapest,
supplier=supplier,
use_routes=use_routes,
**kwargs,
)
return recipe
[docs]
@classmethod
def from_compounds(
cls,
compounds: "CompoundSet",
amount: float = 1,
debug: bool = False,
pick_cheapest: bool = True,
permitted_reactions=None,
quoted_only: bool = False,
supplier: None | str = None,
solve_combinations: bool = True,
pick_first: bool = False,
warn_multiple_solutions: bool = True,
pick_cheapest_inner_routes: bool = False,
unavailable_reaction: str = "error",
reaction_checking_cache: dict[int, bool] | None = None,
reaction_reactant_cache: dict[int, bool] | None = None,
use_routes: bool = False,
**kwargs,
):
"""Create recipe(s) to synthesis products in the :class:`.CompoundSet`
:param compounds: set of compounds to find routes for
:param solve_combinations: bool: combinatorially combine all individual routes (Default value = True)
:param pick_first: return the first solution without comparison (Default value = False)
:param warn_multiple_solutions: warn if a compound has multiple routes (Default value = True)
:param pick_cheapest_inner_routes: for each compound choose the cheapest route (Default value = False)
:param reaction: reaction to create recipe from
:param amount: amount in ``mg`` (Default value = 1)
:param debug: bool: increase verbosity for debugging (Default value = False)
:param pick_cheapest: bool: choose the cheapest solution (Default value = True)
:param permitted_reactions: once consider reactions in this set (Default value = None)
:param quoted_only: bool: only allow reactants with quotes (Default value = False)
:param supplier: None | str: optionally restrict quotes to only this supplier (Default value = None)
:param unavailable_reaction: define the behaviour for when a reaction has unavailable reactants (Default value = 'error')
"""
from .cset import CompoundSet
assert isinstance(compounds, CompoundSet)
# if permitted_reactions:
# raise NotImplementedError
db = compounds.db
n_comps = len(compounds)
assert n_comps
if not hasattr(amount, "__iter__"):
amount = [amount] * n_comps
if use_routes:
route_lookup = db.get_product_id_routes_dict()
if supplier:
raise NotImplementedError
# supplier_lookup = db.get_compound_id_suppliers_dict()
options = []
ok = 0
mrich.var("#compounds", n_comps)
for comp, a in mrich.track(
zip(compounds, amount),
prefix="Solving individual compound recipes...",
total=n_comps,
):
comp_options = []
if use_routes:
if comp.id not in route_lookup:
mrich.error("No routes to", comp)
continue
comp_options = []
for route_id in route_lookup[comp.id]:
route = db.get_route(id=route_id)
comp_options.append(route)
else:
for reaction in comp.reactions:
if permitted_reactions and reaction not in permitted_reactions:
continue
sol = Recipe.from_reaction(
reaction=reaction,
amount=a,
pick_cheapest=pick_cheapest_inner_routes,
debug=debug,
permitted_reactions=permitted_reactions,
quoted_only=quoted_only,
supplier=supplier,
unavailable_reaction=unavailable_reaction,
reaction_checking_cache=reaction_checking_cache,
reaction_reactant_cache=reaction_reactant_cache,
**kwargs,
)
if pick_cheapest_inner_routes:
if sol:
comp_options.append(sol)
else:
assert isinstance(sol, list)
comp_options += sol
if not comp_options:
mrich.error(
f"No solutions for compound={comp} ({comp.reactions.ids=})"
)
continue
if pick_cheapest and len(comp_options) > 1:
if warn_multiple_solutions:
mrich.warning(
f"Multiple solutions for", comp, "(", len(comp_options), ")"
)
if debug:
mrich.debug("Picking cheapest...")
priced = [r for r in comp_options if r.price]
comp_options = sorted(priced, key=lambda r: r.price)[:1]
if warn_multiple_solutions and len(comp_options) > 1:
mrich.warning(f"Multiple solutions for compound={comp}")
if debug:
mrich.debug(f"{comp_options=}")
else:
if n_comps <= 200:
mrich.success(f"Found solution for compound={comp}")
ok += 1
mrich.set_progress_field("ok", ok)
mrich.set_progress_field("n", n_comps)
options.append(comp_options)
assert all(options)
from itertools import product
mrich.print("Solving recipe combinations...")
combinations = list(product(*options))
if not solve_combinations:
return combinations
# if pick_first:
# combinations = [combinations[0]]
solutions = []
if n_comps > 1:
generator = mrich.track(
combinations, prefix="Combining recipes...", total=len(combinations)
)
else:
generator = combinations
ok = 0
for combo in generator:
if debug:
mrich.debug(f"Combination of {len(combo)} recipes")
if not combo:
continue
solution = combo[0]
for i, recipe in enumerate(combo[1:]):
if debug:
mrich.debug(i + 1)
solution += recipe
solutions.append(solution)
ok += 1
mrich.set_progress_field("ok", ok)
mrich.set_progress_field("n", len(combinations))
if not solutions:
mrich.error("No solutions")
return None
if pick_first:
return solutions[0]
if pick_cheapest:
mrich.debug("Calculating prices...")
priced = [r for r in solutions if r.price]
mrich.print("Picking cheapest from", len(priced), "options")
if not priced:
mrich.error("0 recipes with prices, can't choose cheapest")
return solutions
return sorted(priced, key=lambda r: r.price)[0]
return solutions
[docs]
@classmethod
def from_reactants(
cls,
reactants: "CompoundSet | IngredientSet",
amount: float = 1,
debug: bool = False,
return_products: bool = False,
supplier: str | None = None,
pick_cheapest: bool = False,
use_routes: bool = False,
**kwargs,
) -> "list[Recipe] | Recipe | CompoundSet":
"""Find the maximal recipe from a given set of reactants
:param reactants: :class:`.CompoundSet` or :class:`.IngredientSet` for the reactants. Ingredient amounts are ignored
:param amount: amount of each product needed (Default value = 1)
:param debug: increase verbosity (Default value = False)
:param return_products: return products instead of recipe (Default value = False)
:param kwargs: passed to :meth:`.Recipe.from_reactions`
"""
from .cset import IngredientSet
if isinstance(reactants, IngredientSet):
reactant_ids = reactants.compound_ids
else:
reactant_ids = reactants.ids
db = reactants.db
all_reactants = set(reactant_ids)
possible_reactions = []
# recursively search for possible reactions
for i in range(300):
if debug:
mrich.debug(i)
# reaction_ids = db.get_possible_reaction_ids(compound_ids=compound_ids)
reaction_ids = db.get_possible_reaction_ids(compound_ids=all_reactants)
if not reaction_ids:
break
if debug:
mrich.debug(f"Adding {len(reaction_ids)} reactions")
possible_reactions += reaction_ids
if debug:
mrich.var("reaction_ids", reaction_ids)
product_ids = db.get_possible_reaction_product_ids(
reaction_ids=reaction_ids
)
if debug:
mrich.var("product_ids", product_ids)
n_prev = len(all_reactants)
all_reactants |= set(product_ids)
if n_prev == len(all_reactants):
break
else:
raise NotImplementedError("Maximum recursion depth exceeded")
possible_reactions = list(set(possible_reactions))
if debug:
mrich.var("all possible reactions", possible_reactions)
from .rset import ReactionSet
rset = ReactionSet(db, possible_reactions, sort=False)
recipe = cls.from_reactions(
rset,
amount=amount,
permitted_reactions=rset,
debug=debug,
return_products=return_products,
supplier=supplier,
use_routes=use_routes,
**kwargs,
)
return recipe
[docs]
@classmethod
def from_json(
cls,
db: "Database",
path: "str | Path",
debug: bool = True,
allow_db_mismatch: bool = False,
clear_quotes: bool = False,
data: dict = None,
db_mismatch_warning: bool = True,
):
"""Load a serialised recipe from a JSON file
:param db: database to link
:param path: path to JSON
:param debug: increase verbosity (Default value = True)
:param allow_db_mismatch: allow a database mismatch (Default value = False)
:param clear_quotes: ignore reactant quotes (Default value = False)
:param data: serialised data (Default value = None)
"""
# imports
import json
from .cset import IngredientSet
from .rset import ReactionSet
# load JSON
if not data:
if debug:
mrich.reading(path)
data = json.load(open(path, "rt"))
# check metadata
if str(db.path.resolve()) != data["database"]:
if db_mismatch_warning:
mrich.var("session", str(db.path.resolve()))
mrich.var("in file", data["database"])
if allow_db_mismatch:
if db_mismatch_warning:
mrich.warning("Database path mismatch")
else:
mrich.error(
"Database path mismatch, set allow_db_mismatch=True to ignore"
)
return None
if debug:
mrich.print(f'Recipe was generated at: {data["timestamp"]}')
price = data["price"]
# IngredientSets
products = IngredientSet.from_ingredient_dicts(db, data["products"])
intermediates = IngredientSet.from_ingredient_dicts(db, data["intermediates"])
reactants = IngredientSet.from_ingredient_dicts(
db, data["reactants"], supplier=data["reactant_supplier"]
)
if "compounds" in data:
compounds = IngredientSet.from_ingredient_dicts(
db, data["compounds"], supplier=data["compound_supplier"]
)
else:
compounds = IngredientSet(db)
if clear_quotes:
reactants.df["quote_id"] = None
reactants.df["quoted_amount"] = None
compounds.df["quote_id"] = None
compounds.df["quoted_amount"] = None
# ReactionSet
reactions = ReactionSet(db, data["reaction_ids"], sort=False)
if debug:
mrich.var("reactants", reactants)
mrich.var("intermediates", intermediates)
mrich.var("products", products)
mrich.var("reactions", reactions)
mrich.var("compounds", compounds)
# Create the object
self = cls.__new__(cls)
self.__init__(
db,
products=products,
reactants=reactants,
intermediates=intermediates,
reactions=reactions,
compounds=compounds,
)
return self
### PROPERTIES
@property
def db(self) -> "Database":
"""Associated :class:`.Database:"""
return self._db
@property
def products(self) -> "IngredientSet":
"""Product :class:`.IngredientSet`"""
return self._products
@property
def compounds(self) -> "IngredientSet":
"""Product :class:`.IngredientSet`"""
return self._compounds
@property
def poses(self) -> "PoseSet":
"""Product poses"""
if self._poses is None:
self._poses = self.combined_compounds.poses
self._poses._name = f"poses of {self}"
return self._poses
@property
def product_compounds(self) -> "CompoundSet":
"""Product compounds"""
if self._product_compounds is None:
self._product_compounds = self.products.compounds
self._product_compounds._name = f"products of {self}"
return self._product_compounds
@property
def combined_compound_ids(self) -> set[int]:
return set(self.product_compounds.ids) | set(self.compounds.ids)
@property
def combined_compounds(self) -> "CompoundSet":
"""Combined product and no-chem compounds"""
if self._combined_compounds is None:
from .cset import CompoundSet
self._combined_compounds = CompoundSet(self.db, self.combined_compound_ids)
self._combined_compounds._name = f"combined compounds of {self}"
return self._combined_compounds
@property
def interactions(self) -> "InteractionSet":
"""Product pose interactions"""
if self._interactions is None:
self._interactions = self.poses.interactions
return self._interactions
@property
def product(self) -> "Ingredient":
"""Return single product (if there's only one)"""
assert len(self.products) == 1
return self.products[0]
@products.setter
def products(self, a: "IngredientSet"):
"""Set the products"""
self._products = a
self.__flag_modification()
@property
def reactants(self):
"""Reactant :class:`.IngredientSet`"""
return self._reactants
@reactants.setter
def reactants(self, a: "IngredientSet"):
"""Set the reactants"""
self._reactants = a
self.__flag_modification()
@property
def intermediates(self) -> "IngredientSet":
"""Intermediates :class:`.IngredientSet`"""
return self._intermediates
@intermediates.setter
def intermediates(self, a: "IngredientSet"):
"""Set the intermediates"""
self._intermediates = a
self.__flag_modification()
@property
def reactions(self) -> "ReactionSet":
"""Intermediates :class:`.IngredientSet`"""
return self._reactions
@reactions.setter
def reactions(self, a: "ReactionSet"):
"""Set the reactions"""
self._reactions = a
self.__flag_modification()
@property
def price(self) -> "Price":
"""Get the price of the reactants"""
return self.reactants.get_price() + self.compounds.get_price()
@property
def num_products(self) -> int:
"""Return the number of products"""
return len(self.products)
@property
def num_compounds(self) -> int:
"""Return the number of compounds"""
return len(self.combined_compound_ids)
@property
def num_reactions(self):
"""Return the number of reactions"""
return len(self.reactions)
@property
def num_reactants(self):
"""Return the number of reactants"""
return len(self.reactants)
@property
def num_intermediates(self):
"""Return the number of intermediates"""
return len(self.intermediates)
@property
def hash(self) -> str:
"""Return the unique hash string"""
return self._hash
@property
def score(self):
"""Return the Recipe score"""
return self._score
@property
def type(self) -> str:
if self.empty:
return "EMPTY"
chem = bool(self.reactions)
nochem = bool(self.compounds)
if chem and nochem:
return "MIXED"
if chem and not nochem:
return "CHEM"
if nochem and not chem:
return "NOCHEM"
@property
def empty(self) -> bool:
"""Is this Recipe empty?"""
if self.reactants:
return False
if self.products:
return False
if self.intermediates:
return False
if self.reactions:
return False
if self.compounds:
return False
return True
### METHODS
[docs]
def get_price(self, supplier: str | None = None) -> "Price":
"""get the reactants price. See :meth:`.IngredientSet.get_price`
:param supplier: restrict quotes to this supplier
"""
return self.reactants.get_price(supplier=supplier)
[docs]
def draw(self, color_mapper=None, node_size=300, graph_only=False):
"""draw graph of the reaction network
:param color_mapper: (Default value = None)
:param node_size: (Default value = 300)
:param graph_only: (Default value = False)
"""
import networkx as nx
color_mapper = color_mapper or {}
colors = {}
sizes = {}
graph = nx.DiGraph()
for reaction in self.reactions:
for reactant in reaction.reactants:
key = str(reactant)
ingredient = self.get_ingredient(id=reactant.id)
graph.add_node(
key,
id=reactant.id,
smiles=reactant.smiles,
amount=ingredient.amount,
price=str(ingredient.price),
lead_time=ingredient.lead_time,
)
if not graph_only:
sizes[key] = self.get_ingredient(id=reactant.id).amount
if key in color_mapper:
colors[key] = color_mapper[key]
else:
colors[key] = (0.7, 0.7, 0.7)
for product in self.products:
key = str(product.compound)
ingredient = self.get_ingredient(id=product.id)
graph.add_node(
key,
id=product.id,
smiles=product.smiles,
amount=ingredient.amount,
price=str(ingredient.price),
lead_time=ingredient.lead_time,
)
if not graph_only:
sizes[key] = product.amount
if key in color_mapper:
colors[key] = color_mapper[key]
else:
colors[key] = (0.7, 0.7, 0.7)
for reaction in self.reactions:
for reactant in reaction.reactants:
graph.add_edge(
str(reactant),
str(reaction.product),
id=reaction.id,
type=reaction.type,
product_yield=reaction.product_yield,
)
# rescale sizes
if not graph_only:
s_min = min(sizes.values())
sizes = [s / s_min * node_size for s in sizes.values()]
if graph_only:
return graph
else:
import matplotlib as plt
# return nx.draw(graph, pos, with_labels=True, font_weight='bold')
# pos = nx.spring_layout(graph, iterations=200, k=30)
pos = nx.spring_layout(graph)
return nx.draw(
graph,
pos=pos,
with_labels=True,
font_weight="bold",
node_color=list(colors.values()),
node_size=sizes,
)
[docs]
def sankey(self, title: str | None = None) -> "graph_objects.Figure":
"""draw a plotly Sankey diagram
:param title: (Default value = None)
"""
graph = self.draw(graph_only=True)
import plotly.graph_objects as go
nodes = {}
for edge in graph.edges:
c = edge[0]
if c not in nodes:
nodes[c] = len(nodes)
c = edge[1]
if c not in nodes:
nodes[c] = len(nodes)
source = [nodes[a] for a, b in graph.edges]
target = [nodes[b] for a, b in graph.edges]
value = [1 for l in graph.edges]
# print(graph.nodes)
labels = list(nodes.keys())
# compound_ids = [n.id for n in nodes]
# smiles = [n.smiles for n in nodes]
# customdata = [(n.id, n.smiles) for n in ]
hoverkeys = None
customdata = []
for key in nodes.keys():
n = graph.nodes[key]
if not hoverkeys:
hoverkeys = list(n.keys())
if not n:
mrich.error(f"problem w/ node {key=}")
compound_id = int(key[1:])
customdata.append((compound_id, None))
else:
# customdata.append((n['id'], n['smiles']))
d = tuple(v if v is not None else "N/A" for v in n.values())
customdata.append(d)
# id=product.id, smiles=product.smiles, amount=ingredient.amount, price=ingredient.price, lead_time=ingredient.lead_time
hoverkeys_edges = None
# edgedata = [graph.edges[a,b]["reaction_id"] for a,b in graph.edges]
customdata_edges = []
for s, t in graph.edges.keys():
edge = graph.edges[s, t]
if not hoverkeys_edges:
hoverkeys_edges = list(edge.keys())
if not n:
mrich.error(f"problem w/ edge {s=} {t=}")
customdata_edges.append((None, None, None))
else:
d = tuple(v if v is not None else "N/A" for v in edge.values())
customdata_edges.append(d)
hoverlines = []
for i, key in enumerate(hoverkeys):
hoverlines.append(f"{key}=%" "{" f"customdata[{i}]" "}")
hovertemplate = "Compound " + "<br>".join(hoverlines) + "<extra></extra>"
hoverlines_edges = []
for i, key in enumerate(hoverkeys_edges):
hoverlines_edges.append(f"{key}=%" "{" f"customdata[{i}]" "}")
hovertemplate_edges = (
"Reaction " + "<br>".join(hoverlines_edges) + "<extra></extra>"
)
# print(hovertemplate)
# compound_ids = [int(s[1:]) for s in labels]
# from .cset import CompoundSet
# smiles = CompoundSet(self.db, compound_ids).smiles
# print(compound_ids)
fig = go.Figure(
data=[
go.Sankey(
node=dict(
# pad = 15,
# thickness = 20,
# line = dict(color = "black", width = 0.5),
label=labels,
# color = "blue"
customdata=customdata,
# customdata = ["Long name A1", "Long name A2", "Long name B1", "Long name B2",
# "Long name C1", "Long name C2"],
# hovertemplate='Compound %{label}<br><br>smiles=%{customdata}<extra></extra>',
hovertemplate=hovertemplate,
),
link=dict(
customdata=customdata_edges,
hovertemplate=hovertemplate_edges,
source=source,
target=target,
value=value,
),
)
]
)
if not title:
# title = f"Recipe<br><sup>price={self.price}, lead-time={self.lead_time}</sup>"
try:
title = f"Recipe<br><sup>price={self.price}</sup>"
except AssertionError:
title = f"Recipe"
fig.update_layout(title=title)
# link = dict(
# source = [0, 1, 0, 2, 3, 3], # indices correspond to labels, eg A1, A2, A2, B1, ...
# target = [2, 3, 3, 4, 4, 5],
# value = [8, 4, 2, 8, 4, 2],
# customdata = ["q","r","s","t","u","v"],
# hovertemplate='Link from node %{source.customdata}<br />'+
# 'to node%{target.customdata}<br />has value %{value}'+
# '<br />and data %{customdata}<extra></extra>',
# )
return fig
[docs]
def summary(self, price: bool = True) -> None:
"""Print a summary of this recipe
:param price: print the price (Default value = True)
"""
import mcol
mrich.h1(str(self))
if price:
price = self.price
if price:
mrich.var("\nprice", price.amount, price.currency)
# mrich.var('lead-time', self.lead_time, 'working days))
if self.products:
mrich.h3(f"{len(self.products)} products")
if len(self.products) < 100:
for product in self.products:
mrich.var(str(product.compound), f"{product.amount:.2f}", "mg")
if self.intermediates:
mrich.h3(f"{len(self.intermediates)} intermediates")
if len(self.intermediates) < 100:
for intermediate in self.intermediates:
mrich.var(
str(intermediate.compound),
f"{intermediate.amount:.2f}",
"mg",
)
if self.reactants:
mrich.h3(f"{len(self.reactants)} reactants")
if len(self.reactants) < 100:
for reactant in self.reactants:
mrich.var(str(reactant.compound), f"{reactant.amount:.2f}", "mg")
if self.reactions:
mrich.h3(f"{len(self.reactions)} reactions")
if len(self.reactions) < 100:
for reaction in self.reactions:
mrich.var(str(reaction), reaction.reaction_str, reaction.type)
if self.compounds:
mrich.h3(f"{len(self.compounds)} compounds")
if len(self.compounds) < 100:
for compound in self.compounds:
mrich.var(str(compound.compound), f"{compound.amount:.2f}", "mg")
[docs]
def get_ingredient(self, id) -> "Ingredient":
"""Get an ingredient by its compound ID
:param id: compound ID
"""
matches = [r for r in self.reactants if r.id == id]
if not matches:
matches = [r for r in self.intermediates if r.id == id]
if not matches:
matches = [r for r in self.products if r.id == id]
assert len(matches) == 1
return matches[0]
[docs]
def add_to_all_reactants(self, amount: float = 20) -> None:
"""Increment all reactants by this amount
:param amount: amount in ``mg`` (Default value = 20)
"""
self.reactants.df["amount"] += amount
[docs]
def write_json(
self,
file: "str | Path",
*,
extra: dict | None = None,
indent: str = "\t",
**kwargs,
) -> None:
"""Serialise this recipe object and write it to disk
:param file: write to this path
:param extra: extra data to serialise
:param indent: indentation whitespace (Default value = '\t')
"""
import json
from pathlib import Path
file = Path(file).resolve()
assert file.parent.exists()
data = self.get_dict(serialise_price=True, **kwargs)
if extra:
data.update(extra)
mrich.writing(file)
json.dump(data, open(file, "wt"), indent=indent)
[docs]
def get_dict(
self,
*,
price: bool = True,
reactant_supplier: bool = True,
compound_supplier: bool = True,
database: bool = True,
timestamp: bool = True,
compound_ids_only: bool = False,
products: bool = True,
serialise_price: bool = False,
):
"""Serialise this recipe object
Store
=====
- Path to database
- Timestamp
- Reactants (& their quotes, amounts)
- Intermediates (& their quotes)
- Products (& their poses/scores/fingerprints)
- Reactions
- Total Price
- Lead time
:param price: include the price (Default value = True)
:param reactant_supplier: include the supplier (Default value = True)
:param database: include the database (Default value = True)
:param timestamp: add a timestamp (Default value = True)
:param compound_ids_only: ID's only (instead of full :attr:`.IngredientSet.df`) (Default value = False)
:param products: include products (Default value = True)
:param serialise_price: serialise :class:`.Price` object (Default value = False)
"""
import json
from datetime import datetime
data = {}
# Database
if database:
data["database"] = str(self.db.path.resolve())
if timestamp:
data["timestamp"] = str(datetime.now())
# Recipe properties
try:
if price and serialise_price:
data["price"] = self.price.get_dict()
elif price:
data["price"] = self.price
except AssertionError as e:
mrich.warning(f"Could not get price: {e}")
data["price"] = None
if reactant_supplier:
data["reactant_supplier"] = self.reactants.supplier
if compound_supplier:
data["compound_supplier"] = self.compounds.supplier
# IngredientSets
if compound_ids_only:
data["reactant_ids"] = self.reactants.compound_ids
data["intermediate_ids"] = self.intermediates.compound_ids
if products:
data["products_ids"] = self.products.compound_ids
data["compound_ids"] = self.compounds.compound_ids
else:
data["reactants"] = self.reactants.df.to_dict(orient="list")
data["intermediates"] = self.intermediates.df.to_dict(orient="list")
if products:
data["products"] = self.products.df.to_dict(orient="list")
data["compounds"] = self.compounds.df.to_dict(orient="list")
# ReactionSet
data["reaction_ids"] = self.reactions.ids
return data
[docs]
def get_routes(self) -> "RouteSet":
"""Get routes"""
return self.products.get_routes(permitted_reactions=self.reactions)
[docs]
def write_CAR_csv(
self, file: "str | Path", return_df: bool = False
) -> "DataFrame | None":
"""Prepares CSVs for use with CAR.
.. attention::
This method requires a populated `route` table. For a workaround use :meth:`.CompoundSet.write_CAR_csv` instead
Columns:
* target-name
* no-steps
* concentration = None
* amount-required
* batch-tag
per reaction
* reactant-1-1
* reactant-2-1
* reaction-product-smiles-1
* reaction-name-1
* reaction-recipe-1
* reaction-groupby-column-1
:param file: file to write to
:param return_df: return the dataframe (Default value = False)
"""
from .cset import CompoundSet
from pandas import DataFrame
from pathlib import Path
# solve each product's reaction
file = str(Path(file).resolve())
rows = []
routes = self.get_routes()
for sub_recipe in routes:
product = sub_recipe.product
row = {
"target-names": str(product.compound),
"no-steps": 0,
"concentration-required-mM": None,
"amount-required-uL": None,
"batch-tag": None,
}
for i, reaction in enumerate(sub_recipe.reactions):
i = i + 1
row["no-steps"] += 1
match len(reaction.reactants):
case 1:
row[f"reactant-1-{i}"] = reaction.reactants[0].smiles
row[f"reactant-2-{i}"] = None
case 2:
row[f"reactant-1-{i}"] = reaction.reactants[0].smiles
row[f"reactant-2-{i}"] = reaction.reactants[1].smiles
case _:
# mrich.warning(f"More than two reactants for {reaction=}")
for j, r in enumerate(reaction.reactants):
row[f"reactant-{j+1}-{i}"] = reaction.reactants[j].smiles
row[f"reaction-product-smiles-{i}"] = reaction.product.smiles
row[f"reaction-name-{i}"] = reaction.type
row[f"reaction-recipe-{i}"] = None
row[f"reaction-groupby-column-{i}"] = None
# row[f'reaction-id-{i}'] = int(reaction.id)
rows.append(row)
df = DataFrame(rows)
if len(df[df.duplicated()]):
mrich.warning("Removing duplicates from CAR DataFrame")
df = df.drop_duplicates()
df = df.convert_dtypes()
for n_steps in set(df["no-steps"]):
subset = df[df["no-steps"] == n_steps]
this_file = file.replace(".csv", f"_{n_steps}steps.csv")
mrich.writing(this_file)
subset.to_csv(this_file, index=False)
mrich.writing(file)
df.to_csv(file, index=False)
return df
[docs]
def write_reactant_csv(
self, file: "str | Path", return_df: bool = False
) -> "DataFrame | None":
"""Detailed CSV output including reactant information for purchasing and information on the downstream synthetic use
Reactant
========
- ID
- SMILES
- Inchikey
Quote
=====
- Supplier
- Catalogue
- Entry
- Lead-time
- Quoted amount
- Quote currency
- Quote price
- Quote purity
Downstream
==========
- num_reaction_dependencies
- num_product_dependencies
- reaction_dependencies
- product_dependencies
"""
# - remove_with
from pandas import DataFrame
# from rich import print
from .cset import CompoundSet
from .rset import ReactionSet
data = []
routes = self.get_routes()
for reactant in mrich.track(
self.reactants, prefix="Constructing reactant DataFrame"
):
quote = reactant.quote
d = dict(
hippo_id=reactant.compound_id,
smiles=reactant.smiles,
inchikey=reactant.inchikey,
required_amount_mg=reactant.amount,
)
if quote:
d.update(
dict(
quoted_amount=quote.amount,
quote_currency=quote.currency,
quote_price=quote.price.amount,
quote_lead_time_days=quote.lead_time,
quote_supplier=quote.supplier,
quote_catalogue=quote.catalogue,
quote_entry=quote.entry,
quoted_smiles=quote.smiles,
quoted_purity=quote.purity,
)
)
downstream_routes = []
downstream_reactions = []
for route in routes:
if reactant in route.reactants:
downstream_routes.append(route)
for reaction in route.reactions:
if reactant in reaction.reactants:
downstream_reactions.append(reaction)
downstream_products = CompoundSet(
self.db, set(route.product.id for route in downstream_routes)
)
downstream_reactions = ReactionSet(
self.db, set(reaction.id for reaction in downstream_reactions)
)
if not downstream_products:
mrich.error("No downstream products for", reactant)
continue
if not downstream_reactions:
mrich.error("No downstream reactions for", reactant)
continue
def get_scaffold_series():
bases = downstream_products.bases
if not bases:
bases = downstream_products[0:]
return bases.ids
d["num_reaction_dependencies"] = len(downstream_reactions)
d["num_product_dependencies"] = len(downstream_products)
d["reaction_dependencies"] = downstream_reactions.ids
d["product_dependencies"] = downstream_products.ids
d["chemistry_types"] = ", ".join(set(downstream_reactions.types))
d["scaffold_series"] = get_scaffold_series()
data.append(d)
df = DataFrame(data)
mrich.writing(file)
df.to_csv(file, index=False)
if return_df:
return df
return None
[docs]
def write_product_csv(
self, file: "str | Path", return_df: bool = False
) -> "pd.DataFrame | None":
"""Detailed CSV output including product information for selection and synthesis"""
from pandas import DataFrame
# from rich import print
from .pset import PoseSet
from .cset import CompoundSet
from .rset import ReactionSet
data = []
routes = self.get_routes()
pose_map = self.db.get_compound_id_pose_ids_dict(self.products.compounds)
inspiration_map = self.db.get_compound_id_inspiration_ids_dict()
for product in mrich.track(
self.products, prefix="Constructing product DataFrame"
):
d = dict(
hippo_id=product.compound_id,
smiles=product.smiles,
inchikey=product.inchikey,
required_amount_mg=product.amount,
)
upstream_routes = []
upstream_reactions = []
for route in routes:
if product in route.products:
upstream_routes.append(route)
for reaction in route.reactions:
upstream_reactions.append(reaction)
upstream_reactions = ReactionSet(
self.db, set(reaction.id for reaction in upstream_reactions)
)
if not upstream_routes:
mrich.error("No upstream routes for", product)
continue
if not upstream_reactions:
mrich.error("No upstream reactions for", product)
continue
def get_scaffold_series():
if bases := product.bases:
return bases.ids, False
else:
return [product.id], True
poses = pose_map.get(product.id, set())
d["num_poses"] = len(poses)
d["poses"] = poses
d["tags"] = product.tags
d["num_routes"] = len(upstream_routes)
d["num_reaction_steps"] = set(
len(route.reactions) for route in upstream_routes
)
d["reaction_dependencies"] = upstream_reactions.ids
d["reactant_dependencies"] = set(
sum([route.reactants.ids for route in upstream_routes], [])
)
d["route_ids"] = [route.id for route in upstream_routes]
d["chemistry_types"] = ", ".join(set(upstream_reactions.types))
series, is_base = get_scaffold_series()
d["is_scaffold"] = is_base
d["scaffold_series"] = series
inspirations = inspiration_map.get(product.id, None)
if not inspirations and not is_base:
base = product.bases[0]
inspirations = inspiration_map.get(base.id, None)
if not inspirations and "inspiration_pose_ids" in base.metadata:
inspirations = base.metadata["inspiration_pose_ids"]
if (
not inspirations
and is_base
and "inspiration_pose_ids" in product.metadata
):
inspirations = product.metadata["inspiration_pose_ids"]
if inspirations:
inspirations = PoseSet(self.db, inspirations)
d["inspirations"] = ", ".join(n for n in inspirations.names)
else:
d["inspirations"] = ""
data.append(d)
df = DataFrame(data)
mrich.writing(file)
df.to_csv(file, index=False)
if return_df:
return df
return None
[docs]
def write_chemistry_csv(
self, file: "str | Path", return_df: bool = True
) -> "pd.DataFrame | None":
"""Detailed CSV output synthetis information for chemistry types in this set"""
from pandas import DataFrame
from rich import print
from .cset import CompoundSet
from .rset import ReactionSet
data = []
# get compounds
scaffolds = CompoundSet(self.db)
for product in self.products:
if bases := product.bases:
scaffolds += bases
else:
scaffolds.add(product.compound)
routes = self.get_routes()
route_types = {}
for compound in scaffolds:
elabs = (
self.products.compounds.get_by_base(base=compound, none="quiet") or []
)
d = dict(
scaffold_id=compound.id,
product_id=compound.id,
smiles=compound.smiles,
inchikey=compound.inchikey,
num_elaborations=len(elabs),
is_scaffold=True,
)
upstream_routes = []
for route in routes:
if compound in route.products:
upstream_routes.append(route)
if not upstream_routes:
mrich.warning(f"No routes to scaffold={compound}")
continue
d["num_routes"] = len(upstream_routes)
for j, route in enumerate(upstream_routes):
d[f"route_{j+1}_num_steps"] = len(route.reactions)
group = route_types.setdefault(compound.id, set())
group.add(tuple([r.type for r in route.reactions]))
for k, reaction in enumerate(route.reactions):
key = f"route_{j+1}_reaction_{k+1}"
product = reaction.product
d[f"{key}_type"] = reaction.type
d[f"{key}_product_smiles"] = product.smiles
d[f"{key}_product_id"] = product.id
d[f"{key}_product_yield"] = reaction.product_yield
for i, reactant in enumerate(reaction.reactants):
d[f"{key}_reactant_{i+1}_smiles"] = reactant.smiles
d[f"{key}_reactant_{i+1}_id"] = reactant.id
data.append(d)
missing_bases = {}
for compound in self.products.compounds:
if compound in scaffolds:
continue
upstream_routes = []
for route in routes:
if compound in route.products:
upstream_routes.append(route)
bases = compound.bases
for base in bases:
if base.id not in route_types:
group = missing_bases.setdefault(base.id, [])
group.append(compound.id)
continue
else:
for route in upstream_routes:
chem_types = tuple([r.type for r in route.reactions])
if chem_types not in route_types[base.id]:
mrich.success(base)
mrich.success(chem_types)
raise ValueError(
"Scaffold has route not present in dataframe"
)
for base_id, elab_ids in missing_bases.items():
compound = self.db.get_compound(id=sorted(elab_ids)[0])
d = dict(
scaffold_id=base_id,
product_id=compound.id,
smiles=compound.smiles,
inchikey=compound.inchikey,
num_elaborations=len(elab_ids),
is_scaffold=False,
)
upstream_routes = []
for route in routes:
if compound in route.products:
upstream_routes.append(route)
if not upstream_routes:
mrich.error(f"No routes to elab {compound}")
raise ValueError(f"No routes to elab {compound}")
d["num_routes"] = len(upstream_routes)
for j, route in enumerate(upstream_routes):
d[f"route_{j+1}_num_steps"] = len(route.reactions)
group = route_types.setdefault(compound.id, set())
group.add(tuple([r.type for r in route.reactions]))
for k, reaction in enumerate(route.reactions):
key = f"route_{j+1}_reaction_{k+1}"
product = reaction.product
d[f"{key}_type"] = reaction.type
d[f"{key}_product_smiles"] = product.smiles
d[f"{key}_product_id"] = product.id
d[f"{key}_product_yield"] = reaction.product_yield
for i, reactant in enumerate(reaction.reactants):
d[f"{key}_reactant_{i+1}_smiles"] = reactant.smiles
d[f"{key}_reactant_{i+1}_id"] = reactant.id
data.append(d)
df = DataFrame(data)
mrich.writing(file)
df.to_csv(file, index=False)
if return_df:
return df
return None
[docs]
def copy(self) -> "Recipe":
"""Copy this recipe"""
if hasattr(self, "compounds"):
compounds = self.compounds.copy()
else:
compounds = None
return Recipe(
self.db,
products=self.products.copy(),
reactants=self.reactants.copy(),
intermediates=self.intermediates.copy(),
reactions=self.reactions.copy(),
compounds=compounds,
# supplier=self.supplier
)
def __flag_modification(self) -> None:
"""Flag this recipe as modified"""
self._product_interactions = None
self._score = None
self._product_compounds = None
self._product_poses = None
[docs]
def check_integrity(self, debug: bool = False) -> bool:
"""Verify integrity of this recipe"""
# no duplicate ingredients
if debug:
mrich.debug("Checking integrity:", self)
mrich.debug("Checking for duplicate compounds")
if len(self.reactants.compound_ids) != len(set(self.reactants.compound_ids)):
mrich.error("Reactant compound ID's are not unique")
return False
if len(self.intermediates.compound_ids) != len(
set(self.intermediates.compound_ids)
):
mrich.error("Intermediate compound ID's are not unique")
return False
if len(self.products.compound_ids) != len(set(self.products.compound_ids)):
mrich.error("Product compound ID's are not unique")
return False
# all references should exist
if debug:
mrich.debug("Checking for missing references")
if self.db.count_where(
table="reaction", key=f"reaction_id IN {self.reactions.str_ids}"
) < len(self.reactions):
mrich.error("Not all Reactions in Database")
return False
if self.db.count_where(
table="compound", key=f"compound_id IN {self.product_compounds.str_ids}"
) < len(self.products):
mrich.error("Not all product Compounds in Database")
return False
if self.db.count_where(
table="compound", key=f"compound_id IN {self.reactants.compounds.str_ids}"
) < len(self.reactants):
mrich.error("Not all reactant Compounds in Database")
return False
if self.db.count_where(
table="compound",
key=f"compound_id IN {self.intermediates.compounds.str_ids}",
) < len(self.intermediates):
mrich.error("Not all intermediate Compounds in Database")
return False
reaction_intermediates = self.reactions.intermediates
reaction_products = self.reactions.products
reaction_reactants = self.reactions.reactants
if debug:
mrich.debug("Checking for missing reactions")
# all products should have a reaction
for product in self.products:
if product not in reaction_products:
mrich.error(f"Product: {product} does not have associated reaction")
return False
# intermediates
for intermediate in self.intermediates:
if intermediate not in reaction_intermediates:
mrich.error(
f"Intermediate: {intermediate} is not in self.reactions.intermediates"
)
return False
# reactants
for reactant in self.reactants:
if reactant not in reaction_reactants:
mrich.error(f"Reactant: {reactant} is not in self.reactions.reactants")
return False
# all reactions should have enough reactant
if debug:
mrich.debug("Checking reactant quantities")
for reaction in self.reactions:
product_ingredient = self.products(compound_id=reaction.product_id)
if product_ingredient is None:
product_ingredient = self.intermediates(compound_id=reaction.product_id)
if debug and reaction.product_yield < 1.0:
mrich.debug(f"{reaction}.product_yield={reaction.product_yield}")
for reactant in reaction.reactants:
reactant_ingredient = self.intermediates(compound_id=reactant.id)
if reactant_ingredient is None:
reactant_ingredient = self.reactants(compound_id=reactant.id)
required_amount = product_ingredient.amount / reaction.product_yield
if reactant_ingredient.amount < required_amount:
mrich.error(
f"Not enough of {reactant_ingredient.compound}: {reactant_ingredient.amount} < {required_amount}"
)
return False
if debug:
mrich.success(self, "OK")
return True
[docs]
def add_ingredient(self, ingredient: "Ingredient", amount: float = 1):
"""Add an :class:`.Ingredient` object for direct purchase (no associated reactions)"""
self.compounds.add(ingredient)
### DUNDERS
[docs]
def __str__(self) -> str:
"""Unformatted string representation"""
if self.score:
s = f"(score={self.score:.3f})"
else:
s = ""
if self.hash:
return f"Recipe_{self.hash}{s}"
return f"Recipe{s}"
def __longstr(self) -> str:
"""Unformatted string representation"""
if self.empty:
return f"Empty Recipe()"
if self.reactions:
if self.intermediates:
s = f"{self.reactants} --> {self.intermediates} --> {self.products} via {self.reactions}"
else:
s = f"{self.reactants} --> {self.products} via {self.reactions}"
if self.score:
s += f", score={self.score:.3f}"
if self.hash:
return f"Recipe_{self.hash}({s})"
return f"Recipe({s})"
else:
s = f"{self.compounds}"
if self.hash:
return f"Recipe_{self.hash}({s})"
return f"Recipe(#compounds={self.num_compounds} [no-chem])"
[docs]
def __repr__(self) -> str:
"""ANSI Formatted string representation"""
return f"{mcol.bold}{mcol.underline}{self.__longstr()}{mcol.unbold}{mcol.ununderline}"
def __rich__(self) -> str:
"""Rich Formatted string representation"""
return f"[bold underline]{self.__longstr()}"
def __add__(self, other: "Recipe"):
result = self.copy()
result.reactants += other.reactants
result.intermediates += other.intermediates
result.reactions += other.reactions
result.products += other.products
if hasattr(other, "compounds"):
result.compounds += other.compounds
return result
[docs]
class Route(Recipe):
"""A recipe with a single product, that is stored in the database"""
def __init__(self, db, *, route_id, product, reactants, intermediates, reactions):
from .cset import IngredientSet
from .rset import ReactionSet
# check typing
assert isinstance(product, IngredientSet)
assert isinstance(reactants, IngredientSet)
assert isinstance(intermediates, IngredientSet)
assert isinstance(reactions, ReactionSet)
assert len(product) == 1
assert isinstance(route_id, int)
assert route_id
self._id = route_id
self._products = product
self._product_id = product.ids[0]
self._reactants = reactants
self._intermediates = intermediates
self._reactions = reactions
self._db = db
### FACTORIES
[docs]
@classmethod
def from_json(
cls, db: "Database", path: "str | Path", data: dict = None
) -> "Route":
"""Load a serialised route from a JSON file
:param db: database to link
:param path: path to JSON
:param data: serialised data (Default value = None)
"""
import json
from .cset import IngredientSet
from .rset import ReactionSet
if data is None:
data = json.load(open(path, "rt"))
self = cls.__new__(cls)
self._db = db
self._id = data["id"]
self._product_id = data["product_id"]
self._products = IngredientSet.from_compounds(
compounds=None, ids=[self._product_id], db=db
) # IngredientSet
self._reactants = IngredientSet.from_json(
db=db,
path=None,
data=data["reactants"]["data"],
supplier=data["reactants"]["supplier"],
)
self._intermediates = IngredientSet.from_json(
db=db,
path=None,
data=data["intermediates"]["data"],
supplier=data["intermediates"]["supplier"],
)
self._reactions = ReactionSet(
db=db, indices=data["reactions"]["indices"]
) # ReactionSet
return self
### PROPERTIES
@property
def product(self) -> "Ingredient":
"""Product ingredient"""
return self._products[0]
@property
def product_compound(self) -> "Compound":
"""Product compound"""
return self.product.compound
@property
def id(self) -> int:
"""Route ID"""
return self._id
@property
def price(self) -> "Price":
"""Get the price of the reactants"""
return self.reactants.price
### METHODS
[docs]
def get_dict(self) -> dict:
"""Serialisable dictionary"""
data = {}
data["id"] = self.id
data["product_id"] = self.product.id
data["reactants"] = self.reactants.get_dict()
data["intermediates"] = self.intermediates.get_dict()
data["reactions"] = self.reactions.get_dict()
return data
### DUNDERS
[docs]
def __str__(self) -> str:
"""Unformatted string representation"""
return f"Route #{self.id}: {self.product_compound}"
[docs]
def __repr__(self) -> str:
"""ANSI Formatted string representation"""
return f"{mcol.bold}{mcol.underline}{self}{mcol.unbold}{mcol.ununderline}"
def __rich__(self) -> str:
"""Rich Formatted string representation"""
return f"[bold underline]{self}"
[docs]
class RouteSet:
"""A set of Route objects"""
def __init__(self, db, routes):
data = {}
for route in routes:
# assert isinstance(route, Route)
data[route.id] = route
self._data = data
self._db = db
self._cluster_map = None
self._permitted_clusters = None
self._current_cluster = None
### FACTORIES
[docs]
@classmethod
def from_json(
cls, db: "Database", path: "str | Path", data: dict = None
) -> "RouteSet":
"""Load a serialised routeset from a JSON file
:param db: database to link
:param path: path to JSON
:param data: serialised data (Default value = None)
"""
self = cls.__new__(cls)
if data is None:
import json
data = json.load(open(path, "rt"))
new_data = {}
for d in mrich.track(data["routes"].values(), prefix="Loading Routes..."):
route_id = d["id"]
new_data[route_id] = Route.from_json(db=db, path=None, data=d)
self._data = new_data
self._db = db
self._cluster_map = None
self._permitted_clusters = None
self._current_cluster = None
return self
### PROPERTIES
@property
def data(self) -> "dict[int, Route]":
"""Get internal data dictionary"""
return self._data
@property
def db(self):
"""Get associated database"""
return self._db
@property
def routes(self) -> "list[Route]":
"""Get route objects"""
return self.data.values()
@property
def product_ids(self) -> list[int]:
"""Get the :class:`.Compound` ID's of the products"""
ids = self.db.select_where(
table="route",
query="route_product",
key=f"route_id IN {self.str_ids}",
multiple=True,
)
return [i for i, in ids]
@property
def products(self) -> "CompoundSet":
"""Return a :class:`.CompoundSet` of all the route products"""
from .cset import CompoundSet
return CompoundSet(self.db, self.product_ids)
@property
def str_ids(self) -> str:
"""Return an SQL formatted tuple string of the :class:`.Route` ID's"""
return str(tuple(self.ids)).replace(",)", ")")
@property
def ids(self) -> list[int]:
"""Return the :class:`.Route` IDs"""
return self.data.keys()
@property
def cluster_map(self) -> dict[tuple, set]:
"""Create a dictionary grouping routes by their scaffold/base cluster.
:returns: A dictionary mapping a tuple of scaffold :class:`.Compound` IDs to a set of :class:`.Route` ID's to their superstructures.
"""
if self._cluster_map is None:
# get route mapping
pairs = self.db.select_where(
query="route_product, route_id",
key=f"route_id IN {self.str_ids}",
table="route",
multiple=True,
)
route_map = {route_product: route_id for route_product, route_id in pairs}
# group compounds by cluster
compound_clusters = self.db.get_compound_cluster_dict(cset=self.products)
# create the map
self._cluster_map = {}
for cluster, compounds in compound_clusters.items():
self._cluster_map[cluster] = []
for compound in compounds:
route_id = route_map.get(compound, None)
if not route_id:
continue
self._cluster_map[cluster].append(route_id)
if not self._cluster_map[cluster]:
del self._cluster_map[cluster]
return self._cluster_map
### METHODS
[docs]
def copy(self) -> "RouteSet":
"""Copy this RouteSet"""
return RouteSet(self.db, self.data.values())
[docs]
def set_db_pointers(self, db: "Database") -> None:
"""
:param db:
"""
self._db = db
for route in self.data.values():
route._db = db
# def clear_db_pointers(self):
# """ """
# self._db = None
# for route in self.data.values():
# route._db = None
# def get_dict(self):
# """Get serialisable dictionary"""
# data = dict(db=str(self.db), routes={})
# # populate with routes
# for route_id, route in self.data.items():
# data["routes"][route_id] = route.get_dict()
# return data
[docs]
def pop_id(self) -> int:
"""Pop the last route from the set and return it's id"""
route_id, route = self.data.popitem()
return route_id
[docs]
def pop(self) -> "Route":
"""Pop the last route from the set and return it's object"""
route_id, route = self.data.popitem()
return route
[docs]
def balanced_pop(
self, permitted_clusters: set[tuple] | None = None, debug: bool = False
) -> "Route":
"""Pop a route from this set, while maintaining the balance of scaffold clusters populations"""
if not self._data:
mrich.print("RouteSet depleted")
return None
if not self.cluster_map:
# mrich.warning("RouteSet.cluster_map depleted but _data isn't...")
return self.pop()
# store the permitted clusters (or all clusters) list as property
if self._permitted_clusters is None:
if permitted_clusters:
permitted_clusters = set(
(cluster,) if isinstance(cluster, int) else cluster
for cluster in permitted_clusters
)
self._permitted_clusters = []
for cluster in permitted_clusters:
if cluster not in self.cluster_map:
mrich.warning(
cluster, "in permitted_clusters but not cluster_map"
)
else:
self._permitted_clusters.append(cluster)
else:
self._permitted_clusters = list(self.cluster_map.keys())
if self._current_cluster is None:
self._current_cluster = self._permitted_clusters[0]
### pop a Route
if debug:
mrich.debug(f"Would pop Route from {self._current_cluster=}")
cluster = self._current_cluster
# pop the last route id from the given cluster
try:
route_id = self.cluster_map[cluster].pop()
except IndexError:
mrich.print(self._permitted_clusters)
mrich.print(self.cluster_map)
raise
except AttributeError:
mrich.print(cluster)
mrich.print(self.cluster_map)
raise
except KeyError:
mrich.print("cluster", cluster)
mrich.print("self._permitted_clusters", self._permitted_clusters)
mrich.print("self.cluster_map.keys()", self.cluster_map.keys())
raise
# clean up empty clusters
if debug:
mrich.debug("Popped route", route_id)
# get the Route object
if route_id in self._data:
route = self._data[route_id]
del self._data[route_id]
else:
# if debug:
mrich.debug("Route not present")
return self.balanced_pop()
### increment cluster
# def increment_cluster(cluster):
n = len(self._permitted_clusters)
if n > 1:
for i, cluster in enumerate(self._permitted_clusters):
if cluster == self._current_cluster:
if i == n - 1:
self._current_cluster = self._permitted_clusters[0]
else:
self._current_cluster = self._permitted_clusters[i + 1]
break
else:
raise IndexError("This should never be reached...")
# increment_cluster()
if not self.cluster_map[cluster]:
del self.cluster_map[cluster]
if not self.cluster_map:
mrich.debug("RouteSet.cluster_map depleted")
self._permitted_clusters = [
c for c in self._permitted_clusters if c != cluster
]
# if debug:
mrich.debug("Depleted cluster", cluster)
if not self._permitted_clusters:
mrich.debug("Depleted all permitted clusters", cluster)
mrich.debug("Removing cluster restriction", cluster)
self._permitted_clusters = list(self.cluster_map.keys())
self._current_cluster = None
if debug:
mrich.debug("#Routes in set", len(self._data))
return route
[docs]
def shuffle(self):
"""Randomly shuffle the routes in this set"""
import random
items = list(self.data.items())
random.shuffle(items)
self._data = dict(items)
### shuffle the cluster map as well
for cluster, routes in self.cluster_map.items():
random.shuffle(routes)
self.cluster_map[cluster] = routes
### DUNDERS
[docs]
def __len__(self) -> int:
"""Number of routes in this set"""
return len(self.data)
[docs]
def __str__(self) -> str:
"""Unformatted string representation"""
return "{" f"Route × {len(self)}" "}"
[docs]
def __repr__(self) -> str:
"""ANSI Formatted string representation"""
return f"{mcol.bold}{mcol.underline}{self}{mcol.unbold}{mcol.ununderline}"
def __rich__(self) -> str:
"""Rich Formatted string representation"""
return f"[bold underline]{self}"
def __iter__(self):
return iter(self.data.values())
class RecipeSet:
"""A set of recipes stored on disk"""
def __init__(self, db, directory, pattern="*.json"):
from pathlib import Path
from json import JSONDecodeError
self._db = db
self._json_directory = Path(directory)
self._json_pattern = pattern
self._json_paths = {}
for path in self._json_directory.glob(self._json_pattern):
self._json_paths[
path.name.removeprefix("Recipe_").removesuffix(".json")
] = path.resolve()
mrich.reading(f"{directory}/{pattern}")
self._recipes = {}
for key, path in mrich.track(
self._json_paths.items(), prefix="Loading recipes"
):
try:
recipe = Recipe.from_json(
db=self.db,
path=path,
allow_db_mismatch=True,
debug=False,
db_mismatch_warning=False,
)
except JSONDecodeError:
mrich.error(f"Bad JSON in {path}")
continue
recipe._hash = key
self._recipes[key] = recipe
mrich.success("Loaded", len(self), "Recipes")
### FACTORIES
### PROPERTIES
@property
def db(self) -> "Database":
"""Associated database"""
return self._db
### METHODS
def get_values(
self,
key: str,
progress: bool = False,
serialise_price: bool = False,
):
"""Get values of member recipes associated with attribute ``key``
:param key: attribute to query/calculate
:param progress: show a progress bar
:param serialise_price: serialise price objects
"""
values = []
recipes = self._recipes.values()
if progress:
recipes = mrich.track(recipes, prefix=f"Calculating {self} values...")
for recipe in recipes:
value = getattr(recipe, key)
if serialise_price and key == "price":
value = value.amount
values.append(value)
return values
def get_df(self, **kwargs) -> "pandas.DataFrame":
"""Get dataframe of recipe dictionaries. See :meth:`.Recipe.get_dict`"""
data = []
for recipe in self:
d = recipe.get_dict(
# reactant_supplier=False,
database=False,
timestamp=False,
**kwargs,
# timestamp=False,
)
data.append(d)
from pandas import DataFrame
return DataFrame(data)
def items(self) -> "list[tuple[str, Recipe]]":
"""Get data dictionary items"""
return self._recipes.items()
def keys(self) -> list[str]:
"""Get data dictionary keys (recipe hashes)"""
return self._recipes.keys()
### DUNDERS
def __len__(self) -> int:
"""Number of recipes in this set"""
return len(self._recipes)
def __getitem__(
self,
key: int | str,
) -> Recipe:
match key:
case int():
return list(self._recipes.values())[key]
case str():
return self._recipes[key]
case _:
mrich.error(
f"Unsupported type for RecipeSet.__getitem__(): {key=} {type(key)}"
)
return None
def __iter__(self):
return iter(self._recipes.values())
def __contains__(self, key):
assert isinstance(key, str)
return key in self._recipes
def __str__(self) -> str:
"""Unformatted string representation"""
return "{" f"Recipe × {len(self)}" "}"
def __repr__(self) -> str:
"""ANSI Formatted string representation"""
return f"{mcol.bold}{mcol.underline}{self}{mcol.unbold}{mcol.ununderline}"
def __rich__(self) -> str:
"""Rich Formatted string representation"""
return f"[bold underline]{self}"