Playing with regression clustering in Python
Recently, I had to anaylze some data in which dense areas have clear boundaries. In addition, I had to define these boundaries by equations. Altought I could use any clustering algorithm for identifying the dense areas, such as GMM or K-Means, these wouldn’t provide me the boundaries in the required format.
At this point, I found a nice algorithm called Regression-Clustering. Before describing the goal of this algorithm, lets recap what linear regression is. In linear regression, we have a basis of functions B={f_1,…,f_b}, a set of pairs {(x,y)}ni=1 and the goal is to find c∈Rb so that ∀j∈[n]:∑_{i=1}^{b}{c_{i}⋅f_{i}(x_{j})=y_{j}}.
This model performs well when the data points follow one trend, for example:
However, there are many situations when our data doesn’t follow one trend, but many. In this situtations we may consider adapting multiple regressions:
Here comes Regression clustering to the rescue! In the formulation of this problem, instead of one basis of functions we have d bases: B_1,…,B_d, each one should fit a part of our data points.
The first thought was: “There must be a nice python implementation for this!”. However, I couldn’t find one (maybe I missed it?). Thus I searched for a good paper describing it and and started implementing it.
The rest of this post will discuss the implementation of the KHM-Linear-Regression algorithm [1].
I created a model class called KHM. The attributes of this class takes the function bases. I let the user choose whether to use Numpy or Cupy.
class KHM:
def __init__(self, function_basis, lib='numpy', p=2):
:param function_basis: The bases functions for each cluster
:param lib: 'numpy' for CPU, 'cupy' for CUDA
:param p: degree of the residual norm """
self.function_basis = function_basis
self.K = len(function_basis)
self.p = p
self.coeff = None
self.loss = float("inf")
if lib == 'numpy':
import numpy as np
import cupy as np
self.lib = np
Next, I define the fit method. This method takes the x,y pairs, as well as the number of iterations to execute, described weights, eps, and the verbosity parameter.
def fit(self,
:param x:
:param y:
:param num_iterations:
:param verbose: 0 - silent
1 - one line per iteration
2 - print only start and end iterations
x = self.lib.array(x)
y = self.lib.array(y)
W = self.lib.eye(len(y)) if weights is None else self.lib.diag(weights)
X = self.create_X(x)
if verbose == 0:
verbose_print = lambda **a: None
if verbose == 1 or verbose == 2:
def verbose_print(trial, iteration, loss):
if iteration % print_interval == 0 or iteration == 0:
print(f"Trial {trial} : Iteration {iteration:{len(str(max_iterations))}d} / {max_iterations} loss : {loss}")
if verbose == 2:
print_interval = max_iterations - 1
coeff = self.step1()
loss = float('inf')
for r in range(1, max_iterations + 1):
d = self.step2(x, y, coeff)
new_loss = self.calc_loss(d, W)
verbose_print(iteration=r, loss=new_loss, trial=trial_num)
if abs(new_loss - loss) < eps:
loss = new_loss
coeff = [self.LinReg_KHM(d=d, k=k, X=X[k], y=y, p=self.p, W=W) for k in range(self.K)]
self.coeff = coeff
return cur_loss, cur_coeff
The actual fitting is done by the method LinReg_KHM:
def LinReg_KHM(self, d, k, X, y, p, W, q=2):
denominator = (((d[:, k]) ** (p + q)) * (1 / (d ** p)).sum(axis=1) ** 2).reshape(-1, 1)
W = self.lib.sqrt(W) / denominator
return self.lib.linalg.inv(X.T @ W @ X) @ X.T @ W.T @ (y.reshape(-1, 1))The evaluation of the k'th function on a set of points is done by the method `calc_kth_function`:def calc_kth_function(self, k, x, coeff=None):
if coeff is None:
coeff = self.coeff
res = []
for xi in x:
res.append(self.lib.array([f(xi) for f in self.function_basis[k]]))
res = self.lib.array(res)
return (res @ coeff[k].reshape(-1, 1)).flatten()
The initialization of the coefficients:
def step1(self):
return [self.lib.random.randn(len(base)) for base in self.function_basis]
For logging purposes, the loss is calculated in calc_loss:
def calc_loss(self, d, W):
return (self.K / (1 / d).sum(axis=1) @ W.T).sum() / d.shape[0]
The matrix X[k] is a matrix that in the i’th row it contains the evaluation of the k’th base on point x_i. This is done in the method create_X:
def create_X(self, x):
return [self.lib.array([f(x_i) for x_i, f in itertools.product(x, self.function_basis[k])]).reshape(
(-1, len(self.function_basis[k]))) for k in range(self.K)]
DEMOS!!! Now I’ll show you some test I ran in order to check if my implementation works.
In the first test, I generate data corresponding to 3 identical bases: {x²,x,1}
coeff1 = [0.2, -0.5, 5]
coeff2 = [-0.2, -10, -5]
coeff3 = [0, 4, 0]basis1 = [lambda x: x[0]**2, lambda x: x[0], lambda x: 1]
basis2 = [lambda x: x[0]**2, lambda x: x[0], lambda x: 1]
basis3 = [lambda x: x[0]**2, lambda x: x[0], lambda x: 1]x1 = np.linspace(-50, 50, 100).reshape(-1, 1)
x2 = np.linspace(-50, 50, 100).reshape(-1, 1)
x3 = np.linspace(-50, 50, 100).reshape(-1, 1)y1 = np.array([calc_f(basis=basis1, coeff=coeff1, x=x_i) + np.random.randn()*20 for x_i in x1])
y2 = np.array([calc_f(basis=basis2, coeff=coeff2, x=x_i) + np.random.randn()*20 for x_i in x2])
y3 = np.array([calc_f(basis=basis3, coeff=coeff3, x=x_i) + np.random.randn()*20 for x_i in x3])x = np.concatenate([x1, x2, x3])
y = np.concatenate([y1, y2, y3])
The function calc_f
is used for calculating the value of a function given its base and coefficients:
def calc_f(basis, coeff, x):
return np.array(coeff).T @ np.array([f(x) for f in basis])
Running the model is done by:
model = KHM(function_basis=[basis1, basis2, basis3]), y=y, max_iterations=10, verbose=1)
And after a second…
The model was able to successfully fit 3 functions with the corresponding bases to the generated data, even with the added noise.
The next test is done for functions with 2 independent variables, corresponding to the following settings:
coeff1 = [1, 1]
coeff2 = [-1, 1, -0.5, 500]
basis1 = [lambda x: x[0] ** 2, lambda x: x[1] ** 2]
basis2 = [lambda x: x[0] ** 2, lambda x: x[1], lambda x: x[0] * x[1], lambda x: 1]
Running the model is done in the same fashion as before. This time I generate 3D graphs:
The model was able to accurately restore the right coefficients (approximately, ofcourse).