File: //opt/cloudlinux/venv/lib64/python3.11/site-packages/websiteisolation/id_registry.py
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2026 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
"""
Thin Python wrapper around the _lvdmap C extension (backed by liblve).
Public interface is unchanged — callers (lveapi.py, lvectllib.py) continue
to import and use these functions exactly as before.
"""
import pwd
import logging
import os
from .exceptions import LvdError
log = logging.getLogger(__name__)
LVD_IDS_DIR = '/etc/container/lvd_ids'
try:
import _lvdmap
except ImportError:
_lvdmap = None
def _require_lvdmap():
if _lvdmap is None:
raise LvdError("_lvdmap C extension is not installed")
def registry_path_by_username(username):
"""Return the registry file path for *username*, or None if the
user does not exist or has no registry file."""
try:
uid = pwd.getpwnam(username).pw_uid
except KeyError:
return None
path = os.path.join(LVD_IDS_DIR, str(uid))
if not os.path.exists(path):
return None
return path
def assign_domain_id(uid, docroot):
"""Assign a domain ID for a docroot. Returns existing ID if already
assigned, otherwise allocates the next sequential ID."""
if os.geteuid() != 0:
raise LvdError("domain ID assignment requires root")
_require_lvdmap()
domain_id = _lvdmap.assign(uid, docroot)
log.info("assigned domain_id %d to docroot '%s' for uid %d",
domain_id, docroot, uid)
return domain_id
def get_domain_id(uid, docroot):
"""Look up domain ID by docroot. Returns None if not assigned."""
if _lvdmap is None:
return None
r = _lvdmap.lookup(uid, docroot)
return r if r != 0 else None
def get_all_entries(uid):
"""Return dict of docroot -> domain_id for a user."""
if _lvdmap is None:
return {}
return _lvdmap.get_all_entries(uid)
def get_all_domain_ids():
"""Return the set of every assigned domain LVE ID across all users."""
if _lvdmap is None:
return set()
return _lvdmap.get_all_domain_ids()
def reassign_docroot(uid, old_docroot, new_docroot):
"""Move a domain mapping from *old_docroot* to *new_docroot*.
Uses remove + assign via _lvdmap; the domain_id will be newly
allocated (the C API does not support specifying a target ID).
"""
if os.geteuid() != 0:
raise LvdError("domain ID reassignment requires root")
_require_lvdmap()
old_id = _lvdmap.remove(uid, old_docroot)
if old_id is None:
log.warning("reassign_docroot: old_docroot '%s' not in registry "
"for uid %d — nothing to reassign", old_docroot, uid)
return None
new_id = _lvdmap.assign(uid, new_docroot)
log.info("reassigned docroot from '%s' (id=%d) to '%s' (id=%d) "
"for uid %d", old_docroot, old_id, new_docroot, new_id, uid)
return new_id
def remove_domain_id(uid, docroot):
"""Remove a docroot from the registry. Returns the old ID or None."""
if os.geteuid() != 0:
raise LvdError("domain ID removal requires root")
_require_lvdmap()
old_id = _lvdmap.remove(uid, docroot)
if old_id is not None:
log.info("removed domain_id %d for docroot '%s' uid %d",
old_id, docroot, uid)
return old_id
def remove_all_entries(uid):
"""Remove all domain IDs for a user. Returns list of (docroot, id)."""
if os.geteuid() != 0:
raise LvdError("domain ID removal requires root")
_require_lvdmap()
removed = _lvdmap.remove_all(uid)
if removed:
log.info("removed all domain IDs for uid %d (%d entries)",
uid, len(removed))
return removed
def create_empty_registry(uid):
"""Mark *uid* as domain-isolated by touching its per-user file.
The file acts as a marker so that ``find_all_lve_ids_with_config()``
(which lists ``LVD_IDS_DIR``) can detect the user before any domains
are assigned. The C library's ``lvd_map_assign()`` will later
atomically replace this empty file with a proper hash-table via
rename(2), so the marker never interferes with real data.
"""
if os.geteuid() != 0:
raise LvdError("domain ID assignment requires root")
os.makedirs(LVD_IDS_DIR, mode=0o711, exist_ok=True)
try:
st = os.stat(LVD_IDS_DIR)
if st.st_mode & 0o777 != 0o711:
os.chmod(LVD_IDS_DIR, 0o711)
except OSError:
pass
marker = os.path.join(LVD_IDS_DIR, str(uid))
if not os.path.exists(marker):
fd = os.open(marker, os.O_CREAT | os.O_WRONLY, 0o600)
os.close(fd)