import os, sys
from typing import Union, List, Optional
import logging
import numpy as np
import sklearn.cluster as skcluster
logging.basicConfig(
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=os.environ.get("LOGLEVEL", "INFO").upper(),
stream=sys.stdout,
)
logger = logging.getLogger(__name__)
[docs]class Cluster:
def __init__(
self,
cvs: Union[np.ndarray, List],
threshold: float,
angular_mask: Optional[Union[np.ndarray, List]] = None,
weights: Optional[Union[np.ndarray, List]] = None,
max_search_step: int = 500,
max_selection: int = 1000
):
if angular_mask is None:
angular_mask = np.zeros(shape=(cvs[1],))
if weights is None:
weights = np.ones(shape=(cvs[1],))
self.angular_mask = angular_mask
self.weights = weights
self.max_search_step = max_search_step
self.threshold = threshold
self.cvs = cvs
self.enlarge_coeff = 1.05
self.reduce_coeff = 0.95
self.cls_sel = None
self.max_selection = max_selection
[docs] def make_threshold(self, numb_cluster_lower, numb_cluster_upper):
current_iter = 0
logger.info(f"set numb_cluster_upper to {numb_cluster_upper}")
logger.info(f"set numb_cluster_lower to {numb_cluster_lower}")
assert numb_cluster_lower < numb_cluster_upper, f"expect numb_cluster_upper > numb_cluster_lower, "
"got {numb_cluster_upper} < {numb_cluster_lower}"
while current_iter < self.max_search_step:
logger.info(f"making threshold attempt {current_iter}")
cls_sel = sel_from_cluster(
self.cvs, self.threshold, angular_mask=self.angular_mask,
weights=self.weights, max_selection=self.max_selection)
test_numb_cluster = len(set(cls_sel))
if test_numb_cluster < numb_cluster_lower:
self.threshold = self.threshold * self.reduce_coeff
elif test_numb_cluster > numb_cluster_upper:
self.threshold = self.threshold * self.enlarge_coeff
else:
break
logger.info(f"set threshold to {self.threshold}, get {test_numb_cluster} clusters.")
current_iter += 1
self.cls_sel = cls_sel
return self.threshold
[docs] def get_cluster_selection(self):
if self.cls_sel is None:
self.cls_sel = sel_from_cluster(
self.cvs, self.threshold, angular_mask=self.angular_mask,
weights=self.weights, max_selection=self.max_selection)
return self.cls_sel
[docs]def cv_dist(cv1, cv2, angular_mask, weights):
diff = cv1 - cv2
angular_mask = np.array(angular_mask)
angular_boolean = (angular_mask == 1)
angular_diff = diff[angular_boolean]
angular_diff[angular_diff < -np.pi] += 2 * np.pi
angular_diff[angular_diff > np.pi] -= 2 * np.pi
diff[angular_boolean] = angular_diff
return np.linalg.norm(diff * weights)
[docs]def mk_dist(cv, angular_mask, weights):
nframe = cv.shape[0]
dist = np.zeros([nframe, nframe])
for ii in range(nframe):
for jj in range(ii+1, nframe):
dist[ii][jj] = cv_dist(cv[ii], cv[jj], angular_mask, weights)
dist[jj][ii] = dist[ii][jj]
return dist
[docs]def mk_cluster(dist, distance_threshold):
logger.info("clustering ...")
cluster = skcluster.AgglomerativeClustering(n_clusters=None,
linkage='average',
affinity='precomputed',
distance_threshold=distance_threshold)
cluster.fit(dist)
return cluster.labels_
[docs]def sel_from_cluster(cvs, threshold, angular_mask=None, weights=None, max_selection=1000):
if len(cvs) <= 1:
return cvs
weights = np.array(weights)
dist = mk_dist(cvs, angular_mask, weights)
labels = mk_cluster(dist, threshold)
# make cluster map
_cls_map = []
for _ in range(len(set(labels))):
_cls_map.append([])
for idx, label in enumerate(labels):
_cls_map[label].append(idx)
cls_map = []
for clust in _cls_map:
cls_map.append((clust, len(clust)))
cls_map = sorted(cls_map, key=lambda x: x[1], reverse=True)
# randomly select from clusters
cls_sel = []
np.random.seed(seed=None)
for cluster, _ in cls_map:
_ret = np.random.choice(cluster, 1)
cls_sel.append(_ret[0])
if len(cls_sel) > max_selection:
cls_sel = cls_sel[:max_selection]
logger.info("selection number is beyond max selection, adjust to the max number.")
return np.array(cls_sel, dtype=int)