Source code for hippo.recipe

"""Classes for working with Recipes (reaction networks)"""

import mcol
import mrich

from dataclasses import dataclass, field

from .compound import Ingredient


[docs] class Recipe: """A Recipe stores data corresponding to a specific synthetic recipe involving several products, reactants, intermediates, and reactions.""" _db = None def __init__( self, db: "Database", *, products: "IngredientSet | None" = None, reactants: "IngredientSet | None" = None, intermediates: "IngredientSet | None" = None, reactions: "ReactionSet | None" = None, compounds: "IngredientSet | None" = None, ) -> None: """Recipe initialisation""" from .cset import IngredientSet from .rset import ReactionSet if products is None: products = IngredientSet(db) if reactants is None: reactants = IngredientSet(db) if intermediates is None: intermediates = IngredientSet(db) if compounds is None: compounds = IngredientSet(db) if reactions is None: reactions = ReactionSet(db) # check typing assert isinstance(products, IngredientSet) assert isinstance(reactants, IngredientSet) assert isinstance(intermediates, IngredientSet) assert isinstance(compounds, IngredientSet) assert isinstance(reactions, ReactionSet) self._products = products self._reactants = reactants self._intermediates = intermediates self._reactions = reactions self._compounds = compounds self._db = db self._hash = None self._score = None # caches self._product_compounds = None self._poses = None self._interactions = None self._combined_compounds = None ### FACTORIES
[docs] @classmethod def from_reaction( cls, reaction, amount=1, *, debug: bool = False, pick_cheapest: bool = True, permitted_reactions: "ReactionSet | None" = None, quoted_only: bool = False, supplier: None | str = None, unavailable_reaction: str = "error", reaction_checking_cache: dict[int, bool] = None, reaction_reactant_cache: dict[int, bool] = None, inner: bool = False, get_ingredient_quotes: bool = True, ) -> "Recipe | list[Recipe]": """Create a :class:`.Recipe` from a :class:`.Reaction` and its upstream dependencies :param reaction: reaction to create recipe from :param amount: amount in ``mg`` (Default value = 1) :param debug: bool: increase verbosity for debugging (Default value = False) :param pick_cheapest: bool: choose the cheapest solution (Default value = True) :param permitted_reactions: once consider reactions in this set (Default value = None) :param quoted_only: bool: only allow reactants with quotes (Default value = False) :param supplier: None | str: optionally restrict quotes to only this supplier (Default value = None) :param unavailable_reaction: define the behaviour for when a reaction has unavailable reactants (Default value = 'error') :param inner: used to indicate that this is a recursive call (Default value = False) :param get_ingredient_quotes: get quotes for ingredients in this recipe """ from .reaction import Reaction assert isinstance(reaction, Reaction) from .cset import IngredientSet from .rset import ReactionSet if debug: mrich.debug( f"Recipe.from_reaction(R{reaction.id}, {amount=}, {pick_cheapest=})" ) mrich.debug(f"{reaction.product.id=}") mrich.debug(f"{reaction.reactants.ids=}") if permitted_reactions: assert reaction in permitted_reactions # raise NotImplementedError db = reaction.db recipe = cls.__new__(cls) recipe.__init__( db, products=IngredientSet( db, [ reaction.product.as_ingredient( amount=amount, get_quote=get_ingredient_quotes ) ], ), reactants=IngredientSet(db, [], supplier=supplier), intermediates=IngredientSet(db, []), reactions=ReactionSet(db, [reaction.id], sort=False), ) recipes = [recipe] if quoted_only or supplier: if debug: mrich.debug(f"Checking reactant_availability: {reaction=}") if reaction_checking_cache and reaction.id in reaction_checking_cache: ok = reaction_checking_cache[reaction.id] print("reaction_checking_cache used") else: ok = reaction.check_reactant_availability(supplier=supplier) # print('cache not used') if reaction_checking_cache is not None: reaction_checking_cache[reaction.id] = ok if not ok: if unavailable_reaction == "error": mrich.error(f"Reactants not available for {reaction=}") if pick_cheapest: return None else: return [] def get_reactant_amount_pairs(reaction: "Reaction") -> list[tuple[int, float]]: """Get pairs of reactant ID and float amounts""" if reaction_reactant_cache and reaction.id in reaction_reactant_cache: print("reaction_reactant_cache used") return reaction_reactant_cache[reaction.id] else: pairs = reaction.get_reactant_amount_pairs(compound_object=False) if reaction_reactant_cache is not None: reaction_reactant_cache[reaction.id] = pairs return pairs if debug: mrich.debug(f"get_reactant_amount_pairs({reaction.id})") pairs = get_reactant_amount_pairs(reaction) for reactant, reactant_amount in pairs: reactant = db.get_compound(id=reactant) if debug: mrich.debug(f"{reactant.id=}, {reactant_amount=}") # scale amount reactant_amount *= amount reactant_amount /= reaction.product_yield inner_reactions = reactant.get_reactions( none="quiet", permitted_reactions=permitted_reactions ) if inner_reactions: if debug: if len(inner_reactions) == 1: mrich.debug(f"Reactant has ONE inner reaction") else: mrich.warning(f"{reactant=} has MULTIPLE inner reactions") new_recipes = [] inner_recipes = [] for reaction in inner_reactions: reaction_recipes = Recipe.from_reaction( reaction=reaction, amount=reactant_amount, debug=debug, pick_cheapest=False, quoted_only=quoted_only, supplier=supplier, unavailable_reaction=unavailable_reaction, reaction_checking_cache=reaction_checking_cache, reaction_reactant_cache=reaction_reactant_cache, inner=True, ) inner_recipes += reaction_recipes for recipe in recipes: for inner_recipe in inner_recipes: combined_recipe = recipe.copy() combined_recipe.reactants += inner_recipe.reactants combined_recipe.intermediates += inner_recipe.intermediates combined_recipe.reactions += inner_recipe.reactions combined_recipe.intermediates.add( reactant.as_ingredient(reactant_amount, supplier=supplier) ) new_recipes.append(combined_recipe) recipes = new_recipes else: ingredient = reactant.as_ingredient(reactant_amount, supplier=supplier) for recipe in recipes: recipe.reactants.add(ingredient) # reverse ReactionSet's if not inner: for recipe in recipes: recipe.reactions.reverse() if pick_cheapest: if debug: mrich.debug("Picking cheapest") priced = [r for r in recipes if r.get_price(supplier=supplier)] # priced = [r for r in recipes if r.price] if not priced: mrich.error("0 recipes with prices, can't choose cheapest") return recipes sorted_recipes = sorted( priced, key=lambda r: r.get_price(supplier=supplier) ) if debug: for recipe in recipes: mrich.debug(f"{recipe}, {recipe.price}") return sorted_recipes[0] # return sorted(priced, key=lambda r: r.price)[0] return recipes
[docs] @classmethod def from_reactions( cls, reactions: "ReactionSet", amount: float = 1, pick_cheapest: bool = True, permitted_reactions: "ReactionSet | None" = None, final_products_only: bool = True, return_products: bool = False, supplier: str | None = None, use_routes: bool = False, debug: bool = False, **kwargs, ) -> "Recipe | list[Recipe] | CompoundSet": """Create a :class:`.Recipe` from a :class:`.ReactionSet` and its upstream dependencies :param reactions: reactions to create recipe from :param amount: amount in ``mg`` (Default value = 1) :param debug: bool: increase verbosity for debugging (Default value = False) :param pick_cheapest: bool: choose the cheapest solution (Default value = True) :param permitted_reactions: once consider reactions in this set (Default value = None) :param final_products_only: don't get routes to intermediates (Default value = True) :param return_products: return the :class:`.CompoundSet` of products instead (Default value = False) """ from .rset import ReactionSet from .cset import IngredientSet, CompoundSet assert isinstance(reactions, ReactionSet) db = reactions.db if debug: mrich.debug("Recipe.from_reactions()") mrich.var("reactions", reactions) mrich.var("amount", amount) mrich.var("final_products_only", final_products_only) mrich.var("permitted_reactions", permitted_reactions) # get all the products products = reactions.products if debug: mrich.var("products", products) # return products if final_products_only: if debug: mrich.var("products.str_ids", products.str_ids) # raise NotImplementedError ids = reactions.db.execute( f""" SELECT DISTINCT compound_id FROM compound LEFT JOIN reactant ON compound_id = reactant_compound WHERE reactant_compound IS NULL AND compound_id IN {products.str_ids} """ ).fetchall() ids = [i for i, in ids] products = CompoundSet(db, ids) if debug: mrich.var("final products", products) # return ids if return_products: return products recipe = Recipe.from_compounds( compounds=products, amount=amount, permitted_reactions=reactions, pick_cheapest=pick_cheapest, supplier=supplier, use_routes=use_routes, **kwargs, ) return recipe
[docs] @classmethod def from_compounds( cls, compounds: "CompoundSet", amount: float = 1, debug: bool = False, pick_cheapest: bool = True, permitted_reactions=None, quoted_only: bool = False, supplier: None | str = None, solve_combinations: bool = True, pick_first: bool = False, warn_multiple_solutions: bool = True, pick_cheapest_inner_routes: bool = False, unavailable_reaction: str = "error", reaction_checking_cache: dict[int, bool] | None = None, reaction_reactant_cache: dict[int, bool] | None = None, use_routes: bool = False, **kwargs, ): """Create recipe(s) to synthesis products in the :class:`.CompoundSet` :param compounds: set of compounds to find routes for :param solve_combinations: bool: combinatorially combine all individual routes (Default value = True) :param pick_first: return the first solution without comparison (Default value = False) :param warn_multiple_solutions: warn if a compound has multiple routes (Default value = True) :param pick_cheapest_inner_routes: for each compound choose the cheapest route (Default value = False) :param reaction: reaction to create recipe from :param amount: amount in ``mg`` (Default value = 1) :param debug: bool: increase verbosity for debugging (Default value = False) :param pick_cheapest: bool: choose the cheapest solution (Default value = True) :param permitted_reactions: once consider reactions in this set (Default value = None) :param quoted_only: bool: only allow reactants with quotes (Default value = False) :param supplier: None | str: optionally restrict quotes to only this supplier (Default value = None) :param unavailable_reaction: define the behaviour for when a reaction has unavailable reactants (Default value = 'error') """ from .cset import CompoundSet assert isinstance(compounds, CompoundSet) db = compounds.db n_comps = len(compounds) assert n_comps if not hasattr(amount, "__iter__"): amount = [amount] * n_comps if use_routes: route_lookup = db.get_product_id_routes_dict() if supplier: raise NotImplementedError # supplier_lookup = db.get_compound_id_suppliers_dict() options = [] ok = 0 mrich.var("#compounds", n_comps) for comp, a in mrich.track( zip(compounds, amount), prefix="Solving individual compound recipes...", total=n_comps, ): comp_options = [] if use_routes: if comp.id not in route_lookup: mrich.error("No routes to", comp) continue comp_options = [] for route_id in route_lookup[comp.id]: route = db.get_route(id=route_id) comp_options.append(route) else: for reaction in comp.reactions: if permitted_reactions and reaction not in permitted_reactions: continue sol = Recipe.from_reaction( reaction=reaction, amount=a, pick_cheapest=pick_cheapest_inner_routes, debug=debug, permitted_reactions=permitted_reactions, quoted_only=quoted_only, supplier=supplier, unavailable_reaction=unavailable_reaction, reaction_checking_cache=reaction_checking_cache, reaction_reactant_cache=reaction_reactant_cache, **kwargs, ) if pick_cheapest_inner_routes: if sol: comp_options.append(sol) else: assert isinstance(sol, list) comp_options += sol if not comp_options: mrich.error( f"No solutions for compound={comp} ({comp.reactions.ids=})" ) continue if pick_cheapest and len(comp_options) > 1: if warn_multiple_solutions: mrich.warning( f"Multiple solutions for", comp, "(", len(comp_options), ")" ) if debug: mrich.debug("Picking cheapest...") priced = [r for r in comp_options if r.price] comp_options = sorted(priced, key=lambda r: r.price)[:1] if warn_multiple_solutions and len(comp_options) > 1: mrich.warning(f"Multiple solutions for compound={comp}") if debug: mrich.debug(f"{comp_options=}") else: if n_comps <= 200: mrich.success(f"Found solution for compound={comp}") ok += 1 mrich.set_progress_field("ok", ok) mrich.set_progress_field("n", n_comps) options.append(comp_options) assert all(options) from itertools import product mrich.print("Solving recipe combinations...") combinations = list(product(*options)) if not solve_combinations: return combinations solutions = [] if n_comps > 1: generator = mrich.track( combinations, prefix="Combining recipes...", total=len(combinations) ) else: generator = combinations ok = 0 for combo in generator: if debug: mrich.debug(f"Combination of {len(combo)} recipes") if not combo: continue solution = combo[0] for i, recipe in enumerate(combo[1:]): if debug: mrich.debug(i + 1) solution += recipe solutions.append(solution) ok += 1 mrich.set_progress_field("ok", ok) mrich.set_progress_field("n", len(combinations)) if not solutions: mrich.error("No solutions") return None if pick_first: return solutions[0] if pick_cheapest: mrich.debug("Calculating prices...") priced = [r for r in solutions if r.price] mrich.print("Picking cheapest from", len(priced), "options") if not priced: mrich.error("0 recipes with prices, can't choose cheapest") return solutions return sorted(priced, key=lambda r: r.price)[0] return solutions
[docs] @classmethod def from_reactants( cls, reactants: "CompoundSet | IngredientSet", amount: float = 1, debug: bool = False, return_products: bool = False, supplier: str | None = None, pick_cheapest: bool = False, use_routes: bool = False, **kwargs, ) -> "list[Recipe] | Recipe | CompoundSet": """Find the maximal recipe from a given set of reactants :param reactants: :class:`.CompoundSet` or :class:`.IngredientSet` for the reactants. Ingredient amounts are ignored :param amount: amount of each product needed (Default value = 1) :param debug: increase verbosity (Default value = False) :param return_products: return products instead of recipe (Default value = False) :param kwargs: passed to :meth:`.Recipe.from_reactions` """ from .cset import IngredientSet if isinstance(reactants, IngredientSet): reactant_ids = reactants.compound_ids else: reactant_ids = reactants.ids db = reactants.db all_reactants = set(reactant_ids) possible_reactions = [] # recursively search for possible reactions for i in range(300): if debug: mrich.debug(i) # reaction_ids = db.get_possible_reaction_ids(compound_ids=compound_ids) reaction_ids = db.get_possible_reaction_ids(compound_ids=all_reactants) if not reaction_ids: break if debug: mrich.debug(f"Adding {len(reaction_ids)} reactions") possible_reactions += reaction_ids if debug: mrich.var("reaction_ids", reaction_ids) product_ids = db.get_possible_reaction_product_ids( reaction_ids=reaction_ids ) if debug: mrich.var("product_ids", product_ids) n_prev = len(all_reactants) all_reactants |= set(product_ids) if n_prev == len(all_reactants): break else: raise NotImplementedError("Maximum recursion depth exceeded") possible_reactions = list(set(possible_reactions)) if debug: mrich.var("all possible reactions", possible_reactions) from .rset import ReactionSet rset = ReactionSet(db, possible_reactions, sort=False) recipe = cls.from_reactions( rset, amount=amount, permitted_reactions=rset, debug=debug, return_products=return_products, supplier=supplier, use_routes=use_routes, **kwargs, ) return recipe
[docs] @classmethod def from_json( cls, db: "Database", path: "str | Path", debug: bool = True, allow_db_mismatch: bool = False, clear_quotes: bool = False, data: dict = None, db_mismatch_warning: bool = True, ): """Load a serialised recipe from a JSON file :param db: database to link :param path: path to JSON :param debug: increase verbosity (Default value = True) :param allow_db_mismatch: allow a database mismatch (Default value = False) :param clear_quotes: ignore reactant quotes (Default value = False) :param data: serialised data (Default value = None) """ # imports import json from .cset import IngredientSet from .rset import ReactionSet # load JSON if not data: if debug: mrich.reading(path) data = json.load(open(path, "rt")) # check metadata if str(db.path.resolve()) != data["database"]: if db_mismatch_warning: mrich.var("session", str(db.path.resolve())) mrich.var("in file", data["database"]) if allow_db_mismatch: if db_mismatch_warning: mrich.warning("Database path mismatch") else: mrich.error( "Database path mismatch, set allow_db_mismatch=True to ignore" ) return None if debug: mrich.print(f'Recipe was generated at: {data["timestamp"]}') price = data["price"] # IngredientSets products = IngredientSet.from_ingredient_dicts(db, data["products"]) intermediates = IngredientSet.from_ingredient_dicts(db, data["intermediates"]) reactants = IngredientSet.from_ingredient_dicts( db, data["reactants"], supplier=data["reactant_supplier"] ) if "compounds" in data: compounds = IngredientSet.from_ingredient_dicts( db, data["compounds"], supplier=data["compound_supplier"] ) else: compounds = IngredientSet(db) if clear_quotes: reactants.df["quote_id"] = None reactants.df["quoted_amount"] = None compounds.df["quote_id"] = None compounds.df["quoted_amount"] = None # ReactionSet reactions = ReactionSet(db, data["reaction_ids"], sort=False) if debug: mrich.var("reactants", reactants) mrich.var("intermediates", intermediates) mrich.var("products", products) mrich.var("reactions", reactions) mrich.var("compounds", compounds) # Create the object self = cls.__new__(cls) self.__init__( db, products=products, reactants=reactants, intermediates=intermediates, reactions=reactions, compounds=compounds, ) return self
### PROPERTIES @property def db(self) -> "Database": """Associated :class:`.Database:""" return self._db @property def products(self) -> "IngredientSet": """Product :class:`.IngredientSet`""" return self._products @property def compounds(self) -> "IngredientSet": """Product :class:`.IngredientSet`""" return self._compounds @compounds.setter def compounds(self, a: "IngredientSet"): """Set the compounds""" self._compounds = a self.__flag_modification() @property def poses(self) -> "PoseSet": """Product poses""" if self._poses is None: self._poses = self.combined_compounds.poses self._poses._name = f"poses of {self}" return self._poses @property def product_compounds(self) -> "CompoundSet": """Product compounds""" if self._product_compounds is None: self._product_compounds = self.products.compounds self._product_compounds._name = f"products of {self}" return self._product_compounds @property def combined_compound_ids(self) -> set[int]: """Combined :class:`.Compound` IDs from :meth:`.Recipe.product_compounds` and :meth:`.Recipe.compounds`""" return set(self.product_compounds.ids) | set(self.compounds.ids) @property def combined_compounds(self) -> "CompoundSet": """Combined product and no-chem compounds""" if self._combined_compounds is None: from .cset import CompoundSet self._combined_compounds = CompoundSet(self.db, self.combined_compound_ids) self._combined_compounds._name = f"combined compounds of {self}" return self._combined_compounds @property def interactions(self) -> "InteractionSet": """Product pose interactions""" if self._interactions is None: self._interactions = self.poses.interactions return self._interactions @property def product(self) -> "Ingredient": """Return single product (if there's only one)""" assert len(self.products) == 1 return self.products[0] @products.setter def products(self, a: "IngredientSet"): """Set the products""" self._products = a self.__flag_modification() @property def reactants(self): """Reactant :class:`.IngredientSet`""" return self._reactants @reactants.setter def reactants(self, a: "IngredientSet"): """Set the reactants""" self._reactants = a self.__flag_modification() @property def intermediates(self) -> "IngredientSet": """Intermediates :class:`.IngredientSet`""" return self._intermediates @intermediates.setter def intermediates(self, a: "IngredientSet"): """Set the intermediates""" self._intermediates = a self.__flag_modification() @property def reactions(self) -> "ReactionSet": """Intermediates :class:`.IngredientSet`""" return self._reactions @reactions.setter def reactions(self, a: "ReactionSet"): """Set the reactions""" self._reactions = a self.__flag_modification() @property def price(self) -> "Price": """Get the price of the reactants""" return self.reactants.get_price() + self.compounds.get_price() @property def num_products(self) -> int: """Return the number of products""" return len(self.products) @property def num_compounds(self) -> int: """Return the number of compounds""" return len(self.combined_compound_ids) @property def num_reactions(self): """Return the number of reactions""" return len(self.reactions) @property def num_reaction_types(self): """Return the number of reactions""" return self.reactions.num_types @property def num_reactants(self): """Return the number of reactants""" return len(self.reactants) @property def num_intermediates(self): """Return the number of intermediates""" return len(self.intermediates) @property def hash(self) -> str: """Return the unique hash string""" return self._hash @property def score(self): """Return the Recipe score""" return self._score @property def type(self) -> str: """Get Recipe type (EMPTY/MIXED/CHEM/NOCHEM)""" if self.empty: return "EMPTY" chem = bool(self.reactions) nochem = bool(self.compounds) if chem and nochem: return "MIXED" if chem and not nochem: return "CHEM" if nochem and not chem: return "NOCHEM" @property def empty(self) -> bool: """Is this Recipe empty?""" if self.reactants: return False if self.products: return False if self.intermediates: return False if self.reactions: return False if self.compounds: return False return True ### METHODS
[docs] def get_price(self, supplier: str | None = None) -> "Price": """get the reactants price. See :meth:`.IngredientSet.get_price` :param supplier: restrict quotes to this supplier """ return self.reactants.get_price(supplier=supplier)
[docs] def draw(self, color_mapper=None, node_size=300, graph_only=False): """draw graph of the reaction network :param color_mapper: (Default value = None) :param node_size: (Default value = 300) :param graph_only: (Default value = False) """ import networkx as nx color_mapper = color_mapper or {} colors = {} sizes = {} graph = nx.DiGraph() for reaction in self.reactions: for reactant in reaction.reactants: key = str(reactant) ingredient = self.get_ingredient(id=reactant.id) graph.add_node( key, id=reactant.id, smiles=reactant.smiles, amount=ingredient.amount, price=str(ingredient.price), lead_time=ingredient.lead_time, ) if not graph_only: sizes[key] = self.get_ingredient(id=reactant.id).amount if key in color_mapper: colors[key] = color_mapper[key] else: colors[key] = (0.7, 0.7, 0.7) for product in self.products: key = str(product.compound) ingredient = self.get_ingredient(id=product.id) graph.add_node( key, id=product.id, smiles=product.smiles, amount=ingredient.amount, price=str(ingredient.price), lead_time=ingredient.lead_time, ) if not graph_only: sizes[key] = product.amount if key in color_mapper: colors[key] = color_mapper[key] else: colors[key] = (0.7, 0.7, 0.7) for reaction in self.reactions: for reactant in reaction.reactants: graph.add_edge( str(reactant), str(reaction.product), id=reaction.id, type=reaction.type, product_yield=reaction.product_yield, ) # rescale sizes if not graph_only: s_min = min(sizes.values()) sizes = [s / s_min * node_size for s in sizes.values()] if graph_only: return graph else: import matplotlib as plt # return nx.draw(graph, pos, with_labels=True, font_weight='bold') # pos = nx.spring_layout(graph, iterations=200, k=30) pos = nx.spring_layout(graph) return nx.draw( graph, pos=pos, with_labels=True, font_weight="bold", node_color=list(colors.values()), node_size=sizes, )
[docs] def sankey(self, title: str | None = None) -> "graph_objects.Figure": """draw a plotly Sankey diagram :param title: (Default value = None) """ graph = self.draw(graph_only=True) import plotly.graph_objects as go nodes = {} for edge in graph.edges: c = edge[0] if c not in nodes: nodes[c] = len(nodes) c = edge[1] if c not in nodes: nodes[c] = len(nodes) source = [nodes[a] for a, b in graph.edges] target = [nodes[b] for a, b in graph.edges] value = [1 for l in graph.edges] labels = list(nodes.keys()) hoverkeys = None customdata = [] for key in nodes.keys(): n = graph.nodes[key] if not hoverkeys: hoverkeys = list(n.keys()) if not n: mrich.error(f"problem w/ node {key=}") compound_id = int(key[1:]) customdata.append((compound_id, None)) else: d = tuple(v if v is not None else "N/A" for v in n.values()) customdata.append(d) hoverkeys_edges = None customdata_edges = [] for s, t in graph.edges.keys(): edge = graph.edges[s, t] if not hoverkeys_edges: hoverkeys_edges = list(edge.keys()) if not n: mrich.error(f"problem w/ edge {s=} {t=}") customdata_edges.append((None, None, None)) else: d = tuple(v if v is not None else "N/A" for v in edge.values()) customdata_edges.append(d) hoverlines = [] for i, key in enumerate(hoverkeys): hoverlines.append(f"{key}=%" "{" f"customdata[{i}]" "}") hovertemplate = "Compound " + "<br>".join(hoverlines) + "<extra></extra>" hoverlines_edges = [] for i, key in enumerate(hoverkeys_edges): hoverlines_edges.append(f"{key}=%" "{" f"customdata[{i}]" "}") hovertemplate_edges = ( "Reaction " + "<br>".join(hoverlines_edges) + "<extra></extra>" ) fig = go.Figure( data=[ go.Sankey( node=dict( # pad = 15, # thickness = 20, # line = dict(color = "black", width = 0.5), label=labels, # color = "blue" customdata=customdata, # customdata = ["Long name A1", "Long name A2", "Long name B1", "Long name B2", # "Long name C1", "Long name C2"], # hovertemplate='Compound %{label}<br><br>smiles=%{customdata}<extra></extra>', hovertemplate=hovertemplate, ), link=dict( customdata=customdata_edges, hovertemplate=hovertemplate_edges, source=source, target=target, value=value, ), ) ] ) if not title: try: title = f"Recipe<br><sup>price={self.price}</sup>" except AssertionError: title = f"Recipe" fig.update_layout(title=title) return fig
[docs] def summary(self, price: bool = True) -> None: """Print a summary of this recipe :param price: print the price (Default value = True) """ import mcol mrich.h1(str(self)) if price: price = self.price if price: mrich.var("\nprice", price.amount, price.currency) # mrich.var('lead-time', self.lead_time, 'working days)) if self.products: mrich.h3(f"{len(self.products)} products") if len(self.products) < 100: for product in self.products: mrich.var(str(product.compound), f"{product.amount:.2f}", "mg") if self.intermediates: mrich.h3(f"{len(self.intermediates)} intermediates") if len(self.intermediates) < 100: for intermediate in self.intermediates: mrich.var( str(intermediate.compound), f"{intermediate.amount:.2f}", "mg", ) if self.reactants: mrich.h3(f"{len(self.reactants)} reactants") if len(self.reactants) < 100: for reactant in self.reactants: mrich.var(str(reactant.compound), f"{reactant.amount:.2f}", "mg") if self.reactions: mrich.h3(f"{len(self.reactions)} reactions") if len(self.reactions) < 100: for reaction in self.reactions: mrich.var(str(reaction), reaction.reaction_str, reaction.type) if hasattr(self, "_compounds") and self.compounds: mrich.h3(f"{len(self.compounds)} compounds") if len(self.compounds) < 100: for compound in self.compounds: mrich.var(str(compound.compound), f"{compound.amount:.2f}", "mg")
[docs] def get_ingredient(self, id) -> "Ingredient": """Get an ingredient by its compound ID :param id: compound ID """ matches = [r for r in self.reactants if r.id == id] if not matches: matches = [r for r in self.intermediates if r.id == id] if not matches: matches = [r for r in self.products if r.id == id] assert len(matches) == 1 return matches[0]
[docs] def add_to_all_reactants(self, amount: float = 20) -> None: """Increment all reactants by this amount :param amount: amount in ``mg`` (Default value = 20) """ self.reactants.df["amount"] += amount
[docs] def write_json( self, file: "str | Path", *, extra: dict | None = None, indent: str = "\t", **kwargs, ) -> None: """Serialise this recipe object and write it to disk :param file: write to this path :param extra: extra data to serialise :param indent: indentation whitespace (Default value = '\t') """ import json from pathlib import Path file = Path(file).resolve() assert file.parent.exists(), f"Directory does not exist: {file.parent}" data = self.get_dict(serialise_price=True, **kwargs) if extra: data.update(extra) mrich.writing(file) json.dump(data, open(file, "wt"), indent=indent)
[docs] def get_dict( self, *, price: bool = True, reactant_supplier: bool = True, compound_supplier: bool = True, database: bool = True, timestamp: bool = True, compound_ids_only: bool = False, products: bool = True, serialise_price: bool = False, ): """Serialise this recipe object Store ===== - Path to database - Timestamp - Reactants (& their quotes, amounts) - Intermediates (& their quotes) - Products (& their poses/scores/fingerprints) - Reactions - Total Price - Lead time :param price: include the price (Default value = True) :param reactant_supplier: include the supplier (Default value = True) :param database: include the database (Default value = True) :param timestamp: add a timestamp (Default value = True) :param compound_ids_only: ID's only (instead of full :attr:`.IngredientSet.df`) (Default value = False) :param products: include products (Default value = True) :param serialise_price: serialise :class:`.Price` object (Default value = False) """ import json from datetime import datetime data = {} # Database if database: data["database"] = str(self.db.path.resolve()) if timestamp: data["timestamp"] = str(datetime.now()) # Recipe properties try: if price and serialise_price: data["price"] = self.price.get_dict() elif price: data["price"] = self.price except AssertionError as e: mrich.warning(f"Could not get price: {e}") data["price"] = None if reactant_supplier: data["reactant_supplier"] = self.reactants.supplier if compound_supplier: data["compound_supplier"] = self.compounds.supplier # IngredientSets if compound_ids_only: data["reactant_ids"] = self.reactants.compound_ids data["intermediate_ids"] = self.intermediates.compound_ids if products: data["products_ids"] = self.products.compound_ids data["compound_ids"] = self.compounds.compound_ids else: data["reactants"] = self.reactants.df.to_dict(orient="list") data["intermediates"] = self.intermediates.df.to_dict(orient="list") if products: data["products"] = self.products.df.to_dict(orient="list") data["compounds"] = self.compounds.df.to_dict(orient="list") # ReactionSet data["reaction_ids"] = self.reactions.ids return data
[docs] def get_routes(self, return_ids: bool = False) -> "RouteSet": """Get routes""" return self.products.get_routes( permitted_reactions=self.reactions, return_ids=return_ids )
[docs] def write_CAR_csv( self, file: "str | Path", return_df: bool = False ) -> "DataFrame | None": """Prepares CSVs for use with CAR. .. attention:: This method requires a populated `route` table. For a workaround use :meth:`.CompoundSet.write_CAR_csv` instead Columns: * target-name * no-steps * concentration = None * amount-required * batch-tag per reaction * reactant-1-1 * reactant-2-1 * reaction-product-smiles-1 * reaction-name-1 * reaction-recipe-1 * reaction-groupby-column-1 :param file: file to write to :param return_df: return the dataframe (Default value = False) """ from .cset import CompoundSet from pandas import DataFrame from pathlib import Path # solve each product's reaction file = str(Path(file).resolve()) rows = [] routes = self.get_routes() for sub_recipe in routes: product = sub_recipe.product row = { "target-names": str(product.compound), "no-steps": 0, "concentration-required-mM": None, "amount-required-uL": None, "batch-tag": None, } for i, reaction in enumerate(sub_recipe.reactions): i = i + 1 row["no-steps"] += 1 match len(reaction.reactants): case 1: row[f"reactant-1-{i}"] = reaction.reactants[0].smiles row[f"reactant-2-{i}"] = None case 2: row[f"reactant-1-{i}"] = reaction.reactants[0].smiles row[f"reactant-2-{i}"] = reaction.reactants[1].smiles case _: # mrich.warning(f"More than two reactants for {reaction=}") for j, r in enumerate(reaction.reactants): row[f"reactant-{j+1}-{i}"] = reaction.reactants[j].smiles row[f"reaction-product-smiles-{i}"] = reaction.product.smiles row[f"reaction-name-{i}"] = reaction.type row[f"reaction-recipe-{i}"] = None row[f"reaction-groupby-column-{i}"] = None # row[f'reaction-id-{i}'] = int(reaction.id) rows.append(row) df = DataFrame(rows) if len(df[df.duplicated()]): mrich.warning("Removing duplicates from CAR DataFrame") df = df.drop_duplicates() df = df.convert_dtypes() for n_steps in set(df["no-steps"]): subset = df[df["no-steps"] == n_steps] this_file = file.replace(".csv", f"_{n_steps}steps.csv") mrich.writing(this_file) subset.to_csv(this_file, index=False) mrich.writing(file) df.to_csv(file, index=False) return df
[docs] def write_reactant_csv( self, file: "str | Path", return_df: bool = False ) -> "DataFrame | None": """Detailed CSV output including reactant information for purchasing and information on the downstream synthetic use Reactant ======== - ID - SMILES - Inchikey Quote ===== - Supplier - Catalogue - Entry - Lead-time - Quoted amount - Quote currency - Quote price - Quote purity Downstream ========== - num_reaction_dependencies - num_product_dependencies - reaction_dependencies - product_dependencies """ # - remove_with from pandas import DataFrame # from rich import print from .cset import CompoundSet from .rset import ReactionSet data = [] ### Get lookup data route_ids = self.get_routes(return_ids=True) sql = f""" SELECT component_ref, route_product FROM component INNER JOIN route ON route_id = component_route WHERE component_type = 2 AND component_ref IN {self.reactants.compounds.str_ids} AND component_route IN {str(tuple(route_ids)).replace(',)',')')} """ product_lookup = {} for reactant_id, product_id in self.db.execute(sql): product_lookup.setdefault(reactant_id, set()) product_lookup[reactant_id].add(product_id) sql = f""" WITH reactants AS ( SELECT component_ref AS reactant_id, component_route AS route_id FROM component WHERE component_type = 2 AND component_ref IN {self.reactants.compounds.str_ids} ), reactions AS ( SELECT component_ref AS reaction_id, component_route AS route_id, reaction_type FROM component INNER JOIN reaction ON component_ref = reaction_id WHERE component_type = 1 AND component_ref IN {self.reactions.str_ids} ) SELECT reactants.reactant_id, reactions.reaction_id, reactions.reaction_type FROM reactants INNER JOIN reactions ON reactants.route_id = reactions.route_id """ reaction_lookup = {} for reactant_id, reaction_id, reaction_type in self.db.execute(sql): reaction_lookup.setdefault(reactant_id, dict(ids=set(), types=set())) reaction_lookup[reactant_id]["ids"].add(reaction_id) reaction_lookup[reactant_id]["types"].add(reaction_type) smiles_lookup = self.db.get_compound_id_smiles_dict(self.reactants.compounds) inchikey_lookup = self.db.get_compound_id_inchikey_dict( self.reactants.compounds ) ### Reactant Dataframe df = self.reactants.df df["smiles"] = df["compound_id"].apply(lambda x: smiles_lookup[x]) df["inchikey"] = df["compound_id"].apply(lambda x: inchikey_lookup[x]) df = df.drop(columns=["supplier", "max_lead_time", "quoted_amount"]) ### Quote DataFrame qdf = self.db.get_quote_df(self.reactants.quote_ids) qdf = qdf.rename( columns={ "id": "quote_id", "smiles": "quoted_smiles", "purity": "quoted_purity", "date": "quote_date", "lead_time": "quote_lead_time_days", "price": "quote_price", "currency": "quote_currency", "catalogue": "quote_catalogue", "supplier": "quote_supplier", "entry": "quote_entry", "amount": "quoted_amount_mg", } ) qdf = qdf.drop(columns=["compound"]) ### Downstream info df["downstream_product_ids"] = df["compound_id"].apply( lambda x: product_lookup.get(x, set()) ) df["downstream_reaction_ids"] = df["compound_id"].apply( lambda x: reaction_lookup[x]["ids"] ) df["downstream_reaction_types"] = df["compound_id"].apply( lambda x: reaction_lookup[x]["types"] ) df["num_downstream_reactions"] = df["downstream_reaction_ids"].apply(len) df["num_downstream_reaction_types"] = df["downstream_reaction_types"].apply(len) df["num_downstream_products"] = df["downstream_product_ids"].apply(len) ### Join and reformat df = df.merge(qdf, on="quote_id", how="left") df = df.rename( columns={ "amount": "required_amount_mg", } ) cols = [ "compound_id", "smiles", "inchikey", "required_amount_mg", "quoted_amount_mg", "quote_id", "quote_supplier", "quote_catalogue", "quote_entry", "quote_price", "quote_currency", "quote_lead_time_days", "quoted_purity", "quoted_smiles", "quote_date", "num_downstream_reaction_types", "num_downstream_reactions", "num_downstream_products", "downstream_reaction_types", "downstream_reaction_ids", "downstream_product_ids", ] df = df[[c for c in cols if c in df.columns]] ### N.B. scaffold series no longer output mrich.writing(file) df.to_csv(file, index=False) if return_df: return df return None
[docs] def write_product_csv( self, file: "str | Path", return_df: bool = False ) -> "pd.DataFrame | None": """Detailed CSV output including product information for selection and synthesis""" from pandas import DataFrame # from rich import print from .pset import PoseSet from .cset import CompoundSet from .rset import ReactionSet data = [] routes = self.get_routes() pose_map = self.db.get_compound_id_pose_ids_dict(self.products.compounds) inspiration_map = self.db.get_compound_id_inspiration_ids_dict() for product in mrich.track( self.products, prefix="Constructing product DataFrame" ): d = dict( hippo_id=product.compound_id, smiles=product.smiles, inchikey=product.inchikey, required_amount_mg=product.amount, ) upstream_routes = [] upstream_reactions = [] for route in routes: if product in route.products: upstream_routes.append(route) for reaction in route.reactions: upstream_reactions.append(reaction) upstream_reactions = ReactionSet( self.db, set(reaction.id for reaction in upstream_reactions) ) if not upstream_routes: mrich.error("No upstream routes for", product) continue if not upstream_reactions: mrich.error("No upstream reactions for", product) continue def get_scaffold_series() -> tuple[list[int], bool]: """Get scaffold series value""" if scaffolds := product.scaffolds: return scaffolds.ids, False else: return [product.id], True poses = pose_map.get(product.id, set()) d["num_poses"] = len(poses) d["poses"] = poses d["tags"] = product.tags d["num_routes"] = len(upstream_routes) d["num_reaction_steps"] = set( len(route.reactions) for route in upstream_routes ) d["reaction_dependencies"] = upstream_reactions.ids d["reactant_dependencies"] = set( sum([route.reactants.ids for route in upstream_routes], []) ) d["route_ids"] = [route.id for route in upstream_routes] d["chemistry_types"] = ", ".join(upstream_reactions.types) series, is_scaffold = get_scaffold_series() d["is_scaffold"] = is_scaffold d["scaffold_series"] = series inspirations = inspiration_map.get(product.id, None) if not inspirations and not is_scaffold: scaffold = product.scaffolds[0] inspirations = inspiration_map.get(scaffold.id, None) if not inspirations and "inspiration_pose_ids" in scaffold.metadata: inspirations = scaffold.metadata["inspiration_pose_ids"] if ( not inspirations and is_scaffold and "inspiration_pose_ids" in product.metadata ): inspirations = product.metadata["inspiration_pose_ids"] if inspirations: inspirations = PoseSet(self.db, inspirations) d["inspirations"] = ", ".join(n for n in inspirations.names) else: d["inspirations"] = "" data.append(d) df = DataFrame(data) mrich.writing(file) df.to_csv(file, index=False) if return_df: return df return None
[docs] def write_chemistry_csv( self, file: "str | Path", return_df: bool = True ) -> "pd.DataFrame | None": """Detailed CSV output synthetis information for chemistry types in this set""" from pandas import DataFrame from rich import print from .cset import CompoundSet from .rset import ReactionSet data = [] # get compounds scaffolds = CompoundSet(self.db) for product in self.products: if scaffolds := product.scaffolds: scaffolds += scaffolds else: scaffolds.add(product.compound) routes = self.get_routes() route_types = {} for compound in scaffolds: elabs = ( self.products.compounds.get_by_scaffold(scaffold=compound, none="quiet") or [] ) d = dict( scaffold_id=compound.id, product_id=compound.id, smiles=compound.smiles, inchikey=compound.inchikey, num_elaborations=len(elabs), is_scaffold=True, ) upstream_routes = [] for route in routes: if compound in route.products: upstream_routes.append(route) if not upstream_routes: mrich.warning(f"No routes to scaffold={compound}") continue d["num_routes"] = len(upstream_routes) for j, route in enumerate(upstream_routes): d[f"route_{j+1}_num_steps"] = len(route.reactions) group = route_types.setdefault(compound.id, set()) group.add(tuple([r.type for r in route.reactions])) for k, reaction in enumerate(route.reactions): key = f"route_{j+1}_reaction_{k+1}" product = reaction.product d[f"{key}_type"] = reaction.type d[f"{key}_product_smiles"] = product.smiles d[f"{key}_product_id"] = product.id d[f"{key}_product_yield"] = reaction.product_yield for i, reactant in enumerate(reaction.reactants): d[f"{key}_reactant_{i+1}_smiles"] = reactant.smiles d[f"{key}_reactant_{i+1}_id"] = reactant.id data.append(d) missing_scaffolds = {} for compound in self.products.compounds: if compound in scaffolds: continue upstream_routes = [] for route in routes: if compound in route.products: upstream_routes.append(route) scaffolds = compound.scaffolds for scaffold in scaffolds: if scaffold.id not in route_types: group = missing_scaffolds.setdefault(scaffold.id, []) group.append(compound.id) continue else: for route in upstream_routes: chem_types = tuple([r.type for r in route.reactions]) if chem_types not in route_types[base.id]: mrich.success(scaffold) mrich.success(chem_types) raise ValueError( "Scaffold has route not present in dataframe" ) for scaffold_id, elab_ids in missing_scaffolds.items(): compound = self.db.get_compound(id=sorted(elab_ids)[0]) d = dict( scaffold_id=scaffold_id, product_id=compound.id, smiles=compound.smiles, inchikey=compound.inchikey, num_elaborations=len(elab_ids), is_scaffold=False, ) upstream_routes = [] for route in routes: if compound in route.products: upstream_routes.append(route) if not upstream_routes: mrich.error(f"No routes to elab {compound}") raise ValueError(f"No routes to elab {compound}") d["num_routes"] = len(upstream_routes) for j, route in enumerate(upstream_routes): d[f"route_{j+1}_num_steps"] = len(route.reactions) group = route_types.setdefault(compound.id, set()) group.add(tuple([r.type for r in route.reactions])) for k, reaction in enumerate(route.reactions): key = f"route_{j+1}_reaction_{k+1}" product = reaction.product d[f"{key}_type"] = reaction.type d[f"{key}_product_smiles"] = product.smiles d[f"{key}_product_id"] = product.id d[f"{key}_product_yield"] = reaction.product_yield for i, reactant in enumerate(reaction.reactants): d[f"{key}_reactant_{i+1}_smiles"] = reactant.smiles d[f"{key}_reactant_{i+1}_id"] = reactant.id data.append(d) df = DataFrame(data) mrich.writing(file) df.to_csv(file, index=False) if return_df: return df return None
[docs] def to_syndirella( self, out_key: "str | Path", poses: "PoseSet", *, separate: bool = False, ) -> "DataFrame": """Generate inputs for running syndirella elaboration""" import shutil from pathlib import Path out_key = Path(".") / out_key out_dir = out_key.parent out_key = out_key.name mrich.var("out_key", out_key) mrich.var("out_dir", out_dir) if not out_dir.exists(): mrich.writing(out_dir) out_dir.mkdir(parents=True, exist_ok=True) template_dir = out_dir / "templates" if not template_dir.exists(): mrich.writing(template_dir) template_dir.mkdir(parents=True, exist_ok=True) """ Need to create dataframe with columns: - compound_id - pose_id - smiles - reaction_name_step1 - reactant_step1 - reactant2_step1 - product_step1 ... - hit1 - hit2 ... - template - compound_set """ pose_compounds = poses.compounds assert set(self.products.compound_ids) == set( pose_compounds.ids ), "supplied poses have different compounds to Recipe products" assert len(poses) == len( self.products ), "some duplicate compounds in supplied poses" df = poses.get_df( inchikey=False, alias=False, name=False, compound_id=True, reference_id=True, inspiration_aliases=True, ) df = df.reset_index() df = df.rename(columns={"id": "pose_id"}) df["compound_set"] = df["compound_id"].apply(lambda x: f"C{x}") df = df.set_index(["compound_id", "pose_id"]) ## CHECKS no_refs = df[df["reference_id"].isna()] if len(no_refs): mrich.error(len(no_refs), "poses without reference!") ids = set(no_refs.index.get_level_values("pose_id")) mrich.print(ids) from .pset import PoseSet no_insps = bool([1 for i in df["inspiration_aliases"].values if not len(i)]) if no_insps: mrich.error(len(no_insps), "poses without inspirations!") return None ## TEMPLATES references = poses.references ref_lookup = self.db.get_pose_id_alias_dict(references) df["template"] = df["reference_id"].apply(lambda x: ref_lookup[x]) for ref_pose in references: assert ref_pose.apo_path, f"Reference {ref_pose} has no apo_path" template = template_dir / ref_pose.apo_path.name if not template.exists(): mrich.writing(template) shutil.copy(ref_pose.apo_path, template) ## INSPIRATIONS for i, row in df.iterrows(): for j, alias in enumerate(row["inspiration_aliases"]): df.loc[i, f"hit{j+1}"] = alias inspirations = poses.inspirations sdf_name = out_dir / f"{out_key}_syndirella_inspiration_hits.sdf" inspirations.write_sdf( sdf_name, tags=False, metadata=False, name_col="name", ) ## ADD ROUTE INFO routes = self.get_routes() for sub_recipe in mrich.track(routes, prefix="Adding chemistry info..."): product = sub_recipe.product product_id = product.compound_id matches = df.xs(product_id, level="compound_id") if len(matches) > 1: mrich.warning("Multiple rows for compound", product_id) for i, row in matches.iterrows(): key = (product_id, i) for j, reaction in enumerate(sub_recipe.reactions): j = j + 1 match len(reaction.reactants): case 1: df.loc[key, f"reactant_step{j}"] = reaction.reactants[ 0 ].smiles df.loc[key, f"reactant2_step{j}"] = None case 2: df.loc[key, f"reactant_step{j}"] = reaction.reactants[ 0 ].smiles df.loc[key, f"reactant2_step{j}"] = reaction.reactants[ 1 ].smiles case 3: df.loc[key, f"reactant_step{j}"] = reaction.reactants[ 0 ].smiles df.loc[key, f"reactant2_step{j}"] = reaction.reactants[ 1 ].smiles df.loc[key, f"reactant3_step{j}"] = reaction.reactants[ 2 ].smiles case _: raise NotImplementedError("Too many reactants") df.loc[key, f"product_step{j}"] = reaction.product.smiles df.loc[key, f"reaction_name_step{j}"] = reaction.type break ## REMOVE UNECESSARY COLS df = df.drop(columns=["reference_id", "inspiration_aliases"]) ## REORDER COLUMNS cols = [ "smiles", "reaction_name_step1", "reactant_step1", "reactant2_step1", "reactant3_step1", "product_step11", "hit1", "hit2", "hit3", "hit4", "hit5", "hit6", "hit7", "hit8", "hit9", "template", "compound_set", ] if not any([c not in cols for c in df.columns]): df = df[[c for c in cols if c in df.columns]] if not separate: out_path = out_dir / f"{out_key}_syndirella_input.csv" mrich.writing(out_path) df.to_csv(out_path) return df for idx, row in df.iterrows(): out_path = out_dir / f"{out_key}_{row['compound_set']}_syndirella_input.csv" mrich.writing(out_path) single_df = row.to_frame().T single_df = single_df.dropna(axis=1, how="all") single_df.to_csv(out_path, index=False) return df
[docs] def copy(self) -> "Recipe": """Copy this recipe""" if hasattr(self, "compounds"): compounds = self.compounds.copy() else: compounds = None return Recipe( self.db, products=self.products.copy(), reactants=self.reactants.copy(), intermediates=self.intermediates.copy(), reactions=self.reactions.copy(), compounds=compounds, # supplier=self.supplier )
def __flag_modification(self) -> None: """Flag this recipe as modified""" self._product_interactions = None self._score = None self._product_compounds = None self._product_poses = None
[docs] def check_integrity(self, debug: bool = False) -> bool: """Verify integrity of this recipe""" # no duplicate ingredients if debug: mrich.debug("Checking integrity:", self) mrich.debug("Checking for duplicate compounds") if len(self.reactants.compound_ids) != len(set(self.reactants.compound_ids)): mrich.error("Reactant compound ID's are not unique") return False if len(self.intermediates.compound_ids) != len( set(self.intermediates.compound_ids) ): mrich.error("Intermediate compound ID's are not unique") return False if len(self.products.compound_ids) != len(set(self.products.compound_ids)): mrich.error("Product compound ID's are not unique") return False # all references should exist if debug: mrich.debug("Checking for missing references") if self.db.count_where( table="reaction", key=f"reaction_id IN {self.reactions.str_ids}" ) < len(self.reactions): mrich.error("Not all Reactions in Database") return False if self.db.count_where( table="compound", key=f"compound_id IN {self.product_compounds.str_ids}" ) < len(self.products): mrich.error("Not all product Compounds in Database") return False if self.db.count_where( table="compound", key=f"compound_id IN {self.reactants.compounds.str_ids}" ) < len(self.reactants): mrich.error("Not all reactant Compounds in Database") return False if self.db.count_where( table="compound", key=f"compound_id IN {self.intermediates.compounds.str_ids}", ) < len(self.intermediates): mrich.error("Not all intermediate Compounds in Database") return False reaction_intermediates = self.reactions.intermediates reaction_products = self.reactions.products reaction_reactants = self.reactions.reactants if debug: mrich.debug("Checking for missing reactions") # all products should have a reaction for product in self.products: if product not in reaction_products: mrich.error(f"Product: {product} does not have associated reaction") return False # intermediates for intermediate in self.intermediates: if intermediate not in reaction_intermediates: mrich.error( f"Intermediate: {intermediate} is not in self.reactions.intermediates" ) return False # reactants for reactant in self.reactants: if reactant not in reaction_reactants: mrich.error(f"Reactant: {reactant} is not in self.reactions.reactants") return False # all reactions should have enough reactant if debug: mrich.debug("Checking reactant quantities") for reaction in self.reactions: product_ingredient = self.products(compound_id=reaction.product_id) if product_ingredient is None: product_ingredient = self.intermediates(compound_id=reaction.product_id) if debug and reaction.product_yield < 1.0: mrich.debug(f"{reaction}.product_yield={reaction.product_yield}") for reactant in reaction.reactants: reactant_ingredient = self.intermediates(compound_id=reactant.id) if reactant_ingredient is None: reactant_ingredient = self.reactants(compound_id=reactant.id) required_amount = product_ingredient.amount / reaction.product_yield if reactant_ingredient.amount < required_amount: mrich.error( f"Not enough of {reactant_ingredient.compound}: {reactant_ingredient.amount} < {required_amount}" ) return False if debug: mrich.success(self, "OK") return True
[docs] def add_ingredient(self, ingredient: "Ingredient", amount: float = 1): """Add an :class:`.Ingredient` object for direct purchase (no associated reactions)""" self.compounds.add(ingredient)
### DUNDERS
[docs] def __str__(self) -> str: """Unformatted string representation""" if self.score: s = f"(score={self.score:.3f})" else: s = "" if self.hash: return f"Recipe_{self.hash}{s}" return f"Recipe{s}"
def __longstr(self) -> str: """Unformatted string representation""" if self.empty: return f"Empty Recipe()" if self.reactions: if self.intermediates: s = f"{self.reactants} --> {self.intermediates} --> {self.products} via {self.reactions}" else: s = f"{self.reactants} --> {self.products} via {self.reactions}" if self.score: s += f", score={self.score:.3f}" if self.hash: return f"Recipe_{self.hash}({s})" return f"Recipe({s})" else: s = f"{self.compounds}" if self.hash: return f"Recipe_{self.hash}({s})" return f"Recipe(#compounds={self.num_compounds} [no-chem])"
[docs] def __repr__(self) -> str: """ANSI Formatted string representation""" return f"{mcol.bold}{mcol.underline}{self.__longstr()}{mcol.unbold}{mcol.ununderline}"
def __rich__(self) -> str: """Rich Formatted string representation""" return f"[bold underline]{self.__longstr()}"
[docs] def __add__(self, other: "Recipe"): """Add another :class:`.Recipe` to this one""" result = self.copy() result.reactants += other.reactants result.intermediates += other.intermediates result.reactions += other.reactions result.products += other.products if hasattr(other, "compounds"): result.compounds += other.compounds return result
[docs] class Route(Recipe): """A recipe with a single product, that is stored in the database""" def __init__( self, db, *, route_id: int, product: "IngredientSet", reactants: "IngredientSet", intermediates: "IngredientSet", reactions: "ReactionSet", ) -> None: """Route initialisation""" from .cset import IngredientSet from .rset import ReactionSet # check typing assert isinstance(product, IngredientSet) assert isinstance(reactants, IngredientSet) assert isinstance(intermediates, IngredientSet) assert isinstance(reactions, ReactionSet) assert len(product) == 1 assert isinstance(route_id, int) assert route_id self._id = route_id self._products = product self._product_id = product.ids[0] self._reactants = reactants self._intermediates = intermediates self._reactions = reactions self._db = db ### FACTORIES
[docs] @classmethod def from_json( cls, db: "Database", path: "str | Path", data: dict = None ) -> "Route": """Load a serialised route from a JSON file :param db: database to link :param path: path to JSON :param data: serialised data (Default value = None) """ import json from .cset import IngredientSet from .rset import ReactionSet if data is None: data = json.load(open(path, "rt")) self = cls.__new__(cls) self._db = db self._id = data["id"] self._product_id = data["product_id"] self._products = IngredientSet.from_compounds( compounds=None, ids=[self._product_id], db=db ) # IngredientSet self._reactants = IngredientSet.from_json( db=db, path=None, data=data["reactants"]["data"], supplier=data["reactants"]["supplier"], ) self._intermediates = IngredientSet.from_json( db=db, path=None, data=data["intermediates"]["data"], supplier=data["intermediates"]["supplier"], ) self._reactions = ReactionSet( db=db, indices=data["reactions"]["indices"] ) # ReactionSet return self
### PROPERTIES @property def product(self) -> "Ingredient": """Product ingredient""" return self._products[0] @property def product_compound(self) -> "Compound": """Product compound""" return self.product.compound @property def id(self) -> int: """Route ID""" return self._id @property def price(self) -> "Price": """Get the price of the reactants""" return self.reactants.price ### METHODS
[docs] def get_dict(self) -> dict: """Serialisable dictionary""" data = {} data["id"] = self.id data["product_id"] = self.product.id data["reactants"] = self.reactants.get_dict() data["intermediates"] = self.intermediates.get_dict() data["reactions"] = self.reactions.get_dict() return data
### DUNDERS
[docs] def __str__(self) -> str: """Unformatted string representation""" return f"Route #{self.id}: {self.product_compound}"
[docs] def __repr__(self) -> str: """ANSI Formatted string representation""" return f"{mcol.bold}{mcol.underline}{self}{mcol.unbold}{mcol.ununderline}"
def __rich__(self) -> str: """Rich Formatted string representation""" return f"[bold underline]{self}"
[docs] class RouteSet: """A set of Route objects""" def __init__(self, db: "Database", routes: "list[Route]") -> None: """RouteSet initialisation""" data = {} for route in routes: # assert isinstance(route, Route) data[route.id] = route self._data = data self._db = db self._cluster_map = None self._permitted_clusters = None self._current_cluster = None ### FACTORIES
[docs] @classmethod def from_ids(cls, db: "Database", ids: list | set): """Generate a routeset from a set of :class:`.Route` IDs :param db: database to link :param ids: :class:`.Route` database IDs """ routes = [ db.get_route(id=route_id) for route_id in mrich.track(ids, prefix="Getting routes") ] self = cls.__new__(cls) return RouteSet(db, routes)
[docs] @classmethod def from_product_ids(cls, db: "Database", ids: list | set): """Generate a routeset from a set of product :class:`.Compound` IDs :param db: database to link :param ids: :class:`.Compound` database IDs """ str_ids = str(tuple(ids)).replace(",)", ")") records = db.select_where( table="route", query="route_id", key=f"route_product IN {str_ids}", multiple=True, ) route_ids = [i for i, in records] return cls.from_ids(db, route_ids)
[docs] @classmethod def from_json( cls, db: "Database", path: "str | Path", data: dict = None ) -> "RouteSet": """Load a serialised routeset from a JSON file :param db: database to link :param path: path to JSON :param data: serialised data (Default value = None) """ self = cls.__new__(cls) if data is None: import json data = json.load(open(path, "rt")) new_data = {} for d in mrich.track(data["routes"].values(), prefix="Loading Routes..."): route_id = d["id"] new_data[route_id] = Route.from_json(db=db, path=None, data=d) self._data = new_data self._db = db self._cluster_map = None self._permitted_clusters = None self._current_cluster = None return self
### PROPERTIES @property def data(self) -> "dict[int, Route]": """Get internal data dictionary""" return self._data @property def db(self): """Get associated database""" return self._db @property def routes(self) -> "list[Route]": """Get route objects""" return self.data.values() @property def product_ids(self) -> list[int]: """Get the :class:`.Compound` ID's of the products""" ids = self.db.select_where( table="route", query="route_product", key=f"route_id IN {self.str_ids}", multiple=True, ) return [i for i, in ids] @property def products(self) -> "CompoundSet": """Return a :class:`.CompoundSet` of all the route products""" from .cset import CompoundSet return CompoundSet(self.db, self.product_ids) @property def str_ids(self) -> str: """Return an SQL formatted tuple string of the :class:`.Route` ID's""" return str(tuple(self.ids)).replace(",)", ")") @property def ids(self) -> list[int]: """Return the :class:`.Route` IDs""" return self.data.keys() @property def cluster_map(self) -> dict[tuple, set]: """Create a dictionary grouping routes by their scaffold/base cluster. :returns: A dictionary mapping a tuple of scaffold :class:`.Compound` IDs to a set of :class:`.Route` ID's to their superstructures. """ if self._cluster_map is None: # get route mapping pairs = self.db.select_where( query="route_product, route_id", key=f"route_id IN {self.str_ids}", table="route", multiple=True, ) route_map = {route_product: route_id for route_product, route_id in pairs} # group compounds by cluster compound_clusters = self.db.get_compound_cluster_dict(cset=self.products) # create the map self._cluster_map = {} for cluster, compounds in compound_clusters.items(): self._cluster_map[cluster] = [] for compound in compounds: route_id = route_map.get(compound, None) if not route_id: continue self._cluster_map[cluster].append(route_id) if not self._cluster_map[cluster]: del self._cluster_map[cluster] return self._cluster_map ### METHODS
[docs] def copy(self) -> "RouteSet": """Copy this RouteSet""" return RouteSet(self.db, self.data.values())
[docs] def set_db_pointers(self, db: "Database") -> None: """ :param db: """ self._db = db for route in self.data.values(): route._db = db
# def clear_db_pointers(self): # """ """ # self._db = None # for route in self.data.values(): # route._db = None
[docs] def get_dict(self): """Get serialisable dictionary""" data = dict(db=str(self.db), routes={}) # populate with routes for route_id, route in self.data.items(): data["routes"][route_id] = route.get_dict() return data
[docs] def prune_unavailable(self, suppliers: list[str]): """Remove routes that don't have all reactants available from given suppliers""" suppliers_str = str(tuple(suppliers)).replace(",)", ")") sql = f""" WITH possible_reactants AS ( SELECT quote_compound, COUNT( CASE WHEN quote_supplier IN {suppliers_str} THEN 1 END) AS [count_valid] FROM quote GROUP BY quote_compound ), route_reactants AS ( SELECT route_id, route_product, COUNT( CASE WHEN count_valid = 0 THEN 1 WHEN count_valid IS NULL THEN 1 END) AS [count_unavailable] FROM route INNER JOIN component ON component_route = route_id LEFT JOIN possible_reactants ON quote_compound = component_ref WHERE component_type = 2 GROUP BY route_id ) SELECT route_id FROM route_reactants WHERE count_unavailable = 0 AND route_id IN {self.str_ids} """ route_ids = self.db.execute(sql).fetchall() route_ids = [i for i, in route_ids] mrich.var("#routes before pruning", len(self)) mrich.var("#routes after pruning", len(route_ids)) return RouteSet.from_ids(self.db, route_ids)
[docs] def pop_id(self) -> int: """Pop the last route from the set and return it's id""" route_id, route = self.data.popitem() return route_id
[docs] def pop(self) -> "Route": """Pop the last route from the set and return it's object""" route_id, route = self.data.popitem() return route
[docs] def balanced_pop( self, permitted_clusters: set[tuple] | None = None, debug: bool = False ) -> "Route": """Pop a route from this set, while maintaining the balance of scaffold clusters populations""" if not self._data: mrich.print("RouteSet depleted") return None if not self.cluster_map: # mrich.warning("RouteSet.cluster_map depleted but _data isn't...") return self.pop() # store the permitted clusters (or all clusters) list as property if self._permitted_clusters is None: if permitted_clusters: permitted_clusters = set( (cluster,) if isinstance(cluster, int) else cluster for cluster in permitted_clusters ) self._permitted_clusters = [] for cluster in permitted_clusters: if cluster not in self.cluster_map: mrich.warning( cluster, "in permitted_clusters but not cluster_map" ) else: self._permitted_clusters.append(cluster) else: self._permitted_clusters = list(self.cluster_map.keys()) if self._current_cluster is None: self._current_cluster = self._permitted_clusters[0] ### pop a Route if debug: mrich.debug(f"Would pop Route from {self._current_cluster=}") cluster = self._current_cluster # pop the last route id from the given cluster try: route_id = self.cluster_map[cluster].pop() except IndexError: mrich.print(self._permitted_clusters) mrich.print(self.cluster_map) raise except AttributeError: mrich.print(cluster) mrich.print(self.cluster_map) raise except KeyError: mrich.print("cluster", cluster) mrich.print("self._permitted_clusters", self._permitted_clusters) mrich.print("self.cluster_map.keys()", self.cluster_map.keys()) raise # clean up empty clusters if debug: mrich.debug("Popped route", route_id) # get the Route object if route_id in self._data: route = self._data[route_id] del self._data[route_id] else: # if debug: mrich.debug("Route not present") return self.balanced_pop() ### increment cluster # def increment_cluster(cluster): n = len(self._permitted_clusters) if n > 1: for i, cluster in enumerate(self._permitted_clusters): if cluster == self._current_cluster: if i == n - 1: self._current_cluster = self._permitted_clusters[0] else: self._current_cluster = self._permitted_clusters[i + 1] break else: raise IndexError("This should never be reached...") # increment_cluster() if not self.cluster_map[cluster]: del self.cluster_map[cluster] if not self.cluster_map: mrich.debug("RouteSet.cluster_map depleted") self._permitted_clusters = [ c for c in self._permitted_clusters if c != cluster ] # if debug: mrich.debug("Depleted cluster", cluster) if not self._permitted_clusters: mrich.debug("Depleted all permitted clusters", cluster) mrich.debug("Removing cluster restriction", cluster) self._permitted_clusters = list(self.cluster_map.keys()) self._current_cluster = None if debug: mrich.debug("#Routes in set", len(self._data)) return route
[docs] def shuffle(self): """Randomly shuffle the routes in this set""" import random items = list(self.data.items()) random.shuffle(items) self._data = dict(items) ### shuffle the cluster map as well for cluster, routes in self.cluster_map.items(): random.shuffle(routes) self.cluster_map[cluster] = routes
### DUNDERS
[docs] def __len__(self) -> int: """Number of routes in this set""" return len(self.data)
[docs] def __str__(self) -> str: """Unformatted string representation""" return "{" f"Route × {len(self)}" "}"
[docs] def __repr__(self) -> str: """ANSI Formatted string representation""" return f"{mcol.bold}{mcol.underline}{self}{mcol.unbold}{mcol.ununderline}"
def __rich__(self) -> str: """Rich Formatted string representation""" return f"[bold underline]{self}"
[docs] def __iter__(self): """Iterate over routes in this set""" return iter(self.data.values())
class RecipeSet: """A set of recipes stored on disk""" def __init__( self, db: "Database", directory: "str | Path", pattern: str = "*.json" ): """RecipeSet initialisation""" from pathlib import Path from json import JSONDecodeError self._db = db self._json_directory = Path(directory) self._json_pattern = pattern self._json_paths = {} for path in self._json_directory.glob(self._json_pattern): self._json_paths[ path.name.removeprefix("Recipe_").removesuffix(".json") ] = path.resolve() mrich.reading(f"{directory}/{pattern}") self._recipes = {} for key, path in mrich.track( self._json_paths.items(), prefix="Loading recipes" ): try: recipe = Recipe.from_json( db=self.db, path=path, allow_db_mismatch=True, debug=False, db_mismatch_warning=False, ) except JSONDecodeError: mrich.error(f"Bad JSON in {path}") continue recipe._hash = key self._recipes[key] = recipe mrich.success("Loaded", len(self), "Recipes") ### FACTORIES ### PROPERTIES @property def db(self) -> "Database": """Associated database""" return self._db ### METHODS def get_values( self, key: str, progress: bool = False, serialise_price: bool = False, ): """Get values of member recipes associated with attribute ``key`` :param key: attribute to query/calculate :param progress: show a progress bar :param serialise_price: serialise price objects """ values = [] recipes = self._recipes.values() if progress: recipes = mrich.track(recipes, prefix=f"Calculating {self} values...") for recipe in recipes: value = getattr(recipe, key) if serialise_price and key == "price": value = value.amount values.append(value) return values def get_df(self, **kwargs) -> "pandas.DataFrame": """Get dataframe of recipe dictionaries. See :meth:`.Recipe.get_dict`""" data = [] for recipe in self: d = recipe.get_dict( # reactant_supplier=False, database=False, timestamp=False, **kwargs, # timestamp=False, ) data.append(d) from pandas import DataFrame return DataFrame(data) def items(self) -> "list[tuple[str, Recipe]]": """Get data dictionary items""" return self._recipes.items() def keys(self) -> list[str]: """Get data dictionary keys (recipe hashes)""" return self._recipes.keys() ### DUNDERS def __len__(self) -> int: """Number of recipes in this set""" return len(self._recipes) def __getitem__( self, key: int | str, ) -> Recipe: """Get a :class:`.Recipe` in this set by it's index or key/hash""" match key: case int(): return list(self._recipes.values())[key] case str(): return self._recipes[key] case _: mrich.error( f"Unsupported type for RecipeSet.__getitem__(): {key=} {type(key)}" ) return None def __iter__(self): """Iterate over recipes""" return iter(self._recipes.values()) def __contains__(self, key: str): """Is this hash contained in the set""" assert isinstance(key, str) return key in self._recipes def __str__(self) -> str: """Unformatted string representation""" return "{" f"Recipe × {len(self)}" "}" def __repr__(self) -> str: """ANSI Formatted string representation""" return f"{mcol.bold}{mcol.underline}{self}{mcol.unbold}{mcol.ununderline}" def __rich__(self) -> str: """Rich Formatted string representation""" return f"[bold underline]{self}"