Source code for craft_providers.multipass.multipass

#
# Copyright 2022 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
#

"""API provider for Multipass.

This implementation interfaces with multipass using the `multipass` command-line
utility.
"""

import io
import json
import locale
import logging
import pathlib
import shlex
import subprocess
import time
from typing import Any, Callable, Dict, List, Optional, Tuple

import pkg_resources

from craft_providers import errors

from .errors import MultipassError

logger = logging.getLogger(__name__)


[docs]class Multipass: """Wrapper for multipass command. :param multipass_path: Path to multipass command to use. :cvar minimum_required_version: Minimum required version for compatibility. """ minimum_required_version = "1.7" def __init__( self, *, multipass_path: pathlib.Path = pathlib.Path("multipass") ) -> None: self.multipass_path = multipass_path def _run(self, command: List[str], **kwargs) -> subprocess.CompletedProcess: """Execute a multipass command. It always checks the result (as no errors should pass silently) and captures the output (so `multipass` does not pollute the terminal). """ command = [str(self.multipass_path), *command] logger.debug("Executing on host: %s", shlex.join(command)) return subprocess.run(command, check=True, capture_output=True, **kwargs)
[docs] def delete(self, *, instance_name: str, purge=True) -> None: """Passthrough for running multipass delete. :param instance_name: The name of the instance_name to delete. :param purge: Flag to purge the instance_name's image after deleting. :raises MultipassError: on error. """ command = ["delete", instance_name] if purge: command.append("--purge") try: self._run(command) except subprocess.CalledProcessError as error: raise MultipassError( brief=f"Failed to delete VM {instance_name!r}.", details=errors.details_from_called_process_error(error), ) from error
[docs] def exec( self, *, command: List[str], instance_name: str, runner: Callable = subprocess.run, **kwargs, ): """Execute command in instance_name with specified runner. :param command: Command to execute in the instance. :param instance_name: Name of instance to execute in. :param runner: Execution function to invoke, e.g. subprocess.run or Popen. First argument is finalized command with the attached kwargs. :param kwargs: Additional kwargs for runner. :returns: Runner's instance. """ final_cmd = [str(self.multipass_path), "exec", instance_name, "--", *command] quoted_final_cmd = shlex.join(final_cmd) logger.debug("Executing on host: %s", quoted_final_cmd) return runner(final_cmd, **kwargs) # pylint: disable=subprocess-run-check
[docs] def info(self, *, instance_name: str) -> Dict[str, Any]: """Get information/state for instance. :returns: Parsed json data from info command. :raises MultipassError: On error. """ command = ["info", instance_name, "--format", "json"] try: proc = self._run(command, text=True) except subprocess.CalledProcessError as error: raise MultipassError( brief=f"Failed to query info for VM {instance_name!r}.", details=errors.details_from_called_process_error(error), ) from error return json.loads(proc.stdout)
[docs] def is_supported_version(self) -> bool: """Check if Multipass version is supported. A helper to check if Multipass meets minimum supported version for craft-providers. :returns: True if installed version is supported. """ version, _ = self.version() return pkg_resources.parse_version(version) >= pkg_resources.parse_version( self.minimum_required_version )
[docs] def launch( self, *, instance_name: str, image: str, cpus: Optional[str] = None, mem: Optional[str] = None, disk: Optional[str] = None, ) -> None: """Launch multipass VM. :param instance_name: The name the launched instance will have. :param image: Name of image to create the instance with. :param cpus: Amount of virtual CPUs to assign to the launched instance. :param mem: Amount of RAM to assign to the launched instance. :param disk: Amount of disk space the launched instance will have. :raises MultipassError: on error. """ command = ["launch", image, "--name", instance_name] if cpus is not None: command.extend(["--cpus", cpus]) if mem is not None: command.extend(["--mem", mem]) if disk is not None: command.extend(["--disk", disk]) try: self._run(command) except subprocess.CalledProcessError as error: raise MultipassError( brief=f"Failed to launch VM {instance_name!r}.", details=errors.details_from_called_process_error(error), ) from error
[docs] def list(self) -> List[str]: """List names of VMs. :returns: Data from stdout if instance exists, else None. :raises MultipassError: On error. """ command = ["list", "--format", "json"] try: proc = self._run(command, text=True) except subprocess.CalledProcessError as error: raise MultipassError( brief="Failed to query list of VMs.", details=errors.details_from_called_process_error(error), ) from error data_list = json.loads(proc.stdout).get("list", []) return [instance["name"] for instance in data_list]
[docs] def mount( self, *, source: pathlib.Path, target: str, uid_map: Optional[Dict[str, str]] = None, gid_map: Optional[Dict[str, str]] = None, ) -> None: """Mount host source path to target. :param source: Path of local directory to mount. :param target: Target mount points, in <name>[:<path>] format, where <name> is an instance name, and optional <path> is the mount point. If omitted, the mount point will be the same as the source's absolute path. :param uid_map: A mapping of user IDs for use in the mount of the form <host-id> -> <instance-id>. File and folder ownership will be mapped from <host-id> to <instance-id> inside the instance. :param gid_map: A mapping of group IDs for use in the mount of the form <host-id> -> <instance-id>. File and folder ownership will be mapped from <host-id> to <instance-id> inside the instance. """ command = ["mount", str(source), target] if uid_map is not None: for host_id, instance_id in uid_map.items(): command.extend(["--uid-map", f"{host_id}:{instance_id}"]) if gid_map is not None: for host_id, instance_id in gid_map.items(): command.extend(["--gid-map", f"{host_id}:{instance_id}"]) try: self._run(command) except subprocess.CalledProcessError as error: raise MultipassError( brief=f"Failed to mount {str(source)!r} to {target!r}.", details=errors.details_from_called_process_error(error), ) from error
[docs] def start(self, *, instance_name: str) -> None: """Start VM instance. :param instance_name: the name of the instance to start. :raises MultipassError: on error. """ command = ["start", instance_name] try: self._run(command) except subprocess.CalledProcessError as error: raise MultipassError( brief=f"Failed to start VM {instance_name!r}.", details=errors.details_from_called_process_error(error), ) from error
[docs] def stop(self, *, instance_name: str, delay_mins: int = 0) -> None: """Stop VM instance. :param instance_name: the name of the instance_name to stop. :param delay_mins: Delay shutdown for specified number of minutes. :raises MultipassError: on error. """ command = ["stop", instance_name] if delay_mins != 0: command.extend(["--time", str(delay_mins)]) try: self._run(command) except subprocess.CalledProcessError as error: raise MultipassError( brief=f"Failed to stop VM {instance_name!r}.", details=errors.details_from_called_process_error(error), ) from error
[docs] def transfer(self, *, source: str, destination: str) -> None: """Transfer to destination path with source IO. :param source: The source path, prefixed with <name:> for a path inside the instance. :param destination: The destination path, prefixed with <name:> for a path inside the instance. :raises MultipassError: On error. """ command = ["transfer", source, destination] try: self._run(command) except subprocess.CalledProcessError as error: raise MultipassError( brief=f"Failed to transfer {source!r} to {destination!r}.", details=errors.details_from_called_process_error(error), ) from error
[docs] def transfer_destination_io( self, *, source: str, destination: io.BufferedIOBase, chunk_size: int = 4096 ) -> None: """Transfer from source file to destination IO. Note that this can't use std{in,out}=open(...) due to LP #1849753. :param source: The source path, prefixed with <name:> for a path inside the instance. :param destination: An IO stream to write to. :param chunk_size: Number of bytes to transfer at a time. Defaults to 4096. :raises MultipassError: On error. """ command = [str(self.multipass_path), "transfer", source, "-"] with subprocess.Popen( command, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) as proc: # Should never happen, but mypy/pyright makes noise. assert proc.stdout is not None assert proc.stderr is not None while True: data = proc.stdout.read(chunk_size) if not data: break destination.write(data) # Take one read of stderr in case there is anything useful # for debugging an error. stderr = proc.stderr.read() if proc.returncode != 0: raise MultipassError( brief=f"Failed to transfer file {source!r}.", details=errors.details_from_command_error( cmd=command, stderr=stderr, returncode=proc.returncode ), )
[docs] def transfer_source_io( self, *, source: io.BufferedIOBase, destination: str, chunk_size: int = 4096 ) -> None: """Transfer to destination path with source IO. Note that this can't use std{in,out}=open(...) due to LP #1849753. :param source: An IO stream to read from. :param destination: The destination path, prefixed with <name:> for a path inside the instance. :param chunk_size: Number of bytes to transfer at a time. Defaults to 4096. :raises MultipassError: On error. """ command = [str(self.multipass_path), "transfer", "-", destination] with subprocess.Popen( command, stdin=subprocess.PIPE, stderr=subprocess.PIPE ) as proc: # Should never happen, but mypy/pyright makes noise. assert proc.stdin is not None assert proc.stderr is not None while True: data = source.read(chunk_size) if not data: break proc.stdin.write(data) # Close stdin before reading stderr, otherwise read() will hang # because process is waiting for more data. proc.stdin.close() # Take one read of stderr in case there is anything useful # for debugging an error. stderr = proc.stderr.read() if proc.returncode != 0: raise MultipassError( brief=f"Failed to transfer file to destination {destination!r}.", details=errors.details_from_command_error( cmd=command, stderr=stderr, returncode=proc.returncode ), )
[docs] def umount(self, *, mount: str) -> None: """Unmount target in VM. :param mount: Mount point in <name>[:<path>] format, where <name> are instance names, and optional <path> are mount points. If omitted, all mounts will be removed from the named instance. :raises MultipassError: On error. """ command = ["umount", mount] try: self._run(command) except subprocess.CalledProcessError as error: raise MultipassError( brief=f"Failed to unmount {mount!r}.", details=errors.details_from_called_process_error(error), ) from error
[docs] def wait_until_ready( self, *, retry_wait: float = 0.25, timeout: Optional[float] = None ) -> Tuple[str, Optional[str]]: """Wait until Multipass is ready (upon install/startup). :param retry_wait: Time to sleep between retries. :param timeout: Timeout in seconds. :returns: Tuple of parsed versions (multipass, multipassd). multipassd may be None if Multipass is not ready and the timeout limit is reached. """ if timeout is not None: deadline: Optional[float] = time.time() + timeout else: deadline = None while True: multipass_version, multipassd_version = self.version() if multipassd_version is not None: return (multipass_version, multipassd_version) if deadline is not None and time.time() >= deadline: break time.sleep(retry_wait) raise MultipassError( brief="Timed out waiting for Multipass to become ready.", )
[docs] def version(self) -> Tuple[str, Optional[str]]: """Get multipass and multipassd versions. :returns: Tuple of parsed versions (multipass, multipassd). multipassd may be None if Multipass is not yet ready. """ try: proc = self._run(["version"]) except subprocess.CalledProcessError as error: raise MultipassError( brief="Failed to check version.", details=errors.details_from_called_process_error(error), ) from error try: output = proc.stdout.decode(encoding=locale.getpreferredencoding()) except UnicodeDecodeError as error: raise MultipassError( brief="Failed to check version.", details=f"Failed to decode output: {proc.stdout!r}", ) from error # Expected multipass version output should look like: # * Scenario 1: multipassd not yet ready # # multipass: 1.5.0 # # * Scenario 2: typical Linux # # multipass: 1.5.0 # multipassd: 1.5.0 # # * Scenario 3: typical Mac # # multipass: 1.5.0+mac # multipassd: 1.5.0+mac # # * Scenario 4: typical Windows # # multipass: 1.5.0+win # multipassd: 1.5.0+win # # * Scenario 5: outdated Windows version with notice message # See: https://github.com/canonical/multipass/issues/2020 # # multipass: 1.5.0+win # multipassd: 1.5.0+win # # SOME NOTICE INFORMATION.... # # After stripping and splitting: # - ['multipass', '1.5.0'] # - ['multipass', '1.5.0', 'multipassd', '1.5.0'] # - ['multipass', '1.5.0+mac', 'multipassd', '1.5.0+mac'] # - ['multipass', '1.5.0+win', 'multipassd', '1.5.0+win'] # - ['multipass', '1.5.0+win', 'multipassd', '1.5.0+win', ...] output_split = output.strip().split() if len(output_split) < 2 or output_split[0] != "multipass": raise MultipassError( brief=f"Unable to parse version output: {proc.stdout!r}", ) multipass_version = output_split[1].split("+")[0] if len(output_split) >= 4 and output_split[2] == "multipassd": multipassd_version = output_split[3].split("+")[0] else: multipassd_version = None return (multipass_version, multipassd_version)