Source code for hippo.rset

"""Classes for working with sets of :class:`.Reaction` objects"""

import mcol
import mrich

import os
from numpy import int64

from .db import Database
from .reaction import Reaction


class ReactionTable:
    """Class representing all :class:`.Reaction` objects in the 'reaction' table of the :class:`.Database`.

    .. attention::

            :class:`.ReactionTable` objects should not be created directly. Instead use the :meth:`.HIPPO.reactions` property. See :doc:`getting_started`.

    Use as an iterable
    ==================

    Iterate through :class:`.Reaction` objects in the table:

    ::

            for reaction in animal.reactions:
                ...


    Selecting reactions in the table
    ================================

    The :class:`.ReactionTable` can be indexed with :class:`.Reaction` ID, or list/sets/tuples/slices thereof:

    ::

            rtable = animal.reactions

            # indexing individual compounds
            reaction = rtable[13]                            # using the ID

            # getting a subset of compounds
            rset = rtable[13,15,18]      # using IDs (tuple)
            rset = rtable[[13,15,18]]    # using IDs (list)
            rset = rtable[set(13,15,18)] # using IDs (set)
            rset = rtable[13:18]         # using a slice

    """

    _name = "all reactions"

    def __init__(
        self,
        db: Database,
        table: str = "reaction",
    ) -> None:
        """ReactionTable initialisation"""

        self._db = db
        self._table = table

    ### PROPERTIES

    @property
    def db(self) -> "Database":
        """Returns the associated :class:`.Database`"""
        return self._db

    @property
    def table(self) -> str:
        """Returns the name of the :class:`.Database` table"""
        return self._table

    @property
    def name(self) -> str | None:
        """Returns the name of set"""
        return self._name

    @property
    def types(self) -> list[str]:
        """Returns a list of the unique reaction types present in the table"""
        result = self.db.select(
            table=self.table, query="DISTINCT reaction_type", multiple=True
        )
        return [q for q, in result]

    @property
    def ids(self) -> list[int]:
        """Returns the IDs of child reactions"""
        result = self.db.select(table=self.table, query="reaction_id", multiple=True)
        return [q for q, in result]

    ### METHODS

    def interactive(self) -> None:
        """Interactive widget to navigate reactions in the table

        .. attention::

                This method instantiates a :class:`.ReactionSet` containing all poses, it is recommended to instead select a subset for display. This method is only intended for use within a Jupyter Notebook.

        """
        return self[self.ids].interactive()

    def get_by_type(self, reaction_type: str) -> "ReactionSet":
        """Get all child reactions of the given type

        :param reaction_type: reaction type to filter by

        """
        result = self.db.select_where(
            table=self.table,
            query="reaction_id",
            key="type",
            value=reaction_type,
            multiple=True,
        )
        rset = self[[q for q, in result]]
        rset._name = f"all {reaction_type} reactions"
        return rset

    def get_df(self, *, smiles: bool = True, mols: bool = True) -> "pandas.DataFrame":
        """Construct a pandas.DataFrame of all reactions in the database

        :param smiles: Include smiles column (Default value = True)
        :param mols: Include `rdkit.Chem.Mol` column (Default value = True)

        """

        from rdkit.Chem import Mol
        from pandas import DataFrame

        ### SQL QUERY

        data = {}

        if not smiles and not mols:

            sql = "SELECT reaction_id, reaction_type, reaction_product, reactant_compound FROM reaction INNER JOIN reactant ON reaction.reaction_id = reactant.reactant_reaction"

            triples = self.db.execute(sql).fetchall()

            for reaction_id, product_id, reactant_id in triples:
                if reaction_id not in data:
                    data[reaction_id] = dict(product_id=product_id, reactant_ids=[])
                else:
                    assert data[reaction_id]["product_id"] == product_id

                data[reaction_id]["reactant_ids"].append(reactant_id)

        else:

            sql = """
            SELECT {query}
            FROM reaction 

            INNER JOIN reactant 
                ON reaction.reaction_id = reactant.reactant_reaction

            INNER JOIN compound c_r
                ON c_r.compound_id = reactant.reactant_compound

            INNER JOIN compound c_p
                ON c_p.compound_id = reaction.reaction_product
            """

            if not mols:
                sql = sql.format(
                    query="reaction_id, reaction_type, reaction_product, reactant_compound, c_p.compound_smiles, c_r.compound_smiles"
                )

            else:
                sql = sql.format(
                    query="reaction_id, reaction_type, reaction_product, reactant_compound, c_p.compound_smiles, c_r.compound_smiles, mol_to_binary_mol(c_p.compound_mol), mol_to_binary_mol(c_r.compound_mol)"
                )

            results = self.db.execute(sql).fetchall()

            for result in results:

                (
                    reaction_id,
                    reaction_type,
                    product_id,
                    reactant_id,
                    product_smiles,
                    reactant_smiles,
                ) = result[:6]

                if mols:
                    product_mol, reactant_mol = result[6:]

                if reaction_id not in data:
                    data[reaction_id] = dict(
                        reaction_id=reaction_id,
                        reaction_type=reaction_type,
                        product_id=product_id,
                        reactant_ids=set(),
                        product_smiles=product_smiles,
                        reactant_smiles=set(),
                    )
                    if mols:
                        data[reaction_id]["product_mol"] = Mol(product_mol)
                        data[reaction_id]["reactant_mols"] = set()
                else:
                    assert data[reaction_id]["product_id"] == product_id

                data[reaction_id]["reactant_ids"].add(reactant_id)
                data[reaction_id]["reactant_smiles"].add(reactant_smiles)
                if mols:
                    data[reaction_id]["reactant_mols"].add(Mol(reactant_mol))

        data = data.values()
        return DataFrame(data)

    def set_product_yields(
        self, *, type: str, product_yield: float, commit: bool = True
    ) -> None:
        """Set the product_yield for all member :class:`.Reaction` entries with given type

        :param type: the :class:`.Reaction` type to filter by
        :param product_yield: the :class:`.Reaction` product_yield to assign

        """

        assert isinstance(product_yield, float)
        assert product_yield > 0
        assert product_yield <= 1.0

        sql = f"""
        UPDATE reaction
        SET reaction_product_yield = :reaction_product_yield
        WHERE reaction_type = :reaction_type;
        """

        self.db.execute(
            sql,
            dict(
                reaction_product_yield=product_yield,
                reaction_type=type,
            ),
        )

        if commit:
            self.db.commit()

    ### DUNDERS

    def __getitem__(self, key) -> "Reaction | ReactionSet | None":
        """Get a member :class:`.Reaction` object or subset :class:`.ReactionSet` thereof.

        :param key: Can be an integer ID, negative integer index, list/set/tuple of IDs, or slice of IDs

        """

        match key:

            case int():

                if key == 0:
                    return self.__getitem__(key=1)

                if key < 0:
                    key = len(self) + 1 + key
                    return self.__getitem__(key=key)

                else:
                    return self.db.get_reaction(id=key)

            case key if (
                isinstance(key, list) or isinstance(key, tuple) or isinstance(key, set)
            ):
                return ReactionSet(self.db, key)

            case slice():
                ids = self.db.slice_ids(
                    table=self.table, start=key.start, stop=key.stop, step=key.step
                )
                return self[ids]

            case _:
                mrich.error(
                    f"Unsupported type for ReactionTable.__getitem__(): {key=} {type(key)}"
                )

        return None

    def __str__(self) -> str:
        """Unformatted string representation"""

        if self.name:
            s = f"{self.name}: "
        else:
            s = ""

        s += "{" f"R × {len(self)}" "}"

        return s

    def __repr__(self) -> str:
        """ANSI Formatted string representation"""
        return f"{mcol.bold}{mcol.underline}{self}{mcol.unbold}{mcol.ununderline}"

    def __rich__(self) -> str:
        """Rich Formatted string representation"""
        return f"[bold underline]{self}"

    def __len__(self) -> int:
        """Number of reactions in this set"""
        return self.db.count(self.table)

    def __iter__(self):
        """Iterate through poses in this set"""
        return iter(self[i + 1] for i in range(len(self)))

    def __call__(
        self,
        *,
        type: str = None,
    ) -> "ReactionSet":
        """Filter reactions by a given type

        :param type: reaction type to filter by
        :returns: :class:`.ReactionSet`

        """

        if type:
            return self.get_by_type(type)
        else:
            mrich.error("Must provide type argument")
            return None


