| | """
|
| | User authentication models and validation for OpenManus
|
| | Mobile number + password based authentication system
|
| | """
|
| |
|
| | import hashlib
|
| | import re
|
| | import secrets
|
| | from datetime import datetime, timedelta
|
| | from typing import Optional
|
| | from dataclasses import dataclass
|
| | from pydantic import BaseModel, validator
|
| |
|
| |
|
| | class UserSignupRequest(BaseModel):
|
| | """User signup request model"""
|
| |
|
| | full_name: str
|
| | mobile_number: str
|
| | password: str
|
| | confirm_password: str
|
| |
|
| | @validator("full_name")
|
| | def validate_full_name(cls, v):
|
| | if not v or len(v.strip()) < 2:
|
| | raise ValueError("Full name must be at least 2 characters long")
|
| | if len(v.strip()) > 100:
|
| | raise ValueError("Full name must be less than 100 characters")
|
| | return v.strip()
|
| |
|
| | @validator("mobile_number")
|
| | def validate_mobile_number(cls, v):
|
| |
|
| | digits_only = re.sub(r"\D", "", v)
|
| |
|
| |
|
| | if len(digits_only) < 10 or len(digits_only) > 15:
|
| | raise ValueError("Mobile number must be between 10-15 digits")
|
| |
|
| |
|
| | if not re.match(r"^(\+?[1-9]\d{9,14})$", digits_only):
|
| | raise ValueError("Invalid mobile number format")
|
| |
|
| | return digits_only
|
| |
|
| | @validator("password")
|
| | def validate_password(cls, v):
|
| | if len(v) < 8:
|
| | raise ValueError("Password must be at least 8 characters long")
|
| | if len(v) > 128:
|
| | raise ValueError("Password must be less than 128 characters")
|
| |
|
| |
|
| | if not re.search(r"[A-Z]", v):
|
| | raise ValueError("Password must contain at least one uppercase letter")
|
| | if not re.search(r"[a-z]", v):
|
| | raise ValueError("Password must contain at least one lowercase letter")
|
| | if not re.search(r"\d", v):
|
| | raise ValueError("Password must contain at least one digit")
|
| |
|
| | return v
|
| |
|
| | @validator("confirm_password")
|
| | def validate_confirm_password(cls, v, values):
|
| | if "password" in values and v != values["password"]:
|
| | raise ValueError("Passwords do not match")
|
| | return v
|
| |
|
| |
|
| | class UserLoginRequest(BaseModel):
|
| | """User login request model"""
|
| |
|
| | mobile_number: str
|
| | password: str
|
| |
|
| | @validator("mobile_number")
|
| | def validate_mobile_number(cls, v):
|
| |
|
| | digits_only = re.sub(r"\D", "", v)
|
| |
|
| | if len(digits_only) < 10 or len(digits_only) > 15:
|
| | raise ValueError("Invalid mobile number")
|
| |
|
| | return digits_only
|
| |
|
| |
|
| | @dataclass
|
| | class User:
|
| | """User model"""
|
| |
|
| | id: str
|
| | mobile_number: str
|
| | full_name: str
|
| | password_hash: str
|
| | avatar_url: Optional[str] = None
|
| | preferences: Optional[str] = None
|
| | is_active: bool = True
|
| | created_at: Optional[datetime] = None
|
| | updated_at: Optional[datetime] = None
|
| |
|
| |
|
| | @dataclass
|
| | class UserSession:
|
| | """User session model"""
|
| |
|
| | session_id: str
|
| | user_id: str
|
| | mobile_number: str
|
| | full_name: str
|
| | created_at: datetime
|
| | expires_at: datetime
|
| |
|
| | @property
|
| | def is_valid(self) -> bool:
|
| | """Check if session is still valid"""
|
| | return datetime.utcnow() < self.expires_at
|
| |
|
| |
|
| | class UserAuth:
|
| | """User authentication utilities"""
|
| |
|
| | @staticmethod
|
| | def hash_password(password: str) -> str:
|
| | """Hash password using SHA-256 with salt"""
|
| | salt = secrets.token_hex(32)
|
| | password_hash = hashlib.sha256((password + salt).encode()).hexdigest()
|
| | return f"{salt}:{password_hash}"
|
| |
|
| | @staticmethod
|
| | def verify_password(password: str, password_hash: str) -> bool:
|
| | """Verify password against stored hash"""
|
| | try:
|
| | salt, stored_hash = password_hash.split(":")
|
| | password_hash_check = hashlib.sha256((password + salt).encode()).hexdigest()
|
| | return password_hash_check == stored_hash
|
| | except ValueError:
|
| | return False
|
| |
|
| | @staticmethod
|
| | def generate_session_id() -> str:
|
| | """Generate secure session ID"""
|
| | return secrets.token_urlsafe(32)
|
| |
|
| | @staticmethod
|
| | def generate_user_id() -> str:
|
| | """Generate unique user ID"""
|
| | return f"user_{secrets.token_hex(16)}"
|
| |
|
| | @staticmethod
|
| | def format_mobile_number(mobile_number: str) -> str:
|
| | """Format mobile number for consistent storage"""
|
| |
|
| | digits_only = re.sub(r"\D", "", mobile_number)
|
| |
|
| |
|
| | if not digits_only.startswith("+"):
|
| |
|
| | if len(digits_only) == 10:
|
| | digits_only = f"1{digits_only}"
|
| |
|
| | return f"+{digits_only}"
|
| |
|
| | @staticmethod
|
| | def create_session(user: User, duration_hours: int = 24) -> UserSession:
|
| | """Create a new user session"""
|
| | session_id = UserAuth.generate_session_id()
|
| | created_at = datetime.utcnow()
|
| | expires_at = created_at + timedelta(hours=duration_hours)
|
| |
|
| | return UserSession(
|
| | session_id=session_id,
|
| | user_id=user.id,
|
| | mobile_number=user.mobile_number,
|
| | full_name=user.full_name,
|
| | created_at=created_at,
|
| | expires_at=expires_at,
|
| | )
|
| |
|
| |
|
| |
|
| | class AuthResponse(BaseModel):
|
| | """Authentication response model"""
|
| |
|
| | success: bool
|
| | message: str
|
| | session_id: Optional[str] = None
|
| | user_id: Optional[str] = None
|
| | full_name: Optional[str] = None
|
| |
|
| |
|
| | class UserProfile(BaseModel):
|
| | """User profile response model"""
|
| |
|
| | user_id: str
|
| | full_name: str
|
| | mobile_number: str
|
| | avatar_url: Optional[str] = None
|
| | created_at: Optional[str] = None
|
| |
|
| | @staticmethod
|
| | def mask_mobile_number(mobile_number: str) -> str:
|
| | """Mask mobile number for security (show only last 4 digits)"""
|
| | if len(mobile_number) <= 4:
|
| | return "*" * len(mobile_number)
|
| | return "*" * (len(mobile_number) - 4) + mobile_number[-4:]
|
| |
|