Source code for dscript.glider

import numpy as np
import scipy.spatial.distance as spatial


[docs]def get_dim(edgelist): """Given an adjacency list for a graph, returns the number of nodes in the graph. :param edgelist: Graph adjacency list :type edgelist: list :return: Number of nodes in the graph :rtype: int """ node_dict = {} node_count = 0 for edge in edgelist: p, q = edge[:2] if p not in node_dict: node_dict[p] = True node_count += 1 if q not in node_dict: node_dict[q] = True node_count += 1 return node_count
[docs]def densify(edgelist, dim=None, directed=False): """Given an adjacency list for the graph, computes the adjacency matrix. :param edgelist: Graph adjacency list :type edgelist: list :param dim: Number of nodes in the graph :type dim: int :param directed: Whether the graph should be treated as directed :type directed: bool :return: Graph as an adjacency matrix :rtype: np.ndarray """ if dim is None: dim = get_dim(edgelist) A = np.zeros((dim, dim), dtype=np.double) for edge in edgelist: p, q, wt = edge A[p, q] = wt if not directed: A[q, p] = wt return A
[docs]def compute_pinverse_diagonal(D): D_i = D.copy() for i in range(D_i.shape[0]): D_i[i, i] = 1 / D[i, i] if D[i, i] != 0 else 0 return D_i
[docs]def compute_X_normalized(A, D, t=-1, lm=1, is_normalized=True): D_i = compute_pinverse_diagonal(D) P = np.matmul(D_i, A) Identity = np.identity(A.shape[0]) e = np.ones((A.shape[0], 1)) # Compute W scale = np.matmul(e.T, np.matmul(D, e))[0, 0] W = np.multiply(1 / scale, np.matmul(e, np.matmul(e.T, D))) up_P = np.multiply(lm, P - W) X_ = Identity - up_P X_i = np.linalg.pinv(X_) if t > 0: LP_t = Identity - np.linalg.matrix_power(up_P, t) X_i = np.matmul(X_i, LP_t) if not is_normalized: return X_i # Normalize with steady state SS = np.sqrt(np.matmul(D, e)) SS = compute_pinverse_diagonal(np.diag(SS.flatten())) return np.matmul(X_i, SS)
######################################################
[docs]def compute_cw_score(p, q, edgedict, ndict, params=None): """ Computes the common weighted score between p and q. :param p: A node of the graph :param q: Another node in the graph :param edgedict: A dictionary with key `(p, q)` and value `w`. :type edgedict: dict :param ndict: A dictionary with key `p` and the value a set `{p1, p2, ...}` :type ndict: dict :param params: Should always be none here :type params: None :return: A real value representing the score :rtype: float """ if len(ndict[p]) > len(ndict[q]): temp = p p = q q = temp score = 0 for elem in ndict[p]: if elem in ndict[q]: p_elem = edgedict[(p, elem)] if (p, elem) in edgedict else edgedict[(elem, p)] q_elem = edgedict[(q, elem)] if (q, elem) in edgedict else edgedict[(elem, q)] score += p_elem + q_elem return score
[docs]def compute_cw_score_normalized(p, q, edgedict, ndict, params=None): """ Computes the common weighted normalized score between p and q. :param p: A node of the graph :param q: Another node in the graph :param edgedict: A dictionary with key `(p, q)` and value `w`. :type edgedict: dict :param ndict: A dictionary with key `p` and the value a set `{p1, p2, ...}` :type ndict: dict :param params: Should always be none here :type params: None :return: A real value representing the score :rtype: float """ if len(ndict[p]) > len(ndict[q]): temp = p p = q q = temp score = 0 for elem in ndict[p]: if elem in ndict[q]: p_elem = edgedict[(p, elem)] if (p, elem) in edgedict else edgedict[(elem, p)] q_elem = edgedict[(q, elem)] if (q, elem) in edgedict else edgedict[(elem, q)] score += p_elem + q_elem degrees = params["deg"] return score / np.sqrt(degrees[p] * degrees[q])
[docs]def compute_l3_unweighted_mat(A): A_u = np.where(A > 0, 1, 0) d, _ = A_u.shape e = np.ones((d, 1)) deg = A_u @ e ideg = np.where(deg > 0, 1 / deg, 0) sdeg = np.diag(np.sqrt(ideg).flatten()) A1 = sdeg @ A_u @ sdeg return A1
[docs]def compute_l3_weighted_mat(A): d, _ = A.shape e = np.ones((d, 1)) deg = A @ e ideg = np.where(deg > 0, 1 / deg, 0) sdeg = np.diag(np.sqrt(ideg).flatten()) A1 = sdeg @ A @ sdeg return A1
[docs]def compute_l3_score_mat(p, q, edgedict, ndict, params=None): L3 = params["l3"] return L3[p, q]
[docs]def compute_degree_vec(edgelist): A = densify(edgelist) e = np.ones((A.shape[0], 1)) deg = A @ e return deg.flatten()
##############################################################
[docs]def create_edge_dict(edgelist): """ Creates an edge dictionary with the edge `(p, q)` as the key, and weight `w` as the value. :param edgelist: list with elements of form `(p, q, w)` :type edgelist: list :return: A dictionary with key `(p, q)` and value `w`. :rtype: dict """ edgedict = {} for p, q, w in edgelist: edgedict[(p, q)] = w return edgedict
[docs]def create_neighborhood_dict(edgelist): """ Create a dictionary with nodes as key and a list of neighborhood nodes as the value :param edgelist: A list with elements of form `(p, q, w)` :type edgelist: list :return: neighborhood_dict -> A dictionary with key `p` and value, a set `{p1, p2, p3, ...}` :rtype: dict """ ndict = {} for ed in edgelist: p, q, _ = ed if p not in ndict: ndict[p] = set() if q not in ndict: ndict[q] = set() ndict[p].add(q) ndict[q].add(p) return ndict
[docs]def glide_compute_map(pos_df, thres_p=0.9, params={}): """ Return glide_mat and glide_map. :param pos_df: Dataframe of weighted edges :type pos_df: pd.DataFrame :param thres_p: Threshold to treat an edge as positive :type thres_p: float :param params: Parameters for GLIDE :type params: dict :return: glide_matrix and corresponding glide_map :rtype: tuple(np.ndarray, dict) """ params["lam"] = 1 if "lam" not in params else params["lam"] params["norm"] = False if "norm" not in params else params["norm"] params["glide"] = ( {"alpha": 1.0, "beta": 1000.0, "loc": "cw_normalized", "delta": 1.0} if "glide" not in params else params["glide"] ) def a_d(u_edges, n_nodes): A = np.zeros((n_nodes, n_nodes)) for p, q, w in u_edges: A[p, q] = w A[q, p] = w D = np.diag((A @ np.ones((n_nodes, 1))).flatten()) return A, D glide_map = {} count = 0 u_edges = [] for _, (p, q, w) in pos_df.iterrows(): for m in [p, q]: if m not in glide_map: glide_map[m] = count count += 1 u_edges.append((glide_map[p], glide_map[q], w)) A, D = a_d(u_edges, count) X = compute_X_normalized(A, D, lm=params["lam"], is_normalized=params["norm"]) glide_mat = glide_predict_links(u_edges, X, params=params["glide"], thres_p=thres_p) return glide_mat, glide_map
[docs]def glider_score(p, q, glider_map, glider_mat): for m in [p, q]: if m not in glider_map: return 0 p_ = glider_map[p] q_ = glider_map[q] return glider_mat[p_, q_]