in src/backend/domain/services/steps/calculate_indicators.py [0:0]
def _get_graph(session_dict):
vectors = {key: session_dict["data_by_symbol"][key]["close"] for key in session_dict["data_by_symbol"]}
for key in session_dict["data_by_indicator"]:
try:
vectors[key] = session_dict["data_by_indicator"][key].values.astype(np.float64)
except:
if key in session_dict["key_by_indicator"]:
del session_dict["key_by_indicator"][key]
del session_dict["data_by_indicator"][key]
max_len = max([len(vectors[i]) for i in vectors])
for key in vectors:
vectors[key] = np.pad(vectors[key], (0, max_len - vectors[key].shape[0]), mode="constant", constant_values=np.nan)
keys = list(vectors.keys())
sim_coef = defaultdict(lambda: dict())
for i in range(len(keys)):
for j in range(i + 1, len(keys)):
sim = get_sim_coef(vectors[keys[i]], vectors[keys[j]])
sim_coef[keys[i]][keys[j]] = sim
sim_coef[keys[j]][keys[i]] = sim
threshold = 0.2
roots = dict()
for s in session_dict["data_by_symbol"]:
roots[s] = []
root_by_id = {r: r for r in session_dict["data_by_symbol"]}
main_root_by_id = {r: r for r in session_dict["data_by_symbol"]}
main_roots = dict()
for s in session_dict["data_by_symbol"]:
main_roots[s] = []
for key, value in session_dict["key_by_indicator"].items():
if value is not None and value in main_root_by_id:
main_roots[main_root_by_id[value]].append(key)
main_root_by_id[key] = main_root_by_id[main_root_by_id[value]]
elif value is None:
for m_root in main_roots:
if m_root.lower() in [i.lower() for i in key.split("_")]:
main_roots[main_root_by_id[m_root]].append(key)
main_root_by_id[key] = main_root_by_id[main_root_by_id[m_root]]
break
for key in keys:
try:
if key in main_root_by_id and key not in root_by_id:
sorted_sims = sorted(sim_coef[key].items(), key=lambda x: x[1])
for sim_pair in sorted_sims:
if sim_pair[1] >= threshold:
break
if (
sim_pair[0] in main_root_by_id
and sim_pair[0] in root_by_id
and main_root_by_id[key] == main_root_by_id[sim_pair[0]]
):
main_roots[main_root_by_id[key]] = [i for i in main_roots[main_root_by_id[key]] if i != key]
roots[root_by_id[sim_pair[0]]].append(key)
root_by_id[key] = root_by_id[sim_pair[0]]
break
if key not in root_by_id:
roots[key] = []
root_by_id[key] = key
elif key not in main_root_by_id and key not in root_by_id:
sorted_sims = sorted(sim_coef[key].items(), key=lambda x: x[1])
for sim_pair in sorted_sims:
if sim_pair[1] >= threshold:
break
if sim_pair[0] in main_root_by_id and sim_pair[0] in root_by_id:
main_root_by_id[key] = main_root_by_id[sim_pair[0]]
roots[root_by_id[sim_pair[0]]].append(key)
root_by_id[key] = root_by_id[sim_pair[0]]
break
if key not in root_by_id:
roots[key] = []
root_by_id[key] = key
main_roots[key] = []
main_root_by_id[key] = key
except KeyError:
roots[key] = []
root_by_id[key] = key
main_roots[key] = []
main_root_by_id[key] = key
for key in session_dict["data_by_indicator"]:
if not key in root_by_id:
roots[key] = []
if not key in main_root_by_id:
main_roots[key] = []
return roots, main_roots