diff --git a/nimare/annotate/gclda.py b/nimare/annotate/gclda.py index 4668f8b6b..91fc0865a 100755 --- a/nimare/annotate/gclda.py +++ b/nimare/annotate/gclda.py @@ -1,24 +1,234 @@ """Topic modeling with generalized correspondence latent Dirichlet allocation.""" +import itertools import logging +import math import os.path as op import nibabel as nib import numpy as np import pandas as pd +from joblib import Parallel, delayed from nilearn._utils import load_niimg from scipy.stats import multivariate_normal from nimare.base import NiMAREBase -from nimare.utils import get_template +from nimare.utils import _check_ncores, get_template LGR = logging.getLogger(__name__) +def _update_word_topic_assignments( + word_idx, + word_doc_idx, + word_topic_idx, + seeds, + nz_word_topic, + nz_sum_topic, + ny_doc_topic, + voc_len, + beta, + gamma, +): + """Update wtoken_topic_idx (z) indicator variables assigning words->topics. + + Parameters + ---------- + word_idx : (WT) :obj:`list` + Word tokens. + word_doc_idx : (WT) :obj:`list` + Document index. + word_topic_idx : (WT) :obj:`numpy.ndarray` + Topic assignment for word tokens. + seeds : (WT) :obj:`numpy.ndarray` + Array of seeds for multi-core reproducibility of default_rng() instance. + nz_word_topic : (W x T) :obj:`numpy.ndarray` + Number of word-tokens assigned to each topic (T) per word-type (W). + nz_sum_topic : (1 x T) :obj:`numpy.ndarray` + Total number of word-tokens assigned to each topic (T) (across all docs). + ny_doc_topic : (D x T) :obj:`numpy.ndarray` + Number of peak-tokens assigned to each topic (T) per document (D). + voc_len : :obj:`int` + Total number of word-tokens in the vocabulary. + beta : :obj:`float` + Prior count on word-types for each topic. + gamma : :obj:`float` + Prior count added to y-counts when sampling z assignments. + + Returns + ------- + sampled_topics : :obj:`list` + Topics sampled (z). + """ + sampled_topics = [] + # Loop over all word tokens + for i_wtoken, word in enumerate(word_idx): + # --- Seed random number generator for reproducibility + rng = np.random.default_rng(seeds[i_wtoken]) + + # Find document in which word token (not just word) appears + doc = word_doc_idx[i_wtoken] + # current topic assignment for word token w_i + topic = word_topic_idx[i_wtoken] + + # Decrement count-matrices to remove current wtoken_topic_idx + # because wtoken will be reassigned to a new topic + nz_topic = nz_word_topic[word, :] + beta + nz_topic[topic] -= 1 + sum_topic_counts = nz_sum_topic + (beta * voc_len) + sum_topic_counts[0, topic] -= 1 + + # Get sampling distribution: + # p(z_i|z,d,w) ~ p(w|t) * p(t|d) + # ~ p_w_t * p_topic_g_doc + p_word_g_topic = nz_topic / sum_topic_counts + p_topic_g_doc = ny_doc_topic[doc, :] + gamma + probs = p_word_g_topic * p_topic_g_doc # The unnormalized sampling distribution + + # Sample a z_i assignment for the current word-token from the sampling distribution + probs = probs.reshape(-1) / np.sum(probs) # Normalize the sampling distribution + # Numpy returns a binary [1 x T] vector with a '1' in the index of sampled topic + # and zeros everywhere else + assigned_topic_vec = rng.multinomial(1, probs) + # Extract selected topic from vector + topic = np.where(assigned_topic_vec)[0][0] + + sampled_topics.append(topic) + + return sampled_topics + + +def _update_peak_assignments( + doc_idx, + regions, + topics, + peak_probs, + seeds, + ny_region_topic, + ny_doc_topic, + nz_doc_topic, + n_regions, + n_topics, + alpha, + delta, + gamma, +): + """Update y / r indicator variables assigning peaks->topics/subregions. + + Parameters + ---------- + doc_idx : (P) :obj:`list` + Document indices. + regions : (P) :obj:`numpy.ndarray` + Region indices. + topics : (P) :obj:`numpy.ndarray` + Topic indices. + peak_probs : (P x T x R) :obj:`numpy.ndarray` + Matrix of probabilities of all peaks given all topic/subregion spatial distributions. + seeds : (P) :obj:`numpy.ndarray` + Array of seeds for multi-core reproducibility of default_rng() instance. + ny_region_topic : (R x T) :obj:`numpy.ndarray` + Number of peak-tokens assigned to subregion (region) per topic (T). + ny_doc_topic : (D x T) :obj:`numpy.ndarray` + Number of peak-tokens assigned to document (D) per topic (T). + nz_doc_topic : (D x T) :obj:`numpy.ndarray` + Number of word-tokens assigned to each topic (T) per document (D). + n_regions : :obj:`int` + Total number of regions. + n_topics : :obj:`int` + Total number of topics. + alpha : :obj:`float` + Prior count on topics for each document. + delta : :obj:`float` + Prior count on subregions for each topic. + gamma : :obj:`float` + Prior count added to y-counts when sampling z assignments. + + Returns + ------- + sampled_regions : :obj:`list` + Subregions sampled (r). + sampled_topics : :obj:`list` + Topics sampled (y). + """ + sampled_regions = [] + sampled_topics = [] + for i_ptoken, doc in enumerate(doc_idx): + # --- Seed random number generator for reproducibility + rng = np.random.default_rng(seeds[i_ptoken]) + + topic = topics[i_ptoken] + region = regions[i_ptoken] + + # Retrieve the probability of generating current x from all + # subregions: [R x T] array of probs + p_x_subregions = (peak_probs[i_ptoken, :, :]).transpose() + + # Compute the probability of region given topic p(r|t) + # Counts of subregions per topic + prior: p(r|t) + # Normalize the columns such that each topic's distribution over subregions sums to 1 + p_region_g_topic = ny_region_topic + delta + p_region_g_topic[region, topic] -= 1 # Decrement count in Subregion x Topic count matrix + p_region_g_topic = p_region_g_topic / np.sum(p_region_g_topic, axis=0, keepdims=True) + + # Compute the probability of topic given doc p(t|d) + # Counts of topics per document + prior: p(t|d) + # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows + p_topic_g_doc = ny_doc_topic[doc, :] + alpha + p_topic_g_doc[topic] -= 1 # Decrement count in Document (doc) x Topic count matrix + p_topic_g_doc = np.array([p_topic_g_doc] * n_regions) + + # Compute the probability of region given doc: p(r|d) ~ p(r|t) * p(t|d) + p_region_g_doc = p_topic_g_doc * p_region_g_topic + + # Compute the multinomial probability: p(z|y) + # The multinomial from which z is sampled is proportional to number + # of y assigned to each topic, plus constant gamma + # Compute the proportional probabilities in log-space + # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows + doc_y_counts = ny_doc_topic[doc, :] + gamma + doc_y_counts[topic] -= 1 # Decrement count in Document (doc) x Topic count matrix + logp = nz_doc_topic[doc, :] * np.log((doc_y_counts + 1) / (doc_y_counts)) + p_peak_g_topic = np.exp(logp - np.max(logp)) + p_peak_g_topic = np.array([p_peak_g_topic] * n_regions) + + # Get the full sampling distribution: + # [R x T] array containing the proportional probability of all y/r combinations + # Convert from a [R x T] matrix into a [R*T x 1] array we can sample from + # Normalize the sampling distribution + probs_pdf = p_x_subregions * p_region_g_doc * p_peak_g_topic + probs_pdf = np.reshape(probs_pdf, (n_regions * n_topics)) + probs_pdf = probs_pdf / np.sum(probs_pdf) + + # Sample a single element (corresponding to a y_i and c_i assignment for the ptoken) + # from the sampling distribution + # Returns a binary [1 x R*T] vector with a '1' in location that was sampled + # and zeros everywhere else + assignment_vec = rng.multinomial(1, probs_pdf) + + # Reshape 1D back to [R x T] 2D + assignment_arr = np.reshape(assignment_vec, (n_regions, n_topics)) + + # Transform the linear index of the sampled element into the + # subregion/topic (r/y) assignment indices + assignment_idx = np.where(assignment_arr) + region = assignment_idx[0][0] # Subregion sampled (r) + topic = assignment_idx[1][0] # Topic sampled (y) + + sampled_regions.append(region) + sampled_topics.append(topic) + + return sampled_regions, sampled_topics + + class GCLDAModel(NiMAREBase): """Generate a generalized correspondence latent Dirichlet allocation (GCLDA) topic model. This model was originally described in :footcite:t:`rubin2017decoding`. + .. versionchanged:: 0.0.13 + + * New parameter: `n_cores`. Number of cores to use for parallelization. + .. versionchanged:: 0.0.8 * [ENH] Support symmetric GC-LDA topics with more than two subregions. @@ -63,6 +273,10 @@ class GCLDAModel(NiMAREBase): requires n_regions = 2. The default is False. seed_init : :obj:`int`, optional Initial value of random seed. The default is 1. + n_cores : :obj:`int`, optional + Number of cores to use for parallelization. + If <=0, defaults to using all available cores. + Default is 1. Attributes ---------- @@ -101,11 +315,14 @@ def __init__( dobs=25, roi_size=50.0, seed_init=1, + n_cores=1, ): LGR.info("Constructing/Initializing GCLDA Model") count_df = count_df.copy() coordinates_df = coordinates_df.copy() + self.n_cores = _check_ncores(n_cores) + # Check IDs from DataFrames count_df.index = count_df.index.astype(str) count_df["id"] = count_df.index @@ -405,14 +622,124 @@ def _update(self, loglikely_freq=1): is 1 (log-likelihood is updated every iteration). """ self.iter += 1 # Update total iteration count - LGR.debug(f"Iter {self.iter:04d}: Sampling z") self.seed += 1 - self._update_word_topic_assignments(self.seed) # Update z-assignments + np.random.seed(self.seed) # Seed random number generator + + old_wt_idx = self.topics["wtoken_topic_idx"] + + n_wtoken = len(self.data["wtoken_word_idx"]) + chunk_size = math.ceil(n_wtoken / self.n_cores) + seeds = np.random.randint(1e8, size=n_wtoken) # Array of seeds for multicore reproducib... + chunks = [ + { + "word_idx": self.data["wtoken_word_idx"][chunk_i : chunk_i + chunk_size], + "word_doc_idx": self.data["wtoken_doc_idx"][chunk_i : chunk_i + chunk_size], + "word_topic_idx": self.topics["wtoken_topic_idx"][chunk_i : chunk_i + chunk_size], + "seeds": seeds[chunk_i : chunk_i + chunk_size], + } + for chunk_i in range(0, n_wtoken, chunk_size) + ] + + results = Parallel(n_jobs=self.n_cores, max_nbytes=None)( + delayed(_update_word_topic_assignments)( + **chunk, + nz_word_topic=self.topics["n_word_tokens_word_by_topic"], + nz_sum_topic=self.topics["total_n_word_tokens_by_topic"], + ny_doc_topic=self.topics["n_peak_tokens_doc_by_topic"], + voc_len=len(self.vocabulary), + beta=self.params["beta"], + gamma=self.params["gamma"], + ) + for chunk in chunks + ) # Update z-assignments + sampled_wt_idx = list(itertools.chain(*results)) + + # Decrement count-matrices to remove current wtoken_topic_idx + # because wtoken will be reassigned to a new topic + np.subtract.at( + self.topics["n_word_tokens_word_by_topic"], + (self.data["wtoken_word_idx"], old_wt_idx), + 1, + ) + np.subtract.at(self.topics["total_n_word_tokens_by_topic"], (0, old_wt_idx), 1) + np.subtract.at( + self.topics["n_word_tokens_doc_by_topic"], + (self.data["wtoken_doc_idx"], old_wt_idx), + 1, + ) + # Update the indices and the count matrices using the sampled z assignment + np.add.at( + self.topics["n_word_tokens_word_by_topic"], + (self.data["wtoken_word_idx"], sampled_wt_idx), + 1, + ) + np.add.at(self.topics["total_n_word_tokens_by_topic"], (0, sampled_wt_idx), 1) + np.add.at( + self.topics["n_word_tokens_doc_by_topic"], + (self.data["wtoken_doc_idx"], sampled_wt_idx), + 1, + ) + # Update w_i topic-assignment + self.topics["wtoken_topic_idx"] = np.array(sampled_wt_idx) LGR.debug(f"Iter {self.iter:04d}: Sampling y|r") self.seed += 1 - self._update_peak_assignments(self.seed) # Update y-assignments + np.random.seed(self.seed) # Seed random number generator + + old_regions, old_topics = self.topics["peak_region_idx"], self.topics["peak_topic_idx"] + peak_probs = self._get_peak_probs(self) # Retrieve p(x|r,y) for all subregions + + # When the size of X is large and the task is fast, Parallel()(delayed(f)(x) for x in X) + # is very slow. Split data into smaller chuncks to speed up Parallel(). + n_ptoken = len(self.data["ptoken_doc_idx"]) + chunk_size = math.ceil(n_ptoken / self.n_cores) + seeds = np.random.randint(1e8, size=n_ptoken) # Array of seeds for multicore reproducib... + chunks = [ + { + "doc_idx": self.data["ptoken_doc_idx"][chunk_i : chunk_i + chunk_size], + "regions": old_regions[chunk_i : chunk_i + chunk_size], + "topics": old_topics[chunk_i : chunk_i + chunk_size], + "peak_probs": peak_probs[chunk_i : chunk_i + chunk_size, :, :], + "seeds": seeds[chunk_i : chunk_i + chunk_size], + } + for chunk_i in range(0, n_ptoken, chunk_size) + ] + + results = Parallel(n_jobs=self.n_cores, max_nbytes=None)( + delayed(_update_peak_assignments)( + **chunk, + ny_region_topic=self.topics["n_peak_tokens_region_by_topic"], + ny_doc_topic=self.topics["n_peak_tokens_doc_by_topic"], + nz_doc_topic=self.topics["n_word_tokens_doc_by_topic"], + n_regions=self.params["n_regions"], + n_topics=self.params["n_topics"], + alpha=self.params["alpha"], + delta=self.params["delta"], + gamma=self.params["gamma"], + ) + for chunk in chunks + ) # Update y-assignments + + regions, topics = zip(*results) + regions = list(itertools.chain(*regions)) + topics = list(itertools.chain(*topics)) + + # Decrement count-matrices to remove previous self.data["ptoken_doc_idx"] + np.subtract.at(self.topics["n_peak_tokens_region_by_topic"], (old_regions, old_topics), 1) + np.subtract.at( + self.topics["n_peak_tokens_doc_by_topic"], (self.data["ptoken_doc_idx"], old_topics), 1 + ) + + # Update the indices and the count matrices using the regions and topics assignments + np.add.at(self.topics["n_peak_tokens_region_by_topic"], (regions, topics), 1) + np.add.at( + self.topics["n_peak_tokens_doc_by_topic"], (self.data["ptoken_doc_idx"], topics), 1 + ) + + # Update y->topic and y->subregion assignment + self.topics["peak_topic_idx"] = np.array(topics) + self.topics["peak_region_idx"] = np.array(regions) LGR.debug(f"Iter {self.iter:04d}: Updating spatial params") self._update_regions() # Update gaussian estimates for all subregions @@ -430,161 +757,6 @@ def _update(self, loglikely_freq=1): f"tot = {self.loglikelihood['total'][-1]:10.1f}" ) - def _update_word_topic_assignments(self, randseed): - """Update wtoken_topic_idx (z) indicator variables assigning words->topics. - - Parameters - ---------- - randseed : :obj:`int` - Random seed for this iteration. - """ - # --- Seed random number generator - np.random.seed(randseed) - - # Loop over all word tokens - for i_wtoken, word in enumerate(self.data["wtoken_word_idx"]): - # Find document in which word token (not just word) appears - doc = self.data["wtoken_doc_idx"][i_wtoken] - # current topic assignment for word token w_i - topic = self.topics["wtoken_topic_idx"][i_wtoken] - - # Decrement count-matrices to remove current wtoken_topic_idx - # because wtoken will be reassigned to a new topic - self.topics["n_word_tokens_word_by_topic"][word, topic] -= 1 - self.topics["total_n_word_tokens_by_topic"][0, topic] -= 1 - self.topics["n_word_tokens_doc_by_topic"][doc, topic] -= 1 - - # Get sampling distribution: - # p(z_i|z,d,w) ~ p(w|t) * p(t|d) - # ~ p_w_t * p_topic_g_doc - p_word_g_topic = ( - self.topics["n_word_tokens_word_by_topic"][word, :] + self.params["beta"] - ) / ( - self.topics["total_n_word_tokens_by_topic"] - + (self.params["beta"] * len(self.vocabulary)) - ) - p_topic_g_doc = ( - self.topics["n_peak_tokens_doc_by_topic"][doc, :] + self.params["gamma"] - ) - probs = p_word_g_topic * p_topic_g_doc # The unnormalized sampling distribution - - # Sample a z_i assignment for the current word-token from the sampling distribution - probs = np.squeeze(probs) / np.sum(probs) # Normalize the sampling distribution - # Numpy returns a binary [1 x T] vector with a '1' in the index of sampled topic - # and zeros everywhere else - assigned_topic_vec = np.random.multinomial(1, probs) - # Extract selected topic from vector - topic = np.where(assigned_topic_vec)[0][0] - - # Update the indices and the count matrices using the sampled z assignment - self.topics["wtoken_topic_idx"][i_wtoken] = topic # Update w_i topic-assignment - self.topics["n_word_tokens_word_by_topic"][word, topic] += 1 - self.topics["total_n_word_tokens_by_topic"][0, topic] += 1 - self.topics["n_word_tokens_doc_by_topic"][doc, topic] += 1 - - def _update_peak_assignments(self, randseed): - """Update y / r indicator variables assigning peaks->topics/subregions. - - Parameters - ---------- - randseed : :obj:`int` - Random seed for this iteration. - """ - # Seed random number generator - np.random.seed(randseed) - - # Retrieve p(x|r,y) for all subregions - peak_probs = self._get_peak_probs(self) - - # Iterate over all peaks x, and sample a new y and r assignment for each - for i_ptoken, doc in enumerate(self.data["ptoken_doc_idx"]): - topic = self.topics["peak_topic_idx"][i_ptoken] - region = self.topics["peak_region_idx"][i_ptoken] - - # Decrement count-matrices to remove current ptoken_topic_idx - # because ptoken will be reassigned to a new topic - self.topics["n_peak_tokens_region_by_topic"][region, topic] -= 1 - self.topics["n_peak_tokens_doc_by_topic"][doc, topic] -= 1 - - # Retrieve the probability of generating current x from all - # subregions: [R x T] array of probs - p_x_subregions = (peak_probs[i_ptoken, :, :]).transpose() - - # Compute the probabilities of all subregions given doc - # p(r|d) ~ p(r|t) * p(t|d) - # Counts of subregions per topic + prior: p(r|t) - p_region_g_topic = self.topics["n_peak_tokens_region_by_topic"] + self.params["delta"] - - # Normalize the columns such that each topic's distribution over subregions sums to 1 - p_region_g_topic = p_region_g_topic / np.sum(p_region_g_topic, axis=0, keepdims=True) - - # Counts of topics per document + prior: p(t|d) - p_topic_g_doc = ( - self.topics["n_peak_tokens_doc_by_topic"][doc, :] + self.params["alpha"] - ) - - # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows - # Makes it the same shape as p_region_g_topic - p_topic_g_doc = np.array([p_topic_g_doc] * self.params["n_regions"]) - - # Compute p(subregion | document): p(r|d) ~ p(r|t) * p(t|d) - # [R x T] array of probs - p_region_g_doc = p_topic_g_doc * p_region_g_topic - - # Compute the multinomial probability: p(z|y) - # Need the current vector of all z and y assignments for current doc - # The multinomial from which z is sampled is proportional to number - # of y assigned to each topic, plus constant gamma - # Compute the proportional probabilities in log-space - logp = self.topics["n_word_tokens_doc_by_topic"][doc, :] * np.log( - (self.topics["n_peak_tokens_doc_by_topic"][doc, :] + self.params["gamma"] + 1) - / (self.topics["n_peak_tokens_doc_by_topic"][doc, :] + self.params["gamma"]) - ) - # Add a constant before exponentiating to avoid any underflow issues - p_peak_g_topic = np.exp(logp - np.max(logp)) - - # Reshape from (ntopics,) to (nregions, ntopics) with duplicated rows - p_peak_g_topic = np.array([p_peak_g_topic] * self.params["n_regions"]) - - # Get the full sampling distribution: - # [R x T] array containing the proportional probability of all y/r combinations - probs_pdf = p_x_subregions * p_region_g_doc * p_peak_g_topic - - # Convert from a [R x T] matrix into a [R*T x 1] array we can sample from - probs_pdf = np.reshape(probs_pdf, (self.params["n_regions"] * self.params["n_topics"])) - - # Normalize the sampling distribution - probs_pdf = probs_pdf / np.sum(probs_pdf) - - # Sample a single element (corresponding to a y_i and c_i assignment for the ptoken) - # from the sampling distribution - # Returns a binary [1 x R*T] vector with a '1' in location that was sampled - # and zeros everywhere else - assignment_vec = np.random.multinomial(1, probs_pdf) - - # Reshape 1D back to [R x T] 2D - assignment_arr = np.reshape( - assignment_vec, - (self.params["n_regions"], self.params["n_topics"]), - ) - # Transform the linear index of the sampled element into the - # subregion/topic (r/y) assignment indices - assignment_idx = np.where(assignment_arr) - # Subregion sampled (r) - region = assignment_idx[0][0] - # Topic sampled (y) - topic = assignment_idx[1][0] - - # Update the indices and the count matrices using the sampled y/r assignments - # Increment count in Subregion x Topic count matrix - self.topics["n_peak_tokens_region_by_topic"][region, topic] += 1 - # Increment count in Document x Topic count matrix - self.topics["n_peak_tokens_doc_by_topic"][doc, topic] += 1 - # Update y->topic assignment - self.topics["peak_topic_idx"][i_ptoken] = topic - # Update y->subregion assignment - self.topics["peak_region_idx"][i_ptoken] = region - def _update_regions(self): """Update spatial distribution parameters (Gaussians params for all subregions). @@ -643,6 +815,10 @@ def _update_regions(self): ) / (n_obs1 + n_obs2) weighted_mean_otherdims = np.mean(all_xyz[:, 1:], axis=0) + if (n_obs1 == 0) and (n_obs2 == 0): + weighted_mean_dim1 = np.nan_to_num(weighted_mean_dim1) + weighted_mean_otherdims = np.nan_to_num(weighted_mean_otherdims) + # Store weighted mean estimates mu1 = np.zeros([1, self.data["ptoken_coords"].shape[1]]) mu2 = np.zeros([1, self.data["ptoken_coords"].shape[1]])