import networkx as nx
From raw networkx to a basic DAG
= nx.DiGraph()
G '0', 'a1')
G.add_edge('a1': 'A' }, "site")
nx.set_node_attributes(G, { '0', 'b1')
G.add_edge('b1': 'B' }, "site")
nx.set_node_attributes(G, { 'b1', 'b2')
G.add_edge('b2': 'B' }, "site")
nx.set_node_attributes(G, {
= G.nodes
nodes for (k,v) in map(lambda x: (x, 'node'), nodes)} {k:v
{'0': 'node', 'a1': 'node', 'b1': 'node', 'b2': 'node'}
nx.is_directed_acyclic_graph(G)
True
nx.dag_longest_path(G)
['0', 'b1', 'b2']
def flatten(l):
return [item for sublist in l for item in sublist]
def ancetres(node):
= [pred for pred in G.predecessors(node)]
predecessors return predecessors + flatten([ancetres(pred) for pred in predecessors])
'b2') ancetres(
['b1', '0']
def concurrents(node_a, avec=list(G.nodes)):
= [n for n in avec if n not in ancetres(node_a)]
H
H.remove(node_a)return [node_b for node_b in H if node_a not in ancetres(node_b)]
'b2') concurrents(
['a1']
def conflits(node_a, avec=None):
= avec or concurrents(node_a)
avec = nx.dijkstra_path_length(G, source='0', target=node_a) % 2
parité return [node_b for node_b in avec if parité is not nx.dijkstra_path_length(G, source='0', target=node_b) % 2]
'b2') conflits(
['a1']
= nx.get_node_attributes(G, "site")
site 'b1'] site[
'B'
'a1'] > site['b2'] site[
False
from functools import total_ordering
= [
ORDER 'A',
'B',
'C'
]
@total_ordering
class Site:
def __init__(self, label):
self.label = label
def __eq__(self, other):
return ORDER.index(self.label) == ORDER.index(other.label)
def __lt__(self, other):
return ORDER.index(self.label) > ORDER.index(other.label)
='A') > Site(label='B') Site(label
True
'a1': Site(label='A') }, "site")
nx.set_node_attributes(G, { 'b1': Site(label='B') }, "site")
nx.set_node_attributes(G, { 'b2': Site(label='B') }, "site")
nx.set_node_attributes(G, { = nx.get_node_attributes(G, "site")
site
'a1'] > site['b2'] site[
True
def résolution(node_a, node_b):
# on peut imaginer d'autres stratégies…
return site[node_a] > site[node_b]
def estValide(node):
= conflits(node)
confs = all([résolution(node, conf) for conf in confs]) if len(confs) > 0 else True
gagne_conflits = [pred for pred in G.predecessors(node)]
predecessors = all([estValide(pred) for pred in predecessors]) if len(predecessors) > 0 else True
parents_valides return gagne_conflits and parents_valides
for i in G.nodes:
print("""{:2}:
concurrent avec : {}
en conflit avec : {}
validité : {}""".format(i, concurrents(i), conflits(i), estValide(i)))
0 :
concurrent avec : []
en conflit avec : []
validité : True
a1:
concurrent avec : ['b1', 'b2']
en conflit avec : ['b2']
validité : True
b1:
concurrent avec : ['a1']
en conflit avec : []
validité : True
b2:
concurrent avec : ['a1']
en conflit avec : ['a1']
validité : False
from typing import List, Dict
class DAG(nx.DiGraph):
def ancetres(self, node: str) -> List[str]:
= [pred for pred in self.predecessors(node)]
predecessors return predecessors + flatten([self.ancetres(pred) for pred in predecessors])
def concurrents(self, node_a) -> List[str]:
= [n for n in self.nodes if n not in self.ancetres(node_a)]
H # list.remove
H.remove(node_a) return [node_b for node_b in H if node_a not in self.ancetres(node_b)]
def conflits(self, node_a: str) -> List[str]:
= nx.dijkstra_path_length(self, source='0', target=node_a) % 2
parité return [
node_bfor node_b
in self.concurrents(node_a)
if parité is not nx.dijkstra_path_length(self, source='0', target=node_b) % 2
]
def résolution(self, node_a: str, node_b: str):
= nx.get_node_attributes(self, "site")
site # on peut imaginer d'autres stratégies…
return site[node_a] > site[node_b]
def estValide(self, node: str) -> bool:
= self.conflits(node)
_conflits = all([
gagne_conflits self.résolution(node, _conflit)
for _conflit
in _conflits
if len(_conflits) > 0 else True
]) = [pred for pred in self.predecessors(node)]
predecessors = all([
parents_valides self.estValide(pred)
for pred
in predecessors
if len(predecessors) > 0 else True
]) = gagne_conflits and parents_valides
valide self, { node: valide }, "valide")
nx.set_node_attributes(return valide
def dag_longest_valid_path(self) -> str:
= self.copy()
_self = nx.dag_longest_path(_self)
last_path = last_path[-1]
last while not self.estValide(last):
_self.remove_node(last)= nx.dag_longest_path(_self)
last_path = last_path[-1]
last return last_path
def draw(self):
import matplotlib.pyplot as plt
= nx.spring_layout(self)
pos self, pos, node_color='gray', with_labels=True)
nx.draw(
= nx.get_node_attributes(self, "valide")
valide = [node for node in self.nodes if not self.estValide(node)]
invalides for node in self.nodes:
self, pos, nodelist=invalides, node_color='r')
nx.draw_networkx_nodes(
= self.dag_longest_valid_path()
path = list(zip(path,path[1:]))
path_edges self, pos, nodelist=path, node_color='g')
nx.draw_networkx_nodes(self, pos, edgelist=path_edges, edge_color='g', width=5)
nx.draw_networkx_edges(
'equal')
plt.axis(
plt.show()
# Test ancetres
= DAG()
G '0', 'a1')
G.add_edge('a1': Site(label='A') }, "site")
nx.set_node_attributes(G, { '0', 'b1')
G.add_edge('b1': Site(label='B') }, "site")
nx.set_node_attributes(G, { 'b1', 'b2')
G.add_edge('b2': Site(label='B') }, "site")
nx.set_node_attributes(G, { assert G.ancetres('b2') == ['b1', '0']
# Test concurrents
assert G.concurrents('b1') == ['a1']
# Test conflits
assert G.conflits('b2') == ['a1']
# Test estValide
assert G.estValide('b2') == False
assert G.estValide('b1') == True
# Test plus long chemin valide
assert G.dag_longest_valid_path() == ['0', 'a1'] # et non 'b2' le plus long chemin
G.draw()
= [
ORDER 'B',
'A'
]
= DAG()
G '0', 'a1')
G.add_edge('a1': Site(label='A') }, "site")
nx.set_node_attributes(G, { '0', 'b1')
G.add_edge('b1': Site(label='B') }, "site")
nx.set_node_attributes(G, { 'b1', 'b2')
G.add_edge('b2': Site(label='B') }, "site")
nx.set_node_attributes(G, { G.draw()
= [
ORDER 'C',
'B',
'A'
]
= DAG()
G_d '0', 'a1'), ('a1', 'a2'), ('0', 'b1'), ('0', 'c1'), ('c1', 'c2')])
G_d.add_edges_from([('a1': Site(label='A') }, "site")
nx.set_node_attributes(G_d, { 'a2': Site(label='A') }, "site")
nx.set_node_attributes(G_d, { 'b1': Site(label='B') }, "site")
nx.set_node_attributes(G_d, { 'c1': Site(label='C') }, "site")
nx.set_node_attributes(G_d, { 'c2': Site(label='C') }, "site")
nx.set_node_attributes(G_d, { print(G_d.nodes)
'c1')
G_d.estValide( G_d.draw()
['0', 'a1', 'a2', 'b1', 'c1', 'c2']
= DAG()
G_d '0', 'a1'), ('a1', 'a2'), ('0', 'b1'), ('b1', 'b2'), ('0', 'c1')])
G_d.add_edges_from([('a1': Site(label='A') }, "site")
nx.set_node_attributes(G_d, { 'a2': Site(label='A') }, "site")
nx.set_node_attributes(G_d, { 'b1': Site(label='B') }, "site")
nx.set_node_attributes(G_d, { 'b2': Site(label='B') }, "site")
nx.set_node_attributes(G_d, { 'c1': Site(label='C') }, "site")
nx.set_node_attributes(G_d, { 'c2': Site(label='C') }, "site")
nx.set_node_attributes(G_d, { G_d.draw()
= DAG()
G_d '0', 'a1'), ('0', 'b1'), ('b1', 'b2'), ('b2', 'b3')])
G_d.add_edges_from([('a1': Site(label='A') }, "site")
nx.set_node_attributes(G_d, { 'b1': Site(label='B') }, "site")
nx.set_node_attributes(G_d, { 'b2': Site(label='B') }, "site")
nx.set_node_attributes(G_d, { 'b3': Site(label='B') }, "site")
nx.set_node_attributes(G_d, { G_d.draw()
= DAG()
G_d '0', 'a1'), ('a1', 'a2'), ('0', 'b1'), ('b1', 'b2'), ('b2', 'b3')])
G_d.add_edges_from([('a1': Site(label='A') }, "site")
nx.set_node_attributes(G_d, { 'a2': Site(label='A') }, "site")
nx.set_node_attributes(G_d, { 'b1': Site(label='B') }, "site")
nx.set_node_attributes(G_d, { 'b2': Site(label='B') }, "site")
nx.set_node_attributes(G_d, { 'b3': Site(label='B') }, "site")
nx.set_node_attributes(G_d, { G_d.draw()