Skip to content
Snippets Groups Projects
Commit eab3033d authored by Hendrik Buschmeier's avatar Hendrik Buschmeier
Browse files

Refactoring: Merged factor elimination, particlefiltering in one file, respectively.

parent b65cd2f6
No related branches found
No related tags found
No related merge requests found
# -*- coding: utf-8 -*-
import numpy
import operator
class UtilityTable(object):
'''
self.variables -- list of the parent nodes
self.table -- utility table which contains the utility
'''
def __init__(self):
super(UtilityTable, self).__init__()
self.table = numpy.array(0)
self.variables = []
def add_variable(self, variable):
self.variables.append(variable)
ax = self.table.ndim
self.table=numpy.expand_dims(self.table,ax)
self.table=numpy.repeat(self.table,len(variable.value_range),axis = ax)
def get_ut_index(self, node_value_pairs):
nodes, values = zip(*node_value_pairs)
index = []
for node in self.variables:
index_in_values_list = nodes.index(node)
value = values[index_in_values_list]
index.append(node.value_range.index(value))
return tuple(index)
def set_utility_table(self, table, nodes):
if not set(nodes) == set(self.variables):
raise Exception("The list which should define the ordering of the variables does not match"
" the variables that this cpt depends on (plus the node itself)")
if not self.table.ndim == table.ndim:
raise Exception("The provided probability table does not have the right number of dimensions")
for d,node in enumerate(nodes):
if len(node.value_range) != table.shape[d]:
raise Exception("The size of the provided probability table does not match the number of possible values of the node "+node.name+" in dimension "+str(d))
self.table = table
self.variables = nodes
def set_utility(self, value, node_value_pairs):
index = self.get_ut_index(node_value_pairs)
self.table[index]=value
def get_utility_table(self):
return self.table
def get_variables(self):
return self.variables
def get_utility(self, node_value_pairs):
index = self.get_ut_index(node_value_pairs)
return self.table[index]
def __str__(self):
return str(self.table)
\ No newline at end of file
from UtilityNode import UtilityNode
from DecisionNode import DecisionNode
from UtilityTable import UtilityTable
\ No newline at end of file
from MakeDecision import MakeDecision
import networkx as nx
import copy
from primo.reasoning.factorelemination import Factor
from primo.reasoning.factorelemination import FactorTree
from random import choice
from operator import itemgetter
import primo.densities
class Factor(object):
def __init__(self,node):
self.node = node
self.calCPD = node.get_cpd().copy()
self.cluster = set()
self.isEvidence = False
def __str__(self):
return self.node.name
def set_evidence(self,evd):
self.calCPD = self.node.get_cpd().copy()
self.calCPD = self.calCPD.set_evidence(evd)
self.isEvidene = True
def clear_evidence(self):
self.calCPD = self.node.get_cpd().copy()
self.isEvidence = False
def set_cluster(self,cluster):
self.cluster = cluster
def get_variables(self):
return self.node.get_cpd().variables
def get_calculation_CDP(self):
return self.calCPD;
def get_node(self):
return self.node
def contains_node(self,node):
return self.node == node
class FactorTree(object):
'''The factor tree contains for each node of the BayesNet a factor. It
is a directed graph with one root node. To speed up the reasoning it uses
a message based approach which stores calculated intermediate results at
edges. Thus, the first query is expensive and all following are easy calculated.
The speed of the first message calculation depends on how the tree was build.
Literature: Modeling and Reasoning with Bayesian Networks - Adnan Darwiche
Chapter 7
'''
def __init__(self,graph,rootNode):
self.graph = graph
self.rootNode = rootNode
def calculate_PoE(self):
'''Calculates the probability of evidence with the set evidence'''
if not self.graph.graph['messagesValid']:
self.calculate_messages()
cpd = self.calculate_marginal_forOne(self.rootNode)
for v in cpd.get_variables()[:]:
cpd = cpd.marginalization(v)
return cpd
def calculate_marginal(self,variables):
''' If evidence is set, then this methods calculates the posterior marginal.
With an empty evidence this is automatically the prior marginal.'''
if not self.graph.graph['messagesValid']:
self.calculate_messages()
resPT = primo.densities.ProbabilityTable.get_neutral_multiplication_PT()
for f in self.graph.nodes():
if f.get_node() in variables:
resPT = resPT.multiplication(self.calculate_marginal_forOne(f))
resPT = resPT.normalize_as_jpt()
return resPT
def calculate_marginal_forOne(self,factor):
curCPD = factor.get_calculation_CDP().copy()
for p in self.graph.predecessors(factor):
tmpCPD = self.graph[p][factor]['msgRightWay']
curCPD = curCPD.multiplication(tmpCPD)
for p in self.graph.neighbors(factor):
tmpCPD = self.graph[factor][p]['msgAgainstWay']
curCPD = curCPD.multiplication(tmpCPD)
for v in curCPD.get_variables()[:]:
if v != factor.get_node():
curCPD = curCPD.marginalization(v)
return curCPD
def draw(self):
'''Draws the FactorTree'''
import matplotlib.pyplot as plt
nx.draw_circular(self.graph)
plt.show()
def calculate_messages(self):
''' Calculates the messages and stores the intermediate results.'''
self.pull_phase(self.rootNode,self.graph)
self.push_phase(self.rootNode,self.graph,primo.densities.ProbabilityTable.get_neutral_multiplication_PT())
self.graph.graph['messagesValid'] = True
def set_evidences(self,evidences):
self.graph.graph['messagesValid'] = False
evNodes = zip(*evidences)
for factor in self.graph.nodes():
if factor.get_node() in evNodes[0]:
idx = evNodes[0].index(factor.get_node())
factor.set_evidence(evidences[idx])
def pull_phase(self,factor,graph):
calCPD = factor.get_calculation_CDP()
#calculate the messages of the children
for child in graph.neighbors(factor):
tmpInput = self.pull_phase(child,graph)
#project each factor on the specific separator
separator = graph[factor][child]['separator']
for var in tmpInput.variables[:]:
if var not in separator:
tmpInput = tmpInput.marginalization(var)
#save message on edge: it's the opposite of the direction of the edge
graph[factor][child]['msgAgainstWay'] = tmpInput
#calculate the new message
calCPD = calCPD.multiplication(tmpInput)
return calCPD
def push_phase(self,factor,graph,inCPD):
for child in graph.neighbors(factor):
tmpCPD = inCPD.multiplication(factor.get_calculation_CDP())
for child2 in graph.neighbors(factor):
if (child != child2):
tmpCPD = tmpCPD.multiplication(graph[factor][child2]['msgAgainstWay'])
separator = graph[factor][child]['separator']
#project on outgoing edge separator
for var in tmpCPD.variables:
if var not in separator:
tmpCPD = tmpCPD.marginalization(var)
#add setOut to outgoing vars from child
#Message with the direction of the edge
graph[factor][child]['msgRightWay'] = tmpCPD
self.push_phase(child,graph,tmpCPD)
class FactorTreeFactory(object):
'''The FactorTreeFactory creates the FactorTree out of a BayesNet.'''
......@@ -155,12 +319,103 @@ class FactorTreeFactory(object):
factor.set_cluster(localCluster)
class EasiestFactorElimination(object):
'''This is the easiest way for factor elimination.It's has the worst runtime because:
1. Needed evidences are set (optional).
2. All nodes are multiplied.
3. The redundant variables are summed out
Literature: Modeling and Reasoning with Bayesian Networks - Adnan Darwiche
Chapter 6-7
'''
def __init__(self,bayesNet):
self.bn= bayesNet
def calculate_PriorMarginal(self,variables):
'''Calculates the prior marignal for the given variables. The resulting
CPD is returned.'''
nodes = self.bn.get_all_nodes()
finCpd = nodes.pop().get_cpd()
for n in nodes:
finCpd = finCpd.multiplication(n.get_cpd())
for v in finCpd.get_variables():
if v not in variables:
finCpd = finCpd.marginalization(v)
return finCpd
def calculate_PosteriorMarginal(self,variables,evidence):
'''Calculates the posterior marginal for given variables and evidence.
It returns the resulting cpd.'''
nodes = self.bn.get_all_nodes()
#List of evidences
ev_list = zip(*evidence)
# Special Case: First Node
node1 = nodes.pop()
if node1 in ev_list[0]:
ind = ev_list[0].index(node1)
finCpd = node1.get_cpd().set_evidence(evidence[ind])
else:
finCpd = node1.get_cpd()
# For all other nodes
for n in nodes:
if n in ev_list[0]:
#Set evidence and multiply
ind = ev_list[0].index(n)
nCPD = n.get_cpd().set_evidence(evidence[ind])
finCpd = finCpd.multiplication(nCPD)
else:
#only multiply
finCpd = finCpd.multiplication(n.get_cpd())
for v in finCpd.get_variables():
if v not in variables:
finCpd = finCpd.marginalization(v)
finCpd = finCpd.normalize_as_jpt()
return finCpd
def calculate_PoE(self,evidence):
''' Calculates the probabilty of evidence for the given evidence and returns the result.'''
nodes = self.bn.get_all_nodes()
unzipped_list = zip(*evidence)
node1 = nodes.pop()
if node1 in unzipped_list[0]:
ind = unzipped_list[0].index(node1)
finCpd = node1.get_cpd().set_evidence(evidence[ind])
else:
finCpd = node1.get_cpd()
for n in nodes:
if n in unzipped_list[0]:
ind = unzipped_list[0].index(n)
nCPD = n.get_cpd().set_evidence(evidence[ind])
finCpd = finCpd.multiplication(nCPD)
else:
finCpd = finCpd.multiplication(n.get_cpd())
for v in finCpd.get_variables():
finCpd = finCpd.marginalization(v)
return finCpd
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from primo.core import BayesNet
from primo.reasoning import DiscreteNode
import numpy
class EasiestFactorElimination(object):
'''This is the easiest way for factor elimination.It's has the worst runtime because:
1. Needed evidences are set (optional).
2. All nodes are multiplied.
3. The redundant variables are summed out
Literature: Modeling and Reasoning with Bayesian Networks - Adnan Darwiche
Chapter 6-7
'''
def __init__(self,bayesNet):
self.bn= bayesNet
def calculate_PriorMarginal(self,variables):
'''Calculates the prior marignal for the given variables. The resulting
CPD is returned.'''
nodes = self.bn.get_all_nodes()
finCpd = nodes.pop().get_cpd()
for n in nodes:
finCpd = finCpd.multiplication(n.get_cpd())
for v in finCpd.get_variables():
if v not in variables:
finCpd = finCpd.marginalization(v)
return finCpd
def calculate_PosteriorMarginal(self,variables,evidence):
'''Calculates the posterior marginal for given variables and evidence.
It returns the resulting cpd.'''
nodes = self.bn.get_all_nodes()
#List of evidences
ev_list = zip(*evidence)
# Special Case: First Node
node1 = nodes.pop()
if node1 in ev_list[0]:
ind = ev_list[0].index(node1)
finCpd = node1.get_cpd().set_evidence(evidence[ind])
else:
finCpd = node1.get_cpd()
# For all other nodes
for n in nodes:
if n in ev_list[0]:
#Set evidence and multiply
ind = ev_list[0].index(n)
nCPD = n.get_cpd().set_evidence(evidence[ind])
finCpd = finCpd.multiplication(nCPD)
else:
#only multiply
finCpd = finCpd.multiplication(n.get_cpd())
for v in finCpd.get_variables():
if v not in variables:
finCpd = finCpd.marginalization(v)
finCpd = finCpd.normalize_as_jpt()
return finCpd
def calculate_PoE(self,evidence):
''' Calculates the probabilty of evidence for the given evidence and returns the result.'''
nodes = self.bn.get_all_nodes()
unzipped_list = zip(*evidence)
node1 = nodes.pop()
if node1 in unzipped_list[0]:
ind = unzipped_list[0].index(node1)
finCpd = node1.get_cpd().set_evidence(evidence[ind])
else:
finCpd = node1.get_cpd()
for n in nodes:
if n in unzipped_list[0]:
ind = unzipped_list[0].index(n)
nCPD = n.get_cpd().set_evidence(evidence[ind])
finCpd = finCpd.multiplication(nCPD)
else:
finCpd = finCpd.multiplication(n.get_cpd())
for v in finCpd.get_variables():
finCpd = finCpd.marginalization(v)
return finCpd
class Factor(object):
def __init__(self,node):
self.node = node
self.calCPD = node.get_cpd().copy()
self.cluster = set()
self.isEvidence = False
def __str__(self):
return self.node.name
def set_evidence(self,evd):
self.calCPD = self.node.get_cpd().copy()
self.calCPD = self.calCPD.set_evidence(evd)
self.isEvidene = True
def clear_evidence(self):
self.calCPD = self.node.get_cpd().copy()
self.isEvidence = False
def set_cluster(self,cluster):
self.cluster = cluster
def get_variables(self):
return self.node.get_cpd().variables
def get_calculation_CDP(self):
return self.calCPD;
def get_node(self):
return self.node
def contains_node(self,node):
return self.node == node
import networkx as nx
import primo.reasoning.density.ProbabilityTable as ProbabilityTable
class FactorTree(object):
'''The factor tree contains for each node of the BayesNet a factor. It
is a directed graph with one root node. To speed up the reasoning it uses
a message based approach which stores calculated intermediate results at
edges. Thus, the first query is expensive and all following are easy calculated.
The speed of the first message calculation depends on how the tree was build.
Literature: Modeling and Reasoning with Bayesian Networks - Adnan Darwiche
Chapter 7
'''
def __init__(self,graph,rootNode):
self.graph = graph
self.rootNode = rootNode
def calculate_PoE(self):
'''Calculates the probability of evidence with the set evidence'''
if not self.graph.graph['messagesValid']:
self.calculate_messages()
cpd = self.calculate_marginal_forOne(self.rootNode)
for v in cpd.get_variables()[:]:
cpd = cpd.marginalization(v)
return cpd
def calculate_marginal(self,variables):
''' If evidence is set, then this methods calculates the posterior marginal.
With an empty evidence this is automatically the prior marginal.'''
if not self.graph.graph['messagesValid']:
self.calculate_messages()
resPT = ProbabilityTable.get_neutral_multiplication_PT()
for f in self.graph.nodes():
if f.get_node() in variables:
resPT = resPT.multiplication(self.calculate_marginal_forOne(f))
resPT = resPT.normalize_as_jpt()
return resPT
def calculate_marginal_forOne(self,factor):
curCPD = factor.get_calculation_CDP().copy()
for p in self.graph.predecessors(factor):
tmpCPD = self.graph[p][factor]['msgRightWay']
curCPD = curCPD.multiplication(tmpCPD)
for p in self.graph.neighbors(factor):
tmpCPD = self.graph[factor][p]['msgAgainstWay']
curCPD = curCPD.multiplication(tmpCPD)
for v in curCPD.get_variables()[:]:
if v != factor.get_node():
curCPD = curCPD.marginalization(v)
return curCPD
def draw(self):
'''Draws the FactorTree'''
import matplotlib.pyplot as plt
nx.draw_circular(self.graph)
plt.show()
def calculate_messages(self):
''' Calculates the messages and stores the intermediate results.'''
self.pull_phase(self.rootNode,self.graph)
self.push_phase(self.rootNode,self.graph,ProbabilityTable.get_neutral_multiplication_PT())
self.graph.graph['messagesValid'] = True
def set_evidences(self,evidences):
self.graph.graph['messagesValid'] = False
evNodes = zip(*evidences)
for factor in self.graph.nodes():
if factor.get_node() in evNodes[0]:
idx = evNodes[0].index(factor.get_node())
factor.set_evidence(evidences[idx])
def pull_phase(self,factor,graph):
calCPD = factor.get_calculation_CDP()
#calculate the messages of the children
for child in graph.neighbors(factor):
tmpInput = self.pull_phase(child,graph)
#project each factor on the specific separator
separator = graph[factor][child]['separator']
for var in tmpInput.variables[:]:
if var not in separator:
tmpInput = tmpInput.marginalization(var)
#save message on edge: it's the opposite of the direction of the edge
graph[factor][child]['msgAgainstWay'] = tmpInput
#calculate the new message
calCPD = calCPD.multiplication(tmpInput)
return calCPD
def push_phase(self,factor,graph,inCPD):
for child in graph.neighbors(factor):
tmpCPD = inCPD.multiplication(factor.get_calculation_CDP())
for child2 in graph.neighbors(factor):
if (child != child2):
tmpCPD = tmpCPD.multiplication(graph[factor][child2]['msgAgainstWay'])
separator = graph[factor][child]['separator']
#project on outgoing edge separator
for var in tmpCPD.variables:
if var not in separator:
tmpCPD = tmpCPD.marginalization(var)
#add setOut to outgoing vars from child
#Message with the direction of the edge
graph[factor][child]['msgRightWay'] = tmpCPD
self.push_phase(child,graph,tmpCPD)
from EasiestFactorElimination import EasiestFactorElimination
from Factor import Factor
from FactorTree import FactorTree
from FactorTreeFactory import FactorTreeFactory
\ No newline at end of file
# -*- coding: utf-8 -*-
import ParticleFilterDBN
\ No newline at end of file
# -*- coding: utf-8 -*-
from primo.core import BayesNet
from primo.core import DynamicBayesNet
import random
import copy
import random
import time
from primo.networks import BayesNet
from primo.networks import DynamicBayesNet
class Particle(object):
'''
This is the basic particle class used by the DBN particle filter.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment