#
# Copyright 2021-2022 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
"""Helpers for snap commands."""
import contextlib
import json
import logging
import pathlib
import shlex
import subprocess
import urllib.parse
from typing import Any, Dict, Iterator, List, Optional
import requests
import requests_unixsocket # type: ignore
from craft_providers import Executor
from craft_providers.bases.instance_config import InstanceConfiguration
from craft_providers.errors import ProviderError, details_from_called_process_error
from craft_providers.util import snap_cmd, temp_paths
logger = logging.getLogger(__name__)
# possible sources for the snap (using these two constants instead
# of an enum because the values are persisted with JSON)
SNAP_SRC_HOST = "host"
SNAP_SRC_STORE = "store"
[docs]class SnapInstallationError(ProviderError):
"""Unexpected error during snap installation."""
def _download_host_snap(
*, snap_name: str, output: pathlib.Path, chunk_size: int = 64 * 1024
) -> None:
"""Download the current host snap using snapd's APIs."""
quoted_name = urllib.parse.quote(snap_name, safe="")
url = f"http+unix://%2Frun%2Fsnapd.socket/v2/snaps/{quoted_name}/file"
try:
resp = requests_unixsocket.get(url)
except requests.exceptions.ConnectionError as error:
raise SnapInstallationError(
brief="Unable to connect to snapd service."
) from error
try:
resp.raise_for_status()
except requests.exceptions.HTTPError as error:
raise SnapInstallationError(
brief=f"Unable to download snap {snap_name!r} from snapd."
) from error
with output.open("wb") as stream:
for chunk in resp.iter_content(chunk_size):
stream.write(chunk)
def _pack_host_snap(*, snap_name: str, output: pathlib.Path) -> None:
"""Pack the current host snap."""
command = snap_cmd.formulate_pack_command(snap_name, output)
logger.debug("Executing command on host: %s", shlex.join(command))
subprocess.run(
command,
capture_output=True,
check=True,
)
[docs]def get_host_snap_info(snap_name: str) -> Dict[str, Any]:
"""Get info about a snap installed on the host."""
quoted_name = urllib.parse.quote(snap_name, safe="")
url = f"http+unix://%2Frun%2Fsnapd.socket/v2/snaps/{quoted_name}"
try:
snap_info = requests_unixsocket.get(url)
except requests.exceptions.ConnectionError as error:
raise SnapInstallationError(
brief="Unable to connect to snapd service."
) from error
snap_info.raise_for_status()
# TODO: represent snap info in a dataclass
return snap_info.json()["result"]
def _get_target_snap_revision_from_snapd(
snap_name: str, executor: Executor
) -> Optional[str]:
"""Get the revision of the snap on the target."""
quoted_name = urllib.parse.quote(snap_name, safe="")
url = f"http://localhost/v2/snaps/{quoted_name}"
cmd = ["curl", "--silent", "--unix-socket", "/run/snapd.socket", url]
try:
proc = executor.execute_run(cmd, check=True, capture_output=True)
except subprocess.CalledProcessError as error:
raise SnapInstallationError(
brief="Unable to get target snap revision."
) from error
result = json.loads(proc.stdout)
if result["status-code"] == 404:
# snap not found
return None
if result["status-code"] == 200:
return result["result"]["revision"]
raise SnapInstallationError(f"Unknown response from snapd: {result!r}")
def _get_snap_revision_ensuring_source(
snap_name: str, source: str, executor: Executor
) -> Optional[str]:
"""Get revision of snap on target and ensure the installation source."""
instance_config = InstanceConfiguration.load(executor=executor)
if instance_config is None or instance_config.snaps is None:
return None
config = instance_config.snaps.get(snap_name)
if config is None:
# not installed before
return None
# use 'get' to retrieve the source to support configs
# saved by previous versions of the lib
if config.get("source") == source:
# previously installed from specified source: ok
return config["revision"]
# installed from other source: remove it
logger.debug(
"Snap %r installed from other source (%s), removing", snap_name, config
)
cmd = snap_cmd.formulate_remove_command(snap_name)
try:
executor.execute_run(cmd, check=True, capture_output=True)
except subprocess.CalledProcessError as error:
raise SnapInstallationError(
brief=f"Failed to inject snap {snap_name!r}.",
details="unable to remove previously installed snap",
) from error
return None
@contextlib.contextmanager
def _get_host_snap(snap_name: str) -> Iterator[pathlib.Path]:
"""Get snap installed on host containing the config.
Snapd provides an API to fetch a snap. First use that to fetch a snap.
If the snap is installed using `snap try`, it may fail to download. In
that case, attempt to construct the snap by packing it ourselves.
:yields: context manager that sets the temporary snap installation file
as the target
"""
with temp_paths.home_temporary_directory() as tmp_dir:
snap_path = tmp_dir / f"{snap_name}.snap"
try:
_download_host_snap(snap_name=snap_name, output=snap_path)
except SnapInstallationError:
logger.warning(
"Failed to fetch snap from snapd,"
" falling back to `snap pack` to recreate"
)
_pack_host_snap(snap_name=snap_name, output=snap_path)
yield snap_path
def _get_assertion(query: List[str]) -> bytes:
"""Get an assertion from snapd.
:param query: assertion query to pass to `snap known`
:returns: assertion data
:raises SnapInstallationError: if 'snap known' call fails
"""
command = snap_cmd.formulate_known_command(query=query)
logger.debug("Executing command on host: %s", command)
try:
return subprocess.run(command, capture_output=True, check=True).stdout
except subprocess.CalledProcessError as error:
raise SnapInstallationError(
brief="failed to get assertions for snap",
details=details_from_called_process_error(error),
) from error
@contextlib.contextmanager
def _get_assertions_file(
snap_name: str, snap_id: str, snap_revision: str, snap_publisher_id: str
) -> Iterator[pathlib.Path]:
"""Get an assertion file for a snap.
:param snap_name: Name of snap to inject
:param snap_id: ID of the snap
:param snap_revision: Revision of the snap
:param snap_publisher_id: The ID of the snap's publisher's account
:yields: context manager that will set the temporary snap assertion file
as the target
"""
logger.debug("Creating an assert file for snap %r", snap_name)
assertion_queries = [
[
"account-key",
"public-key-sha3-384=BWDEoaqyr25nF5SNCvEv2v"
"7QnM9QsfCc0PBMYD_i2NGSQ32EF2d4D0hqUel3m8ul",
],
["snap-declaration", f"snap-name={snap_name}"],
["snap-revision", f"snap-revision={snap_revision}", f"snap-id={snap_id}"],
["account", f"account-id={snap_publisher_id}"],
]
with temp_paths.home_temporary_file() as assert_file_path:
with open(assert_file_path, "wb") as assert_file:
for query in assertion_queries:
assert_file.write(_get_assertion(query))
assert_file.write(b"\n")
assert_file.flush()
yield assert_file_path
def _add_assertions_from_host(executor: Executor, snap_name: str) -> None:
"""Add assertions from the host into the target for a snap.
:param executor: Executor for target
:param snap_name: Name of snap to inject
"""
target_assert_path = pathlib.Path(f"/tmp/{snap_name}.assert")
snap_info = get_host_snap_info(snap_name)
try:
with _get_assertions_file(
snap_name=snap_name,
snap_id=snap_info["id"],
snap_revision=snap_info["revision"],
snap_publisher_id=snap_info["publisher"]["id"],
) as host_assert_path:
executor.push_file(
source=host_assert_path,
destination=target_assert_path,
)
except ProviderError as error:
raise SnapInstallationError(
brief=f"failed to copy assert file for snap {snap_name!r}",
details="error copying snap assert file into target environment",
) from error
try:
executor.execute_run(
snap_cmd.formulate_ack_command(snap_assert_path=target_assert_path),
check=True,
capture_output=True,
)
except subprocess.CalledProcessError as error:
raise SnapInstallationError(
brief=f"failed to add assertions for snap {snap_name!r}",
details=details_from_called_process_error(error),
) from error
[docs]def inject_from_host(*, executor: Executor, snap_name: str, classic: bool) -> None:
"""Inject snap from host snap.
:param executor: Executor for target
:param snap_name: Name of snap to inject
:param classic: Install in classic mode
:raises SnapInstallationError: on failure to inject snap
"""
logger.debug("Installing snap %r from host (classic=%s)", snap_name, classic)
host_revision = get_host_snap_info(snap_name)["revision"]
target_revision = _get_snap_revision_ensuring_source(
snap_name=snap_name,
source=SNAP_SRC_HOST,
executor=executor,
)
logger.debug("Revisions found: host=%r, target=%r", host_revision, target_revision)
if target_revision is not None and target_revision == host_revision:
logger.debug(
"Skipping snap injection:"
" target is already up-to-date with revision on host"
)
return
target_snap_path = pathlib.Path(f"/tmp/{snap_name}.snap")
is_dangerous = host_revision.startswith("x")
if not is_dangerous:
_add_assertions_from_host(executor=executor, snap_name=snap_name)
with _get_host_snap(snap_name) as host_snap_path:
try:
executor.push_file(
source=host_snap_path,
destination=target_snap_path,
)
except ProviderError as error:
raise SnapInstallationError(
brief=f"failed to copy snap file for snap {snap_name!r}",
details="error copying snap file into target environment",
) from error
try:
executor.execute_run(
snap_cmd.formulate_local_install_command(
classic=classic, dangerous=is_dangerous, snap_path=target_snap_path
),
check=True,
capture_output=True,
)
except subprocess.CalledProcessError as error:
raise SnapInstallationError(
brief=f"failed to install snap {snap_name!r}",
details=details_from_called_process_error(error),
) from error
InstanceConfiguration.update(
executor=executor,
data={
"snaps": {snap_name: {"revision": host_revision, "source": SNAP_SRC_HOST}}
},
)
[docs]def install_from_store(
*, executor: Executor, snap_name: str, channel: str, classic: bool
) -> None:
"""Install snap from store into target.
Perform installation using method which prevents refreshing.
:param executor: Executor for target.
:param snap_name: Name of snap to install.
:param channel: Channel to install from.
:param classic: Install in classic mode.
:raises SnapInstallationError: on unexpected error.
"""
logger.debug(
"Installing snap %r from store (channel=%r, classic=%s)",
snap_name,
channel,
classic,
)
target_revision = _get_snap_revision_ensuring_source(
snap_name=snap_name,
source=SNAP_SRC_STORE,
executor=executor,
)
logger.debug("Revision found in target: %r", target_revision)
if target_revision is None:
# no snap present in the target environment, just install it
cmd = snap_cmd.formulate_remote_install_command(
snap_name=snap_name,
channel=channel,
classic=classic,
)
else:
# refresh the already installed snap
cmd = snap_cmd.formulate_refresh_command(
snap_name=snap_name,
channel=channel,
)
try:
executor.execute_run(cmd, check=True, capture_output=True)
except subprocess.CalledProcessError as error:
raise SnapInstallationError(
brief=f"Failed to install/refresh snap {snap_name!r}.",
details=details_from_called_process_error(error),
) from error
new_target_revision = _get_target_snap_revision_from_snapd(
snap_name=snap_name,
executor=executor,
)
logger.debug("Revision after install/refresh: %r", new_target_revision)
InstanceConfiguration.update(
executor=executor,
data={
"snaps": {
snap_name: {"revision": new_target_revision, "source": SNAP_SRC_STORE}
}
},
)