Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] robynpy modeling component #1226

Merged
merged 29 commits into from
Mar 17, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
ab12f7d
fix lambda_seq and lambda calculation
alxlyj Feb 25, 2025
aa78891
fix for column pollution
alxlyj Feb 25, 2025
9fbe1b3
temporary draft for modeling approach for lambda calculation
alxlyj Feb 26, 2025
d7bf48f
test nevergrad changes
alxlyj Feb 26, 2025
7056786
tested glmnet and sklearn
alxlyj Mar 4, 2025
14c1566
remove redundant function
alxlyj Mar 4, 2025
58aac6c
refactor functions
alxlyj Mar 4, 2025
a95e87f
complete debugging step3
alxlyj Mar 5, 2025
2a21348
reconcile step6 hyper sampling
alxlyj Mar 5, 2025
9e26b4b
validated step6 nevergrad sampling
alxlyj Mar 5, 2025
79ead59
reconcile run_transformations
alxlyj Mar 5, 2025
93e6b5e
update transformations cols
alxlyj Mar 5, 2025
c130f1c
ensure proper saturation for transformation
alxlyj Mar 5, 2025
1be740c
remove all non essential loggers
alxlyj Mar 5, 2025
7017463
changes for validating data splits - step8
alxlyj Mar 6, 2025
5d5d720
complete validation for lambda_seq - step10
alxlyj Mar 7, 2025
0d0b46c
addressed adjusted r2
alxlyj Mar 7, 2025
065e34e
addressed rssd and model_decomp
alxlyj Mar 11, 2025
2602ce4
enforce required cols in x_decomp_agg and decomp_spend_dist
alxlyj Mar 11, 2025
563a5f0
resolve logging errors
alxlyj Mar 12, 2025
74c2c57
update fallback for rpy2 modeling
alxlyj Mar 17, 2025
54cd5a1
updated dependencies
alxlyj Mar 17, 2025
4912776
add documentations for logging
alxlyj Mar 17, 2025
8d81d75
update notebooks
alxlyj Mar 17, 2025
cd6c874
prepopulate noteobok
alxlyj Mar 17, 2025
a290f1c
remove unused notebooks
alxlyj Mar 17, 2025
67f9d4e
update notebooks
alxlyj Mar 17, 2025
fd66ed0
remove unused functions
alxlyj Mar 17, 2025
385cce9
remove unused code
alxlyj Mar 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/src/robyn/common/config/logging.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ qualname=
class=FileHandler
level=DEBUG
formatter=simple
args=('/tmp/robynpy/logs/robynpy_%(asctime)s.log', 'w')
args=('%(logfilename)s', 'w')

[handler_console]
class=StreamHandler
Expand Down
29 changes: 29 additions & 0 deletions python/src/robyn/data/entities/mmmdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def __init__(
all_media: Optional[List[str]] = None,
day_interval: Optional[int] = 7,
interval_type: Optional[str] = "week",
refresh_steps: Optional[int] = None,
refresh_counter: Optional[int] = 0,
refresh_added_start: Optional[datetime] = None,
) -> None:
self.dep_var: Optional[str] = dep_var
self.dep_var_type: DependentVarType = dep_var_type
Expand All @@ -82,6 +85,9 @@ def __init__(
)
self.day_interval: Optional[int] = day_interval
self.interval_type: Optional[str] = interval_type
self.refresh_steps = refresh_steps
self.refresh_counter = refresh_counter
self.refresh_added_start = refresh_added_start

def __str__(self) -> str:
return f"""
Expand All @@ -105,6 +111,9 @@ def __str__(self) -> str:
all_media: {self.all_media}
day_interval: {self.day_interval}
interval_type: {self.interval_type}
refresh_steps: {self.refresh_steps}
refresh_counter: {self.refresh_counter}
refresh_added_start: {self.refresh_added_start}
"""

def update(self, **kwargs: Any) -> None:
Expand Down Expand Up @@ -245,6 +254,9 @@ def calculate_rolling_window_indices(self) -> None:
f"Adjusted window_end to the closest date in the data: {closest_end_date}"
)

# Calculate refresh_added_start - matching R's behavior
self.mmmdata_spec.refresh_added_start = self.mmmdata_spec.window_start

# Calculate rolling window length
if window_start is not None and window_end is not None:
self.mmmdata_spec.rolling_window_length = (
Expand All @@ -253,6 +265,23 @@ def calculate_rolling_window_indices(self) -> None:
+ 1
)

# If we're in refresh mode, check the data proportions like R does
if self.mmmdata_spec.refresh_counter > 0:
original_periods = self.mmmdata_spec.rolling_window_length
new_periods = len(
self.data[
self.data[self.mmmdata_spec.date_var] > self.mmmdata_spec.window_end
]
)

if new_periods > 0.5 * (original_periods + new_periods):
logger.warning(
f"We recommend re-building a model rather than refreshing this one. "
f"More than 50% of your refresh data ({original_periods + new_periods} "
f"{self.mmmdata_spec.interval_type}s) is new data ({new_periods} "
f"{self.mmmdata_spec.interval_type}s)"
)

