From 4cdc847d2e22d7349618c52883f9284756cb73d0 Mon Sep 17 00:00:00 2001 From: Tyler Phillips Date: Tue, 5 Jul 2022 14:13:14 -0400 Subject: [PATCH] add retinaface --- .gitignore | 4 +- notes/{ => kai}/face.py | 0 notes/{ => kai}/tps2020.py | 0 notes/kai/tps2020_b1.py | 279 +++++++++++++ {src => notes/tyler}/identification.py | 0 {src => notes/tyler}/identification_model.py | 0 {src => notes/tyler}/verification.py | 0 {src => notes/tyler}/verification_leakage.py | 0 {src => notes/tyler}/verification_model.py | 0 {src => notes/tyler}/visualize.py | 0 requirements.txt | 5 +- src/biocapsule.py | 34 +- src/face.py | 124 ++++-- src/face_models/__init__.py | 2 +- src/face_models/face_model.py | 356 +++++++++-------- src/face_models/face_preprocess.py | 95 ----- src/face_models/face_setup.sh | 12 + src/face_models/facenet/__init__.py | 1 + src/face_models/{ => facenet}/facenet.py | 366 +++++++++++------- src/face_models/mtcnn/__init__.py | 1 + src/face_models/{ => mtcnn}/helper.py | 24 +- src/face_models/{ => mtcnn}/mtcnn_detector.py | 302 +++++++++------ src/face_models/utils.py | 111 ++++++ src/tps2020.py | 223 +++++++---- src/utils.py | 105 ++--- 25 files changed, 1358 insertions(+), 686 deletions(-) rename notes/{ => kai}/face.py (100%) rename notes/{ => kai}/tps2020.py (100%) create mode 100644 notes/kai/tps2020_b1.py rename {src => notes/tyler}/identification.py (100%) rename {src => notes/tyler}/identification_model.py (100%) rename {src => notes/tyler}/verification.py (100%) rename {src => notes/tyler}/verification_leakage.py (100%) rename {src => notes/tyler}/verification_model.py (100%) rename {src => notes/tyler}/visualize.py (100%) delete mode 100644 src/face_models/face_preprocess.py create mode 100644 src/face_models/facenet/__init__.py rename src/face_models/{ => facenet}/facenet.py (68%) create mode 100644 src/face_models/mtcnn/__init__.py rename src/face_models/{ => mtcnn}/helper.py (87%) rename src/face_models/{ => mtcnn}/mtcnn_detector.py (73%) create mode 100644 src/face_models/utils.py diff --git a/.gitignore b/.gitignore index 4b7896a6b..960530b53 100644 --- a/.gitignore +++ b/.gitignore @@ -10,9 +10,7 @@ __pycache__ *.npz *.hdf5 -src/face_models/model-ir-v1 -src/face_models/model-mtcnn -src/face_models/model-r100-ii +src/face_models/models images/rs images/rs_aligned diff --git a/notes/face.py b/notes/kai/face.py similarity index 100% rename from notes/face.py rename to notes/kai/face.py diff --git a/notes/tps2020.py b/notes/kai/tps2020.py similarity index 100% rename from notes/tps2020.py rename to notes/kai/tps2020.py diff --git a/notes/kai/tps2020_b1.py b/notes/kai/tps2020_b1.py new file mode 100644 index 000000000..d31719dfe --- /dev/null +++ b/notes/kai/tps2020_b1.py @@ -0,0 +1,279 @@ +#this module is alone and not used by others. +#Uses two datasets: lfw and gtdb +import os +import cv2 +import shutil +#from queue import Queue #FIFO +#from threading import Thread +import numpy as np +from sklearn.metrics import confusion_matrix +from sklearn.linear_model import LogisticRegression +from sklearn.model_selection import StratifiedKFold +#from sklearn.neighbors import KNeighborsClassifier +#from sklearn.svm import SVC +from argparse import ArgumentParser +from face import ArcFace, extract_dataset +#from utils import progress_bar +from biocapsule import BioCapsuleGenerator +import datetime + +np.random.seed(42) + +def filter_lfw(features): #only used in this module; second input features_flip removed by Kai + y = np.unique(features[:, -1]) + mask = np.ones(features[:, -1].shape, dtype=bool) + for y_i in y: + if features[features[:, -1] == y_i].shape[0] < 5: + idxes = np.where(features[:, -1] == y_i) + mask[idxes] = False + features = features[mask] #only retain those rows of features corresponding to subjects with at least 5 features in the 2D features array + #features_flip = features_flip[mask] + + y_map = {} + y = np.unique(features[:, -1]) + for i, y_i in enumerate(y): + y_map[y_i] = i + 1 + #print("subject id",y_i,"is mapped to",i+1) #ok + #print("subject id %d is mapped to %d" % (y_i,i+1)) #ok and equivalent + + for i in range(features[:, -1].shape[0]): + #print("feature index %d: oid %d with mapped sid %d" % (i,features[i, -1],y_map[features[i, -1]])) + features[i, -1] = y_map[features[i, -1]] + #features_flip[i, -1] = y_map[features_flip[i, -1]] + + #return features, features_flip #second return value features_flip removed by Kai + return features + +#returns a 2D array of 6 by 512 +def get_rs_features(): #only used in this module + arcface = ArcFace() #an object of ArcFace class; this can be customized with a different feature extraction method + + # if os.path.isdir(os.path.join(os.path.abspath(""), "images", "rs_aligned")): + # shutil.rmtree(os.path.join(os.path.abspath(""), "images", "rs_aligned")) + + rs_features = np.zeros((6, 512)) + #os.mkdir(os.path.join(os.path.abspath(""), "images", "rs_aligned")) + for s_id, subject in enumerate(os.listdir(os.path.join(os.path.abspath(""), "images", "rs"))[4:]): #here listdir should return a list of 10 directory names rs_00 to rs_09 + for image in os.listdir(os.path.join(os.path.abspath(""), "images", "rs", subject)): #image will be sth like rs_04.jpg ... rs_09.jpg; subject is rs_04 ... rs_09 + img = cv2.imread(os.path.join(os.path.abspath(""), "images", "rs", subject, image)) #img is of class 'numpy.ndarray' + img_aligned = arcface.preprocess(img) #get an aligned image with just facial region (five facial landmarks) + feature = arcface.extract(img_aligned, align=False) #the return value of extract here should be a row vector of 512 elements + rs_features[s_id] = feature + + if img_aligned.shape != (3, 112, 112): #this is unnecessary since extract function has already done this? + img_aligned = cv2.resize(img_aligned, (112, 112)) + img_aligned = np.rollaxis(cv2.cvtColor(img_aligned, cv2.COLOR_RGB2BGR), 2, 0) + + #cv2.imwrite(os.path.join(os.path.abspath(""), "images", "rs_aligned", image), cv2.cvtColor(np.rollaxis(img_aligned, 0, 3), cv2.COLOR_RGB2BGR)) + + return rs_features + +#yLen is the number of subjects (LFW: 423; GTDB: 50) +#return a vector of random values (0~5) of length yLen +def rs_rbac(yLen, dist): #only used in this module; REVISED BY KAI + if dist == "unbal": + rs_map = np.random.choice(6, yLen, p=[0.05, 0.1, 0.15, 0.2, 0.25, 0.25]) + else: + rs_map = np.random.choice(6, yLen) + return rs_map + +#return biocapsules +#input features is a 2D array: number of rows is the image count; number of columns is 513 +#input rs_features is of shape 6 by 512 +#original input rs_map is also removed by Kai +def get_bcs(features, rs_features): #only used in this module; #second input features_flip and second return value bcs_flip removed by Kai + bcs = np.zeros((rs_features.shape[0], features.shape[0], 513)) # 3D array of 6 by image_count by 513 + #bcs_flip = np.zeros((rs_features.shape[0], features_flip.shape[0], 513)) +#features[:, :-1] is of shape image_count by 512 + bc_gen = BioCapsuleGenerator() + for i in range(rs_features.shape[0]): #i: 0~5 + bcs[i, :, :] = np.hstack([bc_gen.biocapsule_batch(features[:, :-1], rs_features[i]), features[:, -1][:, np.newaxis]]) #note: the features 2D array will be updated here (but its last column remains the same)! + #bcs_flip[i, :, :] = np.hstack([bc_gen.biocapsule_batch(features_flip[:, :-1], rs_features[i]), features_flip[:, -1][:, np.newaxis]]) +#last column features[:, -1][:, np.newaxis] is subject_id + #return bcs, bcs_flip #second return value bcs_flip removed by Kai + return bcs + +if __name__ == "__main__": + parser = ArgumentParser() + parser.add_argument("-d", "--dataset", required=True, choices=["lfw", "gtdb"], help="dataset to use in experiment") + parser.add_argument("-m", "--mode", required=True, choices=["under", "bc"], help="feature mode to use in experiment") + parser.add_argument("-r", "--role_dist", required=False, choices=["bal", "unbal"], default="unbal", help="role distribution to use in experiment") + parser.add_argument("-t", "--thread_cnt", required=False, type=int, default=1, help="thread count to use in classifier training") + parser.add_argument("-gpu", "--gpu", required=False, type=int, default=-1, help="gpu to use in feature extraction") + args = vars(parser.parse_args()) + + if args["mode"] == "under": + fi = open(os.path.join(os.path.abspath(""), "results", "tps2020_{}_under.txt".format(args["dataset"])), "w") + else: + fi = open(os.path.join(os.path.abspath(""), "results", "tps2020_{}_bc_{}.txt".format(args["dataset"], args["role_dist"])), "w") + print("computing features:",datetime.datetime.now()) + # extract features for experiment: extract_dataset is in face.py + if args["dataset"]=="lfw" and os.path.exists("data/lfw_arcface_feat.npz"): + features = np.load(os.path.join(os.path.abspath(""), "data", "lfw_arcface_feat.npz"))["arr_0"] + elif args["dataset"]=="gtdb" and os.path.exists("data/gtdb_arcface_feat.npz"): + features = np.load(os.path.join(os.path.abspath(""), "data", "gtdb_arcface_feat.npz"))["arr_0"] + else: + features = extract_dataset(args["dataset"], "arcface", args["gpu"]) #second return value features_flip removed by Kai +# features is a 2D array: number of rows is the image count; number of columns is 513 (last column is 1-based subject_id). Each row is a feature vector plus subject_id. + print("done computing features",datetime.datetime.now()) + # remove all subjects with less than 5 images from LFW dataset + print("num_of_raw_subjects =",len(np.unique(features[:, -1]))) + print("features.shape =",features.shape) + if args["dataset"] == "lfw": #filter_lfw is in this module + print("filtering lfw features.") + features = filter_lfw(features) #second input and return value features_flip removed by Kai + print("filtered lfw features.shape =",features.shape) + print("filtered lfw num_of_subjects =",len(np.unique(features[:, -1]))) + + # if biocapsules are used, we can perform authn-authz operation using reference subjects + if args["mode"] == "bc": + # get reference subjects for roles; get_rs_features is in this module + print("computing bcs:",datetime.datetime.now()) + rs_features = get_rs_features() #a 2D array of 6 by 512 + + # assign subjects their reference subjects/roles; rs_rbac is in this module + rs_map = rs_rbac(len(np.unique(features[:, -1])), args["role_dist"]) #each element (0~5) of the vector rs_map (of length number_of_subjects) represents a reference_subject/role for a subject + cnts = np.unique(rs_map, return_counts=True)[1] + for i, cnt in enumerate(cnts): #histogram: how many subjects for each role 0~5 + fi.write("Role {} -- {} Subjects\n".format(i + 1, cnt)) + all_but_one_cnts=[np.sum(cnts)-cnt for cnt in cnts] + + # create all possible biocapsules: get_bcs is in this module; note: features will get updated by the get_bcs call + bcs = get_bcs(features, rs_features) #second input features_flip and fourth input rs_map and second return value bcs_flip removed by Kai + + # tn, fp, fn, tp + conf_mat = np.zeros((4,)) + ctp=0 + ctn=0 + cfp=0 + cfn=0 + print(f"Starting KFold Experiment: {datetime.datetime.now()}") + skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42) + for k, (train_index, test_index) in enumerate(skf.split(features[:, :-1], features[:, -1])): #k will be 0 to 4; test_index is a vector of length image_count/5; train_index is a vector of length image_count*4/5 + print(f"Fold {k} : {datetime.datetime.now()}") + #print(f"train_size = {len(train_index)} ; test_size = {len(test_index)}") + if args["mode"] == "under": + X_train = features[:, :-1][train_index] #2D array of shape train_image_count by 512 + y_train = features[:, -1][train_index] #a vector of subject_id's of length train_image_count + X_test = features[:, :-1][test_index] #2D array of shape test_image_count by 512 + y_test = features[:, -1][test_index] #a vector of subject_id's of length test_image_count + # labels = np.unique(y_train) #a vector of unique subject_id's + # labels_test=np.unique(y_test) + # assert labels.size==labels_test.size + # knn = KNeighborsClassifier() #typically no better than LR? + # print("fold",k,"KNN score:", knn.fit(X_train, y_train).score(X_test, y_test)) + # clfsvm = SVC(kernel="linear", probability=True, random_state=42).fit(X_train, y_train) #typically no better than LR? (occasionally better) + # print("fold",k,"SVM score:", clfsvm.score(X_test, y_test)) + clf = LogisticRegression(class_weight="balanced", random_state=42).fit(X_train, y_train) + #print("fold",k,"LR score:", clf.score(X_test, y_test)) + y_pred=clf.predict(X_test) + for subject_id in np.unique(y_test): + y_test_subject = (y_test == subject_id).astype(int) + y_pred_subject = (y_pred == subject_id).astype(int) + conf_mat += confusion_matrix(y_test_subject, y_pred_subject).ravel() + #print(f"c0 = {conf_mat[0]} ; c1 = {conf_mat[1]} ; c2 = {conf_mat[2]} ; c3 = {conf_mat[3]}") + #print(f"m0 = {conf_mat[0]} ; m1 = {conf_mat[1]} ; m2 = {conf_mat[2]} ; m3 = {conf_mat[3]}") + for j in range(len(test_index)): + if y_pred[j]==y_test[j]: + ctp=ctp+1 + else: #we don't need to compute ctn here because we know ctn+cfp is the number of authentication attempts that should be rejected, which is equal to the total number of authentication attempts minus the number of authentication attempts that should be accepted (i.e. ctp+cfn) + cfn=cfn+1 + cfp=cfp+1 + #print("known subject", y_test[j],"is predicted as",y_pred[j]) + #print("known subject %d (with feature test_index %d) is predicted as %d" % (y_test[j],test_index[j], y_pred[j])) + else: #args["mode"] == "bc" + for i in range(len(rs_features)): #i: 0~5 + X_train = bcs[i, :, :-1][train_index] + y_train = bcs[i, :, -1][train_index] #based on bcs construction, equivalent to features[:, -1][train_index] + X_test = bcs[i, :, :-1][test_index] + y_test = bcs[i, :, -1][test_index] #based on bcs construction, equivalent to features[:, -1][test_index] + # knn = KNeighborsClassifier() #typically no better than LR? + # print("fold",k,"rs",i,"KNN score:", knn.fit(X_train, y_train).score(X_test, y_test)) + # clfsvm = SVC(kernel="linear", probability=True, random_state=42).fit(X_train, y_train) #typically no better than LR? (occasionally better) + # print("fold",k,"rs",i,"SVM score:", clfsvm.score(X_test, y_test)) + clf = LogisticRegression(class_weight="balanced", random_state=42).fit(X_train, y_train) + #print("fold",k,"rs",i," LR score:", clf.score(X_test, y_test)) + y_pred=clf.predict(X_test) + #indices = [idx+1 for idx, el in enumerate(rs_map) if el == i] #subject ids who are assigned rs role i + indexes=[ j for j,el in enumerate(y_test) if rs_map[int(el-1)]==i ] + y_test_i=y_test[indexes] + y_pred_i=y_pred[indexes] + for subject_id in np.unique(y_test_i): + y_test_subject = (y_test_i == subject_id).astype(int) + y_pred_subject = (y_pred_i == subject_id).astype(int) + conf_mat += confusion_matrix(y_test_subject, y_pred_subject).ravel() + #print(f"c0 = {conf_mat[0]} ; c1 = {conf_mat[1]} ; c2 = {conf_mat[2]} ; c3 = {conf_mat[3]}") + conf_mat[0]+=len(y_test_i)*all_but_one_cnts[i] #compensation: these images of subjects of role i are not compared against those subjects of role I!=i so we compensate TN (conf_mat[0]). + #print(f"m0 = {conf_mat[0]} ; m1 = {conf_mat[1]} ; m2 = {conf_mat[2]} ; m3 = {conf_mat[3]}") + + lcfn=0 #three local variables for each role for the purpose of calculating the increment of ctn + lctp=0 + lcfp=0 #lcfp and lcfn may be unequal for each iteration of i + for j in range(len(test_index)): + if rs_map[int(y_test[j]-1)]==i: #subject y_test[j] is known to be in role i + if y_pred[j]==y_test[j]: + ctp=ctp+1 + lctp=lctp+1 + else: #we must have ctp+cfn=number of images + lcfn=lcfn+1 + cfn=cfn+1 + if rs_map[int(y_pred[j]-1)]==i: + cfp=cfp+1 + lcfp=lcfp+1 + #print("1known subject", y_test[j],"in role",rs_map[int(y_test[j]-1)],"is predicted as",y_pred[j],"in role",rs_map[int(y_pred[j]-1)]) + #print("1known subject %d (with feature test_index %d) in role %d is predicted as %d in role %d" % (y_test[j],test_index[j],rs_map[int(y_test[j]-1)],y_pred[j],rs_map[int(y_pred[j]-1)])) + # else: #subject y_test[j] is known to be not in role i + # if rs_map[int(y_pred[j]-1)]!=i: + # if y_pred[j]==y_test[j]: + # cfp=cfp+1 + # ctn+=cnts[i] #compensate + # elif y_pred[j]!=y_test[j]: + # ctn+=1 + #if y_pred[j]!=y_test[j]: + #cfp1=cfp1+1 + #ctn+=1 + #elif rs_map[int(y_pred[j]-1)]==i: + # cfp=cfp+1 + #print("2known subject %d (with feature test_index %d) in role %d is predicted as %d in role %d" % (y_test[j],test_index[j],rs_map[int(y_test[j]-1)],y_pred[j],rs_map[int(y_pred[j]-1)])) + #else: + # interestingc=interestingc+1 + ctn+=len(np.unique(y_test_i))*len(y_test_i)-lcfn-lcfp-lctp #increment of ctn: the logic should be equivalent to that of confusion_matrix + ctn+=len(y_test_i)*all_but_one_cnts[i] #compensation: these images of subjects of role i are not compared against those subjects of role I!=i so we compensate ctn. + #print(f"cm0 = {conf_mat[0]} ; cm1 = {conf_mat[1]} ; cm2 = {conf_mat[2]} ; cm3 = {conf_mat[3]}") + + if args["mode"] == "under": #logic for ctn in under mode + ctn=len(features[:, -1])*len(np.unique(features[:, -1]))-cfp-cfn-ctp + print("ctn =",ctn) + print("cfp =",cfp) #cfp and cfn are necessarily equal in under mode + print("cfn =",cfn) #cfp and cfn may be unequal in bc mode + print("ctp =",ctp) + + print("TN =",conf_mat[0]) + print("FP =",conf_mat[1]) + print("FN =",conf_mat[2]) + print("TP =",conf_mat[3]) + + # (tn + tp) / (tn + fp + fn + tp) + acc = (conf_mat[0] + conf_mat[3]) / np.sum(conf_mat) + # fp / (tn + fp) + far = conf_mat[1] / (conf_mat[0] + conf_mat[1]) + # fn / (fn + tp) + frr = conf_mat[2] / (conf_mat[2] + conf_mat[3]) + + fi.write("Dataset -- {}\n".format(args["dataset"])) + fi.write("BC -- {}\n".format(args["mode"])) + fi.write("RS -- {}\n".format(args["role_dist"])) + fi.write("TN -- {:.6f}\n".format(conf_mat[0])) + fi.write("FP -- {:.6f}\n".format(conf_mat[1])) + fi.write("FN -- {:.6f}\n".format(conf_mat[2])) + fi.write("TP -- {:.6f}\n".format(conf_mat[3])) + fi.write("ACC -- {:.6f}\n".format(acc)) + fi.write("FAR -- {:.6f}\n".format(far)) + fi.write("FRR -- {:.6f}\n".format(frr)) + fi.close() + +#on lfw, I got 5,5(under);4,29(bc bal);7,50(bc unbal) for fp,fn +#https://scikit-learn.org/stable/modules/generated/sklearn.metrics.confusion_matrix.html +# confusion_matrix is a function that computes confusion matrix to evaluate the accuracy of a classification. +# By definition a confusion matrix C is such that C_i,j is equal to the number of observations known to be in group i and predicted to be in group j. +# Thus in binary classification, the count of true negatives is C_0,0, false negatives is C_1,0, true positives is C_1,1 and false positives is C_0,1. diff --git a/src/identification.py b/notes/tyler/identification.py similarity index 100% rename from src/identification.py rename to notes/tyler/identification.py diff --git a/src/identification_model.py b/notes/tyler/identification_model.py similarity index 100% rename from src/identification_model.py rename to notes/tyler/identification_model.py diff --git a/src/verification.py b/notes/tyler/verification.py similarity index 100% rename from src/verification.py rename to notes/tyler/verification.py diff --git a/src/verification_leakage.py b/notes/tyler/verification_leakage.py similarity index 100% rename from src/verification_leakage.py rename to notes/tyler/verification_leakage.py diff --git a/src/verification_model.py b/notes/tyler/verification_model.py similarity index 100% rename from src/verification_model.py rename to notes/tyler/verification_model.py diff --git a/src/visualize.py b/notes/tyler/visualize.py similarity index 100% rename from src/visualize.py rename to notes/tyler/visualize.py diff --git a/requirements.txt b/requirements.txt index cd4335064..a42286793 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,6 +19,7 @@ idna==2.6 imageio==2.6.1 imbalanced-learn==0.6.1 imblearn==0.0 +insightface===0.1.5 isort==4.3.21 joblib==0.14.1 Keras-Applications==1.0.8 @@ -28,7 +29,7 @@ lazy-object-proxy==1.4.3 Markdown==3.1.1 matplotlib==3.1.2 mccabe==0.6.1 -mxnet==1.5.0 +mxnet==1.6.0 networkx==2.4 numpy==1.16.4 oauthlib==3.1.0 @@ -46,7 +47,7 @@ pyparsing==2.4.6 python-dateutil==2.8.1 pytz==2019.3 PyWavelets==1.1.1 -requests==2.20.0 +requests==2.18.4 requests-oauthlib==1.3.0 rsa==4.0 scikit-image==0.16.2 diff --git a/src/biocapsule.py b/src/biocapsule.py index 90934f858..738000dc4 100644 --- a/src/biocapsule.py +++ b/src/biocapsule.py @@ -1,38 +1,56 @@ -import os import numpy as np from scipy.signal import convolve2d -from argparse import ArgumentParser class BioCapsuleGenerator: def __signature_extraction(self, feature): - lvl1 = convolve2d(feature.reshape(32, 16), np.ones( - (5, 5)) / 25., mode="same", boundary="wrap") + lvl1 = convolve2d( + feature.reshape(32, 16), + np.ones((5, 5)) / 25.0, + mode="same", + boundary="wrap", + ) + lvl2 = feature.reshape(32, 16) - lvl1 - signature = np.around(np.average(lvl2, axis=1) * 100.).astype(int) % 9 + + signature = np.around(np.average(lvl2, axis=1) * 100.0).astype(int) % 9 + return signature def __key_generation(self, signature): key = np.empty((0,)) + for sig in signature: np.random.seed(sig) key = np.append(key, np.random.choice(2, 16)) + key = (key * 2) - 1 + return key def biocapsule(self, user_feature, rs_feature): user_signature = self.__signature_extraction(user_feature) user_key = self.__key_generation(user_signature) + rs_signature = self.__signature_extraction(rs_feature) rs_key = self.__key_generation(rs_signature) - return np.multiply(user_feature, rs_key) + np.multiply(rs_feature, user_key) + + bc = np.multiply(user_feature, rs_key) + np.multiply( + rs_feature, user_key + ) + + return bc def biocapsule_batch(self, user_features, rs_feature): rs_signature = self.__signature_extraction(rs_feature) rs_key = self.__key_generation(rs_signature) + for i in range(user_features.shape[0]): user_signature = self.__signature_extraction(user_features[i]) user_key = self.__key_generation(user_signature) - user_features[i] = np.multiply(user_features[i], rs_key) + \ - np.multiply(rs_feature, user_key) + + user_features[i] = np.multiply( + user_features[i], rs_key + ) + np.multiply(rs_feature, user_key) + return user_features diff --git a/src/face.py b/src/face.py index 87886622b..97687df17 100644 --- a/src/face.py +++ b/src/face.py @@ -18,13 +18,13 @@ class FaceNet: """ - def __init__(self, gpu: int = -1): + def __init__(self, gpu: int = -1, detector: str = "mtcnn"): """Initialize FaceNet model object. Provide an int corresponding to a GPU id to use for models. If -1 is given CPU is used rather than GPU. """ - self.__model = FaceNetModel(gpu) + self.__model = FaceNetModel(gpu, detector) def preprocess(self, face_img: np.ndarray) -> np.ndarray: """Preprocess facial image for FaceNet feature extraction @@ -66,6 +66,9 @@ def extract(self, face_img: np.ndarray, align: bool = True) -> np.ndarray: face_img = self.preprocess(face_img) if face_img.shape != (160, 160, 3): + if len(face_img.shape) == 2: + face_img = cv2.cvtColor(face_img, cv2.COLOR_GRAY2BGR) + face_img = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB) face_img = cv2.resize(face_img, (160, 160)) @@ -73,8 +76,8 @@ def extract(self, face_img: np.ndarray, align: bool = True) -> np.ndarray: class ArcFace: - def __init__(self, gpu: int = -1): - self.__model = ArcFaceModel(gpu) + def __init__(self, gpu: int = -1, detector: str = "mtcnn"): + self.__model = ArcFaceModel(gpu, detector) def preprocess(self, face_img: np.ndarray) -> np.ndarray: """Preprocess facial image for ArcFace feature extraction @@ -115,6 +118,9 @@ def extract(self, face_img: np.ndarray, align: bool = True) -> np.ndarray: face_img = self.preprocess(face_img) if face_img.shape != (3, 112, 112): + if len(face_img.shape) == 2: + face_img = cv2.cvtColor(face_img, cv2.COLOR_GRAY2BGR) + face_img = cv2.resize(face_img, (112, 112)) face_img = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB) face_img = np.rollaxis(face_img, 2, 0) @@ -123,34 +129,36 @@ def extract(self, face_img: np.ndarray, align: bool = True) -> np.ndarray: def extract_dataset( - dataset: str, extractor: str = "arcface", gpu: int = -1 -) -> np.ndarray: + dataset: str, + method: str = "arcface", + detector: str = "mtcnn", + flipped: bool = True, + gpu: int = -1, +): """Extract feature vectors of each image within a dataset. - Return array conatining all extracted features. + Save array conatining all extracted features to disk. Parameters ---------- dataset: str Dataset to extract features from. Examples would be gtdb or lfw - extractor: str = "arcface" + method: str = "arcface" Model to use for feature extraction. Currently supported options are arcface/facenet + detector: str = "mtcnn" + Model to use for facial preprocessing. Currently supported options are + mtcnn/retinaface + flipped: bool = True + Flag denoting if flipped features should be extracted gpu: int = -1 GPU id to use for feature extraction and preprocessing models. If -1 is given, CPU is used rather than GPU - Returns - ------- - np.ndarray - Array of features corresponding each image from a dataset. Subject ids - are appended to end of feature vectors. Resulting output will be of - shape (number of dataset images)x513 - """ - if extractor == "arcface": - face = ArcFace(gpu) + if method == "arcface": + face = ArcFace(gpu, detector) else: - face = FaceNet(gpu) + face = FaceNet(gpu, detector) dataset_path = f"images/{dataset}" @@ -161,27 +169,53 @@ def extract_dataset( os.listdir(dataset_path), key=lambda subject: subject.lower() ) - image_cnt = 0 + img_cnt = 0 for subject_id, subject in enumerate(subjects): - progress_bar(f"{dataset} {extractor}", (image_cnt + 1) / file_cnt) + progress_bar(f"{dataset} {method}", (img_cnt + 1) / file_cnt) for image in os.listdir(f"{dataset_path}/{subject}"): - image = cv2.imread(f"{dataset_path}/{subject}/{image}") + img = cv2.imread(f"{dataset_path}/{subject}/{image}") + + feature = face.extract(img) + features[img_cnt, :] = np.append(feature, subject_id + 1) + + img_cnt += 1 + + np.savez_compressed( + f"data/{dataset}_{method}_{detector}_feat.npz", features + ) + + if flipped: + flipped_features = np.zeros((file_cnt, 513)) + + img_cnt = 0 + for subject_id, subject in enumerate(subjects): + progress_bar( + f"{dataset} {method} flipped", (img_cnt + 1) / file_cnt + ) - feature = face.extract(image) - features[image_cnt, :] = np.append(feature, subject_id + 1) + for image in os.listdir(f"{dataset_path}/{subject}"): + img = cv2.imread(f"{dataset_path}/{subject}/{image}") + img = cv2.flip(img, 1) - image_cnt += 1 + flipped_feature = face.extract(img) + flipped_features[img_cnt, :] = np.append( + flipped_feature, subject_id + 1 + ) - return features + img_cnt += 1 + + np.savez_compressed( + f"data/{dataset}_{method}_{detector}_flip_feat.npz", features + ) if __name__ == "__main__": """ - facenet = FaceNet() + facenet = FaceNet(-1, "retinaface") - img_1 = cv2.imread("src/face_models/examples/tom1.jpg")) - img_2 = cv2.imread("src/face_models/examples/adrien.jpg")) + img_1 = cv2.imread("src/face_models/examples/tom1.jpg") + img_2 = cv2.imread("src/face_models/examples/adrien.jpg") feat_1 = facenet.extract(img_1) feat_2 = facenet.extract(img_2) @@ -194,7 +228,7 @@ def extract_dataset( cv2.waitKey(0) cv2.destroyAllWindows() - arcface = ArcFace() + arcface = ArcFace(-1, "mtcnn") feat_1 = arcface.extract(img_1) feat_2 = arcface.extract(img_2) @@ -204,13 +238,11 @@ def extract_dataset( cv2.imshow("before", img_2) img = arcface.preprocess(img_2) cv2.imshow( - "after", - cv2.cvtColor(np.rollaxis(img, 0, 3), cv2.COLOR_RGB2BGR) + "after", cv2.cvtColor(np.rollaxis(img, 0, 3), cv2.COLOR_RGB2BGR) ) cv2.waitKey(0) cv2.destroyAllWindows() """ - parser = ArgumentParser() parser.add_argument( "-d", @@ -225,6 +257,21 @@ def extract_dataset( choices=["arcface", "facenet"], help="method to use in feature extraction", ) + parser.add_argument( + "-det", + "--detector", + required=True, + choices=["mtcnn", "retinaface"], + help="method to use in facial preprocessing", + ) + parser.add_argument( + "-f", + "--flipped", + required=False, + action="store_true", + default=False, + help="extract features for flipped versions of images", + ) parser.add_argument( "-gpu", "--gpu", @@ -235,11 +282,10 @@ def extract_dataset( ) args = vars(parser.parse_args()) - features = extract_dataset(args["dataset"], args["method"], args["gpu"]) - - np.savez_compressed( - os.path.join( - "data/{}_{}_feat.npz".format(args["dataset"], args["method"]) - ), - features, + features = extract_dataset( + args["dataset"], + args["method"], + args["detector"], + args["flipped"], + args["gpu"], ) diff --git a/src/face_models/__init__.py b/src/face_models/__init__.py index 89fea8f42..339f6f9e7 100644 --- a/src/face_models/__init__.py +++ b/src/face_models/__init__.py @@ -1 +1 @@ -from face_models.face_model import ArcFaceModel, FaceNetModel +from .face_model import ArcFaceModel, FaceNetModel diff --git a/src/face_models/face_model.py b/src/face_models/face_model.py index 0bc69b045..f255d5521 100644 --- a/src/face_models/face_model.py +++ b/src/face_models/face_model.py @@ -1,76 +1,135 @@ from __future__ import absolute_import, division, print_function -import os - import cv2 import mxnet as mx import numpy as np import tensorflow as tf +from insightface.model_zoo.face_detection import FaceDetector from sklearn.preprocessing import normalize -from face_models import facenet -from face_models.face_preprocess import preprocess -from face_models.mtcnn_detector import MtcnnDetector +from face_models.facenet import load_facenet_model +from face_models.mtcnn import MtcnnDetector +from face_models.utils import align, get_center_face -__all__ = ["ArcFaceModel", "FaceNetModel"] +__all__ = ["ArcFaceModel", "FaceNetModel", "MtcnnModel", "RetinaFaceModel"] -class FaceNetModel: - """FaceNet: A Unified Embedding for Face Recognition and Clustering - https://arxiv.org/abs/1503.03832 +class RetinaFaceModel: + """RetinaFace: Single-stage Dense Face Localisation in the Wild + https://arxiv.org/abs/1905.00641 - Uses https://github.com/davidsandberg/facenet implementation + Uses https://github.com/deepinsight/insightface implementation """ def __init__(self, gpu: int): - if gpu == -1: - ctx = mx.cpu() - else: - ctx = mx.gpu(gpu) + self.model_path = "src/face_models/models/retinaface/R50-0000.params" + self.__model = self.__load_model(gpu) - self.model_path = "src/face_models/model-ir-v1/20180402-114759.pb" + def __load_model(self, gpu: int): + """Load pretrained RetinaFace model from: + src/face_models/models/retinaface/R50-0000.params - self.__load_model() + Model originally downloaded from: + https://drive.google.com/file/d/1wm-6K688HQEx_H90UdAIuKv-NAsKBu85/view - self.__detector = MtcnnDetector( - model_dir="src/face_models/model-mtcnn", - ctx=ctx, - accurate_landmark=True, - ) + """ + model = FaceDetector(self.model_path, rac="net3") + model.prepare(gpu) + return model - def __load_model(self): - """Load pretrained FaceNet model from: - src/face_models/model-ir-v1/20180402-114759.pb + def get_input(self, face_img: np.ndarray) -> np.ndarray: + """Preprocess facial image for feature extraction using RetinaFace model. + RetinaFace detects facial bounding boxes and facial landmarks to get + face region-of-interest and perform facial alignment. - Model originally downlaoded from: - https://drive.google.com/file/d/1EXPBSXwTaqrSC0OhUdXNmKSh9qJUQ55-/view + Parameters + ---------- + face_img: np.ndarray + Face image of any width/height with BGR colorspace channels - FaceNet model with Inception-ResNet-v1 backbone, pretrained on - VGGFace2, achieves 0.9965 on LFW. + Returns + ------- + np.ndarray + Aligned facial image of shape 112x112x3 with RGB colorspace """ - self.__sess = tf.compat.v1.Session() + # Get face bounding box and landmark detections + # from MTCNN model + ret = self.__model.detect(face_img) - facenet.load_model(self.model_path) + # If no detections were made, return input image + if ret is None: + face_img = cv2.resize(face_img, (112, 112)) + face_img = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB) + return face_img - self.__images_placeholder = ( - tf.compat.v1.get_default_graph().get_tensor_by_name("input:0") - ) - self.__embeddings = ( - tf.compat.v1.get_default_graph().get_tensor_by_name("embeddings:0") - ) - self.__phase_train_placeholder = ( - tf.compat.v1.get_default_graph().get_tensor_by_name( - "phase_train:0" + # Otherwise, the MTCNN model detected facial bounding boxes + # and landmarks + bbox, points = ret + + # Make sure detections are not empty lists + if bbox.shape[0] == 0: + face_img = cv2.resize(face_img, (112, 112)) + face_img = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB) + return face_img + + # If multiple faces were detected, we will use the centermost face + if bbox.shape[0] > 1: + img_center = np.array( + [face_img.shape[0] / 2, face_img.shape[1] / 2] ) + bbox, points = get_center_face(bbox, points, img_center) + + else: + bbox = bbox[0] + points = points[0] + + # Format bounding box and landmarks arrays + bbox = bbox[0:4] + points = points.reshape((2, 5)).T + + # Perform facial alignment using MTCNN bounding box and facial + # landmarks to get 112x112x3 facial image + aligned_img = align(face_img, bbox=bbox) + # Convert facial image from BGR to RGB colorspace + aligned_img = cv2.cvtColor(aligned_img, cv2.COLOR_BGR2RGB) + + return aligned_img + + +class MtcnnModel: + """Joint Face Detection and Alignment using Multi-task + Cascaded Convolutional Networks + https://arxiv.org/abs/1604.02878 + + Uses https://github.com/davidsandberg/facenet implementation + + """ + + def __init__(self, gpu: int): + if gpu == -1: + ctx = mx.cpu() + else: + ctx = mx.gpu(gpu) + + self.model_path = "src/face_models/models/mtcnn" + self.__model = self.__load_model(ctx) + + def __load_model(self, ctx: int): + """Load pretrained MTCNN model from: + src/face_models/models/det[1-4]* + + """ + model = MtcnnDetector( + model_dir=self.model_path, ctx=ctx, accurate_landmark=True, ) + return model def get_input(self, face_img: np.ndarray) -> np.ndarray: - """Preprocess facial image for FaceNet feature extraction - using a MTCNN preprocessing model. MTCNN detects facial bounding - boxes and facial landmarks to get face region-of-interest and perform - facial alignment. + """Preprocess facial image for feature extraction using MTCNN model. + MTCNN detects facial bounding boxes and facial landmarks to get face + region-of-interest and perform facial alignment. Parameters ---------- @@ -80,15 +139,17 @@ def get_input(self, face_img: np.ndarray) -> np.ndarray: Returns ------- np.ndarray - Aligned facial image of shape 160x160x3 with RGB colorspace + Aligned facial image of shape 112x112x3 with RGB colorspace """ # Get face bounding box and landmark detections # from MTCNN model - ret = self.__detector.detect_face(face_img, det_type=0) + ret = self.__model.detect_face(face_img) # If no detections were made, return input image if ret is None: + face_img = cv2.resize(face_img, (112, 112)) + face_img = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB) return face_img # Otherwise, the MTCNN model detected facial bounding boxes @@ -97,6 +158,8 @@ def get_input(self, face_img: np.ndarray) -> np.ndarray: # Make sure detections are not empty lists if bbox.shape[0] == 0: + face_img = cv2.resize(face_img, (112, 112)) + face_img = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB) return face_img # If multiple faces were detected, we will use the centermost face @@ -104,7 +167,11 @@ def get_input(self, face_img: np.ndarray) -> np.ndarray: img_center = np.array( [face_img.shape[0] / 2, face_img.shape[1] / 2] ) - bbox, points = _get_center_face(bbox, points, img_center) + bbox, points = get_center_face(bbox, points, img_center) + + else: + bbox = bbox[0] + points = points[0] # Format bounding box and landmarks arrays bbox = bbox[0:4] @@ -112,9 +179,74 @@ def get_input(self, face_img: np.ndarray) -> np.ndarray: # Perform facial alignment using MTCNN bounding box and facial # landmarks to get 112x112x3 facial image - aligned_img = preprocess(face_img, bbox, points, image_size="112,112") + aligned_img = align(face_img, points=points) # Convert facial image from BGR to RGB colorspace aligned_img = cv2.cvtColor(aligned_img, cv2.COLOR_BGR2RGB) + + return aligned_img + + +class FaceNetModel: + """FaceNet: A Unified Embedding for Face Recognition and Clustering + https://arxiv.org/abs/1503.03832 + + Uses https://github.com/davidsandberg/facenet implementation + + """ + + def __init__(self, gpu: int, detector: str): + self.model_path = "src/face_models/models/facenet/20180402-114759.pb" + self.__load_model() + + if detector == "mtcnn": + self.__detector = MtcnnModel(gpu) + else: + self.__detector = RetinaFaceModel(gpu) + + def __load_model(self): + """Load pretrained FaceNet model from: + src/face_models/models/facenet/20180402-114759.pb + + Model originally downloaded from: + https://drive.google.com/file/d/1EXPBSXwTaqrSC0OhUdXNmKSh9qJUQ55-/view + + FaceNet model with Inception-ResNet-v1 backbone, pretrained on + VGGFace2, achieves 0.9965 on LFW. + + """ + self.__sess = tf.compat.v1.Session() + + load_facenet_model(self.model_path) + + self.__images_placeholder = tf.compat.v1.get_default_graph().get_tensor_by_name( + "input:0" + ) + self.__embeddings = tf.compat.v1.get_default_graph().get_tensor_by_name( + "embeddings:0" + ) + self.__phase_train_placeholder = tf.compat.v1.get_default_graph().get_tensor_by_name( + "phase_train:0" + ) + + def get_input(self, face_img: np.ndarray) -> np.ndarray: + """Preprocess facial image for FaceNet feature extraction + using a MTCNN preprocessing model. MTCNN detects facial bounding + boxes and facial landmarks to get face region-of-interest and perform + facial alignment. + + Parameters + ---------- + face_img: np.ndarray + Face image of any width/height with BGR colorspace channels + + Returns + ------- + np.ndarray + Aligned facial image of shape 160x160x3 with RGB colorspace + + """ + aligned_img = self.__detector.get_input(face_img) + # Resize facial image to 160x160x3 aligned_img = cv2.resize(aligned_img, (160, 160)) @@ -134,7 +266,7 @@ def get_feature(self, aligned_img: np.ndarray) -> np.ndarray: Extracted FaceNet feature vector of shape 512x1 """ - aligned_img = _prewhiten(aligned_img) + aligned_img = self.__prewhiten(aligned_img) if len(aligned_img.shape) == 3: aligned_img = np.expand_dims(aligned_img, axis=0) @@ -148,6 +280,18 @@ def get_feature(self, aligned_img: np.ndarray) -> np.ndarray: return embedding[0] + @staticmethod + def __prewhiten(x: np.ndarray): + """Normalization step for image prior to feature extraction. + Used in FaceNet pipeline. + + """ + mean = np.mean(x) + std = np.std(x) + std_adj = np.maximum(std, 1.0 / np.sqrt(x.size)) + y = np.multiply(np.subtract(x, mean), 1.0 / std_adj) + return y + class ArcFaceModel: """ArcFace: Additive Angular Margin Loss for Deep Face Recognition @@ -157,27 +301,26 @@ class ArcFaceModel: """ - def __init__(self, gpu: int): + def __init__(self, gpu: int, detector: str): if gpu == -1: ctx = mx.cpu() else: ctx = mx.gpu(gpu) - self.model_path = "src/face_models/model-r100-ii/model" - + self.model_path = "src/face_models/models/arcface/model" self.__model = self.__load_model(ctx) - self.__detector = MtcnnDetector( - model_dir=os.path.join(os.path.dirname(__file__), "model-mtcnn"), - ctx=ctx, - accurate_landmark=True, - ) + + if detector == "mtcnn": + self.__detector = MtcnnModel(gpu) + else: + self.__detector = RetinaFaceModel(gpu) def __load_model(self, ctx: int): """Load pretrained ArcFace model from: - src/face_models/model-r100-ii/model-symbol.json - src/face_models/model-r100-ii/model-0000.params + src/face_models/models/arcface/model-symbol.json + src/face_models/models/arcface/model-0000.params - Model originally downlaoded from: + Model originally downloaded from: https://drive.google.com/file/d/1Hc5zUfBATaXUgcU2haUNa7dcaZSw95h2/view ArcFace model with ResNet100 backbonem pretrained on MS1MV2, @@ -213,38 +356,8 @@ def get_input(self, face_img: np.ndarray) -> np.ndarray: Aligned facial image of shape 3x112x112 with RGB colorspace """ - # Get face bounding box and landmark detections - # from MTCNN model - ret = self.__detector.detect_face(face_img, det_type=0) - - # If no detections were made, return input image - if ret is None: - return face_img - - # Otherwise, the MTCNN model detected facial bounding boxes - # and landmarks - bbox, points = ret - - # Make sure detections are not empty lists - if bbox.shape[0] == 0: - return face_img - - # If multiple faces were detected, we will use the centermost face - if bbox.shape[0] > 1: - img_center = np.array( - [face_img.shape[0] / 2, face_img.shape[1] / 2] - ) - bbox, points = _get_center_face(bbox, points, img_center) - - # Format bounding box and landmarks arrays - bbox = bbox[0:4] - points = points.reshape((2, 5)).T + aligned_img = self.__detector.get_input(face_img) - # Perform facial alignment using MTCNN bounding box and facial - # landmarks to get 112x112x3 facial image - aligned_img = preprocess(face_img, bbox, points, image_size="112,112") - # Convert facial image from BGR to RGB colorspace - aligned_img = cv2.cvtColor(aligned_img, cv2.COLOR_BGR2RGB) # Format image array to have RGB color channels along first dimension aligned_img = np.transpose(aligned_img, (2, 0, 1)) @@ -276,66 +389,3 @@ def get_feature(self, aligned_img: np.ndarray) -> np.ndarray: embedding = normalize(embedding).flatten() return embedding - - -def _prewhiten(x: np.ndarray): - """Normalization step for image prior to feature extraction. - Used in FaceNet pipeline. - - """ - mean = np.mean(x) - std = np.std(x) - std_adj = np.maximum(std, 1.0 / np.sqrt(x.size)) - y = np.multiply(np.subtract(x, mean), 1.0 / std_adj) - return y - - -def _get_center_face( - bbox: np.ndarray, points: np.ndarray, img_center: np.ndarray -): - """Using face bounding boxes, facial landmark points and - image center point, find the centermost detected face - and return its corresponding bounding box and facial - landmarks. - - """ - dists = [] - for i in range(bbox.shape[0]): - face_rect = np.array( - [[bbox[i, 0], bbox[i, 1]], [bbox[i, 2], bbox[i, 3]]] - ) - dists.append(_rect_point_dist(face_rect, img_center)) - - idx = dists.index(min(dists)) - - bbox = bbox[idx, :] - points = points[idx, :] - - return bbox, points - - -def _rect_point_dist(bbox: np.ndarray, point: np.ndarray) -> float: - """Get distance between a bounding box and a point. - - Parameters - ---------- - bbox: np.ndarray - Bounding box array containing top-left and bottom-right - coordinates [(x1, y1), (x2, x2)] - point: np.ndarray - Point coordinates [(x1, y1)] - - Returns - ------- - float: - Distance between bbox and point - - """ - bbox_center = (bbox[0] + bbox[1]) / 2 - - bbox_height = bbox[1, 0] - bbox[0, 0] - bbox_width = bbox[1, 1] - bbox[0, 1] - - dx = max(np.abs(point[1] - bbox_center[1]) - bbox_width / 2, 0) - dy = max(np.abs(point[0] - bbox_center[0]) - bbox_height / 2, 0) - return dx * dx + dy * dy diff --git a/src/face_models/face_preprocess.py b/src/face_models/face_preprocess.py deleted file mode 100644 index d365dccf4..000000000 --- a/src/face_models/face_preprocess.py +++ /dev/null @@ -1,95 +0,0 @@ -import cv2 -import numpy as np -from skimage import transform as trans - - -def parse_lst_line(line): - vec = line.strip().split("\t") - assert len(vec) >= 3 - aligned = int(vec[0]) - image_path = vec[1] - label = int(vec[2]) - bbox = None - landmark = None - if len(vec) > 3: - bbox = np.zeros((4,), dtype=np.int32) - for i in xrange(3, 7): - bbox[i - 3] = int(vec[i]) - landmark = None - if len(vec) > 7: - _l = [] - for i in xrange(7, 17): - _l.append(float(vec[i])) - landmark = np.array(_l).reshape((2, 5)).T - - return image_path, label, bbox, landmark, aligned - - -def read_image(img_path, **kwargs): - mode = kwargs.get("mode", "rgb") - layout = kwargs.get("layout", "HWC") - if mode == "gray": - img = cv2.imread(img_path, cv2.CV_LOAD_IMAGE_GRAYSCALE) - else: - img = cv2.imread(img_path, cv2.CV_LOAD_IMAGE_COLOR) - if mode == "rgb": - img = img[..., ::-1] - if layout == "CHW": - img = np.transpose(img, (2, 0, 1)) - return img - - -def preprocess(img, bbox=None, landmark=None, **kwargs): - if isinstance(img, str): - img = read_image(img, **kwargs) - M = None - image_size = [] - str_image_size = kwargs.get("image_size", "") - if len(str_image_size) > 0: - image_size = [int(x) for x in str_image_size.split(",")] - if len(image_size) == 1: - image_size = [image_size[0], image_size[0]] - assert len(image_size) == 2 - assert image_size[0] == 112 - assert image_size[0] == 112 or image_size[1] == 96 - if landmark is not None: - assert len(image_size) == 2 - src = np.array([[30.2946, 51.6963], - [65.5318, 51.5014], - [48.0252, 71.7366], - [33.5493, 92.3655], - [62.7299, 92.2041]], - dtype=np.float32) - if image_size[1] == 112: - src[:, 0] += 8.0 - dst = landmark.astype(np.float32) - - tform = trans.SimilarityTransform() - tform.estimate(dst, src) - M = tform.params[0:2, :] - - if M is None: - if bbox is None: - det = np.zeros(4, dtype=np.int32) - det[0] = int(img.shape[1] * 0.0625) - det[1] = int(img.shape[0] * 0.0625) - det[2] = img.shape[1] - det[0] - det[3] = img.shape[0] - det[1] - else: - det = bbox - margin = kwargs.get("margin", 44) - bb = np.zeros(4, dtype=np.int32) - bb[0] = np.maximum(det[0] - margin / 2, 0) - bb[1] = np.maximum(det[1] - margin / 2, 0) - bb[2] = np.minimum(det[2] + margin / 2, img.shape[1]) - bb[3] = np.minimum(det[3] + margin / 2, img.shape[0]) - ret = img[bb[1]:bb[3], bb[0]:bb[2], :] - if len(image_size) > 0: - ret = cv2.resize(ret, (image_size[1], image_size[0])) - return ret - - else: - assert len(image_size) == 2 - warped = cv2.warpAffine( - img, M, (image_size[1], image_size[0]), borderValue=0.0) - return warped diff --git a/src/face_models/face_setup.sh b/src/face_models/face_setup.sh index 2d6b4276d..296bb6eaf 100644 --- a/src/face_models/face_setup.sh +++ b/src/face_models/face_setup.sh @@ -1,19 +1,31 @@ #!/bin/bash +mkdir models + echo 'Downloading and extracting arcface model!' wget https://cs.iupui.edu/~xzou/Bio-Capsule-Research-2022/model-r100-ii.tar.gz tar -xvzf model-r100-ii.tar.gz rm model-r100-ii.tar.gz +mv model-r100-ii models/arcface echo 'Downloading and extracting facenet model!' wget https://cs.iupui.edu/~xzou/Bio-Capsule-Research-2022/model-ir-v1.tar.gz tar -xvzf model-ir-v1.tar.gz rm model-ir-v1.tar.gz +mv model-ir-v1 models/facenet echo 'Downloading and extracting mtcnn model!' wget https://cs.iupui.edu/~xzou/Bio-Capsule-Research-2022/model-mtcnn.tar.gz tar -xvzf model-mtcnn.tar.gz rm model-mtcnn.tar.gz +mv model-mtcnn models/mtcnn + +echo 'Downloading and extracting retinaface model!' + +wget https://cs.iupui.edu/~xzou/Bio-Capsule-Research-2022/retinaface.tar.gz +tar -xvzf retinaface.tar.gz +rm retinaface.tar.gz +mv retinaface models/retinaface diff --git a/src/face_models/facenet/__init__.py b/src/face_models/facenet/__init__.py new file mode 100644 index 000000000..d25138067 --- /dev/null +++ b/src/face_models/facenet/__init__.py @@ -0,0 +1 @@ +from .facenet import load_model as load_facenet_model diff --git a/src/face_models/facenet.py b/src/face_models/facenet/facenet.py similarity index 68% rename from src/face_models/facenet.py rename to src/face_models/facenet/facenet.py index 0ee13ba3a..08944b7c4 100644 --- a/src/face_models/facenet.py +++ b/src/face_models/facenet/facenet.py @@ -23,23 +23,21 @@ # SOFTWARE. # pylint: disable=missing-docstring -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function +from __future__ import absolute_import, division, print_function +import math import os -from subprocess import Popen, PIPE -import tensorflow as tf -import numpy as np -from scipy import misc -from sklearn.model_selection import KFold -from scipy import interpolate -from tensorflow.python.training import training import random import re -from tensorflow.python.platform import gfile -import math +from subprocess import PIPE, Popen + +import numpy as np +import tensorflow as tf +from scipy import interpolate, misc from six import iteritems +from sklearn.model_selection import KFold +from tensorflow.python.platform import gfile +from tensorflow.python.training import training def triplet_loss(anchor, positive, negative, alpha): @@ -53,7 +51,7 @@ def triplet_loss(anchor, positive, negative, alpha): Returns: the triplet loss according to the FaceNet paper as a float tensor. """ - with tf.variable_scope('triplet_loss'): + with tf.variable_scope("triplet_loss"): pos_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, positive)), 1) neg_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, negative)), 1) @@ -65,11 +63,16 @@ def triplet_loss(anchor, positive, negative, alpha): def center_loss(features, label, alfa, nrof_classes): """Center loss based on the paper "A Discriminative Feature Learning Approach for Deep Face Recognition" - (http://ydwen.github.io/papers/WenECCV16.pdf) + (http://ydwen.github.io/papers/WenECCV16.pdf) """ nrof_features = features.get_shape()[1] - centers = tf.get_variable('centers', [nrof_classes, nrof_features], - dtype=tf.float32, initializer=tf.constant_initializer(0), trainable=False) + centers = tf.get_variable( + "centers", + [nrof_classes, nrof_features], + dtype=tf.float32, + initializer=tf.constant_initializer(0), + trainable=False, + ) label = tf.reshape(label, [-1]) centers_batch = tf.gather(centers, label) diff = (1 - alfa) * (centers_batch - features) @@ -97,7 +100,7 @@ def shuffle_examples(image_paths, labels): def random_rotate_image(image): angle = np.random.uniform(low=-10.0, high=10.0) - return misc.imrotate(image, angle, 'bicubic') + return misc.imrotate(image, angle, "bicubic") # 1: Random rotate 2: Random crop 4: Random flip 8: Fixed image standardization 16: Flip @@ -108,7 +111,9 @@ def random_rotate_image(image): FLIP = 16 -def create_input_pipeline(input_queue, image_size, nrof_preprocess_threads, batch_size_placeholder): +def create_input_pipeline( + input_queue, image_size, nrof_preprocess_threads, batch_size_placeholder +): images_and_labels_list = [] for _ in range(nrof_preprocess_threads): filenames, label, control = input_queue.dequeue() @@ -116,33 +121,46 @@ def create_input_pipeline(input_queue, image_size, nrof_preprocess_threads, batc for filename in tf.unstack(filenames): file_contents = tf.read_file(filename) image = tf.image.decode_image(file_contents, 3) - image = tf.cond(get_control_flag(control[0], RANDOM_ROTATE), - lambda: tf.py_func(random_rotate_image, [ - image], tf.uint8), - lambda: tf.identity(image)) - image = tf.cond(get_control_flag(control[0], RANDOM_CROP), - lambda: tf.random_crop(image, image_size + (3,)), - lambda: tf.image.resize_image_with_crop_or_pad(image, image_size[0], image_size[1])) - image = tf.cond(get_control_flag(control[0], RANDOM_FLIP), - lambda: tf.image.random_flip_left_right(image), - lambda: tf.identity(image)) - image = tf.cond(get_control_flag(control[0], FIXED_STANDARDIZATION), - lambda: (tf.cast(image, tf.float32) - - 127.5) / 128.0, - lambda: tf.image.per_image_standardization(image)) - image = tf.cond(get_control_flag(control[0], FLIP), - lambda: tf.image.flip_left_right(image), - lambda: tf.identity(image)) + image = tf.cond( + get_control_flag(control[0], RANDOM_ROTATE), + lambda: tf.py_func(random_rotate_image, [image], tf.uint8), + lambda: tf.identity(image), + ) + image = tf.cond( + get_control_flag(control[0], RANDOM_CROP), + lambda: tf.random_crop(image, image_size + (3,)), + lambda: tf.image.resize_image_with_crop_or_pad( + image, image_size[0], image_size[1] + ), + ) + image = tf.cond( + get_control_flag(control[0], RANDOM_FLIP), + lambda: tf.image.random_flip_left_right(image), + lambda: tf.identity(image), + ) + image = tf.cond( + get_control_flag(control[0], FIXED_STANDARDIZATION), + lambda: (tf.cast(image, tf.float32) - 127.5) / 128.0, + lambda: tf.image.per_image_standardization(image), + ) + image = tf.cond( + get_control_flag(control[0], FLIP), + lambda: tf.image.flip_left_right(image), + lambda: tf.identity(image), + ) # pylint: disable=no-member image.set_shape(image_size + (3,)) images.append(image) images_and_labels_list.append([images, label]) image_batch, label_batch = tf.train.batch_join( - images_and_labels_list, batch_size=batch_size_placeholder, - shapes=[image_size + (3,), ()], enqueue_many=True, + images_and_labels_list, + batch_size=batch_size_placeholder, + shapes=[image_size + (3,), ()], + enqueue_many=True, capacity=4 * nrof_preprocess_threads * 100, - allow_smaller_final_batch=True) + allow_smaller_final_batch=True, + ) return image_batch, label_batch @@ -163,8 +181,8 @@ def _add_loss_summaries(total_loss): loss_averages_op: op for generating moving averages of losses. """ # Compute the moving average of all individual losses and the total loss. - loss_averages = tf.train.ExponentialMovingAverage(0.9, name='avg') - losses = tf.get_collection('losses') + loss_averages = tf.train.ExponentialMovingAverage(0.9, name="avg") + losses = tf.get_collection("losses") loss_averages_op = loss_averages.apply(losses + [total_loss]) # Attach a scalar summmary to all individual losses and the total loss; do the @@ -172,34 +190,46 @@ def _add_loss_summaries(total_loss): for l in losses + [total_loss]: # Name each loss as '(raw)' and name the moving average version of the loss # as the original loss name. - tf.summary.scalar(l.op.name + ' (raw)', l) + tf.summary.scalar(l.op.name + " (raw)", l) tf.summary.scalar(l.op.name, loss_averages.average(l)) return loss_averages_op -def train(total_loss, global_step, optimizer, learning_rate, moving_average_decay, update_gradient_vars, log_histograms=True): +def train( + total_loss, + global_step, + optimizer, + learning_rate, + moving_average_decay, + update_gradient_vars, + log_histograms=True, +): # Generate moving averages of all losses and associated summaries. loss_averages_op = _add_loss_summaries(total_loss) # Compute gradients. with tf.control_dependencies([loss_averages_op]): - if optimizer == 'ADAGRAD': + if optimizer == "ADAGRAD": opt = tf.train.AdagradOptimizer(learning_rate) - elif optimizer == 'ADADELTA': + elif optimizer == "ADADELTA": opt = tf.train.AdadeltaOptimizer( - learning_rate, rho=0.9, epsilon=1e-6) - elif optimizer == 'ADAM': + learning_rate, rho=0.9, epsilon=1e-6 + ) + elif optimizer == "ADAM": opt = tf.train.AdamOptimizer( - learning_rate, beta1=0.9, beta2=0.999, epsilon=0.1) - elif optimizer == 'RMSPROP': + learning_rate, beta1=0.9, beta2=0.999, epsilon=0.1 + ) + elif optimizer == "RMSPROP": opt = tf.train.RMSPropOptimizer( - learning_rate, decay=0.9, momentum=0.9, epsilon=1.0) - elif optimizer == 'MOM': + learning_rate, decay=0.9, momentum=0.9, epsilon=1.0 + ) + elif optimizer == "MOM": opt = tf.train.MomentumOptimizer( - learning_rate, 0.9, use_nesterov=True) + learning_rate, 0.9, use_nesterov=True + ) else: - raise ValueError('Invalid optimization algorithm') + raise ValueError("Invalid optimization algorithm") grads = opt.compute_gradients(total_loss, update_gradient_vars) @@ -215,15 +245,16 @@ def train(total_loss, global_step, optimizer, learning_rate, moving_average_deca if log_histograms: for grad, var in grads: if grad is not None: - tf.summary.histogram(var.op.name + '/gradients', grad) + tf.summary.histogram(var.op.name + "/gradients", grad) # Track the moving averages of all trainable variables. variable_averages = tf.train.ExponentialMovingAverage( - moving_average_decay, global_step) + moving_average_decay, global_step + ) variables_averages_op = variable_averages.apply(tf.trainable_variables()) with tf.control_dependencies([apply_gradient_op, variables_averages_op]): - train_op = tf.no_op(name='train') + train_op = tf.no_op(name="train") return train_op @@ -242,12 +273,17 @@ def crop(image, random_crop, image_size): sz2 = int(image_size // 2) if random_crop: diff = sz1 - sz2 - (h, v) = (np.random.randint(-diff, diff + 1), - np.random.randint(-diff, diff + 1)) + (h, v) = ( + np.random.randint(-diff, diff + 1), + np.random.randint(-diff, diff + 1), + ) else: (h, v) = (0, 0) - image = image[(sz1 - sz2 + v):(sz1 + sz2 + v), - (sz1 - sz2 + h):(sz1 + sz2 + h), :] + image = image[ + (sz1 - sz2 + v) : (sz1 + sz2 + v), + (sz1 - sz2 + h) : (sz1 + sz2 + h), + :, + ] return image @@ -264,7 +300,9 @@ def to_rgb(img): return ret -def load_data(image_paths, do_random_crop, do_random_flip, image_size, do_prewhiten=True): +def load_data( + image_paths, do_random_crop, do_random_flip, image_size, do_prewhiten=True +): nrof_samples = len(image_paths) images = np.zeros((nrof_samples, image_size, image_size, 3)) for i in range(nrof_samples): @@ -283,10 +321,10 @@ def get_label_batch(label_data, batch_size, batch_index): nrof_examples = np.size(label_data, 0) j = batch_index * batch_size % nrof_examples if j + batch_size <= nrof_examples: - batch = label_data[j:j + batch_size] + batch = label_data[j : j + batch_size] else: x1 = label_data[j:nrof_examples] - x2 = label_data[0:nrof_examples - j] + x2 = label_data[0 : nrof_examples - j] batch = np.vstack([x1, x2]) batch_int = batch.astype(np.int64) return batch_int @@ -296,10 +334,10 @@ def get_batch(image_data, batch_size, batch_index): nrof_examples = np.size(image_data, 0) j = batch_index * batch_size % nrof_examples if j + batch_size <= nrof_examples: - batch = image_data[j:j + batch_size, :, :, :] + batch = image_data[j : j + batch_size, :, :, :] else: x1 = image_data[j:nrof_examples, :, :, :] - x2 = image_data[0:nrof_examples - j, :, :, :] + x2 = image_data[0 : nrof_examples - j, :, :, :] batch = np.vstack([x1, x2]) batch_float = batch.astype(np.float32) return batch_float @@ -315,13 +353,13 @@ def get_triplet_batch(triplets, batch_index, batch_size): def get_learning_rate_from_file(filename, epoch): - with open(filename, 'r') as f: + with open(filename, "r") as f: for line in f.readlines(): - line = line.split('#', 1)[0] + line = line.split("#", 1)[0] if line: - par = line.strip().split(':') + par = line.strip().split(":") e = int(par[0]) - if par[1] == '-': + if par[1] == "-": lr = -1 else: lr = float(par[1]) @@ -331,7 +369,7 @@ def get_learning_rate_from_file(filename, epoch): return learning_rate -class ImageClass(): +class ImageClass: "Stores the paths to images for a given class" def __init__(self, name, image_paths): @@ -339,7 +377,7 @@ def __init__(self, name, image_paths): self.image_paths = image_paths def __str__(self): - return self.name + ', ' + str(len(self.image_paths)) + ' images' + return self.name + ", " + str(len(self.image_paths)) + " images" def __len__(self): return len(self.image_paths) @@ -348,8 +386,11 @@ def __len__(self): def get_dataset(path, has_class_directories=True): dataset = [] path_exp = os.path.expanduser(path) - classes = [path for path in os.listdir( - path_exp) if os.path.isdir(os.path.join(path_exp, path))] + classes = [ + path + for path in os.listdir(path_exp) + if os.path.isdir(os.path.join(path_exp, path)) + ] classes.sort() nrof_classes = len(classes) for i in range(nrof_classes): @@ -370,14 +411,14 @@ def get_image_paths(facedir): def split_dataset(dataset, split_ratio, min_nrof_images_per_class, mode): - if mode == 'SPLIT_CLASSES': + if mode == "SPLIT_CLASSES": nrof_classes = len(dataset) class_indices = np.arange(nrof_classes) np.random.shuffle(class_indices) split = int(round(nrof_classes * (1 - split_ratio))) train_set = [dataset[i] for i in class_indices[0:split]] test_set = [dataset[i] for i in class_indices[split:-1]] - elif mode == 'SPLIT_IMAGES': + elif mode == "SPLIT_IMAGES": train_set = [] test_set = [] for cls in dataset: @@ -387,7 +428,10 @@ def split_dataset(dataset, split_ratio, min_nrof_images_per_class, mode): split = int(math.floor(nrof_images_in_class * (1 - split_ratio))) if split == nrof_images_in_class: split = nrof_images_in_class - 1 - if split >= min_nrof_images_per_class and nrof_images_in_class - split >= 1: + if ( + split >= min_nrof_images_per_class + and nrof_images_in_class - split >= 1 + ): train_set.append(ImageClass(cls.name, paths[:split])) test_set.append(ImageClass(cls.name, paths[split:])) else: @@ -399,44 +443,49 @@ def load_model(model, input_map=None): # Check if the model is a model directory (containing a metagraph and a checkpoint file) # or if it is a protobuf file with a frozen graph model_exp = os.path.expanduser(model) - if (os.path.isfile(model_exp)): - print('Model filename: %s' % model_exp) - with tf.io.gfile.GFile(model_exp, 'rb') as f: + if os.path.isfile(model_exp): + print("Model filename: %s" % model_exp) + with tf.io.gfile.GFile(model_exp, "rb") as f: graph_def = tf.compat.v1.GraphDef() graph_def.ParseFromString(f.read()) - tf.import_graph_def(graph_def, input_map=input_map, name='') + tf.import_graph_def(graph_def, input_map=input_map, name="") else: - print('Model directory: %s' % model_exp) + print("Model directory: %s" % model_exp) meta_file, ckpt_file = get_model_filenames(model_exp) - print('Metagraph file: %s' % meta_file) - print('Checkpoint file: %s' % ckpt_file) + print("Metagraph file: %s" % meta_file) + print("Checkpoint file: %s" % ckpt_file) - saver = tf.train.import_meta_graph(os.path.join( - model_exp, meta_file), input_map=input_map) - saver.restore(tf.get_default_session(), - os.path.join(model_exp, ckpt_file)) + saver = tf.train.import_meta_graph( + os.path.join(model_exp, meta_file), input_map=input_map + ) + saver.restore( + tf.get_default_session(), os.path.join(model_exp, ckpt_file) + ) def get_model_filenames(model_dir): files = os.listdir(model_dir) - meta_files = [s for s in files if s.endswith('.meta')] + meta_files = [s for s in files if s.endswith(".meta")] if len(meta_files) == 0: raise ValueError( - 'No meta file found in the model directory (%s)' % model_dir) + "No meta file found in the model directory (%s)" % model_dir + ) elif len(meta_files) > 1: raise ValueError( - 'There should not be more than one meta file in the model directory (%s)' % model_dir) + "There should not be more than one meta file in the model directory (%s)" + % model_dir + ) meta_file = meta_files[0] ckpt = tf.train.get_checkpoint_state(model_dir) if ckpt and ckpt.model_checkpoint_path: ckpt_file = os.path.basename(ckpt.model_checkpoint_path) return meta_file, ckpt_file - meta_files = [s for s in files if '.ckpt' in s] + meta_files = [s for s in files if ".ckpt" in s] max_step = -1 for f in files: - step_str = re.match(r'(^model-[\w\- ]+.ckpt-(\d+))', f) + step_str = re.match(r"(^model-[\w\- ]+.ckpt-(\d+))", f) if step_str is not None and len(step_str.groups()) >= 2: step = int(step_str.groups()[1]) if step > max_step: @@ -453,19 +502,28 @@ def distance(embeddings1, embeddings2, distance_metric=0): elif distance_metric == 1: # Distance based on cosine similarity dot = np.sum(np.multiply(embeddings1, embeddings2), axis=1) - norm = np.linalg.norm(embeddings1, axis=1) * \ - np.linalg.norm(embeddings2, axis=1) + norm = np.linalg.norm(embeddings1, axis=1) * np.linalg.norm( + embeddings2, axis=1 + ) similarity = dot / norm dist = np.arccos(similarity) / math.pi else: - raise 'Undefined distance metric %d' % distance_metric + raise "Undefined distance metric %d" % distance_metric return dist -def calculate_roc(thresholds, embeddings1, embeddings2, actual_issame, nrof_folds=10, distance_metric=0, subtract_mean=False): - assert(embeddings1.shape[0] == embeddings2.shape[0]) - assert(embeddings1.shape[1] == embeddings2.shape[1]) +def calculate_roc( + thresholds, + embeddings1, + embeddings2, + actual_issame, + nrof_folds=10, + distance_metric=0, + subtract_mean=False, +): + assert embeddings1.shape[0] == embeddings2.shape[0] + assert embeddings1.shape[1] == embeddings2.shape[1] nrof_pairs = min(len(actual_issame), embeddings1.shape[0]) nrof_thresholds = len(thresholds) k_fold = KFold(n_splits=nrof_folds, shuffle=False) @@ -478,24 +536,38 @@ def calculate_roc(thresholds, embeddings1, embeddings2, actual_issame, nrof_fold for fold_idx, (train_set, test_set) in enumerate(k_fold.split(indices)): if subtract_mean: - mean = np.mean(np.concatenate( - [embeddings1[train_set], embeddings2[train_set]]), axis=0) + mean = np.mean( + np.concatenate( + [embeddings1[train_set], embeddings2[train_set]] + ), + axis=0, + ) else: mean = 0.0 - dist = distance(embeddings1 - mean, embeddings2 - - mean, distance_metric) + dist = distance( + embeddings1 - mean, embeddings2 - mean, distance_metric + ) # Find the best threshold for the fold acc_train = np.zeros((nrof_thresholds)) for threshold_idx, threshold in enumerate(thresholds): _, _, acc_train[threshold_idx] = calculate_accuracy( - threshold, dist[train_set], actual_issame[train_set]) + threshold, dist[train_set], actual_issame[train_set] + ) best_threshold_index = np.argmax(acc_train) for threshold_idx, threshold in enumerate(thresholds): - tprs[fold_idx, threshold_idx], fprs[fold_idx, threshold_idx], _ = calculate_accuracy( - threshold, dist[test_set], actual_issame[test_set]) + ( + tprs[fold_idx, threshold_idx], + fprs[fold_idx, threshold_idx], + _, + ) = calculate_accuracy( + threshold, dist[test_set], actual_issame[test_set] + ) _, _, accuracy[fold_idx] = calculate_accuracy( - thresholds[best_threshold_index], dist[test_set], actual_issame[test_set]) + thresholds[best_threshold_index], + dist[test_set], + actual_issame[test_set], + ) tpr = np.mean(tprs, 0) fpr = np.mean(fprs, 0) @@ -506,8 +578,11 @@ def calculate_accuracy(threshold, dist, actual_issame): predict_issame = np.less(dist, threshold) tp = np.sum(np.logical_and(predict_issame, actual_issame)) fp = np.sum(np.logical_and(predict_issame, np.logical_not(actual_issame))) - tn = np.sum(np.logical_and(np.logical_not( - predict_issame), np.logical_not(actual_issame))) + tn = np.sum( + np.logical_and( + np.logical_not(predict_issame), np.logical_not(actual_issame) + ) + ) fn = np.sum(np.logical_and(np.logical_not(predict_issame), actual_issame)) tpr = 0 if (tp + fn == 0) else float(tp) / float(tp + fn) @@ -516,9 +591,18 @@ def calculate_accuracy(threshold, dist, actual_issame): return tpr, fpr, acc -def calculate_val(thresholds, embeddings1, embeddings2, actual_issame, far_target, nrof_folds=10, distance_metric=0, subtract_mean=False): - assert(embeddings1.shape[0] == embeddings2.shape[0]) - assert(embeddings1.shape[1] == embeddings2.shape[1]) +def calculate_val( + thresholds, + embeddings1, + embeddings2, + actual_issame, + far_target, + nrof_folds=10, + distance_metric=0, + subtract_mean=False, +): + assert embeddings1.shape[0] == embeddings2.shape[0] + assert embeddings1.shape[1] == embeddings2.shape[1] nrof_pairs = min(len(actual_issame), embeddings1.shape[0]) nrof_thresholds = len(thresholds) k_fold = KFold(n_splits=nrof_folds, shuffle=False) @@ -530,26 +614,33 @@ def calculate_val(thresholds, embeddings1, embeddings2, actual_issame, far_targe for fold_idx, (train_set, test_set) in enumerate(k_fold.split(indices)): if subtract_mean: - mean = np.mean(np.concatenate( - [embeddings1[train_set], embeddings2[train_set]]), axis=0) + mean = np.mean( + np.concatenate( + [embeddings1[train_set], embeddings2[train_set]] + ), + axis=0, + ) else: mean = 0.0 - dist = distance(embeddings1 - mean, embeddings2 - - mean, distance_metric) + dist = distance( + embeddings1 - mean, embeddings2 - mean, distance_metric + ) # Find the threshold that gives FAR = far_target far_train = np.zeros(nrof_thresholds) for threshold_idx, threshold in enumerate(thresholds): _, far_train[threshold_idx] = calculate_val_far( - threshold, dist[train_set], actual_issame[train_set]) + threshold, dist[train_set], actual_issame[train_set] + ) if np.max(far_train) >= far_target: - f = interpolate.interp1d(far_train, thresholds, kind='slinear') + f = interpolate.interp1d(far_train, thresholds, kind="slinear") threshold = f(far_target) else: threshold = 0.0 val[fold_idx], far[fold_idx] = calculate_val_far( - threshold, dist[test_set], actual_issame[test_set]) + threshold, dist[test_set], actual_issame[test_set] + ) val_mean = np.mean(val) far_mean = np.mean(far) @@ -560,8 +651,9 @@ def calculate_val(thresholds, embeddings1, embeddings2, actual_issame, far_targe def calculate_val_far(threshold, dist, actual_issame): predict_issame = np.less(dist, threshold) true_accept = np.sum(np.logical_and(predict_issame, actual_issame)) - false_accept = np.sum(np.logical_and( - predict_issame, np.logical_not(actual_issame))) + false_accept = np.sum( + np.logical_and(predict_issame, np.logical_not(actual_issame)) + ) n_same = np.sum(actual_issame) n_diff = np.sum(np.logical_not(actual_issame)) val = float(true_accept) / float(n_same) @@ -572,30 +664,31 @@ def calculate_val_far(threshold, dist, actual_issame): def store_revision_info(src_path, output_dir, arg_string): try: # Get git hash - cmd = ['git', 'rev-parse', 'HEAD'] + cmd = ["git", "rev-parse", "HEAD"] gitproc = Popen(cmd, stdout=PIPE, cwd=src_path) (stdout, _) = gitproc.communicate() git_hash = stdout.strip() except OSError as e: - git_hash = ' '.join(cmd) + ': ' + e.strerror + git_hash = " ".join(cmd) + ": " + e.strerror try: # Get local changes - cmd = ['git', 'diff', 'HEAD'] + cmd = ["git", "diff", "HEAD"] gitproc = Popen(cmd, stdout=PIPE, cwd=src_path) (stdout, _) = gitproc.communicate() git_diff = stdout.strip() except OSError as e: - git_diff = ' '.join(cmd) + ': ' + e.strerror + git_diff = " ".join(cmd) + ": " + e.strerror # Store a text file in the log directory - rev_info_filename = os.path.join(output_dir, 'revision_info.txt') + rev_info_filename = os.path.join(output_dir, "revision_info.txt") with open(rev_info_filename, "w") as text_file: - text_file.write('arguments: %s\n--------------------\n' % arg_string) - text_file.write('tensorflow version: %s\n--------------------\n' % - tf.__version__) # @UndefinedVariable - text_file.write('git hash: %s\n--------------------\n' % git_hash) - text_file.write('%s' % git_diff) + text_file.write("arguments: %s\n--------------------\n" % arg_string) + text_file.write( + "tensorflow version: %s\n--------------------\n" % tf.__version__ + ) # @UndefinedVariable + text_file.write("git hash: %s\n--------------------\n" % git_hash) + text_file.write("%s" % git_diff) def list_variables(filename): @@ -609,8 +702,10 @@ def put_images_on_grid(images, shape=(16, 8)): nrof_images = images.shape[0] img_size = images.shape[1] bw = 3 - img = np.zeros((shape[1] * (img_size + bw) + bw, - shape[0] * (img_size + bw) + bw, 3), np.float32) + img = np.zeros( + (shape[1] * (img_size + bw) + bw, shape[0] * (img_size + bw) + bw, 3), + np.float32, + ) for i in range(shape[1]): x_start = i * (img_size + bw) + bw for j in range(shape[0]): @@ -618,14 +713,15 @@ def put_images_on_grid(images, shape=(16, 8)): if img_index >= nrof_images: break y_start = j * (img_size + bw) + bw - img[x_start:x_start + img_size, y_start:y_start + - img_size, :] = images[img_index, :, :, :] + img[ + x_start : x_start + img_size, y_start : y_start + img_size, : + ] = images[img_index, :, :, :] if img_index >= nrof_images: break return img def write_arguments_to_file(args, filename): - with open(filename, 'w') as f: + with open(filename, "w") as f: for key, value in iteritems(vars(args)): - f.write('%s: %s\n' % (key, str(value))) + f.write("%s: %s\n" % (key, str(value))) diff --git a/src/face_models/mtcnn/__init__.py b/src/face_models/mtcnn/__init__.py new file mode 100644 index 000000000..62343eb62 --- /dev/null +++ b/src/face_models/mtcnn/__init__.py @@ -0,0 +1 @@ +from .mtcnn_detector import MtcnnDetector diff --git a/src/face_models/helper.py b/src/face_models/mtcnn/helper.py similarity index 87% rename from src/face_models/helper.py rename to src/face_models/mtcnn/helper.py index e1b6a9eff..015954650 100644 --- a/src/face_models/helper.py +++ b/src/face_models/mtcnn/helper.py @@ -59,8 +59,10 @@ def nms(boxes, overlap_threshold, mode="Union"): overlap = inter / (area[i] + area[idxs[:last]] - inter) # delete all indexes from the index list that have - idxs = np.delete(idxs, np.concatenate(([last], - np.where(overlap > overlap_threshold)[0]))) + idxs = np.delete( + idxs, + np.concatenate(([last], np.where(overlap > overlap_threshold)[0])), + ) return pick @@ -119,14 +121,16 @@ def generate_bbox(map, reg, scale, threshold): reg = np.array([dx1, dy1, dx2, dy2]) score = map[t_index[0], t_index[1]] - boundingbox = np.vstack([np.round((stride * t_index[1] + 1) / scale), - np.round((stride * t_index[0] + 1) / scale), - np.round( - (stride * t_index[1] + 1 + cellsize) / scale), - np.round( - (stride * t_index[0] + 1 + cellsize) / scale), - score, - reg]) + boundingbox = np.vstack( + [ + np.round((stride * t_index[1] + 1) / scale), + np.round((stride * t_index[0] + 1) / scale), + np.round((stride * t_index[1] + 1 + cellsize) / scale), + np.round((stride * t_index[0] + 1 + cellsize) / scale), + score, + reg, + ] + ) return boundingbox.T diff --git a/src/face_models/mtcnn_detector.py b/src/face_models/mtcnn/mtcnn_detector.py similarity index 73% rename from src/face_models/mtcnn_detector.py rename to src/face_models/mtcnn/mtcnn_detector.py index f740c6646..2bf2a6443 100644 --- a/src/face_models/mtcnn_detector.py +++ b/src/face_models/mtcnn/mtcnn_detector.py @@ -1,49 +1,57 @@ +import math import os +from itertools import repeat + import cv2 -import math import mxnet as mx import numpy as np -from itertools import repeat + try: from itertools import izip except ImportError: izip = zip -from face_models.helper import nms, adjust_input, generate_bbox, detect_first_stage_warpper +from face_models.mtcnn.helper import ( + adjust_input, + detect_first_stage_warpper, + nms, +) class MtcnnDetector(object): """ - Joint Face Detection and Alignment using Multi-task Cascaded Convolutional Neural Networks - see https://github.com/kpzhang93/MTCNN_face_detection_alignment - this is a mxnet version + Joint Face Detection and Alignment using Multi-task Cascaded Convolutional Neural Networks + see https://github.com/kpzhang93/MTCNN_face_detection_alignment + this is a mxnet version """ - def __init__(self, - model_dir=".", - minsize=20, - threshold=[0.6, 0.7, 0.8], - factor=0.709, - num_worker=1, - accurate_landmark=False, - ctx=mx.cpu()): + def __init__( + self, + model_dir=".", + minsize=20, + threshold=[0.6, 0.7, 0.8], + factor=0.709, + num_worker=1, + accurate_landmark=False, + ctx=mx.cpu(), + ): """ - Initialize the detector - - Parameters: - ---------- - model_dir : string - path for the models - minsize : float number - minimal face to detect - threshold : float number - detect threshold for 3 stages - factor: float number - scale factor for image pyramid - num_worker: int number - number of processes we use for first stage - accurate_landmark: bool - use accurate landmark localization or not + Initialize the detector + + Parameters: + ---------- + model_dir : string + path for the models + minsize : float number + minimal face to detect + threshold : float number + detect threshold for 3 stages + factor: float number + scale factor for image pyramid + num_worker: int number + number of processes we use for first stage + accurate_landmark: bool + use accurate landmark localization or not """ self.num_worker = num_worker @@ -141,11 +149,13 @@ def pad(self, bboxes, w, h): height and width of the bbox """ - tmpw, tmph = bboxes[:, 2] - bboxes[:, 0] + \ - 1, bboxes[:, 3] - bboxes[:, 1] + 1 + tmpw, tmph = ( + bboxes[:, 2] - bboxes[:, 0] + 1, + bboxes[:, 3] - bboxes[:, 1] + 1, + ) num_box = bboxes.shape[0] - dx, dy = np.zeros((num_box, )), np.zeros((num_box, )) + dx, dy = np.zeros((num_box,)), np.zeros((num_box,)) edx, edy = tmpw.copy() - 1, tmph.copy() - 1 x, y, ex, ey = bboxes[:, 0], bboxes[:, 1], bboxes[:, 2], bboxes[:, 3] @@ -179,10 +189,12 @@ def slice_index(self, number): number: int number number """ + def chunks(l, n): """Yield successive n-sized chunks from l.""" for i in range(0, len(l), n): - yield l[i:i + n] + yield l[i : i + n] + num_list = range(number) return list(chunks(num_list, self.num_worker)) @@ -190,19 +202,22 @@ def detect_face_limited(self, img, det_type=2): height, width, _ = img.shape if det_type >= 2: total_boxes = np.array( - [[0.0, 0.0, img.shape[1], img.shape[0], 0.9]], dtype=np.float32) + [[0.0, 0.0, img.shape[1], img.shape[0], 0.9]], dtype=np.float32 + ) num_box = total_boxes.shape[0] # pad the bbox [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad( - total_boxes, width, height) + total_boxes, width, height + ) # (3, 24, 24) is the input shape for RNet input_buf = np.zeros((num_box, 3, 24, 24), dtype=np.float32) for i in range(num_box): tmp = np.zeros((tmph[i], tmpw[i], 3), dtype=np.uint8) - tmp[dy[i]:edy[i] + 1, dx[i]:edx[i] + 1, - :] = img[y[i]:ey[i] + 1, x[i]:ex[i] + 1, :] + tmp[dy[i] : edy[i] + 1, dx[i] : edx[i] + 1, :] = img[ + y[i] : ey[i] + 1, x[i] : ex[i] + 1, : + ] input_buf[i, :, :, :] = adjust_input(cv2.resize(tmp, (24, 24))) output = self.RNet.predict(input_buf) @@ -225,17 +240,20 @@ def detect_face_limited(self, img, det_type=2): total_boxes[:, 0:4] = np.round(total_boxes[:, 0:4]) else: total_boxes = np.array( - [[0.0, 0.0, img.shape[1], img.shape[0], 0.9]], dtype=np.float32) + [[0.0, 0.0, img.shape[1], img.shape[0], 0.9]], dtype=np.float32 + ) num_box = total_boxes.shape[0] [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad( - total_boxes, width, height) + total_boxes, width, height + ) # (3, 48, 48) is the input shape for ONet input_buf = np.zeros((num_box, 3, 48, 48), dtype=np.float32) for i in range(num_box): tmp = np.zeros((tmph[i], tmpw[i], 3), dtype=np.float32) - tmp[dy[i]:edy[i] + 1, dx[i]:edx[i] + 1, - :] = img[y[i]:ey[i] + 1, x[i]:ex[i] + 1, :] + tmp[dy[i] : edy[i] + 1, dx[i] : edx[i] + 1, :] = img[ + y[i] : ey[i] + 1, x[i] : ex[i] + 1, : + ] input_buf[i, :, :, :] = adjust_input(cv2.resize(tmp, (48, 48))) output = self.ONet.predict(input_buf) @@ -254,10 +272,14 @@ def detect_face_limited(self, img, det_type=2): # compute landmark points bbw = total_boxes[:, 2] - total_boxes[:, 0] + 1 bbh = total_boxes[:, 3] - total_boxes[:, 1] + 1 - points[:, 0:5] = np.expand_dims( - total_boxes[:, 0], 1) + np.expand_dims(bbw, 1) * points[:, 0:5] - points[:, 5:10] = np.expand_dims( - total_boxes[:, 1], 1) + np.expand_dims(bbh, 1) * points[:, 5:10] + points[:, 0:5] = ( + np.expand_dims(total_boxes[:, 0], 1) + + np.expand_dims(bbw, 1) * points[:, 0:5] + ) + points[:, 5:10] = ( + np.expand_dims(total_boxes[:, 1], 1) + + np.expand_dims(bbh, 1) * points[:, 5:10] + ) # nms total_boxes = self.calibrate_box(total_boxes, reg) @@ -271,7 +293,9 @@ def detect_face_limited(self, img, det_type=2): # extended stage num_box = total_boxes.shape[0] patchw = np.maximum( - total_boxes[:, 2] - total_boxes[:, 0] + 1, total_boxes[:, 3] - total_boxes[:, 1] + 1) + total_boxes[:, 2] - total_boxes[:, 0] + 1, + total_boxes[:, 3] - total_boxes[:, 1] + 1, + ) patchw = np.round(patchw * 0.25) # make it even @@ -281,15 +305,19 @@ def detect_face_limited(self, img, det_type=2): for i in range(5): x, y = points[:, i], points[:, i + 5] x, y = np.round(x - 0.5 * patchw), np.round(y - 0.5 * patchw) - [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad(np.vstack([x, y, x + patchw - 1, y + patchw - 1]).T, - width, - height) + [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad( + np.vstack([x, y, x + patchw - 1, y + patchw - 1]).T, + width, + height, + ) for j in range(num_box): tmpim = np.zeros((tmpw[j], tmpw[j], 3), dtype=np.float32) - tmpim[dy[j]:edy[j] + 1, dx[j]:edx[j] + 1, - :] = img[y[j]:ey[j] + 1, x[j]:ex[j] + 1, :] - input_buf[j, i * 3:i * 3 + 3, :, - :] = adjust_input(cv2.resize(tmpim, (24, 24))) + tmpim[dy[j] : edy[j] + 1, dx[j] : edx[j] + 1, :] = img[ + y[j] : ey[j] + 1, x[j] : ex[j] + 1, : + ] + input_buf[j, i * 3 : i * 3 + 3, :, :] = adjust_input( + cv2.resize(tmpim, (24, 24)) + ) output = self.LNet.predict(input_buf) @@ -301,10 +329,14 @@ def detect_face_limited(self, img, det_type=2): tmp_index = np.where(np.abs(output[k] - 0.5) > 0.35) output[k][tmp_index[0]] = 0.5 - pointx[:, k] = np.round( - points[:, k] - 0.5 * patchw) + output[k][:, 0] * patchw - pointy[:, k] = np.round( - points[:, k + 5] - 0.5 * patchw) + output[k][:, 1] * patchw + pointx[:, k] = ( + np.round(points[:, k] - 0.5 * patchw) + + output[k][:, 0] * patchw + ) + pointy[:, k] = ( + np.round(points[:, k + 5] - 0.5 * patchw) + + output[k][:, 1] * patchw + ) points = np.hstack([pointx, pointy]) points = points.astype(np.int32) @@ -349,7 +381,7 @@ def detect_face(self, img, det_type=0): minl *= m factor_count = 0 while minl > MIN_DET_SIZE: - scales.append(m * self.factor**factor_count) + scales.append(m * self.factor ** factor_count) minl *= self.factor factor_count += 1 @@ -357,8 +389,15 @@ def detect_face(self, img, det_type=0): sliced_index = self.slice_index(len(scales)) total_boxes = [] for batch in sliced_index: - local_boxes = map(detect_first_stage_warpper, - izip(repeat(img), self.PNets[:len(batch)], [scales[i] for i in batch], repeat(self.threshold[0]))) + local_boxes = map( + detect_first_stage_warpper, + izip( + repeat(img), + self.PNets[: len(batch)], + [scales[i] for i in batch], + repeat(self.threshold[0]), + ), + ) total_boxes.extend(local_boxes) # remove the Nones @@ -380,36 +419,39 @@ def detect_face(self, img, det_type=0): bbh = total_boxes[:, 3] - total_boxes[:, 1] + 1 # refine the bboxes - total_boxes = np.vstack([total_boxes[:, 0] + total_boxes[:, 5] * bbw, - total_boxes[:, 1] + - total_boxes[:, 6] * bbh, - total_boxes[:, 2] + - total_boxes[:, 7] * bbw, - total_boxes[:, 3] + - total_boxes[:, 8] * bbh, - total_boxes[:, 4] - ]) + total_boxes = np.vstack( + [ + total_boxes[:, 0] + total_boxes[:, 5] * bbw, + total_boxes[:, 1] + total_boxes[:, 6] * bbh, + total_boxes[:, 2] + total_boxes[:, 7] * bbw, + total_boxes[:, 3] + total_boxes[:, 8] * bbh, + total_boxes[:, 4], + ] + ) total_boxes = total_boxes.T total_boxes = self.convert_to_square(total_boxes) total_boxes[:, 0:4] = np.round(total_boxes[:, 0:4]) else: total_boxes = np.array( - [[0.0, 0.0, img.shape[1], img.shape[0], 0.9]], dtype=np.float32) + [[0.0, 0.0, img.shape[1], img.shape[0], 0.9]], dtype=np.float32 + ) # second stage num_box = total_boxes.shape[0] # pad the bbox [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad( - total_boxes, width, height) + total_boxes, width, height + ) # (3, 24, 24) is the input shape for RNet input_buf = np.zeros((num_box, 3, 24, 24), dtype=np.float32) for i in range(num_box): tmp = np.zeros((tmph[i], tmpw[i], 3), dtype=np.uint8) - tmp[dy[i]:edy[i] + 1, dx[i]:edx[i] + 1, - :] = img[y[i]:ey[i] + 1, x[i]:ex[i] + 1, :] + tmp[dy[i] : edy[i] + 1, dx[i] : edx[i] + 1, :] = img[ + y[i] : ey[i] + 1, x[i] : ex[i] + 1, : + ] input_buf[i, :, :, :] = adjust_input(cv2.resize(tmp, (24, 24))) output = self.RNet.predict(input_buf) @@ -438,14 +480,16 @@ def detect_face(self, img, det_type=0): # pad the bbox [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad( - total_boxes, width, height) + total_boxes, width, height + ) # (3, 48, 48) is the input shape for ONet input_buf = np.zeros((num_box, 3, 48, 48), dtype=np.float32) for i in range(num_box): tmp = np.zeros((tmph[i], tmpw[i], 3), dtype=np.float32) - tmp[dy[i]:edy[i] + 1, dx[i]:edx[i] + 1, - :] = img[y[i]:ey[i] + 1, x[i]:ex[i] + 1, :] + tmp[dy[i] : edy[i] + 1, dx[i] : edx[i] + 1, :] = img[ + y[i] : ey[i] + 1, x[i] : ex[i] + 1, : + ] input_buf[i, :, :, :] = adjust_input(cv2.resize(tmp, (48, 48))) output = self.ONet.predict(input_buf) @@ -464,10 +508,14 @@ def detect_face(self, img, det_type=0): # compute landmark points bbw = total_boxes[:, 2] - total_boxes[:, 0] + 1 bbh = total_boxes[:, 3] - total_boxes[:, 1] + 1 - points[:, 0:5] = np.expand_dims( - total_boxes[:, 0], 1) + np.expand_dims(bbw, 1) * points[:, 0:5] - points[:, 5:10] = np.expand_dims( - total_boxes[:, 1], 1) + np.expand_dims(bbh, 1) * points[:, 5:10] + points[:, 0:5] = ( + np.expand_dims(total_boxes[:, 0], 1) + + np.expand_dims(bbw, 1) * points[:, 0:5] + ) + points[:, 5:10] = ( + np.expand_dims(total_boxes[:, 1], 1) + + np.expand_dims(bbh, 1) * points[:, 5:10] + ) # nms total_boxes = self.calibrate_box(total_boxes, reg) @@ -483,7 +531,9 @@ def detect_face(self, img, det_type=0): ############################################# num_box = total_boxes.shape[0] patchw = np.maximum( - total_boxes[:, 2] - total_boxes[:, 0] + 1, total_boxes[:, 3] - total_boxes[:, 1] + 1) + total_boxes[:, 2] - total_boxes[:, 0] + 1, + total_boxes[:, 3] - total_boxes[:, 1] + 1, + ) patchw = np.round(patchw * 0.25) # make it even @@ -493,15 +543,19 @@ def detect_face(self, img, det_type=0): for i in range(5): x, y = points[:, i], points[:, i + 5] x, y = np.round(x - 0.5 * patchw), np.round(y - 0.5 * patchw) - [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad(np.vstack([x, y, x + patchw - 1, y + patchw - 1]).T, - width, - height) + [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad( + np.vstack([x, y, x + patchw - 1, y + patchw - 1]).T, + width, + height, + ) for j in range(num_box): tmpim = np.zeros((tmpw[j], tmpw[j], 3), dtype=np.float32) - tmpim[dy[j]:edy[j] + 1, dx[j]:edx[j] + 1, - :] = img[y[j]:ey[j] + 1, x[j]:ex[j] + 1, :] - input_buf[j, i * 3:i * 3 + 3, :, - :] = adjust_input(cv2.resize(tmpim, (24, 24))) + tmpim[dy[j] : edy[j] + 1, dx[j] : edx[j] + 1, :] = img[ + y[j] : ey[j] + 1, x[j] : ex[j] + 1, : + ] + input_buf[j, i * 3 : i * 3 + 3, :, :] = adjust_input( + cv2.resize(tmpim, (24, 24)) + ) output = self.LNet.predict(input_buf) @@ -513,10 +567,14 @@ def detect_face(self, img, det_type=0): tmp_index = np.where(np.abs(output[k] - 0.5) > 0.35) output[k][tmp_index[0]] = 0.5 - pointx[:, k] = np.round( - points[:, k] - 0.5 * patchw) + output[k][:, 0] * patchw - pointy[:, k] = np.round( - points[:, k + 5] - 0.5 * patchw) + output[k][:, 1] * patchw + pointx[:, k] = ( + np.round(points[:, k] - 0.5 * patchw) + + output[k][:, 0] * patchw + ) + pointy[:, k] = ( + np.round(points[:, k + 5] - 0.5 * patchw) + + output[k][:, 1] * patchw + ) points = np.hstack([pointx, pointy]) points = points.astype(np.int32) @@ -555,7 +613,10 @@ def find_tfrom_between_shapes(self, from_shape, to_shape): tran_m: tran_b: """ - assert from_shape.shape[0] == to_shape.shape[0] and from_shape.shape[0] % 2 == 0 + assert ( + from_shape.shape[0] == to_shape.shape[0] + and from_shape.shape[0] % 2 == 0 + ) sigma_from = 0.0 sigma_to = 0.0 @@ -572,8 +633,9 @@ def find_tfrom_between_shapes(self, from_shape, to_shape): sigma_from += temp_dis * temp_dis temp_dis = np.linalg.norm(to_shape_points[i] - mean_to) sigma_to += temp_dis * temp_dis - cov += (to_shape_points[i].transpose() - - mean_to.transpose()) * (from_shape_points[i] - mean_from) + cov += (to_shape_points[i].transpose() - mean_to.transpose()) * ( + from_shape_points[i] - mean_from + ) sigma_from = sigma_from / to_shape_points.shape[0] sigma_to = sigma_to / to_shape_points.shape[0] @@ -625,19 +687,35 @@ def extract_image_chips(self, img, points, desired_size=256, padding=0): else: padding = 0 # average positions of face points - mean_face_shape_x = [0.224152, 0.75610125, - 0.490127, 0.254149, 0.726104] - mean_face_shape_y = [0.2119465, 0.2119465, - 0.628106, 0.780233, 0.780233] + mean_face_shape_x = [ + 0.224152, + 0.75610125, + 0.490127, + 0.254149, + 0.726104, + ] + mean_face_shape_y = [ + 0.2119465, + 0.2119465, + 0.628106, + 0.780233, + 0.780233, + ] from_points = [] to_points = [] for i in range(len(shape) / 2): - x = (padding + mean_face_shape_x[i]) / \ - (2 * padding + 1) * desired_size - y = (padding + mean_face_shape_y[i]) / \ - (2 * padding + 1) * desired_size + x = ( + (padding + mean_face_shape_x[i]) + / (2 * padding + 1) + * desired_size + ) + y = ( + (padding + mean_face_shape_y[i]) + / (2 * padding + 1) + * desired_size + ) to_points.append([x, y]) from_points.append([shape[2 * i], shape[2 * i + 1]]) @@ -652,11 +730,14 @@ def extract_image_chips(self, img, points, desired_size=256, padding=0): probe_vec = tran_m * probe_vec scale = np.linalg.norm(probe_vec) - angle = 180.0 / math.pi * \ - math.atan2(probe_vec[1, 0], probe_vec[0, 0]) - - from_center = [(shape[0] + shape[2]) / 2.0, - (shape[1] + shape[3]) / 2.0] + angle = ( + 180.0 / math.pi * math.atan2(probe_vec[1, 0], probe_vec[0, 0]) + ) + + from_center = [ + (shape[0] + shape[2]) / 2.0, + (shape[1] + shape[3]) / 2.0, + ] to_center = [0, 0] to_center[1] = desired_size * 0.4 to_center[0] = desired_size * 0.5 @@ -665,7 +746,8 @@ def extract_image_chips(self, img, points, desired_size=256, padding=0): ey = to_center[1] - from_center[1] rot_mat = cv2.getRotationMatrix2D( - (from_center[0], from_center[1]), -1 * angle, scale) + (from_center[0], from_center[1]), -1 * angle, scale + ) rot_mat[0][2] += ex rot_mat[1][2] += ey diff --git a/src/face_models/utils.py b/src/face_models/utils.py new file mode 100644 index 000000000..9d95b744c --- /dev/null +++ b/src/face_models/utils.py @@ -0,0 +1,111 @@ +from typing import Tuple + +import cv2 +import numpy as np +from skimage import transform + + +def get_center_face( + bbox: np.ndarray, points: np.ndarray, img_center: np.ndarray +): + """Using face bounding boxes, facial landmark points and + image center point, find the centermost detected face + and return its corresponding bounding box and facial + landmarks. + + """ + dists = [] + for i in range(bbox.shape[0]): + face_rect = np.array( + [[bbox[i, 0], bbox[i, 1]], [bbox[i, 2], bbox[i, 3]]] + ) + dists.append(_rect_point_dist(face_rect, img_center)) + + idx = dists.index(min(dists)) + + bbox = bbox[idx, :] + points = points[idx, :] + + return bbox, points + + +def _rect_point_dist(bbox: np.ndarray, point: np.ndarray) -> float: + """Get distance between a bounding box and a point. + + Parameters + ---------- + bbox: np.ndarray + Bounding box array containing top-left and bottom-right + coordinates [(x1, y1), (x2, x2)] + point: np.ndarray + Point coordinates [(x1, y1)] + + Returns + ------- + float: + Distance between bbox and point + + """ + bbox_center = (bbox[0] + bbox[1]) / 2 + + bbox_height = bbox[1, 0] - bbox[0, 0] + bbox_width = bbox[1, 1] - bbox[0, 1] + + dx = max(np.abs(point[1] - bbox_center[1]) - bbox_width / 2, 0) + dy = max(np.abs(point[0] - bbox_center[0]) - bbox_height / 2, 0) + return dx * dx + dy * dy + + +def align( + face_img: np.ndarray, + bbox: np.ndarray = None, + points: np.ndarray = None, + image_size: Tuple[int, int] = (112, 112), + margin: int = 44, +): + assert len(image_size) == 2 + assert image_size[0] == 112 + assert image_size[0] == 112 or image_size[1] == 96 + + if points is not None: + src = np.array( + [ + [30.2946, 51.6963], + [65.5318, 51.5014], + [48.0252, 71.7366], + [33.5493, 92.3655], + [62.7299, 92.2041], + ], + dtype=np.float32, + ) + + if image_size[1] == 112: + src[:, 0] += 8.0 + + dst = points.astype(np.float32) + + tform = transform.SimilarityTransform() + tform.estimate(dst, src) + + warped = cv2.warpAffine( + face_img, + tform.params[0:2, :], + (image_size[1], image_size[0]), + borderValue=0.0, + ) + + return warped + + else: + bb = np.zeros(4, dtype=np.int32) + + bb[0] = np.maximum(bbox[0] - margin / 2, 0) + bb[1] = np.maximum(bbox[1] - margin / 2, 0) + bb[2] = np.minimum(bbox[2] + margin / 2, face_img.shape[1]) + bb[3] = np.minimum(bbox[3] + margin / 2, face_img.shape[0]) + + ret = face_img[bb[1] : bb[3], bb[0] : bb[2], :] + + if len(image_size) > 0: + ret = cv2.resize(ret, (image_size[1], image_size[0])) + return ret diff --git a/src/tps2020.py b/src/tps2020.py index 035bb79dc..1899acf74 100644 --- a/src/tps2020.py +++ b/src/tps2020.py @@ -1,5 +1,4 @@ import datetime -from inspect import stack import os from argparse import ArgumentParser @@ -132,82 +131,66 @@ def get_bcs(features: np.ndarray, rs_features: np.ndarray) -> np.ndarray: return bcs -if __name__ == "__main__": - parser = ArgumentParser() - parser.add_argument( - "-d", - "--dataset", - required=True, - choices=["lfw", "gtdb"], - help="dataset to use in experiment", - ) - parser.add_argument( - "-m", - "--mode", - required=True, - choices=["under", "bc"], - help="feature mode to use in experiment", - ) - parser.add_argument( - "-r", - "--role_dist", - required=False, - choices=["bal", "unbal"], - default="unbal", - help="role distribution to use in experiment", - ) - parser.add_argument( - "-gpu", - "--gpu", - required=False, - type=int, - default=-1, - help="gpu to use in feature extraction", - ) - args = vars(parser.parse_args()) - - if args["mode"] == "under": - fi = open("results/tps2020_{}_under.txt".format(args["dataset"]), "w") +def experiment( + dataset: str, mode: str, role_dist: str, flipped: bool, gpu: int +): + if mode == "under": + fi = open(f"results/tps2020_{dataset}_under.txt", "w") else: - fi = open( - "results/tps2020_{}_bc_{}.txt".format( - args["dataset"], args["role_dist"] - ), - "w", - ) + fi = open(f"results/tps2020_{dataset}_bc_{role_dist}.txt", "w") print(f"Computing Features: {datetime.datetime.now()}") # If already extracted features, use the precomputed features - if args["dataset"] == "lfw" and os.path.exists( - "data/lfw_arcface_feat.npz" + if not flipped and os.path.exists( + f"data/{dataset}_arcface_mtcnn_feat.npz" ): - features = np.load("data/lfw_arcface_feat.npz")["arr_0"] - elif args["dataset"] == "gtdb" and os.path.exists( - "data/gtdb_arcface_feat.npz" + features = np.load(f"data/{dataset}_arcface_mtcnn_feat.npz")["arr_0"] + + elif ( + flipped + and os.path.exists(f"data/{dataset}_arcface_mtcnn_feat.npz") + and os.path.exists(f"data/{dataset}_arcface_mtcnn_flip_feat.npz") ): - features = np.load("data/gtdb_arcface_feat.npz")["arr_0"] + features = np.load(f"data/{dataset}_arcface_mtcnn_feat.npz")["arr_0"] + flipped_features = np.load( + f"data/{dataset}_arcface_mtcnn_flip_feat.npz" + )["arr_0"] + else: - features = extract_dataset(args["dataset"], "arcface", args["gpu"]) + extract_dataset(dataset, "arcface", "mtcnn", flipped, gpu) + + features = np.load(f"data/{dataset}_arcface_mtcnn_feat.npz")["arr_0"] + + if flipped: + flipped_features = np.load( + f"data/{dataset}_arcface_mtcnn_flip_feat.npz" + )["arr_0"] print(f"Done Computing Features {datetime.datetime.now()}") # Remove all subjects with less than 5 images from LFW dataset - if args["dataset"] == "lfw": + if dataset == "lfw": features = filter_lfw(features) - # Comput BioCapsules for features using Reference Subjects - if args["mode"] == "bc": + if flipped: + flipped_features = filter_lfw(flipped_features) + + # Compute BioCapsules for features using Reference Subjects + if mode == "bc": print(f"Computing BCs: {datetime.datetime.now()}") rs_features = get_rs_features() - rs_map = rs_rbac(len(np.unique(features[:, -1])), args["role_dist"]) + rs_map = rs_rbac(len(np.unique(features[:, -1])), role_dist) cnts = np.unique(rs_map, return_counts=True)[1] for i, cnt in enumerate(cnts): fi.write(f"Role {i + 1} -- {cnt} Subjects\n") bcs = get_bcs(features, rs_features) + if flipped: + flipped_bcs = get_bcs(flipped_features, rs_features) + print(f"Done Computing BCs: {datetime.datetime.now()}") # TP, FP, FN, FP @@ -220,7 +203,7 @@ def get_bcs(features: np.ndarray, rs_features: np.ndarray) -> np.ndarray: ): print(f"Fold {k} : {datetime.datetime.now()}") - if args["mode"] == "under": + if mode == "under": X_train, y_train = ( features[:, :-1][train_index], features[:, -1][train_index], @@ -230,6 +213,15 @@ def get_bcs(features: np.ndarray, rs_features: np.ndarray) -> np.ndarray: features[:, -1][test_index], ) + if flipped: + X_train_flip, y_train_flip = ( + flipped_features[:, :-1][train_index], + flipped_features[:, -1][train_index], + ) + + X_train = np.vstack([X_train, X_train_flip]) + y_train = np.hstack([y_train, y_train_flip]) + clf = LogisticRegression( class_weight="balanced", random_state=42 ).fit(X_train, y_train) @@ -237,7 +229,7 @@ def get_bcs(features: np.ndarray, rs_features: np.ndarray) -> np.ndarray: y_pred = clf.predict(X_test) # Aggregate prediction results with respect to each subject - for subject_id in y_test: + for subject_id in np.unique(y_test): y_test_subject = (y_test == subject_id).astype(int) y_pred_subject = (y_pred == subject_id).astype(int) conf_mat += confusion_matrix( @@ -246,8 +238,7 @@ def get_bcs(features: np.ndarray, rs_features: np.ndarray) -> np.ndarray: else: train_mask = np.zeros( - (train_index.shape[0] + test_index.shape[0]), - dtype=bool, + (train_index.shape[0] + test_index.shape[0]), dtype=bool, ) train_mask[train_index] = True train_mask = np.concatenate( @@ -255,8 +246,7 @@ def get_bcs(features: np.ndarray, rs_features: np.ndarray) -> np.ndarray: ) test_mask = np.zeros( - (train_index.shape[0] + test_index.shape[0]), - dtype=bool, + (train_index.shape[0] + test_index.shape[0]), dtype=bool, ) test_mask[test_index] = True test_mask = np.concatenate( @@ -274,32 +264,50 @@ def get_bcs(features: np.ndarray, rs_features: np.ndarray) -> np.ndarray: bcs[:, -1][test_mask], ) - for subject_id in np.unique(bcs[:, -2]): + if flipped: + X_train_flip, y_train_subject_flip, y_train_rs_flip = ( + flipped_bcs[:, :-2][train_mask], + flipped_bcs[:, -2][train_mask], + flipped_bcs[:, -1][train_mask], + ) + + X_train = np.vstack([X_train, X_train_flip]) + y_train_subject = np.hstack( + [y_train_subject, y_train_subject_flip] + ) + y_train_rs = np.hstack([y_train_rs, y_train_rs_flip]) + + for subject_id in np.unique(y_test_subject): rs_id = float(rs_map[int(subject_id) - 1] + 1) - y_train = np.zeros(X_train.shape[0]) - y_train[ + y_train_binary = np.zeros(X_train.shape[0]) + y_train_binary[ np.logical_and( - y_train_subject == subject_id, - y_train_rs == rs_id, + y_train_subject == subject_id, y_train_rs == rs_id, ) ] = 1 - y_test = np.zeros(X_test.shape[0]) - y_test[ + y_test_binary = np.zeros(X_test.shape[0]) + y_test_binary[ np.logical_and( - y_test_subject == subject_id, - y_test_rs == rs_id, + y_test_subject == subject_id, y_test_rs == rs_id, ) ] = 1 + # We assume the RS is automatically specified when + # a username is given during an authentication attempt + X_train_bianry = X_train[y_train_rs == rs_id] + y_train_binary = y_train_binary[y_train_rs == rs_id] + X_test_bianry = X_test[y_test_rs == rs_id] + y_test_binary = y_test_binary[y_test_rs == rs_id] + clf = LogisticRegression( class_weight="balanced", random_state=42 - ).fit(X_train, y_train) + ).fit(X_train_bianry, y_train_binary) - y_pred = clf.predict(X_test) + y_pred = clf.predict(X_test_bianry) - conf_mat += confusion_matrix(y_test, y_pred).ravel() + conf_mat += confusion_matrix(y_test_binary, y_pred).ravel() print(f"Finished KFold Experiment: {datetime.datetime.now()}") @@ -310,14 +318,65 @@ def get_bcs(features: np.ndarray, rs_features: np.ndarray) -> np.ndarray: # fn / (fn + tp) frr = conf_mat[2] / (conf_mat[2] + conf_mat[3]) - fi.write("Dataset -- {}\n".format(args["dataset"])) - fi.write("BC -- {}\n".format(args["mode"])) - fi.write("RS -- {}\n".format(args["role_dist"])) - fi.write("TN -- {}\n".format(conf_mat[0])) - fi.write("TP -- {}\n".format(conf_mat[3])) - fi.write("FP -- {}\n".format(conf_mat[1])) - fi.write("FN -- {}\n".format(conf_mat[2])) - fi.write("ACC -- {:.6f}\n".format(acc)) - fi.write("FAR -- {:.6f}\n".format(far)) - fi.write("FRR -- {:.6f}\n".format(frr)) + fi.write(f"Dataset -- {dataset}\n") + fi.write(f"BC -- {mode}\n") + fi.write(f"RS -- {role_dist}\n") + fi.write(f"TN -- {conf_mat[0]}\n") + fi.write(f"TP -- {conf_mat[3]}\n") + fi.write(f"FP -- {conf_mat[1]}\n") + fi.write(f"FN -- {conf_mat[2]}\n") + fi.write(f"ACC -- {acc:.6f}\n") + fi.write(f"FAR -- {far:.6f}\n") + fi.write(f"FRR -- {frr:.6f}\n") fi.close() + + +if __name__ == "__main__": + parser = ArgumentParser() + parser.add_argument( + "-d", + "--dataset", + required=True, + choices=["lfw", "gtdb"], + help="dataset to use in experiment", + ) + parser.add_argument( + "-m", + "--mode", + required=True, + choices=["under", "bc"], + help="feature mode to use in experiment", + ) + parser.add_argument( + "-r", + "--role_dist", + required=False, + choices=["bal", "unbal"], + default="unbal", + help="role distribution to use in experiment", + ) + parser.add_argument( + "-f", + "--flipped", + required=False, + action="store_true", + default=False, + help="use flipped features in experiment", + ) + parser.add_argument( + "-gpu", + "--gpu", + required=False, + type=int, + default=-1, + help="gpu to use in feature extraction", + ) + args = vars(parser.parse_args()) + + experiment( + args["dataset"], + args["mode"], + args["role_dist"], + args["flipped"], + args["gpu"], + ) diff --git a/src/utils.py b/src/utils.py index dbd5d556c..e51d791ce 100644 --- a/src/utils.py +++ b/src/utils.py @@ -7,8 +7,13 @@ def progress_bar(text, percent, barLen=20): - print(text + " -- [{:<{}}] {:.0f}%".format("=" * - int(barLen * percent), barLen, percent * 100), end="\r") + print( + text + + " -- [{:<{}}] {:.0f}%".format( + "=" * int(barLen * percent), barLen, percent * 100 + ), + end="\r", + ) if percent == 1: print("\n") @@ -30,63 +35,69 @@ def walk(path): def bc_lfw(mode, rs_cnt): bc_gen = BioCapsuleGenerator() lfw = get_lfw(mode) - X_rs = np.load(os.path.join(os.path.abspath(""), "data", - "rs_{}_feat.npz".format(mode)))["arr_0"] + X_rs = np.load(f"data/rs_{mode}_feat.npz")["arr_0"] + for fold in range(10): for i in range(rs_cnt): - print("BC+LFW Train -- Fold {} -- RS Count {}/{}".format(fold, i + 1, rs_cnt)) - lfw["train_{}".format(fold)][:, :-1] = bc_gen.biocapsule_batch( - lfw["train_{}".format(fold)][:, :-1], X_rs[i, :-1]) + print(f"BC+LFW Train -- Fold {fold} -- RS Count {i + 1}/{rs_cnt}") + lfw[f"train_{fold}"][:, :-1] = bc_gen.biocapsule_batch( + lfw[f"train_{fold}"][:, :-1], X_rs[i, :-1] + ) for i in range(rs_cnt): - print("BC+LFW Test -- Fold {} -- RS Count {}/{}".format(fold, i + 1, rs_cnt)) - lfw["test_{}".format(fold)][:, 0, :-1] = bc_gen.biocapsule_batch( - lfw["test_{}".format(fold)][:, 0, :-1], X_rs[i, :-1]) - lfw["test_{}".format(fold)][:, 1, :-1] = bc_gen.biocapsule_batch( - lfw["test_{}".format(fold)][:, 1, :-1], X_rs[i, :-1]) + print(f"BC+LFW Test -- Fold {fold} -- RS Count {i + 1}/{rs_cnt}") + lfw[f"test_{fold}"][:, 0, :-1] = bc_gen.biocapsule_batch( + lfw[f"test_{fold}"][:, 0, :-1], X_rs[i, :-1] + ) + lfw[f"test_{fold}"][:, 1, :-1] = bc_gen.biocapsule_batch( + lfw[f"test_{fold}"][:, 1, :-1], X_rs[i, :-1] + ) + return lfw def get_lfw(mode): people = [] - with open(os.path.join(os.path.abspath(""), "images", "people.txt"), "r") as people_file: + with open("images/people.txt", "r") as people_file: people_list = list(csv.reader(people_file, delimiter="\t")) - assert(len(people_list[2:603]) == 601) + assert len(people_list[2:603]) == 601 people.append(people_list[2:603]) - assert(len(people_list[604:1159]) == 555) + assert len(people_list[604:1159]) == 555 people.append(people_list[604:1159]) - assert(len(people_list[1160:1712]) == 552) + assert len(people_list[1160:1712]) == 552 people.append(people_list[1160:1712]) - assert(len(people_list[1713:2273]) == 560) + assert len(people_list[1713:2273]) == 560 people.append(people_list[1713:2273]) - assert(len(people_list[2274:2841]) == 567) + assert len(people_list[2274:2841]) == 567 people.append(people_list[2274:2841]) - assert(len(people_list[2842:3369]) == 527) + assert len(people_list[2842:3369]) == 527 people.append(people_list[2842:3369]) - assert(len(people_list[3370:3967]) == 597) + assert len(people_list[3370:3967]) == 597 people.append(people_list[3370:3967]) - assert(len(people_list[3968:4569]) == 601) + assert len(people_list[3968:4569]) == 601 people.append(people_list[3968:4569]) - assert(len(people_list[4570:5150]) == 580) + assert len(people_list[4570:5150]) == 580 people.append(people_list[4570:5150]) - assert(len(people_list[5151:]) == 609) + assert len(people_list[5151:]) == 609 people.append(people_list[5151:]) pairs = [] - with open(os.path.join(os.path.abspath(""), "images", "pairs.txt"), "r") as pairs_file: + with open("images/pairs.txt", "r") as pairs_file: pairs_list = list(csv.reader(pairs_file, delimiter="\t")) for i in range(10): idx = i * 600 + 1 - pairs.append(pairs_list[idx: idx + 600]) - assert (len(pairs[i]) == 600) - - features = np.load(os.path.join(os.path.abspath( - ""), "data", "lfw_{}_feat.npz".format(mode)))["arr_0"] - - subjects = os.listdir( - os.path.join(os.path.abspath(""), "images", "lfw")) - subjects = [x for _, x in sorted( - zip([subject.lower() for subject in subjects], subjects))] + pairs.append(pairs_list[idx : idx + 600]) + assert len(pairs[i]) == 600 + + features = np.load(f"data/lfw_{mode}_feat.npz")["arr_0"] + + subjects = os.listdir(os.path.join(os.path.abspath(""), "images", "lfw")) + subjects = [ + x + for _, x in sorted( + zip([subject.lower() for subject in subjects], subjects) + ) + ] subject = {} for s_id, s in enumerate(subjects): subject[s] = s_id + 1 @@ -97,39 +108,37 @@ def get_lfw(mode): train_cnt = np.sum([int(s[-1]) for s in train]) test = pairs[i] - lfw["train_{}".format(i)] = np.zeros((train_cnt, 513)) - lfw["test_{}".format(i)] = np.zeros((600, 2, 513)) + lfw[f"train_{i}"] = np.zeros((train_cnt, 513)) + lfw[f"test_{i}"] = np.zeros((600, 2, 513)) train_idx = 0 for s in train: s_id = subject[s[0]] s_features = features[features[:, -1] == s_id] - assert (s_features.shape[0] == int(s[1])) + assert s_features.shape[0] == int(s[1]) for j in range(s_features.shape[0]): - lfw["train_{}".format(i)][train_idx] = s_features[j] + lfw[f"train_{i}"][train_idx] = s_features[j] train_idx += 1 - assert (train_idx == train_cnt) + assert train_idx == train_cnt for test_idx, s in enumerate(test): if len(s) == 3: s_id = subject[s[0]] s_features = features[features[:, -1] == s_id] - lfw["test_{}".format(i)][test_idx, - 0] = s_features[int(s[1]) - 1] - lfw["test_{}".format(i)][test_idx, - 1] = s_features[int(s[2]) - 1] + lfw[f"test_{i}"][test_idx, 0] = s_features[int(s[1]) - 1] + lfw[f"test_{i}"][test_idx, 1] = s_features[int(s[2]) - 1] + else: s_id_1 = subject[s[0]] s_features = features[features[:, -1] == s_id_1] - lfw["test_{}".format(i)][test_idx, - 0] = s_features[int(s[1]) - 1] + lfw[f"test_{i}"][test_idx, 0] = s_features[int(s[1]) - 1] + s_id_2 = subject[s[2]] s_features = features[features[:, -1] == s_id_2] - lfw["test_{}".format(i)][test_idx, - 1] = s_features[int(s[3]) - 1] + lfw[f"test_{i}"][test_idx, 1] = s_features[int(s[3]) - 1] - assert (test_idx == 599) + assert test_idx == 599 return lfw