import json import traceback import numpy as np import pandas as pd from hdbscan import HDBSCAN from sentence_transformers import SentenceTransformer from sklearn.feature_extraction.text import CountVectorizer from sklearn.metrics import pairwise_distances from sklearn.metrics.pairwise import cosine_similarity from sklearn.model_selection import ParameterGrid from umap import UMAP from bertopic import BERTopic from bertopic.representation import KeyBERTInspired from bertopic.vectorizers import ClassTfidfTransformer param_grid = { "nr_topics": [45, 50, 55], "min_topic_size": [30, 40, 50], "n_gram_max": [3], "min_document_frequency": [1, 2], "n_neighbors": [15], "n_components": [2], "min_dist": [0.1], "top_n_words": [10], } def calculate_metrics(topic_model, embedder, top_n_words=5): # Get topic words topic_words = [] for topic_id in range(len(topic_model.get_topic_info()) - 1): words = [word for word, _ in topic_model.get_topic(topic_id)] topic_words.append(words[:top_n_words]) # Coherence coherence_scores = [] for words in topic_words: embeddings = embedder.encode(words) sim_matrix = cosine_similarity(embeddings) np.fill_diagonal(sim_matrix, 0) coherence_scores.append(np.mean(sim_matrix)) overall_coherence = np.mean(coherence_scores) # Diversity all_topic_words = [word for topic in topic_words for word in topic] diversity = len(set(all_topic_words)) / len(all_topic_words) # Inter-topic distance topic_embeddings = [ np.mean(embedder.encode(words), axis=0) for words in topic_words ] topic_distance = pairwise_distances(topic_embeddings, metric="cosine") avg_distance = np.mean(topic_distance[np.triu_indices_from(topic_distance, k=1)]) res = { "coherence": float(str(overall_coherence)[:6]), "diversity": float(str(diversity)[:6]), "inter_topic_distance": float(str(avg_distance)[:6]), "combined_score": float( str(0.6 * overall_coherence + 0.2 * diversity + 0.2 * avg_distance)[:6] ), } print(res) return res def auto_tune_bertopic(texts, embedding_model, param_grid): best_score = -1 best_params = None best_model = None history = [] print("Starting auto-tuning of BERTopic...") print(f"Number of reviews: {len(texts)}") print("Running embedding model...") embedder = SentenceTransformer(embedding_model) embeddings = embedder.encode(reviews, show_progress_bar=True) # Convert param_grid to list for sampling print("Generating parameter combinations...") param_list = list(ParameterGrid(param_grid)) print(f"Total parameter combinations: {len(param_list)}") for params in param_list: try: print(f"Testing params: {params}") ctfidf_model = ClassTfidfTransformer(reduce_frequent_words=True) vectorizer_model = CountVectorizer( stop_words="english", min_df=params["min_document_frequency"], ngram_range=(1, params["n_gram_max"]), ) representation_model = KeyBERTInspired() umap_model = UMAP( n_neighbors=params["n_neighbors"], n_components=params["n_components"], min_dist=params["min_dist"], metric="cosine", low_memory=True, random_state=42, ) hdbscan_model = HDBSCAN( min_cluster_size=params["min_topic_size"], metric="euclidean", cluster_selection_method="eom", gen_min_span_tree=True, prediction_data=True, ) model = BERTopic( embedding_model=embedding_model, ctfidf_model=ctfidf_model, vectorizer_model=vectorizer_model, umap_model=umap_model, hdbscan_model=hdbscan_model, representation_model=representation_model, verbose=True, calculate_probabilities=True, language="english", top_n_words=params["top_n_words"], nr_topics=params["nr_topics"], ) topics, _ = model.fit_transform(texts, embeddings) metrics = calculate_metrics(model, embedder) history.append({"params": params, "metrics": metrics}) with open("history.json", "w") as f: json.dump(history, f, indent=2) if metrics["combined_score"] > best_score: best_score = metrics["combined_score"] best_params = params best_model = model except Exception as e: print(f"Failed with params {params}: {str(e)}") traceback.print_exc() continue return best_model, best_params, best_score, history SPECIAL_CHARS = ["\n", "\\n"] MIN_REVIEW_WORDS = 5 reviews = pd.read_csv("data.tab", sep="\t").review.to_list() for schar in SPECIAL_CHARS: reviews = [ review.replace(schar, " ") if isinstance(review, str) else review for review in reviews ] reviews = [review for review in reviews if len(str(review).split()) >= MIN_REVIEW_WORDS] print(auto_tune_bertopic(reviews, "all-MiniLM-L6-v2", param_grid))