def set_default_factor_vars(self) -> None:
"""
Set the default factor variables.
Expand Down
318 changes: 318 additions & 0 deletions python/src/robyn/modeling/ridge/models/ridge_utils.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent, I would prefer we make this object oriented by defining a class. It's okay if you want to do this in follow up PR.

Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
import logging
import numpy as np
from sklearn.linear_model import Ridge
import time


def create_ridge_model_sklearn(
lambda_value, n_samples, fit_intercept=True, standardize=True
):
"""Create a Ridge regression model using scikit-learn.

Args:
lambda_value: Regularization parameter (lambda) from glmnet
n_samples: Number of samples (needed for proper scaling)
fit_intercept: Whether to fit the intercept
standardize: Whether to standardize the input features

Returns:
A configured sklearn Ridge model that behaves like glmnet
"""

# Create a wrapper class that matches glmnet's behavior
class GlmnetLikeRidge:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.lambda_value = lambda_value # Use raw lambda value
self.fit_intercept = fit_intercept
self.standardize = standardize
self.feature_means = None
self.feature_stds = None
self.y_mean = None
self.coef_ = None
self.intercept_ = 0.0

def mysd(self, y):
"""R-like standard deviation"""
return np.sqrt(np.sum((y - np.mean(y)) ** 2) / len(y))

def fit(self, X, y):
X = np.asarray(X)
y = np.asarray(y)

# Debug prints matching R
self.logger.debug("Lambda calculation debug:")
self.logger.debug(f"x_means: {np.mean(np.abs(X))}")
x_sds = np.apply_along_axis(self.mysd, 0, X)
self.logger.debug(f"x_sds mean: {np.mean(x_sds)}")

# Center and scale like R's glmnet
if self.standardize:
self.feature_means = np.mean(X, axis=0)
self.feature_stds = np.apply_along_axis(self.mysd, 0, X)
self.feature_stds[self.feature_stds == 0] = 1.0
X_scaled = (X - self.feature_means) / self.feature_stds
else:
X_scaled = X
self.feature_means = np.zeros(X.shape[1])
self.feature_stds = np.ones(X.shape[1])

if self.fit_intercept:
self.y_mean = np.mean(y)
y_centered = y - self.y_mean
else:
y_centered = y
self.y_mean = 0.0

self.logger.debug(f"sx mean: {np.mean(np.abs(X_scaled))}")
self.logger.debug(f"sy mean: {np.mean(np.abs(y_centered))}")
self.logger.debug(f"lambda: {self.lambda_value}")

# Fit model using raw lambda (not scaled)
model = Ridge(
alpha=self.lambda_value,
fit_intercept=False, # We handle centering manually
solver="cholesky",
)

model.fit(X_scaled, y_centered)

# Transform coefficients back to original scale
if self.standardize:
self.coef_ = model.coef_ / self.feature_stds
else:
self.coef_ = model.coef_

if self.fit_intercept:
self.intercept_ = self.y_mean - np.dot(self.feature_means, self.coef_)

self.logger.debug(
f"Coefficients range: [{np.min(self.coef_):.6f}, {np.max(self.coef_):.6f}]"
)
self.logger.debug(f"Intercept: {self.intercept_:.6f}")

return self

def predict(self, X):
if self.coef_ is None:
raise ValueError("Model must be fitted before making predictions")

# Direct prediction using coefficients and intercept
return np.dot(X, self.coef_) + self.intercept_

return GlmnetLikeRidge()


def create_ridge_model_rpy2(
lambda_value, n_samples, fit_intercept=True, standardize=True, **kwargs
):
"""Create a Ridge regression model using rpy2 to access glmnet.

Args:
lambda_value: Regularization parameter
n_samples: Number of samples (not directly used, but kept for API consistency)
fit_intercept: Whether to fit the intercept
standardize: Whether to standardize the input features
**kwargs: Additional arguments to pass to glmnet

