Skip to content

Commit

Permalink
Get the first big classifier to work
Browse files Browse the repository at this point in the history
  • Loading branch information
Khemarato Bhikkhu committed Jan 29, 2024
1 parent 6f091ab commit 4741138
Showing 1 changed file with 209 additions and 6 deletions.
215 changes: 209 additions & 6 deletions scripts/auto_sort_unreads.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/bin/python3

import argparse
from math import sqrt
from typing import Any
from collections.abc import Mapping
from pathlib import Path
Expand All @@ -14,9 +15,15 @@
from numpy.typing import ArrayLike
import gensim.downloader
from nltk.stem.snowball import SnowballStemmer
from scipy import sparse
from sklearn.feature_extraction.text import TfidfTransformer, CountVectorizer
from sklearn.pipeline import Pipeline
from sklearn.multiclass import OneVsRestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.base import BaseEstimator, ClassifierMixin, TransformerMixin
from sklearn.utils.validation import check_X_y, check_is_fitted, check_array
from sklearn.utils.multiclass import unique_labels
from tqdm import tqdm
from tqdm.contrib.concurrent import process_map as tqdm_process_map
from unidecode import unidecode
Expand Down Expand Up @@ -231,6 +238,7 @@ def get_all_trainable_files_in_folders(trainable_folders, verbose=False) -> list
else:
trainable_folders = list(trainable_folders.values())
wrapper = lambda a: a
print("Getting all trainable files from drive...")
if not verbose:
wrapper = tqdm
for folderid in wrapper(trainable_folders):
Expand Down Expand Up @@ -301,17 +309,22 @@ def _get_trainable_files_in_folder(folderid, verbose):
print(f" Had to load {len(shortcutIds)} targets individually")
return ret + gdrive.batch_get_files_by_id(list(shortcutIds), FILE_FIELDS)

