#!/usr/bin/python # -*- coding: utf-8 -*- ''' Ce script calcule les distances de désenclavement d'une liste de douars Les douars sont placés dans un graphe comprenant les pistes, les routes et les services de base Pour calculer la distance entre un douar et un service de base, on explore le graphe selon une méthode similaire à Dijkstra, jusqu'à ce qu'on trouve un douar ayant l'accès à ce service. Le script prend en entrée deux fichiers : - un fichier csv (séparateur espaces) contenant la liste des arêtes : noeud 1, noeud 2, distance (m), type de route (route, piste, aucun) - un fichier json détaillant l'emplacement des services de base : école, collège et centre de santé En sortie, le script retourne un fichier csv contenant la liste des distances de désenclavement pour chaque douar Les fichiers d'entrée et de sortie peuvent être modifiés à la fin du script dans la partie __main__ Dépendance : networkx ''' import networkx as nx import json, heapq import psycopg2 # Distances maximales : au delà de ces distances, on retournera Inf # (permet de réduire le temps de calcul en limitant l'exploration) # Dans le modèle usuel, l'indice s'annule au delà de ces distances # Pour supprimer la condition, il suffit de fixer les valeurs à Inf PISTEMAX = 4000 ROUTEMAX = 15000 ECOLEMAX = 4000 COLLEGEMAX = 30000 SANTEMAX = 20000 CLIEUMAX = 20000 def create_graph(edges_file, nodes_file) : ''' Crée un graphe à partir de deux fichiers de données :param edges_file: fichier csv contenant la liste des arêtes :param nodes_file: fichier json contenant les services de base :returns: graphe du réseau routier et des services ''' G = nx.Graph() f = open(edges_file) for line in f : l = line.split() n1, n2 = int(l[0]), int(l[1]) dis = float(l[2]) t = l[3] G.add_edge(n1, n2, {'length' : dis, 'type' : t}) f.close() nodes = json.load(open(nodes_file)) for i in nodes : for n in nodes[i] : G.node[n][i] = True return G def has_road(G, n, codes) : ''' Evalue si un douar est relié à la route ou la piste :param G: graphe complet du réseau :param n: id du noeud étudié :param codes : liste des codes à rechercher ['p', 'r'] pour la route, ['p'] pour la piste :returns: True si le douar n est relié à la piste / route False sinon ''' res = False for n2 in G.neighbors(n) : res = res or (G[n][n2]['type'] in codes) return res def has_service(G, n, b) : ''' Evalue si le douar possède une service donnné :param G: graphe complet du réseau :param n: id du noeud étudié :param b : bâtiment recherché (ecole, college, sante) :returns: True si le douar n possède le batiment False sinon ''' try : return G.node[n][b] except KeyError : return False def distance(G, source, test_fun, param, cutoff= None, weight= 'length') : ''' Calcule la plus courte distance à un service donné Algorithme de Dijkstra modifié Basé sur la fonction single_source_dijkstra_path_length de networkx :param G: graphe :param source: noeud étudié :param test_fun: fonction qui évalue la présence du service :param param: troisième argument de la fonction précédente :cutoff: distance au delà de laquelle on arrête de chercher :weight: paramètre des arêtes utilisé comme poids :returns: distance du douar au service recherché Inf, si pas de résultat trouvé dans le périmètre défini par cutoff ''' dist = {} seen = {source : 0} fringe = [] heapq.heappush(fringe,(0,source)) while fringe : (d,v) = heapq.heappop(fringe) if v in dist: continue dist[v] = d if test_fun(G, v, param) : # On s'arrête dès qu'on trouvé un noeud qui valide le test # Par construction c'est le plus proche de la source return(d) edata = iter(G[v].items()) for w,edgedata in edata : vw_dist = dist[v] + edgedata.get(weight,1) # if cutoff is not None and vw_dist>cutoff: # continue if w in dist : if vw_dist < dist[w] : raise ValueError('Contradictory paths found:', 'negative weights?') elif w not in seen or vw_dist < seen[w]: seen[w] = vw_dist heapq.heappush(fringe,(vw_dist,w)) return (vw_dist) def all_dis(G, n) : ''' Calcule toutes les distances de désenclavement à un noeud :param G: graphe complet du réseau :param n: id du noeud étudié :returns: dictionnaire {service : distance} ''' res = { 'piste' : distance(G, n, has_road, ['r', 'p'], PISTEMAX), 'route' : distance(G, n, has_road, ['r'], ROUTEMAX), 'ecole' : distance(G, n, has_service, 'ecole', ECOLEMAX), 'college' : distance(G, n, has_service, 'college', COLLEGEMAX), 'sante' : distance(G, n, has_service, 'sante', SANTEMAX), 'cheflieu' : distance(G, n, has_service, 'cheflieu', CLIEUMAX) } return res def ma_fonction(d) : conn = psycopg2.connect(" dbname='test3' user = 'postgres' host='localhost' password='postgres'") cursor= conn.cursor() for n in d : cursor.execute(""" Update douar SET distance_route=%s, distance_piste=%s, distance_ecole=%s, distance_college=%s, distance_sante=%s, distance_cheflieu=%s WHERE id=%s """, (d[n]['route'] / 1000.0, d[n]['piste'] / 1000.0, d[n]['ecole'] / 1000.0, d[n]['college'] / 1000.0, d[n]['sante'] / 1000.0, d[n]['cheflieu'] / 1000.0, n) ) conn.commit() # print(commit) # def write_csv_output(file, d) : # ''' # Ecrit un fichier csv # :param file: nom du fichier de sortie # :param d: dictionnaire {id : {service : distance}} # :returns: None # ''' # out = open(file, 'w') # out.write('id,piste,route,ecole,college,sante\n') # for n in d : # l = [ # str(n), # format(d[n]['piste'] / 1000.0, '0.2f'), # format(d[n]['route'] / 1000.0, '0.2f'), # format(d[n]['ecole'] / 1000.0, '0.2f'), # format(d[n]['college'] / 1000.0, '0.2f'), # format(d[n]['sante'] / 1000.0, '0.2f') # ] # out.write(','.join(l) + '\n') if __name__ == '__main__' : ''' Partie qui est executée lorsqu'on appelle directement le script ''' # Crée le graphe G = create_graph('edges.csv', 'nodes.json') # Calcule les distances pour chaque douar distances = {} for n in G : if n > 0 : distances[n] = all_dis(G, n) # Ecrit le fichier csv de sortie ma_fonction(distances)