Playing with regression clustering in Python
Recently, I had to anaylze some data in which dense areas have clear boundaries. In addition, I had to define these boundaries by equations. Altought I could use any clustering algorithm for identifying the dense areas, such as GMM or K-Means, these wouldn’t provide me the boundaries in the required format.
At this point, I found a nice algorithm called Regression-Clustering. Before describing the goal of this algorithm, lets recap what linear regression is. In linear regression, we have a basis of functions B={f_1,…,f_b}, a set of pairs {(x,y)}ni=1 and the goal is to find c∈Rb so that ∀j∈[n]:∑_{i=1}^{b}{c_{i}⋅f_{i}(x_{j})=y_{j}}.
This model performs well when the data points follow one trend, for example:
However, there are many situations when our data doesn’t follow one trend, but many. In this situtations we may consider adapting multiple regressions:
Here comes Regression clustering to the rescue! In the formulation of this problem, instead of one basis of functions we have d bases: B_1,…,B_d, each one should fit a part of our data points.
The first thought was: “There must be a nice python implementation for this!”. However, I couldn’t find one (maybe I missed it?). Thus I searched for a good paper describing it and and started implementing it.
The rest of this post will discuss the implementation of the KHM-Linear-Regression algorithm [1].
I created a model class called KHM. The attributes of this class takes the function bases. I let the user choose whether to use Numpy or Cupy.
class KHM:
def __init__(self, function_basis, lib='numpy', p=2):
"""
:param function_basis: The bases functions for each cluster
:param lib: 'numpy' for CPU, 'cupy' for CUDA
:param p: degree of the residual norm """
self.function_basis = function_basis
self.K = len(function_basis)
self.p = p
self.coeff = None
self.loss = float("inf")
if lib == 'numpy':
import numpy as np
else:
import cupy as np
self.lib = np
Next, I define the fit method. This method takes the x,y pairs, as well as the number of iterations to execute, described weights, eps, and the verbosity parameter.
def fit(self,
x,
y,
max_iterations=100,
verbose=0,
print_interval=2,
trials=1,
eps=1e-3,
weights=None):
"""
:param x:
:param y:
:param num_iterations:
:param verbose: 0 - silent
1 - one line per iteration
2 - print only start and end iterations
"""
x = self.lib.array(x)
y = self.lib.array(y)
W = self.lib.eye(len(y)) if weights is None else self.lib.diag(weights)
X = self.create_X(x)
if verbose == 0:
verbose_print = lambda **a: None
if verbose == 1 or verbose == 2:
def verbose_print(trial, iteration, loss):
if iteration % print_interval == 0 or iteration == 0:
print(f"Trial {trial} : Iteration {iteration:{len(str(max_iterations))}d} / {max_iterations} loss : {loss}")
if verbose == 2:
print_interval = max_iterations - 1
coeff = self.step1()
loss = float('inf')
for r in range(1, max_iterations + 1):
d = self.step2(x, y, coeff)
new_loss = self.calc_loss(d, W)
verbose_print(iteration=r, loss=new_loss, trial=trial_num)
if abs(new_loss - loss) < eps:
break
loss = new_loss
coeff = [self.LinReg_KHM(d=d, k=k, X=X[k], y=y, p=self.p, W=W) for k in range(self.K)]
self.coeff = coeff
return cur_loss, cur_coeff
The actual fitting is done by the method LinReg_KHM:
def LinReg_KHM(self, d, k, X, y, p, W, q=2):
denominator = (((d[:, k]) ** (p + q)) * (1 / (d ** p)).sum(axis=1) ** 2).reshape(-1, 1)
W = self.lib.sqrt(W) / denominator
return self.lib.linalg.inv(X.T @ W @ X) @ X.T @ W.T @ (y.reshape(-1, 1))The evaluation of the k'th function on a set of points is done by the method `calc_kth_function`:def calc_kth_function(self, k, x, coeff=None):
if coeff is None:
coeff = self.coeff
res = []
for xi in x:
res.append(self.lib.array([f(xi) for f in self.function_basis[k]]))
res = self.lib.array(res)
return (res @ coeff[k].reshape(-1, 1)).flatten()
The initialization of the coefficients:
def step1(self):
return [self.lib.random.randn(len(base)) for base in self.function_basis]
For logging purposes, the loss is calculated in calc_loss:
def calc_loss(self, d, W):
return (self.K / (1 / d).sum(axis=1) @ W.T).sum() / d.shape[0]
The matrix X[k] is a matrix that in the i’th row it contains the evaluation of the k’th base on point x_i. This is done in the method create_X:
def create_X(self, x):
return [self.lib.array([f(x_i) for x_i, f in itertools.product(x, self.function_basis[k])]).reshape(
(-1, len(self.function_basis[k]))) for k in range(self.K)]
DEMOS!!! Now I’ll show you some test I ran in order to check if my implementation works.
In the first test, I generate data corresponding to 3 identical bases: {x²,x,1}
coeff1 = [0.2, -0.5, 5]
coeff2 = [-0.2, -10, -5]
coeff3 = [0, 4, 0]basis1 = [lambda x: x[0]**2, lambda x: x[0], lambda x: 1]
basis2 = [lambda x: x[0]**2, lambda x: x[0], lambda x: 1]
basis3 = [lambda x: x[0]**2, lambda x: x[0], lambda x: 1]x1 = np.linspace(-50, 50, 100).reshape(-1, 1)
x2 = np.linspace(-50, 50, 100).reshape(-1, 1)
x3 = np.linspace(-50, 50, 100).reshape(-1, 1)y1 = np.array([calc_f(basis=basis1, coeff=coeff1, x=x_i) + np.random.randn()*20 for x_i in x1])
y2 = np.array([calc_f(basis=basis2, coeff=coeff2, x=x_i) + np.random.randn()*20 for x_i in x2])
y3 = np.array([calc_f(basis=basis3, coeff=coeff3, x=x_i) + np.random.randn()*20 for x_i in x3])x = np.concatenate([x1, x2, x3])
y = np.concatenate([y1, y2, y3])
The function calc_f
is used for calculating the value of a function given its base and coefficients:
def calc_f(basis, coeff, x):
return np.array(coeff).T @ np.array([f(x) for f in basis])
Running the model is done by:
model = KHM(function_basis=[basis1, basis2, basis3])
model.fit(x=x, y=y, max_iterations=10, verbose=1)
And after a second…
The model was able to successfully fit 3 functions with the corresponding bases to the generated data, even with the added noise.
The next test is done for functions with 2 independent variables, corresponding to the following settings:
coeff1 = [1, 1]
coeff2 = [-1, 1, -0.5, 500]
basis1 = [lambda x: x[0] ** 2, lambda x: x[1] ** 2]
basis2 = [lambda x: x[0] ** 2, lambda x: x[1], lambda x: x[0] * x[1], lambda x: 1]
Running the model is done in the same fashion as before. This time I generate 3D graphs:
The model was able to accurately restore the right coefficients (approximately, ofcourse).