def gfiles_organized_by_tags(trainable_folders=None, trainable_files=None):
def get_folder_to_tag_mapping(trainable_folders=None):
if not trainable_folders:
trainable_folders = get_all_trainable_drive_folders()
if not trainable_files:
trainable_files = get_all_trainable_files_in_folders(trainable_folders)
ret = {tag: [] for tag in trainable_folders.keys()}
folder_to_tag = {
return {
folder: tag
for tag in trainable_folders.keys()
for folder in trainable_folders[tag]
}

def gfiles_organized_by_tags(trainable_folders=None, trainable_files=None):
if not trainable_folders:
trainable_folders = get_all_trainable_drive_folders()
if not trainable_files:
trainable_files = get_all_trainable_files_in_folders(trainable_folders)
ret = {tag: [] for tag in trainable_folders.keys()}
folder_to_tag = get_folder_to_tag_mapping(trainable_folders=trainable_folders)
for file in trainable_files:
ret[folder_to_tag[file['parent']]].append(file)
return ret
Expand Down Expand Up @@ -444,14 +457,204 @@ def save_all_pdf_texts(parallelism=6, sample_size=None, min_size=0, max_size=150
def new_vectorizer():
return CountVectorizer(
lowercase=False, # already lower()ed by the normalization step above
stop_words=STEMMED_STOP_WORDS, # filter by stems (any added since normalizing)
stop_words=[w for w in STEMMED_STOP_WORDS if w], # filter by stems (any added since normalizing)
min_df=15,
)

class DataPoint():
def __init__(
self,
title=None,
content=None,
tag=None,
confidence=1.0,
) -> None:
self.title = title or ''
self.content = content or ''
self.tag = tag
self.confidence = confidence
def get_normalized_title(self):
return self.title
def get_normalized_content(self):
if not self.content:
return ''
return self.content + 4 * (' ' + self.title)
def get_tag(self):
return self.tag
def get_confidence(self):
return self.confidence
def set_title_vector(self, title_vector):
self.title_vector = title_vector
def set_content_vector(self, content_vector):
self.content_vector = content_vector
def get_title_vector(self):
return self.title_vector
def get_content_vector(self):
return self.content_vector

class DataSource:
def get_all_datapoints(self) -> list[DataPoint]:
raise NotImplementedError()

class GoogleDriveFilesDataSource(DataSource):
def __init__(self, unread_confidence=0.2) -> None:
super().__init__()
self.unread_confidence = unread_confidence

def get_all_datapoints(self) -> list[DataPoint]:
folders = get_all_trainable_drive_folders()
tag_for_folder = get_folder_to_tag_mapping(trainable_folders=folders)
all_files = get_all_trainable_files_in_folders(folders)
all_the_data = []
print("Building Google Drive DataPoints...")
for file in tqdm(all_files):
content = ''
tag = tag_for_folder[file['parent']]
fp = NORMALIZED_TEXT_FOLDER.joinpath(f"{file['id']}.txt")
if fp.exists():
content = fp.read_text()
title = normalize_text(file['name'])
if title or content:
all_the_data.append(DataPoint(
title=title,
content=content,
tag=tag,
confidence=1.0 \
if folders[tag][0] == file['parent'] \
else self.unread_confidence,
))
return all_the_data

"""BEGIN SKLEARN CODE"""

class RemoveSparseFeatures(BaseEstimator, TransformerMixin):
def __init__(self, k=15):
self.k = k

def fit(self, X, y=None):
self.num_features_in = X.shape[1]
self.sparse_mask = np.where(np.sum(X != 0, axis=0) >= self.k)[1]
self.num_features_out = self.sparse_mask.shape[0]
return self

def transform(self, X):
if hasattr(self, 'sparse_mask'):
return X[:, self.sparse_mask]
else:
raise ValueError("The transformer has not been fitted yet.")


class OBUNodeClassifier(BaseEstimator, ClassifierMixin):
"""My custom sklearn classifier for making one step prediction"""
def __init__(self, classifierclass=LogisticRegression) -> None:
super().__init__()
self.classifierclass = classifierclass

def fit(self, X, y, sample_weight=None):
X, y = check_X_y(X, y, accept_sparse=True)
self.classes_ = unique_labels(y)
self.pipeline_ = Pipeline(steps=[
('filter_rare_words', RemoveSparseFeatures()),
('tfidf', TfidfTransformer()),
('classifier', self.classifierclass())
])
self.pipeline_.fit(X, y, classifier__sample_weight=sample_weight)
return self

def predict(self, X):
check_is_fitted(self)
X = check_array(X)
return self.pipeline_.predict(X)

class OBUTopicClassifier:
"""Class for bridging my world and the sklearn world"""
def __init__(
self,
data_sources: list[DataSource],
min_points: int = 50, # tags with less than this many data points will be filtered out
) -> None:
self.data_sources = data_sources
self.min_points = min_points

def train(self):
"""The big main function"""
self.load_data()
self.count_words()
self.drive_map = get_drive_folder_heirarchy()
# TODO: Train a OBUNodeClassifier for each tag
self.classifiers_ = {
'root': self.train_node('root'),
}

def load_data(self):
self.all_the_data_ = []
for datasource in self.data_sources:
self.all_the_data_.extend(datasource.get_all_datapoints())

def count_words(self):
print("Counting all the words across the entire dataset...")
self.vectorizer_ = new_vectorizer()
X_raw = []
for datapoint in self.all_the_data_:
X_raw.append(datapoint.get_normalized_title())
X_raw.append(datapoint.get_normalized_content())
X_raw = self.vectorizer_.fit_transform(X_raw)
for i in range(0, int(X_raw.shape[0]), 2):
t_v, c_v = X_raw[i:i+2]
self.all_the_data_[int(i/2)].set_title_vector(t_v)
self.all_the_data_[int(i/2)].set_content_vector(c_v)

def _training_data_from_datapoints(
self,
data_points,
) -> tuple[list]:
"""Returns (X, w) from a list of DataPoints"""
x = []
w = []
for datapoint in data_points:
title = datapoint.get_title_vector()
content = datapoint.get_content_vector()
confidence = datapoint.get_confidence()
if title.nnz > 0:
x.append(title)
w.append(confidence * sqrt(title.nnz))
if content.nnz > 0:
x.append(content)
w.append(confidence * sqrt(content.nnz))
return (x, w)

def train_node(self, tag:str) -> OBUNodeClassifier:
relevant_tags = set([tag] + self.drive_map[tag]['ancestors'])
X, w = self._training_data_from_datapoints(
(dp for dp in self.all_the_data_ if dp.get_tag() in relevant_tags)
)
y = [tag] * len(w)
for child in self.drive_map[tag]['children']:
relevant_tags = set([child] + self.drive_map[child]['descendants'])
child_X, child_w = self._training_data_from_datapoints(
(dp for dp in self.all_the_data_ if dp.get_tag() in relevant_tags)
)
X.extend(child_X)
w.extend(child_w)
if len(child_w) >= self.min_points:
y.extend([child] * len(child_w))
else:
y.extend([tag] * len(child_w))
node_classifier = OBUNodeClassifier()
X = sparse.vstack(X)
return node_classifier.fit(X, y, sample_weight=w)

def predict(self, X, normalized=False) -> ArrayLike:
"""Given (normalized?) strings, predict the topics"""
if not normalized:
X = list(map(normalize_text, X))
X = self.vectorizer_.transform(X)
return self.classifiers_['root'].predict(X)

if __name__ == "__main__":
argument_parser = argparse.ArgumentParser(
prog="python3 auto_sort_unreads.py",
)
argument_parser.parse_args()
# TODO
OBUTopicClassifier(data_sources=[GoogleDriveFilesDataSource()]).train()

0 comments on commit 4741138

Please sign in to comment.