| | """ |
| | Load output files from AMLSim and create NetworkX graph for analytics |
| | """ |
| | import os |
| | import sys |
| | import csv |
| | from datetime import datetime, timedelta |
| | from dateutil.parser import parse |
| | from collections import Counter |
| | import networkx as nx |
| | import json |
| |
|
| |
|
| | |
| | ACCT_SAR = "sar" |
| | TX_AMOUNT = "amount" |
| | TX_DATE = "date" |
| |
|
| | DEGREE_STEP = 5 |
| |
|
| |
|
| | def load_base_csv(acct_csv, tx_csv, schema_data): |
| | """Load account and transaction list CSV from the transaction graph generator (before running AMLSim) |
| | :param acct_csv: Account list CSV |
| | :param tx_csv: Transaction list CSV |
| | :param schema_data: Schema data from JSON file |
| | :return: Base transaction network as a NetworkX graph object |
| | """ |
| | return None |
| |
|
| |
|
| | def load_result_csv(acct_csv: str, tx_csv: str, schema_data) -> nx.MultiDiGraph: |
| | """Load account list CSV and transaction list CSV from AMLSim and generate transaction graph |
| | :param acct_csv: Account list CSV |
| | :param tx_csv: Transaction list CSV |
| | :param schema_data: Schema data from JSON |
| | :return: Transaction network as a NetworkX graph object |
| | """ |
| | acct_id_idx = None |
| | acct_sar_idx = None |
| | tx_src_idx = None |
| | tx_dst_idx = None |
| | tx_amt_idx = None |
| | tx_date_idx = None |
| | is_date_type = False |
| | base_date = datetime(1970, 1, 1) |
| |
|
| | for idx, col in enumerate(schema_data["account"]): |
| | data_type = col.get("dataType") |
| | if data_type == "account_id": |
| | acct_id_idx = idx |
| | elif data_type == "sar_flag": |
| | acct_sar_idx = idx |
| | for idx, col in enumerate(schema_data["transaction"]): |
| | data_type = col.get("dataType") |
| | if data_type == "orig_id": |
| | tx_src_idx = idx |
| | elif data_type == "dest_id": |
| | tx_dst_idx = idx |
| | elif data_type == "amount": |
| | tx_amt_idx = idx |
| | elif data_type == "timestamp": |
| | tx_date_idx = idx |
| | is_date_type = col.get("valueType") == "date" |
| |
|
| | _g = nx.MultiDiGraph() |
| | num_accts = 0 |
| | num_sar = 0 |
| | num_txs = 0 |
| | |
| | print("Load account list CSV file", acct_csv) |
| | with open(acct_csv, "r") as rf: |
| | reader = csv.reader(rf) |
| | next(reader) |
| | for row in reader: |
| | acct_id = row[acct_id_idx] |
| | is_sar = row[acct_sar_idx].lower() == "true" |
| | attr = {ACCT_SAR: is_sar} |
| | _g.add_node(acct_id, **attr) |
| | num_accts += 1 |
| | if is_sar: |
| | num_sar += 1 |
| | print("Number of total accounts: %d" % num_accts) |
| | print("Number of SAR accounts: %d (%.2f%%)" % (num_sar, num_sar/num_accts*100)) |
| |
|
| | |
| | print("Loading transaction list CSV file", tx_csv) |
| | with open(tx_csv, "r") as rf: |
| | reader = csv.reader(rf) |
| | next(reader) |
| | for row in reader: |
| | src_id = row[tx_src_idx] |
| | dst_id = row[tx_dst_idx] |
| | amount = float(row[tx_amt_idx]) |
| | date = parse(row[tx_date_idx]) if is_date_type else base_date + timedelta(int(row[tx_date_idx])) |
| | date_str = date.strftime("%Y-%m-%d") |
| | attr = {TX_AMOUNT: amount, TX_DATE: date_str} |
| | _g.add_edge(src_id, dst_id, **attr) |
| | num_txs += 1 |
| | if num_txs % 100000 == 0: |
| | print("Loaded %d transactions" % num_txs) |
| | print("Number of transactions: %d" % num_txs) |
| | return _g |
| |
|
| |
|
| | def load_alert_csv(_g, alert_acct_csv, alert_tx_csv, schema_data): |
| | """Load alert member and transaction lists |
| | """ |
| | acct_id_idx = None |
| |
|
| |
|
| | class __TransactionGraphLoader: |
| |
|
| | def __init__(self, _conf_json): |
| | with open(_conf_json, "r") as rf: |
| | self.conf = json.load(rf) |
| | schema_json = os.path.join(self.conf["input"]["directory"], self.conf["input"]["schema"]) |
| | with open(schema_json, "r") as rf: |
| | self.schema = json.load(rf) |
| | self.output_conf = self.conf["output"] |
| | self.g = nx.MultiDiGraph() |
| | self.sim_name = self.conf["general"]["simulation_name"] |
| |
|
| | def get_graph(self): |
| | return self.g |
| |
|
| | def count_hub_accounts(self, min_degree=DEGREE_STEP, max_degree=10): |
| | """Count number of "hub" accounts by degree |
| | """ |
| | in_deg = Counter(self.g.in_degree().values()) |
| | out_deg = Counter(self.g.out_degree().values()) |
| | for th in range(min_degree, max_degree + 1, DEGREE_STEP): |
| | num_fan_in = sum([c for d, c in in_deg.items() if d >= th]) |
| | num_fan_out = sum([c for d, c in out_deg.items() if d >= th]) |
| | print("\tNumber of fan-in / fan-out patterns with", th, "or more neighbors:", num_fan_in, "/", num_fan_out) |
| |
|
| |
|
| | class BaseGraphLoader(__TransactionGraphLoader): |
| |
|
| | def __init__(self, _conf_json): |
| | super(BaseGraphLoader, self).__init__(_conf_json) |
| |
|
| |
|
| | class ResultGraphLoader(__TransactionGraphLoader): |
| |
|
| | def __init__(self, _conf_json): |
| | super(ResultGraphLoader, self).__init__(_conf_json) |
| |
|
| | |
| | output_dir = os.path.join(self.output_conf["directory"], self.sim_name) |
| | acct_file = self.output_conf["accounts"] |
| | tx_file = self.output_conf["transactions"] |
| | alert_acct_file = self.output_conf["alert_members"] |
| | alert_tx_file = self.output_conf["alert_transactions"] |
| |
|
| | acct_path = os.path.join(output_dir, acct_file) |
| | tx_path = os.path.join(output_dir, tx_file) |
| | self.g = load_result_csv(acct_path, tx_path, self.schema) |
| | self.num_normal_accts = len([n for n, flag in nx.get_node_attributes(self.g, ACCT_SAR).items() if not flag]) |
| | self.num_sar_accts = len([n for n, flag in nx.get_node_attributes(self.g, ACCT_SAR).items() if flag]) |
| |
|
| | def count_hub_accounts(self, min_degree=DEGREE_STEP, max_degree=10): |
| | super(ResultGraphLoader, self).count_hub_accounts(min_degree, max_degree) |
| |
|
| | |
| | normal_in_deg = Counter([v for k, v in self.g.in_degree().items() if not self.g.node[k][ACCT_SAR]]) |
| | normal_out_deg = Counter([v for k, v in self.g.out_degree().items() if not self.g.node[k][ACCT_SAR]]) |
| | sar_in_deg = Counter([v for k, v in self.g.in_degree().items() if self.g.node[k][ACCT_SAR]]) |
| | sar_out_deg = Counter([v for k, v in self.g.out_degree().items() if self.g.node[k][ACCT_SAR]]) |
| |
|
| | print("Number of fan-in / fan-out patterns for %d normal accounts" % self.num_normal_accts) |
| | for th in range(min_degree, max_degree + 1, DEGREE_STEP): |
| | num_fan_in = sum([c for d, c in normal_in_deg.items() if d >= th]) |
| | num_fan_out = sum([c for d, c in normal_out_deg.items() if d >= th]) |
| | ratio_fan_in = num_fan_in / self.num_normal_accts |
| | ratio_fan_out = num_fan_out / self.num_normal_accts |
| | print("\tNumber of fan-in / fan-out patterns with %d or more neighbors: %d (%.2f%%)/ %d (%.2f%%)" % |
| | (th, num_fan_in, ratio_fan_in * 100, num_fan_out, ratio_fan_out * 100)) |
| |
|
| | print("Number of fan-in / fan-out patterns for %d SAR accounts" % self.num_sar_accts) |
| | for th in range(min_degree, max_degree + 1, DEGREE_STEP): |
| | num_fan_in = sum([c for d, c in sar_in_deg.items() if d >= th]) |
| | num_fan_out = sum([c for d, c in sar_out_deg.items() if d >= th]) |
| | ratio_fan_in = num_fan_in / self.num_sar_accts |
| | ratio_fan_out = num_fan_out / self.num_sar_accts |
| | print("\tNumber of fan-in / fan-out patterns with %d or more neighbors: %d (%.2f%%)/ %d (%.2f%%)" % |
| | (th, num_fan_in, ratio_fan_in * 100, num_fan_out, ratio_fan_out * 100)) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | argv = sys.argv |
| | if len(argv) < 2: |
| | print("Usage: python3 %s [ConfJSON]" % argv[0]) |
| | exit(1) |
| |
|
| | conf_json = argv[1] |
| |
|
| | |
| | |
| |
|
| | |
| | rgl = ResultGraphLoader(conf_json) |
| | rgl.count_hub_accounts(5, 25) |
| |
|