Returns:
A Ridge regression model using rpy2 to access glmnet.
"""
try:
import rpy2.robjects as ro
from rpy2.robjects import numpy2ri
from rpy2.robjects.packages import importr
from rpy2.robjects.conversion import localconverter

# Import glmnet only once per Python session
global glmnet_imported
if "glmnet_imported" not in globals():
try:
importr("glmnet")
glmnet_imported = True
except Exception as e:
logging.warning(f"Failed to import glmnet: {e}")
logging.warning("Falling back to sklearn implementation")
return create_ridge_model_sklearn(
lambda_value, n_samples, fit_intercept, standardize
)
except ImportError:
logging.warning("rpy2 not available, using sklearn implementation")
return create_ridge_model_sklearn(
lambda_value, n_samples, fit_intercept, standardize
)

class GlmnetRidgeWrapper:
def __init__(self):
self.lambda_value = lambda_value
self.fit_intercept = fit_intercept
self.standardize = standardize
self.kwargs = kwargs
self.fitted_model = None
self.coef_ = None
self.intercept_ = 0.0
self.logger = logging.getLogger(__name__)

# Cache for performance
self._X_matrix_cache = {}
self._prediction_cache = {}

def fit(self, X, y):
# Ensure numpy arrays
X = np.asarray(X)
y = np.asarray(y)

self.logger.debug("\n=== Model Fitting Debug ===")
self.logger.debug(f"Input shapes - X: {X.shape}, y: {y.shape}")
self.logger.debug(
f"X stats - min: {X.min():.6f}, max: {X.max():.6f}, mean: {X.mean():.6f}"
)
self.logger.debug(
f"y stats - min: {y.min():.6f}, max: {y.max():.6f}, mean: {y.mean():.6f}"
)
self.logger.debug(f"lambda_value: {self.lambda_value}")

fit_intercept_r = "TRUE" if self.fit_intercept else "FALSE"
standardize_r = "TRUE" if self.standardize else "FALSE"

# Collect optional parameters
optional_params = []

# Generate key for caching
cache_key = (
hash(X.tobytes()),
hash(y.tobytes()),
self.lambda_value,
self.fit_intercept,
self.standardize,
)

# Convert Python objects to R
with localconverter(ro.default_converter + numpy2ri.converter):
# Pass the data to R environment
ro.r.assign("X_r", X)
ro.r.assign("y_r", y)
ro.r.assign("lambda_value", self.lambda_value)

# Extract optional parameters
lower_limits = kwargs.get("lower_limits", None)
upper_limits = kwargs.get("upper_limits", None)

if lower_limits is not None:
ro.r.assign("lower_limits_r", lower_limits)
optional_params.append("lower.limits = lower_limits_r")

if upper_limits is not None:
ro.r.assign("upper_limits_r", upper_limits)
optional_params.append("upper.limits = upper_limits_r")

# Add any additional parameters
for k, v in self.kwargs.items():
if v is not None:
k_r = k.replace("_", ".")
ro.r.assign(f"{k}_param", v)
optional_params.append(f"{k_r} = {k}_param")

# Join optional parameters
optional_str = ", ".join(optional_params)
if optional_str:
optional_str = ", " + optional_str

# Fit the model using direct R code
r_code = f"""
# Use global assignment operator to ensure objects persist
r_model <<- glmnet(
x = X_r,
y = y_r,
family = "gaussian",
alpha = 0, # 0 for ridge regression
lambda = lambda_value,
standardize = {standardize_r},
intercept = {fit_intercept_r},
type.measure = "mse"{optional_str}
)
coef_values <<- as.numeric(coef(r_model, s = lambda_value))
"""

# Execute R code for model fitting
ro.r(r_code)

# Get the model and coefficients from R
self.fitted_model = ro.r["r_model"]
coef_array = np.array(ro.r["coef_values"])

# Store X matrix for future predictions
self._X_matrix_cache[cache_key] = X

# After getting coefficients
self.logger.debug("\n=== Coefficient Debug ===")
self.logger.debug(f"Raw coefficients shape: {coef_array.shape}")
self.logger.debug(
f"Raw coefficients range: [{coef_array.min():.6f}, {coef_array.max():.6f}]"
)

# First coefficient is intercept, rest are feature coefficients
if self.fit_intercept:
self.intercept_ = float(coef_array[0])
self.coef_ = coef_array[1:]
else:
self.intercept_ = 0.0
self.coef_ = coef_array[1:]

self.logger.debug(f"Final intercept: {self.intercept_:.6f}")
self.logger.debug(
f"Final coefficients range: [{self.coef_.min():.6f}, {self.coef_.max():.6f}]"
)

return self

def predict(self, X):
X = np.asarray(X)
self.logger.debug("\n=== Prediction Input ===")
self.logger.debug(f"X shape: {X.shape}")
self.logger.debug(f"X range: [{X.min():.6f}, {X.max():.6f}]")
self.logger.debug(f"X mean: {X.mean():.6f}")
self.logger.debug(
f"X stats - min: {X.min():.6f}, max: {X.max():.6f}, mean: {X.mean():.6f}"
)

if X.shape[0] < 1000:
predictions = np.dot(X, self.coef_) + self.intercept_
self.logger.debug(f"Using direct computation")
else:
# For larger matrices, use R but check cache first
X_hash = hash(X.tobytes())
if X_hash in self._prediction_cache:
return self._prediction_cache[X_hash]

# Make predictions using R code directly
with localconverter(ro.default_converter + numpy2ri.converter):
# Pass the data to R environment
ro.r.assign("X_new", X)
ro.r.assign("lambda_value", self.lambda_value)

# Make predictions using R code
ro.r(
"""
predictions <<- as.numeric(predict(r_model, newx = X_new, s = lambda_value, type = "response"))
"""
)

# Get predictions from R
predictions = np.array(ro.r["predictions"])
self.logger.debug("\n=== Prediction Output ===")
self.logger.debug(
f"Predictions range: [{predictions.min():.6f}, {predictions.max():.6f}]"
)
self.logger.debug(f"Predictions mean: {predictions.mean():.6f}")
# Cache the predictions
self._prediction_cache[X_hash] = predictions

self.logger.debug(f"Using R computation")

self.logger.debug(
f"Predictions stats - min: {predictions.min():.6f}, max: {predictions.max():.6f}, mean: {predictions.mean():.6f}"
)
return predictions

return GlmnetRidgeWrapper()
Loading