diff --git a/Project.toml b/Project.toml index d15f9dc..b066b40 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "DPMMSubClusters" uuid = "2841fd70-8698-11e9-176d-6dfa142d2ee7" authors = ["Or Dinari "] -version = "0.1.5" +version = "0.1.6" [deps] CatViews = "81a5f4ea-a946-549a-aa7e-2a7f63a27d31" @@ -19,12 +19,12 @@ StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91" [compat] CatViews = "1" Clustering = "0.13.3" -DistributedArrays = "0.6" +DistributedArrays = "0, 1" Distributions = "0.21.3" -JLD2 = "0.1.3" -NPZ = "0.4.0" -SpecialFunctions = "0.7" -StatsBase = "0.32" +JLD2 = "0, 1" +NPZ = "0, 1" +SpecialFunctions = "0, 1" +StatsBase = "0,1" julia = "1" [extras] diff --git a/src/dp-parallel-sampling.jl b/src/dp-parallel-sampling.jl index 42defd6..4838eeb 100644 --- a/src/dp-parallel-sampling.jl +++ b/src/dp-parallel-sampling.jl @@ -18,7 +18,8 @@ function init_model() end total_dim = size(data,2) model_hyperparams = model_hyper_params(hyper_params,α,total_dim) - labels = distribute(rand(1:initial_clusters,(size(data,2)))) + + labels = distribute(rand(1:initial_clusters,(size(data,2))) .+ ((outlier_mod > 0) ? 1 : 0)) labels_subcluster = distribute(rand(1:2,(size(data,2)))) group = local_group(model_hyperparams,data,labels,labels_subcluster,local_cluster[],Float32[]) return dp_parallel_sampling(model_hyperparams,group) @@ -45,7 +46,7 @@ function init_model_from_data(all_data) total_dim = size(data,2) model_hyperparams = model_hyper_params(hyper_params,α,total_dim) - labels = distribute(rand(1:initial_clusters,(size(data,2)))) + labels = distribute(rand(1:initial_clusters,(size(data,2))) .+ ((outlier_mod > 0) ? 1 : 0)) labels_subcluster = distribute(rand(1:2,(size(data,2)))) group = local_group(model_hyperparams,data,labels,labels_subcluster,local_cluster[],Float32[]) return dp_parallel_sampling(model_hyperparams,group) @@ -59,6 +60,9 @@ Initialize the first clusters in the model, according to the number defined by i Mutates the model. """ function init_first_clusters!(dp_model::dp_parallel_sampling, initial_cluster_count::Int64) + if outlier_mod > 0 + push!(dp_model.group.local_clusters, create_outlier_local_cluster(dp_model.group,outlier_hyper_params)) + end for i=1:initial_cluster_count push!(dp_model.group.local_clusters, create_first_local_cluster(dp_model.group)) end @@ -70,15 +74,18 @@ end """ dp_parallel(all_data::AbstractArray{Float32,2}, - local_hyper_params::distribution_hyper_params, - α_param::Float32, - iters::Int64 = 100, - init_clusters::Int64 = 1, - seed = nothing, - verbose = true, - save_model = false, - burnout = 15, - gt = nothing) + local_hyper_params::distribution_hyper_params, + α_param::Float32, + iters::Int64 = 100, + init_clusters::Int64 = 1, + seed = nothing, + verbose = true, + save_model = false, + burnout = 15, + gt = nothing, + max_clusters = Inf, + outlier_weight = 0, + outlier_params = nothing) Run the model. # Args and Kwargs @@ -92,6 +99,9 @@ Run the model. - `save_model` will save a checkpoint every 25 iterations. - `burnout` how long to wait after creating a cluster, and allowing it to split/merge - `gt` Ground truth, when supplied, will perform NMI and VI analysis on every iteration. + - `max_clusters` limit the number of cluster + - `outlier_weight` constant weight of an extra non-spliting component + - `outlier_params` hyperparams for an extra non-spliting component # Return values dp_model, iter_count , nmi_score_history, liklihood_history, cluster_count_history @@ -111,7 +121,9 @@ function dp_parallel(all_data::AbstractArray{Float32,2}, save_model = false, burnout = 15, gt = nothing, - max_clusters = Inf) + max_clusters = Inf, + outlier_weight = 0, + outlier_params = nothing) global iterations = iters global random_seed = seed global hyper_params = local_hyper_params @@ -121,6 +133,8 @@ function dp_parallel(all_data::AbstractArray{Float32,2}, global should_save_model = save_model global burnout_period = burnout global max_num_of_clusters = max_clusters + global outlier_mod = outlier_weight + global outlier_hyper_params = outlier_params dp_model = init_model_from_data(all_data) global leader_dict = get_node_leaders_dict() init_first_clusters!(dp_model, initial_clusters) @@ -135,7 +149,7 @@ end """ fit(all_data::AbstractArray{Float32,2},local_hyper_params::distribution_hyper_params,α_param::Float32; - iters::Int64 = 100, init_clusters::Int64 = 1,seed = nothing, verbose = true, save_model = false, burnout = 20, gt = nothing) + iters::Int64 = 100, init_clusters::Int64 = 1,seed = nothing, verbose = true, save_model = false, burnout = 20, gt = nothing, max_clusters = Inf, outlier_weight = 0, outlier_params = nothing) Run the model (basic mode). # Args and Kwargs @@ -149,6 +163,9 @@ Run the model (basic mode). - `save_model` will save a checkpoint every 25 iterations. - `burnout` how long to wait after creating a cluster, and allowing it to split/merge - `gt` Ground truth, when supplied, will perform NMI and VI analysis on every iteration. + - `max_clusters` limit the number of cluster + - `outlier_weight` constant weight of an extra non-spliting component + - `outlier_params` hyperparams for an extra non-spliting component # Return Values - `labels` Labels assignments @@ -158,6 +175,7 @@ Run the model (basic mode). - `nmi_score_history` NMI score per iteration (if gt suppled) - `likelihood_history` Log likelihood per iteration. - `cluster_count_history` Cluster counts per iteration. + - `sub_labels` Sub labels assignments # Example: ```julia @@ -185,14 +203,15 @@ julia> unique(ret_values[1]) ``` """ function fit(all_data::AbstractArray{Float32,2},local_hyper_params::distribution_hyper_params,α_param::Float32; - iters::Int64 = 100, init_clusters::Int64 = 1,seed = nothing, verbose = true, save_model = false, burnout = 20, gt = nothing, max_clusters = Inf) - dp_model,iter_count , nmi_score_history, liklihood_history, cluster_count_history = dp_parallel(all_data, local_hyper_params,α_param,iters,init_clusters,seed,verbose,save_model,burnout,gt, max_clusters) - return Array(dp_model.group.labels), [x.cluster_params.cluster_params.distribution for x in dp_model.group.local_clusters], dp_model.group.weights,iter_count , nmi_score_history, liklihood_history, cluster_count_history + iters::Int64 = 100, init_clusters::Int64 = 1,seed = nothing, verbose = true, save_model = false, burnout = 20, gt = nothing, max_clusters = Inf, outlier_weight = 0, outlier_params = nothing) + dp_model,iter_count , nmi_score_history, liklihood_history, cluster_count_history = dp_parallel(all_data, local_hyper_params,α_param,iters,init_clusters,seed,verbose, save_model,burnout,gt, max_clusters, outlier_weight, outlier_params) + return Array(dp_model.group.labels), [x.cluster_params.cluster_params.distribution for x in dp_model.group.local_clusters], dp_model.group.weights,iter_count , nmi_score_history, liklihood_history, cluster_count_history,Array(dp_model.group.labels_subcluster) end """ fit(all_data::AbstractArray{Float32,2},α_param::Float32; - iters::Int64 = 100, init_clusters::Int64 = 1,seed = nothing, verbose = true, save_model = false, burnout = 20, gt = nothing) + iters::Int64 = 100, init_clusters::Int64 = 1,seed = nothing, verbose = true, save_model = false,burnout = 20, gt = nothing, max_clusters = Inf, outlier_weight = 0, outlier_params = nothing) + Run the model (basic mode) with default `NIW` prior. # Args and Kwargs @@ -205,6 +224,8 @@ Run the model (basic mode) with default `NIW` prior. - `save_model` will save a checkpoint every 25 iterations. - `burnout` how long to wait after creating a cluster, and allowing it to split/merge - `gt` Ground truth, when supplied, will perform NMI and VI analysis on every iteration. + - `outlier_weight` constant weight of an extra non-spliting component + - `outlier_params` hyperparams for an extra non-spliting component # Return Values - `labels` Labels assignments @@ -214,6 +235,7 @@ Run the model (basic mode) with default `NIW` prior. - `nmi_score_history` NMI score per iteration (if gt suppled) - `likelihood_history` Log likelihood per iteration. - `cluster_count_history` Cluster counts per iteration. + - `sub_labels` Sub labels assignments # Example: ```julia @@ -235,29 +257,29 @@ julia> unique(ret_values[1]) ``` """ function fit(all_data::AbstractArray{Float32,2},α_param::Float32; - iters::Int64 = 100, init_clusters::Int64 = 1,seed = nothing, verbose = true, save_model = false,burnout = 20, gt = nothing, max_clusters = Inf) + iters::Int64 = 100, init_clusters::Int64 = 1,seed = nothing, verbose = true, save_model = false,burnout = 20, gt = nothing, max_clusters = Inf, outlier_weight = 0, outlier_params = nothing) data_dim = size(all_data,1) cov_mat = Matrix{Float32}(I, data_dim, data_dim) local_hyper_params = niw_hyperparams(1,zeros(Float32,data_dim),data_dim+3,cov_mat) - dp_model,iter_count , nmi_score_history, liklihood_history, cluster_count_history = dp_parallel(all_data, local_hyper_params,α_param,iters,init_clusters,seed,verbose,save_model,burnout,gt, max_clusters) - return Array(dp_model.group.labels), [x.cluster_params.cluster_params.distribution for x in dp_model.group.local_clusters], dp_model.group.weights,iter_count , nmi_score_history, liklihood_history, cluster_count_history + dp_model,iter_count , nmi_score_history, liklihood_history, cluster_count_history = dp_parallel(all_data, local_hyper_params,α_param,iters,init_clusters, seed,verbose,save_model,burnout,gt, max_clusters,outlier_weight, outlier_params) + return Array(dp_model.group.labels), [x.cluster_params.cluster_params.distribution for x in dp_model.group.local_clusters], dp_model.group.weights,iter_count , nmi_score_history, liklihood_history, cluster_count_history, Array(dp_model.group.labels_subcluster) end fit(all_data::AbstractArray, α_param; iters = 100, init_clusters = 1, seed = nothing, verbose = true, - save_model = false,burnout = 20, gt = nothing, max_clusters = Inf) = + save_model = false,burnout = 20, gt = nothing, max_clusters = Inf, outlier_weight = 0, outlier_params = nothing) = fit(Float32.(all_data),Float32(α_param),iters = Int64(iters), init_clusters=Int64(init_clusters), seed = seed, verbose = verbose, - save_model = save_model, burnout = burnout, gt = gt, max_clusters = max_clusters) + save_model = save_model, burnout = burnout, gt = gt, max_clusters = max_clusters, outlier_weight = outlier_weight, outlier_params = outlier_params) fit(all_data::AbstractArray,local_hyper_params::distribution_hyper_params,α_param; iters = 100, init_clusters::Number = 1, seed = nothing, verbose = true, - save_model = false,burnout = 20, gt = nothing, max_clusters = Inf) = + save_model = false,burnout = 20, gt = nothing, max_clusters = Inf, outlier_weight = 0, outlier_params = nothing) = fit(Float32.(all_data),local_hyper_params,Float32(α_param),iters = Int64(iters), init_clusters=Int64(init_clusters), seed = seed, verbose = verbose, - save_model = save_model, burnout = burnout, gt = gt, max_clusters = max_clusters) + save_model = save_model, burnout = burnout, gt = gt, max_clusters = max_clusters, outlier_weight = outlier_weight, outlier_params = outlier_params) @@ -316,6 +338,7 @@ function run_model(dp_model, first_iter, model_params="none", prev_time = 0) for i=first_iter:iterations # plot_group(dp_model.group) + final = false no_more_splits = false if i >= iterations - argmax_sample_stop #We assume the cluters k has been setteled by now, and a low probability random split can do dmg @@ -438,3 +461,58 @@ end function set_parr_worker(labels,cluster_count) global glob_parr = zeros(Float32,size(localpart(labels),1),cluster_count) end + +""" + cluster_statistics(points,labels, clusters) + +Provide avg statsitcs of probabiliy and likelihood for given points, labels and clusters + +# Args and Kwargs + - `points` a `DxN` array containing the data + - `labels` points labels + - `clusters` vector of clusters distributions + + +# Return values +avg_ll, avg_prob + - `avg_ll` each cluster avg point ll + - `avg_prob` each cluster avg point prob + + +# Example: +```julia +julia> dp = run_model_from_checkpoint("checkpoint__50.jld2") +Loading Model: + 1.073261 seconds (2.27 M allocations: 113.221 MiB, 2.60% gc time) +Including params +Loading data: + 0.000881 seconds (10.02 k allocations: 378.313 KiB) +Creating model: +Node Leaders: +Dict{Any,Any}(2=>Any[2, 3]) +Running model: +... +``` +""" +function cluster_statistics(points,labels, clusters) + parr = zeros(Float32,length(labels), length(clusters)) + tic = time() + for (k,cluster) in enumerate(clusters) + log_likelihood!(reshape((@view parr[:,k]),:,1), points,cluster) + end + log_likelihood_array = copy(parr) + log_likelihood_array[isnan.(log_likelihood_array)] .= -Inf #Numerical errors arent fun + max_log_prob_arr = maximum(log_likelihood_array, dims = 2) + log_likelihood_array .-= max_log_prob_arr + map!(exp,log_likelihood_array,log_likelihood_array) + # println("lsample log cat2" * string(log_likelihood_array)) + sum_prob_arr = sum(log_likelihood_array, dims =[2]) + log_likelihood_array ./= sum_prob_arr + avg_ll = zeros(length(clusters)) + avg_prob = zeros(length(clusters)) + for i=1:length(clusters) + avg_ll[i] = sum(parr[labels .== i,i]) / sum(labels .== i) + avg_prob[i] = sum(log_likelihood_array[labels .== i,i]) / sum(labels .== i) + end + return avg_ll, avg_prob +end diff --git a/src/global_params.jl b/src/global_params.jl index e67d917..ffba8c9 100644 --- a/src/global_params.jl +++ b/src/global_params.jl @@ -24,6 +24,13 @@ hyper_params = niw_hyperparams(1.0, Matrix{Float32}(I, 2, 2)*1.0) +outlier_mod = 0.05 #Concetration Parameter +outlier_hyper_params = niw_hyperparams(1.0, + zeros(Float32,2), + 5, + Matrix{Float32}(I, 2, 2)*1.0) + + #Saving specifics: enable_saving = true diff --git a/src/local_clusters_actions.jl b/src/local_clusters_actions.jl index d378558..648d1f2 100644 --- a/src/local_clusters_actions.jl +++ b/src/local_clusters_actions.jl @@ -20,6 +20,28 @@ function create_first_local_cluster(group::local_group) end +function create_outlier_local_cluster(group::local_group,outlier_params) + suff = create_sufficient_statistics(outlier_params,outlier_params, Array(group.points)) + post = calc_posterior(outlier_params,suff) + dist = sample_distribution(post) + cp = cluster_parameters(outlier_params, dist, suff, post) + cpl = deepcopy(cp) + cpr = deepcopy(cp) + splittable = splittable_cluster_params(cp,cpl,cpr,[0.5,0.5], false,ones(burnout_period+5)*-Inf) + cp.suff_statistics.N = size(group.points,2) + cpl.suff_statistics.N = sum(group.labels_subcluster .== 1) + cpl.suff_statistics.N = sum(group.labels_subcluster .== 2) + cluster = local_cluster(splittable, group.model_hyperparams.total_dim, + cp.suff_statistics.N, + cpl.suff_statistics.N, + cpl.suff_statistics.N) + # @sync for i in (nworkers()== 0 ? procs() : workers()) + # @spawnat i split_first_cluster_worker!(group) + # end + return cluster +end + + function sample_sub_clusters!(group::local_group) for i in (nworkers()== 0 ? procs() : workers()) @spawnat i sample_sub_clusters_worker!(group.points, group.labels, group.labels_subcluster) @@ -62,15 +84,16 @@ function create_subclusters_labels!(labels::AbstractArray{Int64,1}, end -function sample_labels!(group::local_group, final::Bool) - sample_labels!(group.labels, group.points, final) +function sample_labels!(group::local_group, final::Bool, no_more_splits::Bool) + sample_labels!(group.labels, group.points, final, no_more_splits) end function sample_labels!(labels::AbstractArray{Int64,1}, points::AbstractArray{Float32,2}, - final::Bool) + final::Bool, + no_more_splits::Bool) for i in (nworkers()== 0 ? procs() : workers()) - @spawnat i sample_labels_worker!(labels,points,final) + @spawnat i sample_labels_worker!(labels,points,final, no_more_splits) end end @@ -90,7 +113,8 @@ end function sample_labels_worker!(labels::AbstractArray{Int64,1}, points::AbstractArray{Float32,2}, - final::Bool) + final::Bool, + no_more_splits::Bool) indices = localindices(points)[2] lbls = localpart(labels) pts = localpart(points) @@ -98,6 +122,10 @@ function sample_labels_worker!(labels::AbstractArray{Int64,1}, parr = zeros(Float32,length(indices), length(clusters_vector)) tic = time() for (k,cluster) in enumerate(clusters_vector) + # if no_more_splits == false && k==1 + # parr[:,k] .= -Inf + # continue + # end log_likelihood!(reshape((@view parr[:,k]),:,1), pts,cluster.cluster_dist) end # println("Time: "* string(time()-tic) * " size:" *string(size(pts))) @@ -232,6 +260,9 @@ function update_suff_stats_posterior!(group::local_group,indices = nothing, use_ end end for (index,v) in enumerate(indices) + # if outlier_mod > 0 && v == 1 + # continue + # end if length(suff_stats_vectors[index]) == 0 continue end @@ -339,6 +370,9 @@ end function check_and_split!(group::local_group, final::Bool) split_arr= zeros(Float32,length(group.local_clusters)) for (index,cluster) in enumerate(group.local_clusters) + if outlier_mod > 0 && index == 1 + continue + end if cluster.cluster_params.splittable == true && cluster.cluster_params.cluster_params.suff_statistics.N > 1 should_split_local!((@view split_arr[index,:]), cluster.cluster_params, group.model_hyperparams.α,final) @@ -372,6 +406,9 @@ function check_and_merge!(group::local_group, final::Bool) indices = Vector{Int64}() new_indices = Vector{Int64}() for i=1:length(group.local_clusters) + if outlier_mod > 0 && i == 1 + continue + end for j=i+1:length(group.local_clusters) if (group.local_clusters[i].cluster_params.splittable == true && group.local_clusters[j].cluster_params.splittable == true && @@ -404,12 +441,18 @@ function sample_clusters!(group::local_group, first::Bool) cluster_params_futures[i] = sample_cluster_params(cluster.cluster_params, group.model_hyperparams.α,first) end for (i,cluster) in enumerate(group.local_clusters) + if outlier_mod > 0 && i == 1 + continue + end cluster.cluster_params = fetch(cluster_params_futures[i]) cluster.points_count = cluster.cluster_params.cluster_params.suff_statistics.N push!(points_count, cluster.points_count) end push!(points_count, group.model_hyperparams.α) - group.weights = rand(Dirichlet(Float64.(points_count)))[1:end-1] + group.weights = rand(Dirichlet(Float64.(points_count)))[1:end-1] .* (1 - outlier_mod) + if outlier_mod > 0 + group.weights = vcat([outlier_mod],group.weights) + end end function create_thin_cluster_params(cluster::local_cluster) @@ -436,7 +479,7 @@ function remove_empty_clusters!(group::local_group) pts_count = Vector{Int64}() for (index,cluster) in enumerate(group.local_clusters) push!(pts_count, cluster.points_count) - if cluster.points_count > 0 + if cluster.points_count > 0 || (outlier_mod > 0 && index == 1) || (outlier_mod > 0 && index == 2 && length(group.local_clusters) == 2) push!(new_vec,cluster) end end @@ -527,7 +570,7 @@ end function group_step(group::local_group, no_more_splits::Bool, final::Bool,first::Bool) sample_clusters!(group,false) broadcast_cluster_params([create_thin_cluster_params(x) for x in group.local_clusters],group.weights) - sample_labels!(group, (hard_clustering ? true : final)) + sample_labels!(group, (hard_clustering ? true : final), no_more_splits) sample_sub_clusters!(group) update_suff_stats_posterior!(group) reset_bad_clusters!(group) diff --git a/src/utils.jl b/src/utils.jl index e8f154c..326a0c1 100644 --- a/src/utils.jl +++ b/src/utils.jl @@ -18,6 +18,7 @@ end # #Note that we expect the log_likelihood_array to be in rows (samples) x columns (clusters) , this is due to making it more efficent that way. function sample_log_cat_array!(labels::AbstractArray{Int64,1}, log_likelihood_array::AbstractArray{Float32,2}) # println("lsample log cat" * string(log_likelihood_array)) + log_likelihood_array[isnan.(log_likelihood_array)] .= -Inf #Numerical errors arent fun max_log_prob_arr = maximum(log_likelihood_array, dims = 2) log_likelihood_array .-= max_log_prob_arr map!(exp,log_likelihood_array,log_likelihood_array)