HEX
Server: LiteSpeed
System: Linux server302.web-hosting.com 4.18.0-513.18.1.lve.el8.x86_64 #1 SMP Thu Feb 22 12:55:50 UTC 2024 x86_64
User: synqowzz (1256)
PHP: 8.1.34
Disabled: NONE
Upload Files
File: //opt/cloudlinux/venv/lib64/python3.11/site-packages/websiteisolation/id_registry.py
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2026 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

"""
Thin Python wrapper around the _lvdmap C extension (backed by liblve).

Public interface is unchanged — callers (lveapi.py, lvectllib.py) continue
to import and use these functions exactly as before.
"""
import pwd
import logging
import os

from .exceptions import LvdError

log = logging.getLogger(__name__)

LVD_IDS_DIR = '/etc/container/lvd_ids'

try:
    import _lvdmap
except ImportError:
    _lvdmap = None


def _require_lvdmap():
    if _lvdmap is None:
        raise LvdError("_lvdmap C extension is not installed")


def registry_path_by_username(username):
    """Return the registry file path for *username*, or None if the
    user does not exist or has no registry file."""
    try:
        uid = pwd.getpwnam(username).pw_uid
    except KeyError:
        return None
    path = os.path.join(LVD_IDS_DIR, str(uid))
    if not os.path.exists(path):
        return None
    return path


def assign_domain_id(uid, docroot):
    """Assign a domain ID for a docroot. Returns existing ID if already
    assigned, otherwise allocates the next sequential ID."""
    if os.geteuid() != 0:
        raise LvdError("domain ID assignment requires root")
    _require_lvdmap()
    domain_id = _lvdmap.assign(uid, docroot)
    log.info("assigned domain_id %d to docroot '%s' for uid %d",
             domain_id, docroot, uid)
    return domain_id


def get_domain_id(uid, docroot):
    """Look up domain ID by docroot. Returns None if not assigned."""
    if _lvdmap is None:
        return None
    r = _lvdmap.lookup(uid, docroot)
    return r if r != 0 else None


def get_all_entries(uid):
    """Return dict of docroot -> domain_id for a user."""
    if _lvdmap is None:
        return {}
    return _lvdmap.get_all_entries(uid)


def get_all_domain_ids():
    """Return the set of every assigned domain LVE ID across all users."""
    if _lvdmap is None:
        return set()
    return _lvdmap.get_all_domain_ids()


def reassign_docroot(uid, old_docroot, new_docroot):
    """Move a domain mapping from *old_docroot* to *new_docroot*.

    Uses remove + assign via _lvdmap; the domain_id will be newly
    allocated (the C API does not support specifying a target ID).
    """
    if os.geteuid() != 0:
        raise LvdError("domain ID reassignment requires root")
    _require_lvdmap()

    old_id = _lvdmap.remove(uid, old_docroot)
    if old_id is None:
        log.warning("reassign_docroot: old_docroot '%s' not in registry "
                    "for uid %d — nothing to reassign", old_docroot, uid)
        return None

    new_id = _lvdmap.assign(uid, new_docroot)
    log.info("reassigned docroot from '%s' (id=%d) to '%s' (id=%d) "
             "for uid %d", old_docroot, old_id, new_docroot, new_id, uid)
    return new_id


def remove_domain_id(uid, docroot):
    """Remove a docroot from the registry. Returns the old ID or None."""
    if os.geteuid() != 0:
        raise LvdError("domain ID removal requires root")
    _require_lvdmap()
    old_id = _lvdmap.remove(uid, docroot)
    if old_id is not None:
        log.info("removed domain_id %d for docroot '%s' uid %d",
                 old_id, docroot, uid)
    return old_id


def remove_all_entries(uid):
    """Remove all domain IDs for a user. Returns list of (docroot, id)."""
    if os.geteuid() != 0:
        raise LvdError("domain ID removal requires root")
    _require_lvdmap()
    removed = _lvdmap.remove_all(uid)
    if removed:
        log.info("removed all domain IDs for uid %d (%d entries)",
                 uid, len(removed))
    return removed


def create_empty_registry(uid):
    """Mark *uid* as domain-isolated by touching its per-user file.

    The file acts as a marker so that ``find_all_lve_ids_with_config()``
    (which lists ``LVD_IDS_DIR``) can detect the user before any domains
    are assigned.  The C library's ``lvd_map_assign()`` will later
    atomically replace this empty file with a proper hash-table via
    rename(2), so the marker never interferes with real data.
    """
    if os.geteuid() != 0:
        raise LvdError("domain ID assignment requires root")
    os.makedirs(LVD_IDS_DIR, mode=0o711, exist_ok=True)
    try:
        st = os.stat(LVD_IDS_DIR)
        if st.st_mode & 0o777 != 0o711:
            os.chmod(LVD_IDS_DIR, 0o711)
    except OSError:
        pass
    marker = os.path.join(LVD_IDS_DIR, str(uid))
    if not os.path.exists(marker):
        fd = os.open(marker, os.O_CREAT | os.O_WRONLY, 0o600)
        os.close(fd)