Ridge / Polynomial Regression From Scratch

import math
import numpy as np
import pandas as pd
import pdb
class RidgeRegression():
    """Implementation of ridge regression from scratch.
    """

    def __init__(self) -> None:
        self._lambda = 0.1
        self.X_maxs = None
        self.X_means = None

    def fit(self, X, y):
        """Fit the model to training data

        Args:
            X (np.array): Training data. Columns for features. Rows for test instances.
            y (np.array): Training target column. Must have same number of rows as X

        Returns:
            (np.array, np.array): Ridge regression weights, degrees of freedom.
        """
        self.calc_means_and_stds(X)
        X = self.prepare_X(X)
        # k features and n observations
        self.n, self.k = X.shape
        u, s, vh = np.linalg.svd(X, full_matrices=False)
        vh = vh.T # Numpy returns the transose
        s = s*np.identity(self.k)
        lambda_i = self._lambda*np.identity(self.k)
        m_in = lambda_i @ np.linalg.inv(s.T*s) + np.identity(self.k)
        m = np.linalg.inv(m_in)
        w_ls = vh @ np.linalg.inv(s) @ u.T @ y
        w_rr = vh @ m @ vh.T @ w_ls
        self.weights = w_rr
        degrees_freedom = np.diag(s**2 / (lambda_i + s**2)).sum()
        return w_rr, degrees_freedom

    def calc_means_and_stds(self, X):
        """Calculate and store the means and std.

        These will be used to normalize new data

        Args:
            X (np.array): Training data
        """
        self.X_maxs = X.max(axis=0)
        self.X_means = X.mean(axis=0)
        self.X_stds = X.std(axis=0)
        self.X_means[self.X_stds == 0] = 0 # Don't want to subtract any means if variance is zero (for 1 column)
        self.X_stds[self.X_stds == 0] = 1 # If STD is zero force it to one.

    def prepare_X(self, X):
        """Prepares any X data for model usage

        Subtract the column mean and normalize by variance.

        Args:
            X (np.array): Input array
        """
        if self.X_means is None:
            raise ValueError("Need to calc_means_and_stds at least once before this function.")
        X = (X - self.X_means) / self.X_stds
        
        return X

    def predict(self, X_in):
        """Predict y values for new X input

        Args:
            X_in (np.array): New data. Must have same shape as data this was trained on.

        Returns:
            np.array: The predicted data
        """
        X_prepared = self.prepare_X(X_in)
        yhat = X_prepared @ self.weights
        return yhat

Polynomial Regression

Polynomial Regression can be achieved by reusing regression code by simply modifying the input data. The model parameters will still be linear but we will now have input columns that are powers of the original input columns. This allows us to model nonlinear functions.

class PolynomialRegression(RidgeRegression):

    def __init__(self, p) -> None:
        """Initialization function

        Args:
            p (int): pth order polynomial regression!
        """
        super().__init__()
        self.p = p

    def add_polynomial_columns(self, X, p):
        """Add polynomial columns to data

        Args:
            X (np.array): 0 order training data
            p (int): pth order polynomial. p >=1

        Returns:
            np.array: X with more columns for every feature, x^i, x^(i+1), .. x^p
        """
        if p < 1:
            raise ValueError("p must be 2 or larger.")
        # Removing last column of 1s then adding it back afterwards. TODO: This shouldn't be hard coded
        ones = X[:, -1:]
        column_groups = [X[:, 0:-1]**n for n in range(1, p+1)]
        column_groups.append(ones)
        X = np.concatenate(column_groups, axis=1)
        return X

    def prepare_X(self, X):
        """Prepare the features for training or prediction.

        See add_polynomial_columns for details

        Args:
            X (np.array): Input feature data

        Returns:
            np.array: Prepared data
        """
        X = self.add_polynomial_columns(X, self.p)
        self.calc_means_and_stds(X)
        return super().prepare_X(X)
base_path = "/Users/trevorgordon/Library/Mobile Documents/com~apple~CloudDocs/Documents/root/Columbia/Fall2021/ELEN4720/Assignments/assignment1/"

X_train_raw = pd.read_csv(base_path + "hw1-data/X_train.csv", header=None)
y_train_raw = pd.read_csv(base_path + "hw1-data/y_train.csv", header=None)
X_test_raw = pd.read_csv(base_path + "hw1-data/X_test.csv", header=None)
y_test_raw = pd.read_csv(base_path + "hw1-data/y_test.csv", header=None)

