#
# Copyright 2022 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
"""API provider for Multipass.
This implementation interfaces with multipass using the `multipass` command-line
utility.
"""
import io
import json
import locale
import logging
import pathlib
import shlex
import subprocess
import time
from typing import Any, Callable, Dict, List, Optional, Tuple
import pkg_resources
from craft_providers import errors
from .errors import MultipassError
logger = logging.getLogger(__name__)
[docs]class Multipass:
"""Wrapper for multipass command.
:param multipass_path: Path to multipass command to use.
:cvar minimum_required_version: Minimum required version for compatibility.
"""
minimum_required_version = "1.7"
def __init__(
self, *, multipass_path: pathlib.Path = pathlib.Path("multipass")
) -> None:
self.multipass_path = multipass_path
def _run(self, command: List[str], **kwargs) -> subprocess.CompletedProcess:
"""Execute a multipass command.
It always checks the result (as no errors should pass silently) and captures the
output (so `multipass` does not pollute the terminal).
"""
command = [str(self.multipass_path), *command]
logger.debug("Executing on host: %s", shlex.join(command))
return subprocess.run(command, check=True, capture_output=True, **kwargs)
[docs] def delete(self, *, instance_name: str, purge=True) -> None:
"""Passthrough for running multipass delete.
:param instance_name: The name of the instance_name to delete.
:param purge: Flag to purge the instance_name's image after deleting.
:raises MultipassError: on error.
"""
command = ["delete", instance_name]
if purge:
command.append("--purge")
try:
self._run(command)
except subprocess.CalledProcessError as error:
raise MultipassError(
brief=f"Failed to delete VM {instance_name!r}.",
details=errors.details_from_called_process_error(error),
) from error
[docs] def exec(
self,
*,
command: List[str],
instance_name: str,
runner: Callable = subprocess.run,
**kwargs,
):
"""Execute command in instance_name with specified runner.
:param command: Command to execute in the instance.
:param instance_name: Name of instance to execute in.
:param runner: Execution function to invoke, e.g. subprocess.run or
Popen. First argument is finalized command with the attached
kwargs.
:param kwargs: Additional kwargs for runner.
:returns: Runner's instance.
"""
final_cmd = [str(self.multipass_path), "exec", instance_name, "--", *command]
quoted_final_cmd = shlex.join(final_cmd)
logger.debug("Executing on host: %s", quoted_final_cmd)
return runner(final_cmd, **kwargs) # pylint: disable=subprocess-run-check
[docs] def info(self, *, instance_name: str) -> Dict[str, Any]:
"""Get information/state for instance.
:returns: Parsed json data from info command.
:raises MultipassError: On error.
"""
command = ["info", instance_name, "--format", "json"]
try:
proc = self._run(command, text=True)
except subprocess.CalledProcessError as error:
raise MultipassError(
brief=f"Failed to query info for VM {instance_name!r}.",
details=errors.details_from_called_process_error(error),
) from error
return json.loads(proc.stdout)
[docs] def is_supported_version(self) -> bool:
"""Check if Multipass version is supported.
A helper to check if Multipass meets minimum supported version for
craft-providers.
:returns: True if installed version is supported.
"""
version, _ = self.version()
return pkg_resources.parse_version(version) >= pkg_resources.parse_version(
self.minimum_required_version
)
[docs] def launch(
self,
*,
instance_name: str,
image: str,
cpus: Optional[str] = None,
mem: Optional[str] = None,
disk: Optional[str] = None,
) -> None:
"""Launch multipass VM.
:param instance_name: The name the launched instance will have.
:param image: Name of image to create the instance with.
:param cpus: Amount of virtual CPUs to assign to the launched instance.
:param mem: Amount of RAM to assign to the launched instance.
:param disk: Amount of disk space the launched instance will have.
:raises MultipassError: on error.
"""
command = ["launch", image, "--name", instance_name]
if cpus is not None:
command.extend(["--cpus", cpus])
if mem is not None:
command.extend(["--mem", mem])
if disk is not None:
command.extend(["--disk", disk])
try:
self._run(command)
except subprocess.CalledProcessError as error:
raise MultipassError(
brief=f"Failed to launch VM {instance_name!r}.",
details=errors.details_from_called_process_error(error),
) from error
[docs] def list(self) -> List[str]:
"""List names of VMs.
:returns: Data from stdout if instance exists, else None.
:raises MultipassError: On error.
"""
command = ["list", "--format", "json"]
try:
proc = self._run(command, text=True)
except subprocess.CalledProcessError as error:
raise MultipassError(
brief="Failed to query list of VMs.",
details=errors.details_from_called_process_error(error),
) from error
data_list = json.loads(proc.stdout).get("list", [])
return [instance["name"] for instance in data_list]
[docs] def mount(
self,
*,
source: pathlib.Path,
target: str,
uid_map: Optional[Dict[str, str]] = None,
gid_map: Optional[Dict[str, str]] = None,
) -> None:
"""Mount host source path to target.
:param source: Path of local directory to mount.
:param target: Target mount points, in <name>[:<path>] format, where
<name> is an instance name, and optional <path> is the mount point.
If omitted, the mount point will be the same as the source's
absolute path.
:param uid_map: A mapping of user IDs for use in the mount of the form
<host-id> -> <instance-id>. File and folder ownership will be
mapped from <host-id> to <instance-id> inside the instance.
:param gid_map: A mapping of group IDs for use in the mount of the form
<host-id> -> <instance-id>. File and folder ownership will be
mapped from <host-id> to <instance-id> inside the instance.
"""
command = ["mount", str(source), target]
if uid_map is not None:
for host_id, instance_id in uid_map.items():
command.extend(["--uid-map", f"{host_id}:{instance_id}"])
if gid_map is not None:
for host_id, instance_id in gid_map.items():
command.extend(["--gid-map", f"{host_id}:{instance_id}"])
try:
self._run(command)
except subprocess.CalledProcessError as error:
raise MultipassError(
brief=f"Failed to mount {str(source)!r} to {target!r}.",
details=errors.details_from_called_process_error(error),
) from error
[docs] def start(self, *, instance_name: str) -> None:
"""Start VM instance.
:param instance_name: the name of the instance to start.
:raises MultipassError: on error.
"""
command = ["start", instance_name]
try:
self._run(command)
except subprocess.CalledProcessError as error:
raise MultipassError(
brief=f"Failed to start VM {instance_name!r}.",
details=errors.details_from_called_process_error(error),
) from error
[docs] def stop(self, *, instance_name: str, delay_mins: int = 0) -> None:
"""Stop VM instance.
:param instance_name: the name of the instance_name to stop.
:param delay_mins: Delay shutdown for specified number of minutes.
:raises MultipassError: on error.
"""
command = ["stop", instance_name]
if delay_mins != 0:
command.extend(["--time", str(delay_mins)])
try:
self._run(command)
except subprocess.CalledProcessError as error:
raise MultipassError(
brief=f"Failed to stop VM {instance_name!r}.",
details=errors.details_from_called_process_error(error),
) from error
[docs] def transfer(self, *, source: str, destination: str) -> None:
"""Transfer to destination path with source IO.
:param source: The source path, prefixed with <name:> for a path inside
the instance.
:param destination: The destination path, prefixed with <name:> for a
path inside the instance.
:raises MultipassError: On error.
"""
command = ["transfer", source, destination]
try:
self._run(command)
except subprocess.CalledProcessError as error:
raise MultipassError(
brief=f"Failed to transfer {source!r} to {destination!r}.",
details=errors.details_from_called_process_error(error),
) from error
[docs] def transfer_destination_io(
self, *, source: str, destination: io.BufferedIOBase, chunk_size: int = 4096
) -> None:
"""Transfer from source file to destination IO.
Note that this can't use std{in,out}=open(...) due to LP #1849753.
:param source: The source path, prefixed with <name:> for a path inside
the instance.
:param destination: An IO stream to write to.
:param chunk_size: Number of bytes to transfer at a time. Defaults to
4096.
:raises MultipassError: On error.
"""
command = [str(self.multipass_path), "transfer", source, "-"]
with subprocess.Popen(
command,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as proc:
# Should never happen, but mypy/pyright makes noise.
assert proc.stdout is not None
assert proc.stderr is not None
while True:
data = proc.stdout.read(chunk_size)
if not data:
break
destination.write(data)
# Take one read of stderr in case there is anything useful
# for debugging an error.
stderr = proc.stderr.read()
if proc.returncode != 0:
raise MultipassError(
brief=f"Failed to transfer file {source!r}.",
details=errors.details_from_command_error(
cmd=command, stderr=stderr, returncode=proc.returncode
),
)
[docs] def transfer_source_io(
self, *, source: io.BufferedIOBase, destination: str, chunk_size: int = 4096
) -> None:
"""Transfer to destination path with source IO.
Note that this can't use std{in,out}=open(...) due to LP #1849753.
:param source: An IO stream to read from.
:param destination: The destination path, prefixed with <name:> for a
path inside the instance.
:param chunk_size: Number of bytes to transfer at a time. Defaults to
4096.
:raises MultipassError: On error.
"""
command = [str(self.multipass_path), "transfer", "-", destination]
with subprocess.Popen(
command, stdin=subprocess.PIPE, stderr=subprocess.PIPE
) as proc:
# Should never happen, but mypy/pyright makes noise.
assert proc.stdin is not None
assert proc.stderr is not None
while True:
data = source.read(chunk_size)
if not data:
break
proc.stdin.write(data)
# Close stdin before reading stderr, otherwise read() will hang
# because process is waiting for more data.
proc.stdin.close()
# Take one read of stderr in case there is anything useful
# for debugging an error.
stderr = proc.stderr.read()
if proc.returncode != 0:
raise MultipassError(
brief=f"Failed to transfer file to destination {destination!r}.",
details=errors.details_from_command_error(
cmd=command, stderr=stderr, returncode=proc.returncode
),
)
[docs] def umount(self, *, mount: str) -> None:
"""Unmount target in VM.
:param mount: Mount point in <name>[:<path>] format, where <name> are
instance names, and optional <path> are mount points. If omitted,
all mounts will be removed from the named instance.
:raises MultipassError: On error.
"""
command = ["umount", mount]
try:
self._run(command)
except subprocess.CalledProcessError as error:
raise MultipassError(
brief=f"Failed to unmount {mount!r}.",
details=errors.details_from_called_process_error(error),
) from error
[docs] def wait_until_ready(
self, *, retry_wait: float = 0.25, timeout: Optional[float] = None
) -> Tuple[str, Optional[str]]:
"""Wait until Multipass is ready (upon install/startup).
:param retry_wait: Time to sleep between retries.
:param timeout: Timeout in seconds.
:returns: Tuple of parsed versions (multipass, multipassd). multipassd
may be None if Multipass is not ready and the timeout limit is reached.
"""
if timeout is not None:
deadline: Optional[float] = time.time() + timeout
else:
deadline = None
while True:
multipass_version, multipassd_version = self.version()
if multipassd_version is not None:
return (multipass_version, multipassd_version)
if deadline is not None and time.time() >= deadline:
break
time.sleep(retry_wait)
raise MultipassError(
brief="Timed out waiting for Multipass to become ready.",
)
[docs] def version(self) -> Tuple[str, Optional[str]]:
"""Get multipass and multipassd versions.
:returns: Tuple of parsed versions (multipass, multipassd). multipassd
may be None if Multipass is not yet ready.
"""
try:
proc = self._run(["version"])
except subprocess.CalledProcessError as error:
raise MultipassError(
brief="Failed to check version.",
details=errors.details_from_called_process_error(error),
) from error
try:
output = proc.stdout.decode(encoding=locale.getpreferredencoding())
except UnicodeDecodeError as error:
raise MultipassError(
brief="Failed to check version.",
details=f"Failed to decode output: {proc.stdout!r}",
) from error
# Expected multipass version output should look like:
# * Scenario 1: multipassd not yet ready
#
# multipass: 1.5.0
#
# * Scenario 2: typical Linux
#
# multipass: 1.5.0
# multipassd: 1.5.0
#
# * Scenario 3: typical Mac
#
# multipass: 1.5.0+mac
# multipassd: 1.5.0+mac
#
# * Scenario 4: typical Windows
#
# multipass: 1.5.0+win
# multipassd: 1.5.0+win
#
# * Scenario 5: outdated Windows version with notice message
# See: https://github.com/canonical/multipass/issues/2020
#
# multipass: 1.5.0+win
# multipassd: 1.5.0+win
#
# SOME NOTICE INFORMATION....
#
# After stripping and splitting:
# - ['multipass', '1.5.0']
# - ['multipass', '1.5.0', 'multipassd', '1.5.0']
# - ['multipass', '1.5.0+mac', 'multipassd', '1.5.0+mac']
# - ['multipass', '1.5.0+win', 'multipassd', '1.5.0+win']
# - ['multipass', '1.5.0+win', 'multipassd', '1.5.0+win', ...]
output_split = output.strip().split()
if len(output_split) < 2 or output_split[0] != "multipass":
raise MultipassError(
brief=f"Unable to parse version output: {proc.stdout!r}",
)
multipass_version = output_split[1].split("+")[0]
if len(output_split) >= 4 and output_split[2] == "multipassd":
multipassd_version = output_split[3].split("+")[0]
else:
multipassd_version = None
return (multipass_version, multipassd_version)