| import pandas as pd |
| import networkx as nx |
| import tqdm |
| import plotly.graph_objects as go |
| from datasets import load_dataset |
| import pandas as pd |
|
|
|
|
| def load_graph_from_edge_df( |
| repo_name: str, |
| edge_df: pd.DataFrame, |
| ) -> nx.DiGraph: |
| """ |
| Create a NetworkX directed graph from the dependency edge DataFrame. |
| Uses all edge types for centrality calculation. |
| |
| Args: |
| repo_name: Name of the repository to filter by |
| edge_df: DataFrame with columns [repo_name, target, source, edge_type] |
| |
| Returns: |
| NetworkX DiGraph with edges and edge attributes |
| """ |
| G = nx.DiGraph() |
| repo_edge_df = edge_df[edge_df["repo_name"] == repo_name] |
|
|
| |
| for _, row in repo_edge_df.iterrows(): |
| source = row["source"] |
| target = row["target"] |
| edge_type = row["edge_type"] |
|
|
| |
| G.add_edge(source, target, edge_type=edge_type, repo_name=repo_name) |
|
|
| return G |
|
|
|
|
| def init_graphs(): |
| """Initialize graphs from dependency data on startup""" |
| print("Loading dependency data from HuggingFace Hub...") |
| dataset = load_dataset( |
| "lambdaofgod/pwc_github_search", |
| data_files="sample_repo_dependency_records.parquet", |
| ) |
| graph_dependencies_df = dataset["train"].to_pandas() |
|
|
| repos = graph_dependencies_df["repo_name"].unique() |
|
|
| graphs = dict() |
| print(f"Loading {len(repos)} graphs...") |
| for repo_name in tqdm.tqdm(repos): |
| graph = load_graph_from_edge_df(repo_name, graph_dependencies_df) |
| graphs[repo_name] = graph |
|
|
| print("Graphs loaded successfully!") |
| return graphs |
|
|
|
|
| def get_node_type(node, graph): |
| """Determine node type based on edge relationships""" |
| node_str = str(node) |
|
|
| |
| if "/" in node_str: |
| for _, _, data in graph.edges(node, data=True): |
| if data.get("edge_type") == "repo-file": |
| return "repository" |
|
|
| |
| if ".py" in node_str: |
| |
| for source, target, data in graph.edges(data=True): |
| if target == node and data.get("edge_type") == "repo-file": |
| return "file" |
| |
| for _, _, data in graph.edges(node, data=True): |
| edge_type = data.get("edge_type", "") |
| if edge_type.startswith("file-"): |
| return "file" |
|
|
| |
| for source, target, data in graph.edges(data=True): |
| edge_type = data.get("edge_type", "") |
| if (target == node and edge_type == "file-import") or ( |
| edge_type == "import-import" and (source == node or target == node) |
| ): |
| return "import" |
|
|
| |
| for source, target, data in graph.edges(data=True): |
| edge_type = data.get("edge_type", "") |
| if target == node and edge_type == "file-class": |
| return "class" |
| if source == node and edge_type in ["class-method", "inheritance"]: |
| return "class" |
|
|
| |
| for source, target, data in graph.edges(data=True): |
| edge_type = data.get("edge_type", "") |
| if target == node and edge_type == "file-function": |
| return "function" |
| if edge_type == "function-function" and (source == node or target == node): |
| return "function" |
|
|
| |
| for source, target, data in graph.edges(data=True): |
| if target == node and data.get("edge_type") == "class-method": |
| return "method" |
|
|
| |
| return "unknown" |
|
|
|
|
| def create_interactive_plotly_graph( |
| repo_name, graph, layout_type="spring", selected_edge_types=None |
| ): |
| """Create an interactive Plotly graph with node names and edge types""" |
| if selected_edge_types is None: |
| selected_edge_types = set() |
| |
| if layout_type == "spring": |
| pos = nx.spring_layout(graph, k=1, iterations=100) |
| elif layout_type == "circular": |
| pos = nx.circular_layout(graph) |
| elif layout_type == "kamada_kawai": |
| pos = nx.kamada_kawai_layout(graph) |
| elif layout_type == "fruchterman_reingold": |
| pos = nx.fruchterman_reingold_layout(graph, k=1, iterations=100) |
| elif layout_type == "shell": |
| pos = nx.shell_layout(graph) |
| elif layout_type == "spectral": |
| pos = nx.spectral_layout(graph) |
| elif layout_type == "planar": |
| try: |
| pos = nx.planar_layout(graph) |
| except nx.NetworkXException: |
| |
| pos = nx.spring_layout(graph, k=1, iterations=50) |
| else: |
| pos = nx.spring_layout(graph, k=1, iterations=50) |
|
|
| |
| filtered_edges = [] |
| for edge in graph.edges(data=True): |
| edge_type = edge[2].get("edge_type", "unknown") |
| if not selected_edge_types or edge_type in selected_edge_types: |
| filtered_edges.append(edge) |
|
|
| |
| edge_x = [] |
| edge_y = [] |
| edge_info = [] |
|
|
| for edge in filtered_edges: |
| x0, y0 = pos[edge[0]] |
| x1, y1 = pos[edge[1]] |
| edge_x.extend([x0, x1, None]) |
| edge_y.extend([y0, y1, None]) |
|
|
| |
| edge_type = edge[2].get("edge_type", "unknown") |
| edge_info.append(f"{edge[0]} → {edge[1]}<br>Type: {edge_type}") |
|
|
| |
| edge_trace = go.Scatter( |
| x=edge_x, |
| y=edge_y, |
| line=dict(width=1, color="#888"), |
| hoverinfo="none", |
| mode="lines", |
| name="Edges", |
| ) |
|
|
| |
| node_type_colors = { |
| "repository": "#FF6B6B", |
| "file": "#4ECDC4", |
| "class": "#45B7D1", |
| "function": "#96CEB4", |
| "method": "#FFEAA7", |
| "import": "#FF9F43", |
| "unknown": "#DDA0DD", |
| } |
|
|
| |
| connected_nodes = set() |
| for edge in filtered_edges: |
| connected_nodes.add(edge[0]) |
| connected_nodes.add(edge[1]) |
|
|
| |
| if not selected_edge_types: |
| connected_nodes = set(graph.nodes()) |
|
|
| |
| degrees = [graph.degree(node) for node in connected_nodes] |
| min_degree = min(degrees) if degrees else 0 |
| max_degree = max(degrees) if degrees else 1 |
| degree_range = max_degree - min_degree if max_degree > min_degree else 1 |
|
|
| |
| node_x = [] |
| node_y = [] |
| node_text = [] |
| node_info = [] |
| node_colors = [] |
| node_types = [] |
| node_sizes = [] |
| node_opacities = [] |
|
|
| for node in connected_nodes: |
| x, y = pos[node] |
| node_x.append(x) |
| node_y.append(y) |
|
|
| |
| node_type = get_node_type(node, graph) |
| node_types.append(node_type) |
|
|
| |
| degree = graph.degree(node) |
| |
| size = max(8, min(25, 8 + degree * 1.5)) |
| node_sizes.append(size) |
|
|
| |
| normalized_degree = (degree - min_degree) / degree_range |
| opacity = 0.3 + (normalized_degree * 0.7) |
| node_opacities.append(opacity) |
|
|
| |
| display_name = str(node) |
| if len(display_name) > 30: |
| display_name = display_name[:27] + "..." |
|
|
| node_text.append(display_name) |
| node_info.append( |
| f"Node: {node}<br>Type: {node_type}<br>Degree: {graph.degree(node)}" |
| ) |
|
|
| |
| node_colors.append(node_type_colors.get(node_type, node_type_colors["unknown"])) |
|
|
| |
| node_trace = go.Scatter( |
| x=node_x, |
| y=node_y, |
| mode="markers+text", |
| hoverinfo="text", |
| hovertext=node_info, |
| text=node_text, |
| textposition="middle center", |
| textfont=dict(size=8, color="rgba(0,0,0,1)"), |
| marker=dict( |
| size=node_sizes, |
| color=node_colors, |
| line=dict(width=1, color="black"), |
| opacity=node_opacities, |
| ), |
| name="Nodes", |
| ) |
|
|
| |
| fig = go.Figure(data=[edge_trace, node_trace]) |
|
|
| fig.update_layout( |
| title=dict( |
| text=f"Interactive Dependency Graph: {repo_name}", font=dict(size=16) |
| ), |
| showlegend=True, |
| hovermode="closest", |
| margin=dict(b=20, l=5, r=5, t=40), |
| annotations=[ |
| dict( |
| text="Hover over nodes for details. Zoom and pan to explore.", |
| showarrow=False, |
| xref="paper", |
| yref="paper", |
| x=0.005, |
| y=-0.002, |
| ) |
| ], |
| xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), |
| yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), |
| plot_bgcolor="white", |
| ) |
|
|
| return fig |
|
|
|
|
| def get_available_edge_types(graph): |
| """Get all unique edge types in the graph""" |
| edge_types = set() |
| for _, _, data in graph.edges(data=True): |
| edge_type = data.get("edge_type", "unknown") |
| edge_types.add(edge_type) |
|
|
| |
| preferred_order = [ |
| "repo-file", |
| "file-class", |
| "file-import", |
| "inheritance", |
| "import-import", |
| "file-function", |
| "class-method", |
| "function-function", |
| ] |
|
|
| |
| ordered_types = [] |
| for edge_type in preferred_order: |
| if edge_type in edge_types: |
| ordered_types.append(edge_type) |
| edge_types.remove(edge_type) |
|
|
| |
| ordered_types.extend(sorted(list(edge_types))) |
|
|
| return ordered_types |
|
|
|
|
| def visualize_graph( |
| repo_name, graphs_dict, layout_type="spring", selected_edge_types=None |
| ): |
| """Visualize the selected repository's graph""" |
| if repo_name not in graphs_dict: |
| return None, f"Repository '{repo_name}' not found in loaded graphs." |
|
|
| if repo_name is None: |
| return None, "Please select a repository." |
|
|
| graph = graphs_dict[repo_name] |
|
|
| |
| fig = create_interactive_plotly_graph( |
| repo_name, graph, layout_type, selected_edge_types |
| ) |
|
|
| |
| edge_types = {} |
| filtered_edge_count = 0 |
| for _, _, data in graph.edges(data=True): |
| edge_type = data.get("edge_type", "unknown") |
| if not selected_edge_types or edge_type in selected_edge_types: |
| edge_types[edge_type] = edge_types.get(edge_type, 0) + 1 |
| filtered_edge_count += 1 |
|
|
| edge_type_summary = "\n".join( |
| [f" {edge_type}: {count}" for edge_type, count in edge_types.items()] |
| ) |
|
|
| |
| if selected_edge_types: |
| |
| connected_nodes = set() |
| for source, target, data in graph.edges(data=True): |
| edge_type = data.get("edge_type", "unknown") |
| if edge_type in selected_edge_types: |
| connected_nodes.add(source) |
| connected_nodes.add(target) |
| else: |
| connected_nodes = set(graph.nodes()) |
|
|
| node_types = {} |
| for node in connected_nodes: |
| node_type = get_node_type(node, graph) |
| node_types[node_type] = node_types.get(node_type, 0) + 1 |
|
|
| node_type_summary = "\n".join( |
| [f" {node_type}: {count}" for node_type, count in node_types.items()] |
| ) |
|
|
| stats = f"""Repository: {repo_name} |
| Visible nodes: {len(connected_nodes)} / {graph.number_of_nodes()} |
| Visible edges: {filtered_edge_count} / {graph.number_of_edges()} |
| |
| Visible node types: |
| {node_type_summary} |
| |
| Visible edge types: |
| {edge_type_summary} |
| """ |
|
|
| return fig, stats |
|
|