X_train_using = X_train_raw.copy().to_numpy()
y_train_using = y_train_raw.copy().to_numpy()
X_test = X_test_raw.copy().to_numpy()
y_test = y_test_raw.copy().to_numpy()
rr = RidgeRegression()

data = []
degress_freedom = []
for lammy in range(0, 5000, 1):
    rr._lambda = lammy
    w_rr, degrees_freedom = rr.fit(X_train_using, y_train_using)
    dataline = np.append(w_rr, [lammy, degrees_freedom])
    data.append(dataline)
weight_df = pd.DataFrame(np.array(data).reshape(len(data),9))
weight_df = weight_df.rename({7:"lambda", 8:"df"}, axis=1)
weight_df = weight_df.melt(id_vars=["lambda", "df"])
/var/folders/sw/31vsr7sd3dggcqkkgn59gsl80000gn/T/ipykernel_32281/4279596416.py:33: RuntimeWarning: invalid value encountered in true_divide
  degrees_freedom = np.diag(s**2 / (lambda_i + s**2)).sum()

Study on Degrees of Freedom

By running the model with different lambda parameters we can see how the weights change.

import chartify
ch = chartify.Chart(blank_labels=True)
ch.plot.scatter(
    data_frame=weight_df,
    x_column='df',
    y_column='value',
    color_column='variable')
ch.set_title(("Degrees of freedom λ and the resulting weights" +
            "\nfor 7 predictive features in a ridge regression model"))   
ch.axes.set_xaxis_label("λ degrees of freedom")
ch.axes.set_yaxis_label("Weights")
ch.show()
# %% Part C RMS Error
rr = RidgeRegression()

data = []
degress_freedom = []
for lammy in range(0, 50, 1):
    rr._lambda = lammy
    w_rr, degrees_freedom = rr.fit(X_train_using, y_train_using)
    y_pred = rr.predict(X_test)
    rms = math.sqrt((sum((y_pred-y_test)**2) / (len(y_pred)))[0])
    dataline = np.append(w_rr, [lammy, degrees_freedom, rms])
    data.append(dataline)
predicted_df = pd.DataFrame(np.array(data).reshape(len(data),10))
predicted_df = predicted_df.rename({7:"lambda", 8:"df", 9: "rms"}, axis=1)
predicted_df = predicted_df.melt(id_vars=["lambda", "df", "rms"])
/var/folders/sw/31vsr7sd3dggcqkkgn59gsl80000gn/T/ipykernel_32281/4279596416.py:33: RuntimeWarning: invalid value encountered in true_divide
  degrees_freedom = np.diag(s**2 / (lambda_i + s**2)).sum()
# %%
import plotly.express as px
fig = px.scatter(predicted_df, x="lambda", y="rms")
fig.show()
# %% Part 2 A PolynomialRegression - Data Preparation

X_train_using = X_train_raw.copy().to_numpy()
y_train_using = y_train_raw.copy().to_numpy()
import math
predicted_dfs = []
for p_using in range(1, 4):
    rr = PolynomialRegression(p=p_using)

    data = []
    degress_freedom = []
    for lammy in range(1, 100, 1):
        try:
            rr._lambda = lammy
            w_rr, degrees_freedom = rr.fit(X_train_using, y_train_using)
            y_pred = rr.predict(X_test)
            rms = math.sqrt((sum((y_pred-y_test)**2) / (len(y_pred)))[0])
            dataline = np.append(w_rr, [lammy, degrees_freedom, rms])
            data.append(dataline)
        except:
            print(f"Didn't work for lambda {lammy} and polynomial {p_using}")
    predicted_df = pd.DataFrame(np.array(data).reshape(len(data),4+6*p_using))
    predicted_df = predicted_df.rename({1+6*p_using:"lambda", 6*p_using+2:"df", 6*p_using+3: "rms"}, axis=1)
    predicted_df = predicted_df.melt(id_vars=["lambda", "df", "rms"])
    predicted_df["p"] = p_using
    predicted_dfs.append(predicted_df)
df = pd.concat(predicted_dfs)
/var/folders/sw/31vsr7sd3dggcqkkgn59gsl80000gn/T/ipykernel_32281/4279596416.py:33: RuntimeWarning:

invalid value encountered in true_divide
# %% Part 2 A PolynomialRegression - Plotting
import plotly.express as px
df["p"] = df["p"].astype(str)
fig = px.scatter(df, x="lambda", y="rms", color="p", title=f"RMS error for Polynomial Regression of order p")
fig.show()