[docs] class ReactionSet: """Object representing a subset of the 'reaction' table in the :class:`.Database`. .. attention:: :class:`.ReactionSet` objects should not be created directly. Instead use the :meth:`.HIPPO.reactions` property. See :doc:`getting_started` and :doc:`insert_elaborations`. Use as an iterable ================== Iterate through :class:`.Reaction` objects in the set: :: rset = animal.reactions[:100] for reaction in rset: ... Check membership ================ To determine if a :class:`.Reaction` is present in the set: :: is_member = reaction in cset Selecting compounds in the set ============================== The :class:`.ReactionSet` can be indexed like standard Python lists by their indices :: rset = animal.reactions[1:100] # indexing individual compounds reaction = rset[0] # get the first reaction reaction = rset[1] # get the second reaction reaction = rset[-1] # get the last reaction # getting a subset of compounds using a slice rset2 = rset[13:18] # using a slice """ _table = "reaction" def __init__( self, db: Database, indices: list = None, *, sort: bool = True, name: str | None = None, ) -> None: """ReactionSet initialisation""" self._db = db indices = indices or [] if not isinstance(indices, list): indices = list(indices) assert all(isinstance(i, int) or isinstance(i, int64) for i in indices) if sort: self._indices = sorted(list(set(indices))) else: self._indices = list(set(indices)) self._name = name ### PROPERTIES @property def db(self) -> Database: """Returns the associated :class:`.Database`""" return self._db @property def table(self) -> str: """Returns the name of the :class:`.Database` table""" return self._table @property def name(self) -> str | None: """Returns the name of set""" return self._name @property def indices(self) -> list[int]: """Returns the ids of reactions in this set""" return self._indices @property def ids(self) -> list[int]: """Returns the ids of reactions in this set""" return self._indices @property def types(self) -> list[str]: """Returns the types of reactions in this set""" records = self.db.select_where( table="reaction", key=f"reaction_id IN {self.str_ids}", query="DISTINCT reaction_type", multiple=True, ) return [t for t, in records] @property def num_types(self) -> int: """Returns the number of reaction types in this set""" (count,) = self.db.select_where( table="reaction", key=f"reaction_id IN {self.str_ids}", query="COUNT(DISTINCT reaction_type)", ) return count @property def str_ids(self) -> str: """Return an SQL formatted tuple string of the :class:`.Compound` IDs""" return str(tuple(self.ids)).replace(",)", ")") @property def products(self) -> "CompoundSet": """Get all product compounds that can be synthesised with these reactions (no intermediates)""" from .cset import CompoundSet intermediates = self.intermediates product_ids = self.db.execute( f""" SELECT compound_id FROM compound INNER JOIN reaction ON compound_id = reaction_product WHERE reaction_id IN {self.str_ids} AND compound_id NOT IN {intermediates.str_ids} """ ).fetchall() cset = CompoundSet(self.db, [i for i, in product_ids]) if self.name: cset._name = f"products of {self}" return cset @property def intermediates(self) -> "CompoundSet": """Get all intermediate compounds that can be synthesised with these reactions""" from .cset import CompoundSet sql = f""" SELECT DISTINCT compound_id FROM compound INNER JOIN reaction ON compound_id = reaction_product INNER JOIN reactant ON compound_id = reactant_compound WHERE reactant_reaction IN {self.str_ids} """ intermediate_ids = self.db.execute(sql).fetchall() cset = CompoundSet(self.db, [i for i, in intermediate_ids]) if self.name: cset._name = f"intermediates of {self}" return cset @property def reactants(self) -> "CompoundSet": """Get all reactant compounds that are used by these reactions""" from .cset import CompoundSet sql = f""" SELECT DISTINCT reactant_compound FROM reactant WHERE reactant_reaction IN {self.str_ids} """ reactant_ids = self.db.execute(sql).fetchall() cset = CompoundSet(self.db, [i for i, in reactant_ids]) if self.name: cset._name = f"reactants of {self}" return cset ### METHODS
[docs] def add(self, r: Reaction) -> None: """Add a :class:`.Reaction` to this set :param r: :class:`.Reaction` to be added """ assert isinstance(r, Reaction) if (id := r.id) not in self._indices: self._indices.append(id)
[docs] def interactive(self): """Creates a ipywidget to interactively navigate this PoseSet.""" from ipywidgets import ( interactive, BoundedIntText, Checkbox, interactive_output, HBox, GridBox, Layout, VBox, ) from IPython.display import display from pprint import pprint a = BoundedIntText( value=0, min=0, max=len(self) - 1, step=1, description=f"Rs (/{len(self)}):", disabled=False, ) b = Checkbox(description="Name", value=True) c = Checkbox(description="Summary", value=False) d = Checkbox(description="Draw", value=True) e = Checkbox(description="Check chemistry", value=False) f = Checkbox(description="Reactant Quotes", value=False) ui1 = GridBox( [b, c, d], layout=Layout(grid_template_columns="repeat(5, 100px)") ) ui2 = GridBox([e, f], layout=Layout(grid_template_columns="repeat(2, 150px)")) ui = VBox([a, ui1, ui2]) def widget( i, name=True, summary=True, draw=True, check_chemistry=True, reactants=False ): """ :param i: :param name: (Default value = True) :param summary: (Default value = True) :param draw: (Default value = True) :param check_chemistry: (Default value = True) :param reactants: (Default value = False) """ reaction = self[i] if name: print(repr(reaction)) if summary: reaction.summary(draw=False) if draw: reaction.draw() if check_chemistry: reaction.check_chemistry(debug=True) if reactants: for comp in reaction.reactants: # if summary: # comp.summary(draw=False) # elif name: print(repr(comp)) quotes = comp.get_quotes(df=True) display(quotes) # break # if draw: # comp.draw() out = interactive_output( widget, { "i": a, "name": b, "summary": c, "draw": d, "check_chemistry": e, "reactants": f, }, ) display(ui, out)
[docs] def get_df(self, smiles=True, mols=True, **kwargs) -> "pandas.DataFrame": """Construct a pandas.DataFrame of this ReactionSet :param smiles: Include smiles column (Default value = True) :param mols: Include `rdkit.Chem.Mol` column (Default value = True) :param kwargs: keyword arguments are passed on to :meth:`.Reaction.get_dict: """ from pandas import DataFrame from rdkit.Chem import Mol mrich.debug("Using slower Reaction.dict rather than direct SQL query...") data = [] for r in mrich.track(self, prefix="ReactionSet --> DataFrame"): data.append(r.get_dict(smiles=smiles, mols=mols, **kwargs)) return DataFrame(data)
[docs] def copy(self) -> "ReactionSet": """Return a copy of this set""" return ReactionSet(self.db, self.ids, sort=False, name=self.name)
[docs] def get_recipes( self, amounts: float | list[float] = 1.0, **kwargs ) -> "Recipe | list[Recipe]": """Get the :class:`.Recipe` object(s) from this set of recipes :param amounts: float or list/generator of product amounts in mg, (Default value = 1.0) :param kwargs: keyword arguments are passed on to :meth:`.Recipe.from_reactions: """ from .recipe import Recipe return Recipe.from_reactions(db=self.db, reactions=self, amounts=1, **kwargs)
[docs] def reverse(self) -> None: """In-place reversal of indices""" self._indices = list(reversed(self._indices))
[docs] def get_dict(self) -> dict[str]: """Serializable dictionary""" return dict(db=str(self.db), indices=self.indices)
[docs] def summary(self) -> None: """Print a summary of the Reactions""" mrich.header(self) for reaction in self: print(repr(reaction))
### DUNDERS
[docs] def __str__(self) -> str: """Unformatted string representation""" if self.name: s = f"{self.name}: " else: s = "" s += "{" f"R × {len(self)}" "}" return s
[docs] def __repr__(self) -> str: """ANSI Formatted string representation""" return f"{mcol.bold}{mcol.underline}{self}{mcol.unbold}{mcol.ununderline}"
def __rich__(self) -> str: """Rich Formatted string representation""" return f"[bold underline]{self}"
[docs] def __len__(self) -> int: """Number of member :class:`.Reaction` objects""" return len(self.indices)
[docs] def __iter__(self): """Iterate through member :class:`.Reaction` objects""" return iter(self.db.get_reaction(id=i) for i in self.indices)
[docs] def __getitem__(self, key) -> "Reaction | ReactionSet": """Get member :class:`.Reaction` object by single, slice or list/set/tuple of ID""" match key: case int(): try: index = self.indices[key] except IndexError: mrich.error(f"list index out of range: {key=} for {self}") raise return self.db.get_reaction(id=index) case slice(): ids = self.ids[key] return ReactionSet(self.db, ids) case key if ( isinstance(key, list) or isinstance(key, tuple) or isinstance(key, set) ): ids = self.ids[key] return ReactionSet(self.db, ids) case _: mrich.error( f"Unsupported type for ReactionSet.__getitem__(): {key=} {type(key)}" ) return None
[docs] def __add__(self, other: "ReactionSet") -> "ReactionSet": """Add a :class:`.ReactionSet` to this one""" if other: for reaction in other: self.add(reaction) self._name = None return self