Source code for qwak.qwak

from __future__ import annotations
from typing import Union

import networkx as nx
import numpy as np
import copy
import json
from qwak.Errors import (
    StateOutOfBounds,
    NonUnitaryState,
    UndefinedTimeList,
    EmptyProbDistList,
    MissingNodeInput,
    MissingGraphInput,
)
from qwak.State import State
from qwak.Operator import Operator
from qwak.QuantumWalk import QuantumWalk
from qwak.ProbabilityDistribution import (
    ProbabilityDistribution,
)


[docs] class QWAK: def __init__( self, graph: nx.Graph, time: float = 0, timeList: list = None, gamma: float = 1, initStateList: list = None, customStateList: list = None, laplacian: bool = False, markedElements: list = [], qwakId: str = 'userUndef', ) -> None: """Data access class that combines all three components required to perform a continuous-time quantum walk, given by the multiplication of an operator (represented by the Operator class) by an initial state (State class). This multiplication is achieved in the StaticQuantumwalk class, which returns a final state (State Class) representing the amplitudes of each state associated with a graph node. These amplitudes can then be transformed to probability distributions (ProbabilityDistribution class) suitable for plotting with matplotlib, or your package of choice. Default values for the initial state, time and transition rate are a column vector full of 0s, 0 and 1, respectively. Methods runWalk or buildWalk must then be used to generate the results of the quantum walk. Parameters ---------- graph : nx.Graph NetworkX graph where the walk takes place. Also used for defining the dimensions of the quantum walk. time : float Time interval for the quantum walk, by default None. timeList : list List with time intervals for multiple walks, by default None. initStateList : list[int], optional List with chosen initial states for uniform superposition, by default None customStateList : list[(int,complex)], optional Custom init state, by default None. laplacian : bool, optional Allows the user to choose whether to use the Laplacian or simple adjacency matrix, by default False. markedElements : list, optional List with marked elements for search, by default None. qwakId : str, optional User-defined ID for the QWAK instance, by default 'userUndef'. """ self._graph = graph self._n = len(self._graph) if timeList is not None: self._timeList = [x for x in timeList] else: self._timeList = [0]*self._n self._qwakId = qwakId self._operator = Operator( self._graph, time=time, gamma=gamma, laplacian=laplacian, markedElements=markedElements) self._initState = State( self._n, nodeList=initStateList, customStateList=customStateList) self._quantumWalk = QuantumWalk(self._initState, self._operator) self._probDist = ProbabilityDistribution( self._quantumWalk.getFinalState()) self._probDistList = []
[docs] def runWalk( self, time: float = 0, initStateList: list = None, customStateList: list = None) -> None: """Builds class' attributes, runs the walk and calculates the amplitudes and probability distributions with the given parameters. These can be accessed with their respective get methods. Parameters ---------- time : float, optional Time for which to calculate the quantum walk, by default 0. initStateList : list[int], optional List with chosen initial states for uniform superposition, by default None. customStateList : list[(int,complex)], optional Custom init state, by default None. Raises ------ stOBErr State out of bounds exception. nUErr State not unitary exception. """ try: self._initState.buildState( nodeList=initStateList, customStateList=customStateList ) except StateOutOfBounds as stOBErr: raise stOBErr except NonUnitaryState as nUErr: raise nUErr self._operator.buildDiagonalOperator(time=time) self._quantumWalk.buildWalk(self._initState, self._operator) self._probDist.buildProbDist(self._quantumWalk.getFinalState())
[docs] def runExpmWalk( self, time: float = 0, initStateList: list = None, customStateList: list = None) -> None: """Builds class' attributes, runs the walk and calculates the amplitudes and probability distributions with the given parameters. These can be accessed with their respective get methods. Parameters ---------- time : float, optional Time for which to calculate the quantum walk, by default 0. initStateList : list[int], optional List with chosen initial states for uniform superposition, by default None. customStateList : list[(int,complex)], optional Custom init state, by default None. Raises ------ stOBErr State out of bounds exception. nUErr State not unitary exception. """ try: self._initState.buildState( nodeList=initStateList, customStateList=customStateList ) except StateOutOfBounds as stOBErr: raise stOBErr except NonUnitaryState as nUErr: raise nUErr self._operator.buildExpmOperator(time=time) self._quantumWalk.buildWalk(self._initState, self._operator) self._probDist.buildProbDist(self._quantumWalk.getFinalState())
[docs] def runMultipleWalks( self, timeList: list = None, initStateList: list = None, customStateList: list = None) -> None: """Runs the walk for multiple times and stores the probability distributions in a list. Parameters ---------- timeList : list, optional List of times for which to calculate the quantum walk, by default None. initStateList : list, optional List with chosen initial states for uniform superposition, by default None. customStateList : list, optional Custom init state, by default None. Raises ------ UndefinedTimeList Raised when the timeList is None. """ self._probDistList = [] if timeList is not None: self._timeList = timeList elif self._timeList is None: raise UndefinedTimeList(f"TimeList is {self._timeList}.") for time in self._timeList: self.runWalk( time=time, initStateList=initStateList, customStateList=customStateList) self._probDistList.append(copy.deepcopy(self.getProbDist()))
[docs] def runMultipleExpmWalks( self, timeList: list = None, initStateList: list = None, customStateList: list = None) -> None: """Runs the walk for multiple times and stores the probability distributions in a list. Parameters ---------- timeList : list, optional List of times for which to calculate the quantum walk, by default None. initStateList : list, optional List with chosen initial states for uniform superposition, by default None. customStateList : list, optional Custom init state, by default None. Raises ------ UndefinedTimeList Raised when the timeList is None. """ self._probDistList = [] if timeList is not None: self._timeList = timeList elif self._timeList is None: raise UndefinedTimeList(f"TimeList is {self._timeList}.") for time in self._timeList: self.runExpmWalk( time=time, initStateList=initStateList, customStateList=customStateList) self._probDistList.append(copy.deepcopy(self.getProbDist()))
[docs] def resetWalk(self) -> None: """Resets the components of a walk.""" self._initState.resetState() self._operator.resetOperator() self._quantumWalk.resetWalk() self._probDist.resetProbDist() self._probDistList = [] self._walkList = []
[docs] def setDim( self, newDim: int, graphStr: str = None, graph: nx.Graph = None, initStateList: list = None) -> None: """Sets the current walk dimensions to a user defined one. Also takes a graph string to be evaluated and executed as a NetworkX graph generator. Parameters ---------- newDim : int New dimension for the quantum walk. graphStr : str Graph string to generate the graph with the new dimension. graph : nx.Graph, optional Graph with the new dimension. initStateList : list[int], optional Init state list with new dimension. """ self._n = newDim if graphStr is not None: self._graph = eval(f"{graphStr}({self._n})") self._n = len(self._graph) elif graph is not None: self._graph = graph self._n = len(self._graph) else: raise MissingGraphInput( f"You tried to set QWAK dim without providing a graph with updated dimensions: {self._graph}") self._initState.setDim(newDim, newNodeList=initStateList) self._operator.setDim(newDim, self._graph) self._quantumWalk.setDim(newDim) self._probDist.setDim(newDim)
[docs] def getDim(self) -> int: """Gets the current graph dimension. Returns ------- int Dimension of graph. """ return self._n
[docs] def setGraph(self, newGraph: nx.Graph, initStateList=None) -> None: """Sets the current graph to a user defined one. Also recalculates the current operator and walk dimension. Parameters ---------- newGraph : nx.Graph New NetworkX graph. """ self._graph = newGraph self._n = len(self._graph)
[docs] def getGraph(self) -> nx.Graph: """Gets the current graph. Returns ------- nx.Graph Current graph. """ return self._graph
[docs] def setCustomGraph(self, customAdjMatrix: np.ndarray) -> None: """Sets the current graph to a user defined one. Parameters ---------- customAdjMatrix : np.ndarray Adjacency matrix of the new graph. """ self._graph = nx.from_numpy_matrix(customAdjMatrix) self.setGraph(newGraph=self._graph) self._initStateList = [self._n // 2] self.setDim( self._n, graph=self._graph, initStateList=self._initStateList)
[docs] def setInitState(self, newInitState: State) -> None: """Sets the current initial state to a user defined one. Parameters ---------- newInitState : State New initial state. """ self._initState.setState(newInitState) self._initStateList = self._initState.getNodeList()
[docs] def getInitState(self) -> State: """Gets the initial state. Returns ------- State Initial State. """ return self._initState
[docs] def setTime(self, newTime: float) -> None: """Sets the current walk time to a user defined one. Parameters ---------- newTime : float New time. """ self._operator.setTime(newTime)
[docs] def setTimeList(self, newTimeList: list) -> None: """Sets the current walk time to a user defined one. Parameters ---------- newTimeList : list New time list. """ timeList = np.linspace( newTimeList[0], newTimeList[1], int( newTimeList[1])) self._timeList = timeList.tolist()
[docs] def getTime(self) -> float: """Gets the current walk time. Returns ------- float Current value of time. """ return self._operator.getTime()
[docs] def getTimeList(self) -> float: """Gets the current walk time. Returns ------- float Current value of time. """ return self._timeList
[docs] def setAdjacencyMatrix( self, newAdjMatrix: np.ndarray, initStateList: list = None ) -> None: """Sets the current adjacency matrix to a user defined one. Parameters ---------- newAdjMatrix : np.ndarray New adjacency matrix. initStateList : list, optional New initial state list, by default None. """ self._n = len(self._operator.getAdjacencyMatrix()) self._operator.setAdjacencyMatrix(newAdjMatrix) self._initState = State(self._n, initStateList) self._quantumWalk = QuantumWalk(self._initState, self._operator) self._probDist = ProbabilityDistribution( self._quantumWalk.getFinalState())
[docs] def getAdjacencyMatrix(self) -> np.ndarray: """Gets the current adjacency matrix. Returns: np.ndarray: Current adjacency matrix. """ return self._operator.getAdjacencyMatrix()
[docs] def setHamiltonian(self, newHamiltonian: np.ndarray) -> None: """Sets the current Hamiltonian to a user defined one. Parameters ---------- newHamiltonian : np.ndarray New Hamiltonian. """ self._operator.setHamiltonian(newHamiltonian)
[docs] def getHamiltonian(self) -> np.ndarray: """Gets the current Hamiltonian. Returns ------- np.ndarray Current Hamiltonian. """ return self._operator.getHamiltonian()
[docs] def setOperator(self, newOperator: Operator) -> None: """Sets the current walk operator a user defined one. Parameters ---------- newOperator : Operator New operator object. """ self._operator.setOperator(newOperator)
[docs] def getOperator(self) -> Operator: """Gets the current walk operator. Returns ------- Operator Current operator object. """ return self._operator
[docs] def setWalk(self, newWalk: State) -> None: """Sets current walk amplitudes to a user defined state. This might not be needed and removed in the future. Parameters ---------- newWalk : State New walk amplitudes. """ self._quantumWalk.setWalk(newWalk)
[docs] def getWalk(self) -> QuantumWalk: """Gets current QuantumWalk object Returns ------- QuantumWalk Current state amplitudes. """ return self._quantumWalk
[docs] def getFinalState(self) -> State: """Gets current QuantumWalk State. Returns ------- State State of the QuantumWalk. """ return self._quantumWalk.getFinalState()
[docs] def getAmpVec(self) -> np.ndarray: """Gets the array of the QuantumWalk state. Returns ------- np.ndarray Array of the QuantumWalk state. """ return self._quantumWalk.getAmpVec()
[docs] def setProbDist(self, newProbDist: ProbabilityDistribution) -> None: """Sets current walk probability distribution to a user defined one. This might not be needed and removed in the future. Parameters ---------- newProbDist : ProbabilityDistribution New probability distribution. """ self._probDist.setProbDist(newProbDist)
[docs] def getProbDist(self) -> ProbabilityDistribution: """Gets the current probability distribution. Returns ------- ProbabilityDistribution ProbabilityDistribution object. """ return self._probDist
[docs] def getProbDistList(self) -> list: """Returns a list of probability distributions in the case of multiple walks. Returns ------- list List of ProbabilityDistribution objects. """ return self._probDistList
[docs] def setProbDistList(self, newProbDistList: list) -> None: """Sets the current probability distribution list to a user defined one. Parameters ---------- newProbDistList : list New probability distribution list. """ self._probDistList = newProbDistList
[docs] def getProbVec(self) -> np.ndarray: """Gets the current probability distribution vector. Returns ------- np.ndarray Probability Distribution vector. """ return self._probDist.getProbVec()
[docs] def getProbVecList(self) -> list: """Returns a list of probability distribution vectors in the case of multiple walks. Returns ------- list List of probability distribution vectors. """ return [probDist.getProbVec() for probDist in self._probDistList]
[docs] def searchNodeAmplitude(self, searchNode: int) -> complex: """User inputted node for search Parameters ---------- searchNode : int User inputted node for the search. Returns ------- complex Amplitude associated with the search node. """ return self._quantumWalk.searchNodeAmplitude(searchNode)
[docs] def searchNodeProbability(self, searchNode: int) -> float: """Searches and gets the probability associated with a given node. Parameters ---------- searchNode : int User inputted node for the search. Returns ------- float Probability associated with the search node. """ return self._probDist.searchNodeProbability(searchNode)
[docs] def getMean(self, resultRounding: int = None) -> float: """Gets the mean of the probability distribution. Parameters ---------- resultRounding : int, optional Rounding of the result, by default None. Returns ------- float Mean of the probability distribution. """ return self._probDist.moment(1) if ( resultRounding is None) \ else round(self._probDist.moment(1), resultRounding)
[docs] def getMeanList(self, resultRounding: int = None) -> list: """Gets the mean of the probability distribution list. Parameters ---------- resultRounding : int, optional Rounding of the results, by default None. Returns ------- list List of means of the probability distributions. """ return [ probDist.moment(1) for probDist in self._probDistList] if ( resultRounding is None) \ else [ round( probDist.moment(1), resultRounding) for probDist in self._probDistList]
[docs] def getSndMoment(self, resultRounding: int = None) -> float: """Gets the second moment of the probability distribution. Parameters ---------- resultRounding : int, optional Rounding of the result, by default None. Returns ------- float Second moment of the probability distribution. """ return self._probDist.moment(2) if (resultRounding is None) \ else round( self._probDist.moment(2), resultRounding)
[docs] def getStDev(self, resultRounding: int = None) -> float: """Gets the standard deviation of the probability distribution. Parameters ---------- resultRounding : int, optional Rounding of the result, by default None. Returns ------- float Standard deviation of the probability distribution. """ return self._probDist.stDev() if (resultRounding is None) \ else round( self._probDist.stDev(), resultRounding)
[docs] def getStDevList(self, resultRounding: int = None) -> list: """Gets the standard deviation of the probability distribution list. Parameters ---------- resultRounding : int, optional Rounding of the results, by default None. Returns ------- list List of standard deviations of the probability distributions. """ return [ probDist.stDev() for probDist in self._probDistList] if ( resultRounding is None) \ else [ round( probDist.stDev(), resultRounding) for probDist in self._probDistList]
[docs] def getInversePartRatio(self, resultRounding: int = None) -> float: """Gets the inverse participation ratio of the probability distribution. Parameters ---------- resultRounding : int, optional Rounding of the result, by default None. Returns ------- float Inverse participation ratio of the probability distribution. """ return self._probDist.invPartRatio() if ( resultRounding is None) \ else round( self._probDist.invPartRatio(), resultRounding)
[docs] def getInversePartRatioList( self, resultRounding: int = None) -> list: """Gets the inverse participation ratio of the probability distribution list. Parameters ---------- resultRounding : int, optional Rounding of the results, by default None. Returns ------- list List of inverse participation ratios of the probability distributions. """ return [ probDist.invPartRatio() for probDist in self._probDistList] if ( resultRounding is None) else [ round( probDist.invPartRatio(), resultRounding) for probDist in self._probDistList]
[docs] def getSurvivalProb( self, fromNode, toNode, resultRounding: int = None) -> float: """Gets the survival probability of the probability distribution. Parameters ---------- fromNode : _type_ Starting node. toNode : _type_ Ending node. resultRounding : int, optional Rounding of the result, by default None. Returns ------- float Survival probability of the probability distribution. Raises ------ MissingNodeInput Missing input node error. """ try: return self._probDist.survivalProb( fromNode, toNode) if ( resultRounding is None) else round( self._probDist.survivalProb( fromNode, toNode), resultRounding) except MissingNodeInput as err: raise err
[docs] def getSurvivalProbList( self, fromNode, toNode, resultRounding: int = None) -> list: """Gets the survival probability of the probability distribution list. Parameters ---------- fromNode : _type_ Starting node. toNode : _type_ Ending node. resultRounding : int, optional Rounding of the results, by default None. Returns ------- list List of survival probabilities of the probability distributions. Raises ------ MissingNodeInput Missing input node error. """ try: return [ probDist.survivalProb( fromNode, toNode) for probDist in self._probDistList] if ( resultRounding is None) else [ round( probDist.survivalProb( fromNode, toNode), resultRounding) for probDist in self._probDistList] except MissingNodeInput as err: raise err
[docs] def checkPST(self, fromNode, toNode) -> Union[str, bool]: """Checks if a structure allows for PST between certain nodes. Parameters ---------- fromNode : _type_ Starting node. toNode : _type_ Ending node. Returns ------- Union([str,bool]) _description_ Raises ------ MissingNodeInput Missing input node error. """ try: return self._operator.checkPST(fromNode, toNode) except MissingNodeInput as err: raise err
[docs] def checkPST_sympy(self, fromNode, toNode) -> Union[str, bool]: """Checks if a structure allows for PST between certain nodes. Parameters ---------- fromNode : _type_ Starting node. toNode : _type_ Ending node. Returns ------- Union([str,bool]) _description_ Raises ------ MissingNodeInput Missing input node error. """ try: return self._operator.checkPST_sympy(fromNode, toNode) except MissingNodeInput as err: raise err
[docs] def getTransportEfficiency(self) -> float: """Gets the transport efficiency of the quantum walk. Returns ------- float Transport efficiency of the quantum walk. """ return self._quantumWalk.transportEfficiency()
[docs] def getMarkedElements(self) -> list: """Gets the marked elements of the quantum walk. Returns ------- list Marked elements of the quantum walk. """ return self._operator.getMarkedElements()
[docs] def setMarkedElements(self, markedElements: list) -> None: """Sets the marked elements of the quantum walk. Parameters ---------- markedElements : list Marked elements of the quantum walk. """ self._operator.setMarkedElements(markedElements)
[docs] def getQWAK(self) : """Gets the QWAK instance. Returns ------- QWAK QWAK instance. """ return self._qwak
[docs] def setQWAK(self, newQWAK: QWAK) -> None: """Sets the QWAK instance's attributes to the ones of the given QWAK instance. Parameters ---------- newQWAK : QWAK QWAK instance to copy the attributes from. """ self.setGraph(newQWAK.getGraph()) self.setDim(newQWAK.getDim(), graph=self._graph) self.setInitState(newQWAK.getInitState()) self.setOperator(newQWAK.getOperator()) self.setWalk(newQWAK.getWalk()) self.setProbDist(newQWAK.getProbDist())
[docs] def getQWAKId(self) -> str: """Gets the QWAK instance's ID. Returns ------- str QWAK instance's ID. """ return self._qwakId
[docs] def to_json(self, isDynamic=False) -> str: """Returns a JSON representation of the QWAK instance Parameters ---------- isDynamic : bool, optional If True, the JSON will contain the timeList, probDistList and walkList attributes, by default False. Returns ------- str JSON representation of the QWAK instance. """ if isDynamic: qwakJson = json.dumps({ 'dim': self._n, 'graph': nx.node_link_data(self._graph), 'timeList': self._timeList, 'initState': json.loads(self._initState.to_json()), 'operator': json.loads(self._operator.to_json()), 'quantumWalk': json.loads(self._quantumWalk.to_json()), 'probDistList': [probDist.to_json() for probDist in self._probDistList], 'qwakId': self._qwakId }) else: qwakJson = json.dumps({ 'dim': self._n, 'graph': nx.node_link_data(self._graph), 'initState': json.loads(self._initState.to_json()), 'operator': json.loads(self._operator.to_json()), 'quantumWalk': json.loads(self._quantumWalk.to_json()), 'probDist': json.loads(self._probDist.to_json()), 'qwakId': self._qwakId }) return qwakJson
[docs] @classmethod def from_json(cls, json_var: str, isDynamic=False) -> QWAK: """Returns a QWAK instance from a JSON representation. Parameters ---------- json_var : str JSON representation of the QWAK instance. isDynamic : bool, optional If True, the JSON will contain the timeList and probDistList attributes, by default False. Returns ------- QWAK QWAK instance from a JSON representation. """ if isinstance(json_var, str): data = json.loads(json_var) elif isinstance(json_var, dict): data = json_var qwakId = data['qwakId'] graph = nx.node_link_graph(data['graph']) initState = State.from_json(data['initState']) operator = Operator.from_json(data['operator']) quantumWalk = QuantumWalk.from_json(data['quantumWalk']) if isDynamic: timeList = data['timeList'] newQwak = cls(graph=graph, timeList=timeList, qwakId=qwakId) probDistList = [ProbabilityDistribution.from_json( probDist) for probDist in data['probDistList']] newQwak.setProbDistList(probDistList) else: probDist = ProbabilityDistribution.from_json( data['probDist']) newQwak = cls(graph=graph, qwakId=qwakId) newQwak.setProbDist(probDist) newQwak.setInitState(initState) newQwak.setOperator(operator) newQwak.setWalk(quantumWalk) return newQwak