PATH:
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clcagefslib
#!/opt/cloudlinux/venv/bin/python3 -sbb # -*- coding: utf-8 -*- # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # import functools import logging import os import shutil import subprocess import tempfile from collections import defaultdict from pathlib import Path from clcommon.clcagefs import setup_mount_dir_cagefs, CAGEFSCTL_TOOL from clcommon.cpapi import cpusers, is_panel_feature_supported from clcommon.cpapi import docroot as get_domain_docroot from clcommon.const import Feature from clcommon.cpapi.cpapiexceptions import NoDomain from lve_utils.pylve_wrapper import PyLve from .fs import user_exists from .exceptions import UserNotFoundError from .webisolation import admin_config, config, jail_utils from .webisolation.config import DOCROOTS_ISOLATED_BASE from .webisolation.jail_config_builder import write_jail_mounts_config from .webisolation.php import reload_processes_with_docroots from .webisolation.service import start_monitoring_service, stop_monitoring_service from .webisolation.triggers import trigger_xray_ini_regeneration, trigger_ssa_ini_regeneration def is_website_isolation_allowed_server_wide(): return os.path.isfile(admin_config.WEBSITE_ISOLATION_MARKER) def is_website_isolation_feature_available(): return os.path.isfile(admin_config.WEBSITE_ISOLATION_AVAILABLE_MARKER) def get_isolation_user_mode() -> str | None: """Return the current user mode for website isolation. Returns: ``"allow_all"`` – all users allowed, denied dir lists exceptions. ``"deny_all"`` – no users allowed, allowed dir lists exceptions. ``None`` – not initialised yet. """ has_denied = os.path.isdir(admin_config.ISOLATION_DENIED_DIR) has_allowed = os.path.isdir(admin_config.ISOLATION_ALLOWED_DIR) if has_denied and has_allowed: # Error state – both dirs present. Treat as allow_all and clean up. logging.warning( "Both site-isolation.users.allowed and site-isolation.users.denied " "directories exist. Removing allowed directory, treating as allow_all mode." ) shutil.rmtree(admin_config.ISOLATION_ALLOWED_DIR, ignore_errors=True) return "allow_all" if has_denied: return "allow_all" if has_allowed: return "deny_all" return None def is_website_isolation_allowed_for_user(user: str) -> bool: """Check whether *user* is allowed to use website isolation. Combines the global marker with the two-mode user model: * **allow_all** – allowed unless the user is in the denied directory. * **deny_all** – denied unless the user is in the allowed directory. """ if not os.path.isfile(admin_config.WEBSITE_ISOLATION_MARKER): return False mode = get_isolation_user_mode() if mode == "allow_all": return not admin_config.user_in_dir(admin_config.ISOLATION_DENIED_DIR, user) if mode == "deny_all": return admin_config.user_in_dir(admin_config.ISOLATION_ALLOWED_DIR, user) return False # not initialised def _ensure_isolation_mount_and_marker(): """Set up mount directories and the global marker if not already done.""" if not os.path.isfile(admin_config.WEBSITE_ISOLATION_MARKER): setup_mount_dir_cagefs( str(DOCROOTS_ISOLATED_BASE), prefix="*", remount_cagefs=True, remount_in_background=False, ) marker_path = Path(admin_config.WEBSITE_ISOLATION_MARKER) marker_path.parent.mkdir(parents=True, exist_ok=True) marker_path.touch() # Remount kills redis processes, restart clwpos_monitoring service if it's running subprocess.run( ["/usr/bin/systemctl", "try-restart", "clwpos_monitoring.service"], capture_output=True, text=True ) PROXY_COMMANDS_PATH = "/etc/cagefs/proxy.commands" CAGEFSCTL_USER_PROXY_ENTRY = "CAGEFSCTL_USER:noproceed:nolve=root:/usr/sbin/cagefsctl-user" CAGEFSCTL_USER_BINARIES = [ "/usr/sbin/cagefsctl-user", ] LVD_PROXY_ENTRIES = { "LVD_REGISTRY_HELPER": "/usr/share/lve-utils/lvd-registry-helper", "LVD_LIMITS_HELPER": "/usr/share/lve-utils/lvd-limits-helper", } def _add_proxy_entries(content, entries): """Append missing proxy entries to *content*, return (new_content, added). Each entry in *entries* is a ``{KEY: path}`` dict. An entry is added only when its KEY is not yet present AND the binary exists on disk. Returns the (possibly updated) content string and a bool indicating whether any entries were added. """ added = False for key, binary in entries.items(): if key in content: continue if not os.path.exists(binary): continue if content and not content.endswith("\n"): content += "\n" content += f"{key}={binary}\n" added = True return content, added def _remove_proxy_entries(content, entries): """Remove proxy entries whose binaries no longer exist on disk. Returns the (possibly updated) content string and a bool indicating whether any entries were removed. """ removed = False for key, binary in entries.items(): if key not in content: continue if os.path.exists(binary): continue lines = content.splitlines(keepends=True) lines = [l for l in lines if not l.startswith(f"{key}=")] content = "".join(lines) removed = True return content, removed def ensure_proxyexec_command(): """Register the ``cagefsctl-user`` proxyexec alias if not already present. Appends the ``CAGEFSCTL_USER`` entry to ``/etc/cagefs/proxy.commands`` and runs ``cagefsctl --update-list`` to pull the required binaries into the CageFS skeleton. This is a no-op when the entry already exists. Also registers/unregisters the LVD helper binaries (``lvd-registry-helper``, ``lvd-limits-helper``) depending on whether they are present on disk. """ try: with open(PROXY_COMMANDS_PATH, "r", encoding="utf-8") as f: content = f.read() except FileNotFoundError: content = "" changed = False if "CAGEFSCTL_USER" not in content: logging.info("Registering cagefsctl-user in %s", PROXY_COMMANDS_PATH) if content and not content.endswith("\n"): content += "\n" content += CAGEFSCTL_USER_PROXY_ENTRY + "\n" changed = True content, added = _add_proxy_entries(content, LVD_PROXY_ENTRIES) if added: logging.info("Registering LVD helpers in %s", PROXY_COMMANDS_PATH) changed = True content, removed = _remove_proxy_entries(content, LVD_PROXY_ENTRIES) if removed: logging.info("Removing stale LVD helpers from %s", PROXY_COMMANDS_PATH) changed = True if not changed: return proxy_dir = os.path.dirname(PROXY_COMMANDS_PATH) os.makedirs(proxy_dir, exist_ok=True) fd, tmp_path = tempfile.mkstemp(dir=proxy_dir, prefix=".proxy.commands.") try: with os.fdopen(fd, "w", encoding="utf-8") as f: f.write(content) os.replace(tmp_path, PROXY_COMMANDS_PATH) except BaseException: os.unlink(tmp_path) raise update_list = ("\n".join(CAGEFSCTL_USER_BINARIES) + "\n").encode() subprocess.run( [CAGEFSCTL_TOOL, "--wait-lock", "--update-list"], input=update_list, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False, ) subprocess.run( [CAGEFSCTL_TOOL, "--update-wrappers"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False, ) def toggle_isolation_user_mode() -> str: """Flip the isolation user mode without modifying any per-user state. Unlike :func:`allow_website_isolation_server_wide` and :func:`deny_website_isolation_server_wide`, this function only flips the mode indicator directories. It does **not** clean up existing user isolation or alter the per-user exception lists. * ``allow_all`` → ``deny_all`` * ``deny_all`` → ``allow_all`` * not initialised → ``allow_all`` Returns: The new mode after toggling (``"allow_all"`` or ``"deny_all"``). """ _ensure_isolation_mount_and_marker() current = get_isolation_user_mode() if current == "allow_all": new_mode = "deny_all" os.makedirs(admin_config.ISOLATION_ALLOWED_DIR, mode=admin_config.DIR_MODE, exist_ok=True) shutil.rmtree(admin_config.ISOLATION_DENIED_DIR, ignore_errors=True) else: # deny_all or not initialised → allow_all new_mode = "allow_all" os.makedirs(admin_config.ISOLATION_DENIED_DIR, mode=admin_config.DIR_MODE, exist_ok=True) shutil.rmtree(admin_config.ISOLATION_ALLOWED_DIR, ignore_errors=True) return new_mode @functools.lru_cache(maxsize=1) def _domain_limits_supported(): """Check if the running kernel supports per-domain LVE limits.""" try: py = PyLve() except (AttributeError, OSError): return False if not py.initialize(): return False return py.domains_supported() def _allow_domain_limits(username): """Create LVP infrastructure for user via lvectl (idempotent).""" if not is_panel_feature_supported(Feature.LVE): return if not _domain_limits_supported(): return subprocess.run(['/usr/sbin/lvectl', 'allow-domain-limits', username], check=True, capture_output=True) def _deny_domain_limits(username): """Tear down LVP infrastructure for user via lvectl.""" if not is_panel_feature_supported(Feature.LVE): return if not _domain_limits_supported(): return subprocess.run(['/usr/sbin/lvectl', 'deny-domain-limits', username], check=True, capture_output=True) def _register_domain_lve(user, domain): """Register domain LVE under user's LVP via lvectl.""" if not is_panel_feature_supported(Feature.LVE): return if not _domain_limits_supported(): return subprocess.run( ['/usr/sbin/lvectl', 'enable-domain-limits', domain], check=True, capture_output=True, ) def _unregister_domain_lve(user, domain): """Unregister domain LVE via lvectl.""" if not is_panel_feature_supported(Feature.LVE): return if not _domain_limits_supported(): return subprocess.run( ['/usr/sbin/lvectl', 'disable-domain-limits', domain], check=True, capture_output=True, ) def allow_website_isolation_server_wide(): """Switch to *allow_all* mode – all users are allowed by default.""" _ensure_isolation_mount_and_marker() ensure_proxyexec_command() # Create empty denied-users directory → allow_all mode indicator os.makedirs(admin_config.ISOLATION_DENIED_DIR, mode=admin_config.DIR_MODE, exist_ok=True) # Remove allowed-users directory (belongs to deny_all mode) shutil.rmtree(admin_config.ISOLATION_ALLOWED_DIR, ignore_errors=True) for username in cpusers(): if not user_exists(username): continue try: _allow_domain_limits(username) except (subprocess.CalledProcessError, OSError): logging.exception( "Failed to enable domain limits for user %s, skipping.", username, ) def deny_website_isolation_server_wide(): """Switch to *deny_all* mode – no users are allowed by default. Disables domain isolation for every user and switches the mode. """ _cleanup_all_users_isolation() for username in cpusers(): if not user_exists(username): continue try: _deny_domain_limits(username) except (subprocess.CalledProcessError, OSError): logging.exception( "Failed to disable domain limits for user %s, skipping.", username, ) _ensure_isolation_mount_and_marker() # Create empty allowed-users directory → deny_all mode indicator os.makedirs(admin_config.ISOLATION_ALLOWED_DIR, mode=admin_config.DIR_MODE, exist_ok=True) # Remove denied-users directory (belongs to allow_all mode) shutil.rmtree(admin_config.ISOLATION_DENIED_DIR, ignore_errors=True) def allow_website_isolation_for_user(username: str): """Allow website isolation for *username* (mode-aware). * **allow_all** – removes *username* from the denied directory. * **deny_all** – adds *username* to the allowed directory. * **not initialised** – sets up infrastructure in *deny_all* mode with *username* as the first allowed user. """ _ensure_isolation_mount_and_marker() ensure_proxyexec_command() mode = get_isolation_user_mode() if mode == "allow_all": admin_config.remove_user_from_dir(admin_config.ISOLATION_DENIED_DIR, username) elif mode == "deny_all": admin_config.add_user_to_dir(admin_config.ISOLATION_ALLOWED_DIR, username) else: # Not initialised → start in deny_all mode with this user allowed os.makedirs(admin_config.ISOLATION_ALLOWED_DIR, mode=admin_config.DIR_MODE, exist_ok=True) admin_config.add_user_to_dir(admin_config.ISOLATION_ALLOWED_DIR, username) _allow_domain_limits(username) def deny_website_isolation_for_user(username: str): """Deny website isolation for *username* (mode-aware). * **allow_all** – adds *username* to the denied directory. * **deny_all** – removes *username* from the allowed directory. Also disables all domain isolation for the user. """ mode = get_isolation_user_mode() if mode == "allow_all": admin_config.add_user_to_dir(admin_config.ISOLATION_DENIED_DIR, username) elif mode == "deny_all": admin_config.remove_user_from_dir(admin_config.ISOLATION_ALLOWED_DIR, username) # Clean up existing domain isolation for the user _cleanup_user_isolation(username) _deny_domain_limits(username) def _cleanup_user_isolation(username: str): """Remove all domain isolation state for a single user.""" if not user_exists(username): return user_cfg = config.load_user_config(username) if not user_cfg.enabled_websites: return domain_docroot_map = { d: _get_docroot_or_none(d) for d in user_cfg.enabled_websites } config.save_user_config(username, config=None) write_jail_mounts_config(username, user_config=None) reload_processes_with_docroots( username, filter_by_docroots=list(domain_docroot_map.values()) ) for d, docroot in domain_docroot_map.items(): if docroot is None: logging.error( "Unable to detect document root for domain %s, " "configuration cleanup failed. Contact CloudLinux support " "if the error repeats.", d, ) continue jail_utils.remove_website_token_directory(username, docroot) def _cleanup_all_users_isolation(): """Remove domain isolation state for every user that has it.""" for username in list(users_with_enabled_domain_isolation()): try: _cleanup_user_isolation(username) except Exception: logging.exception( "Unable to disable website isolation for user %s, skipping.", username, ) users_left = users_with_enabled_domain_isolation() if not users_left: stop_monitoring_service() def _get_docroot_or_none(domain: str): try: return get_domain_docroot(domain)[0] except (NoDomain, IndexError): return None def is_isolation_enabled(user): if not is_website_isolation_allowed_server_wide(): return False try: domains_config_path = jail_utils.get_jail_config_path(user) except UserNotFoundError: return False return os.path.exists(domains_config_path) def users_with_enabled_domain_isolation() -> dict: users = [u for u in cpusers() if user_exists(u) and is_isolation_enabled(u)] user_domain_pairs = {} for user in users: domains_with_isolation = get_websites_with_enabled_isolation(user) if domains_with_isolation: user_domain_pairs[user] = domains_with_isolation return user_domain_pairs def get_websites_with_enabled_isolation(user: str): if not user_exists(user): logging.warning( "User %s not found, cannot get websites with enabled isolation", user) return [] return config.load_user_config(user).enabled_websites def get_docroots_of_isolated_websites() -> dict: """ Returns pairs user: set(docroots) for all users with website isolation enabled Used by monitoring service to watch docroots changes to load actual list of docroot paths instead of stale storage """ users_with_isolation = users_with_enabled_domain_isolation() pairs = defaultdict(set) for user, domains in users_with_isolation.items(): for domain in domains: try: dr = get_domain_docroot(domain)[0] except (NoDomain, IndexError): continue pairs[user].add(dr) return pairs def enable_website_isolation(user, domain): if not user_exists(user): logging.warning( "User %s not found, cannot enable website isolation", user) return user_config = config.load_user_config(user) if domain not in user_config.enabled_websites: user_config.enabled_websites.append(domain) # if it crashes just let the command fail with NoDomain # exception, it should be a very rare case because # we validate input domain name in cagefsctl.py document_root = get_domain_docroot(domain)[0] # Create website token directory and overlay storage jail_utils.create_website_token_directory(user, document_root) jail_utils.create_overlay_storage_directory(user, document_root) config.save_user_config(user, user_config) # regenerate alt-php ini configuration for selector for the specific domain subprocess.run(["cagefsctl", "--rebuild-alt-php-ini", "--domain", domain], check=True) write_jail_mounts_config(user, user_config) reload_processes_with_docroots(user, filter_by_docroots=[_get_docroot_or_none(domain)]) start_monitoring_service() # Trigger xray/ssa ini regeneration for per-domain PHP selector trigger_xray_ini_regeneration(user, domain) trigger_ssa_ini_regeneration(user) _register_domain_lve(user, domain) def regenerate_isolation_configuration(user): if not user_exists(user): logging.warning( "User %s not found, cannot regenerate website isolation configuration", user) return user_config = config.load_user_config(user) write_jail_mounts_config(user, user_config) document_roots = [] for domain in user_config.enabled_websites: document_root = _get_docroot_or_none(domain) if document_root is None: logging.warning( "Unable to find document root for domain %s, " "please contact CloudLinux support if the issue persists.", domain, ) continue document_roots.append(document_root) try: # recreate tokens and storage e.g. when username changes jail_utils.create_website_token_directory(user, document_root) jail_utils.create_overlay_storage_directory(user, document_root) except Exception as e: logging.error("Unable to recreate token/storage for domain=%s, Error=%s", domain, e) continue reload_processes_with_docroots(user, filter_by_docroots=document_roots) def disable_website_isolation(user: str, domain: str | None = None): if not user_exists(user): logging.warning( "User %s not found, cannot disable website isolation", user) return user_config = config.load_user_config(user) reload_docroots = None domains_to_unregister = [] if domain is None: domains_to_unregister = list(user_config.enabled_websites) reload_docroots = [ _get_docroot_or_none(website) for website in user_config.enabled_websites ] user_config.enabled_websites = [] elif domain in user_config.enabled_websites: domains_to_unregister = [domain] reload_docroots = [_get_docroot_or_none(domain)] user_config.enabled_websites.remove(domain) config.save_user_config(user, user_config) write_jail_mounts_config(user, user_config) if reload_docroots: reload_processes_with_docroots(user, filter_by_docroots=reload_docroots) for document_root in reload_docroots: if document_root is None: continue jail_utils.remove_website_token_directory(user, document_root) for d in domains_to_unregister: try: _unregister_domain_lve(user, d) except (subprocess.CalledProcessError, OSError): logging.exception( "Failed to unregister domain LVE for %s, skipping.", d) # get actual docroots for all users with website isolation enabled users_with_isolation = users_with_enabled_domain_isolation() if not users_with_isolation: stop_monitoring_service()
[+]
..
[+]
__pycache__
[-] io.py
[open]
[+]
selector
[+]
webisolation
[-] domain.py
[open]
[-] const.py
[open]
[-] fs.py
[open]
[-] __init__.py
[open]
[-] cli.py
[open]
[-] exceptions.py
[open]