-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfbd.py
92 lines (82 loc) · 2.81 KB
/
fbd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# Federated Bayes Decision
# Dependencies
import pandas as pd
import numpy as np
class FBD:
"""
Class for Federated Bayes Decision
"""
def __init__(
self,
values: dict[int, (np.array, np.array, int)] = {} # mean, std, count
):
self.values = values
def fit(
self,
X: pd.DataFrame,
Y: list
):
self.values = {}
yarr = np.array(Y)
ks = np.unique(yarr)
df = X.copy()
for k in ks:
ymatch = yarr == k
self.values[k] = [
np.array(np.mean(df[ymatch], axis=0)),
np.array(np.std(df[ymatch], axis=0)) + 1e-150,
sum(ymatch)
]
return self
@staticmethod
def probability(X, vmean, vstd):
return np.exp(-np.sum(((X - vmean) / vstd) ** 2, axis=1)) / np.sqrt(sum(vstd ** 2) * 2 * np.pi)
def predict(self, X):
npx = X.to_numpy()
lx = len(X)
kp = {
k: self.values[k][2] * self.probability(npx, self.values[k][0], self.values[k][1]) for k in self.values
}
if lx == 1:
return max(kp, key=kp.get)
yp = []
for i in range(lx):
hk = 0
hv = 0
for k in kp:
tv = kp[k][i]
if tv > hv:
hk = k
hv = tv
yp.append(hk)
return yp
@staticmethod
def probability_one(X1, vmean, vstd):
return np.exp(-np.sum(((X1 - vmean) / vstd) ** 2)) / np.sqrt(sum(vstd ** 2) * 2 * np.pi)
def predict_one(self, X1):
return {
k: self.values[k][2] * self.probability_one(X1, self.values[k][0], self.values[k][1]) for k in self.values
}
@staticmethod
def weightedAvg(v1, c1, v2, c2):
return (v1 * c1 + v2 * c2) / (c1 + c2)
def merge(self, bd):
if bd:
for k in self.values:
if k in bd.values:
nmean = self.weightedAvg(self.values[k][0], self.values[k][2], bd.values[k][0], bd.values[k][2])
nstdself = np.sqrt(((self.values[k][0] - nmean) ** 2) + self.values[k][1] ** 2)
nstdbd = np.sqrt(((bd.values[k][0] - nmean) ** 2) + bd.values[k][1] ** 2)
self.values[k] = (
nmean,
self.weightedAvg(nstdself, self.values[k][2], nstdbd, bd.values[k][2]),
self.values[k][2] + bd.values[k][2]
)
for k in bd.values:
if k not in self.values:
self.values[k] = bd.values[k]
def copy(self):
values = {
k: (np.copy(self.values[k][0]), np.copy(self.values[k][1]), self.values[k][2]) for k in self.values
}
return FBD(values)