1306 lines
48 KiB
Python
1306 lines
48 KiB
Python
"""Repository."""
|
|
from __future__ import annotations
|
|
|
|
from asyncio import sleep
|
|
from datetime import datetime
|
|
import os
|
|
import pathlib
|
|
import shutil
|
|
import tempfile
|
|
from typing import TYPE_CHECKING, Any
|
|
import zipfile
|
|
|
|
from aiogithubapi import (
|
|
AIOGitHubAPIException,
|
|
AIOGitHubAPINotModifiedException,
|
|
GitHubReleaseModel,
|
|
)
|
|
from aiogithubapi.const import BASE_API_URL
|
|
from aiogithubapi.objects.repository import AIOGitHubAPIRepository
|
|
import attr
|
|
from homeassistant.helpers import device_registry as dr, issue_registry as ir
|
|
|
|
from ..const import DOMAIN
|
|
from ..enums import ConfigurationType, HacsDispatchEvent, RepositoryFile
|
|
from ..exceptions import (
|
|
HacsException,
|
|
HacsNotModifiedException,
|
|
HacsRepositoryArchivedException,
|
|
HacsRepositoryExistException,
|
|
)
|
|
from ..utils.backup import Backup, BackupNetDaemon
|
|
from ..utils.decode import decode_content
|
|
from ..utils.decorator import concurrent
|
|
from ..utils.filters import filter_content_return_one_of_type
|
|
from ..utils.json import json_loads
|
|
from ..utils.logger import LOGGER
|
|
from ..utils.path import is_safe
|
|
from ..utils.queue_manager import QueueManager
|
|
from ..utils.store import async_remove_store
|
|
from ..utils.template import render_template
|
|
from ..utils.validate import Validate
|
|
from ..utils.version import (
|
|
version_left_higher_or_equal_then_right,
|
|
version_left_higher_then_right,
|
|
)
|
|
from ..utils.workarounds import DOMAIN_OVERRIDES
|
|
|
|
if TYPE_CHECKING:
|
|
from ..base import HacsBase
|
|
|
|
|
|
TOPIC_FILTER = (
|
|
"add-on",
|
|
"addon",
|
|
"app",
|
|
"appdaemon-apps",
|
|
"appdaemon",
|
|
"custom-card",
|
|
"custom-cards",
|
|
"custom-component",
|
|
"custom-components",
|
|
"customcomponents",
|
|
"hacktoberfest",
|
|
"hacs-default",
|
|
"hacs-integration",
|
|
"hacs-repository",
|
|
"hacs",
|
|
"hass",
|
|
"hassio",
|
|
"home-assistant-custom",
|
|
"home-assistant-frontend",
|
|
"home-assistant-hacs",
|
|
"home-assistant-sensor",
|
|
"home-assistant",
|
|
"home-automation",
|
|
"homeassistant-components",
|
|
"homeassistant-integration",
|
|
"homeassistant-sensor",
|
|
"homeassistant",
|
|
"homeautomation",
|
|
"integration",
|
|
"lovelace-ui",
|
|
"lovelace",
|
|
"media-player",
|
|
"mediaplayer",
|
|
"netdaemon",
|
|
"plugin",
|
|
"python_script",
|
|
"python-script",
|
|
"python",
|
|
"sensor",
|
|
"smart-home",
|
|
"smarthome",
|
|
"template",
|
|
"templates",
|
|
"theme",
|
|
"themes",
|
|
)
|
|
|
|
|
|
REPOSITORY_KEYS_TO_EXPORT = (
|
|
# Keys can not be removed from this list until v3
|
|
# If keys are added, the action need to be re-run with force
|
|
("description", ""),
|
|
("downloads", 0),
|
|
("domain", None),
|
|
("etag_releases", None),
|
|
("etag_repository", None),
|
|
("full_name", ""),
|
|
("last_commit", None),
|
|
("last_updated", 0),
|
|
("last_version", None),
|
|
("manifest_name", None),
|
|
("open_issues", 0),
|
|
("stargazers_count", 0),
|
|
("topics", []),
|
|
)
|
|
|
|
HACS_MANIFEST_KEYS_TO_EXPORT = (
|
|
# Keys can not be removed from this list until v3
|
|
# If keys are added, the action need to be re-run with force
|
|
("country", []),
|
|
("name", None),
|
|
)
|
|
|
|
|
|
class FileInformation:
|
|
"""FileInformation."""
|
|
|
|
def __init__(self, url, path, name):
|
|
self.download_url = url
|
|
self.path = path
|
|
self.name = name
|
|
|
|
|
|
@attr.s(auto_attribs=True)
|
|
class RepositoryData:
|
|
"""RepositoryData class."""
|
|
|
|
archived: bool = False
|
|
authors: list[str] = []
|
|
category: str = ""
|
|
config_flow: bool = False
|
|
default_branch: str = None
|
|
description: str = ""
|
|
domain: str = None
|
|
downloads: int = 0
|
|
etag_repository: str = None
|
|
etag_releases: str = None
|
|
file_name: str = ""
|
|
first_install: bool = False
|
|
full_name: str = ""
|
|
hide: bool = False
|
|
has_issues: bool = True
|
|
id: int = 0
|
|
installed_commit: str = None
|
|
installed_version: str = None
|
|
installed: bool = False
|
|
last_commit: str = None
|
|
last_fetched: datetime = None
|
|
last_updated: str = 0
|
|
last_version: str = None
|
|
manifest_name: str = None
|
|
new: bool = True
|
|
open_issues: int = 0
|
|
published_tags: list[str] = []
|
|
releases: bool = False
|
|
selected_tag: str = None
|
|
show_beta: bool = False
|
|
stargazers_count: int = 0
|
|
topics: list[str] = []
|
|
|
|
@property
|
|
def name(self):
|
|
"""Return the name."""
|
|
if self.category in ["integration", "netdaemon"]:
|
|
return self.domain
|
|
return self.full_name.split("/")[-1]
|
|
|
|
def to_json(self):
|
|
"""Export to json."""
|
|
return attr.asdict(self, filter=lambda attr, value: attr.name != "last_fetched")
|
|
|
|
@staticmethod
|
|
def create_from_dict(source: dict, action: bool = False) -> RepositoryData:
|
|
"""Set attributes from dicts."""
|
|
data = RepositoryData()
|
|
data.update_data(source, action)
|
|
return data
|
|
|
|
def update_data(self, data: dict, action: bool = False) -> None:
|
|
"""Update data of the repository."""
|
|
for key, value in data.items():
|
|
if key not in self.__dict__:
|
|
continue
|
|
|
|
if key == "last_fetched" and isinstance(value, float):
|
|
setattr(self, key, datetime.fromtimestamp(value))
|
|
elif key == "id":
|
|
setattr(self, key, str(value))
|
|
elif key == "country":
|
|
if isinstance(value, str):
|
|
setattr(self, key, [value])
|
|
else:
|
|
setattr(self, key, value)
|
|
elif key == "topics" and not action:
|
|
setattr(self, key, [topic for topic in value if topic not in TOPIC_FILTER])
|
|
|
|
else:
|
|
setattr(self, key, value)
|
|
|
|
|
|
@attr.s(auto_attribs=True)
|
|
class HacsManifest:
|
|
"""HacsManifest class."""
|
|
|
|
content_in_root: bool = False
|
|
country: list[str] = []
|
|
filename: str = None
|
|
hacs: str = None # Minimum HACS version
|
|
hide_default_branch: bool = False
|
|
homeassistant: str = None # Minimum Home Assistant version
|
|
manifest: dict = {}
|
|
name: str = None
|
|
persistent_directory: str = None
|
|
render_readme: bool = False
|
|
zip_release: bool = False
|
|
|
|
def to_dict(self):
|
|
"""Export to json."""
|
|
return attr.asdict(self)
|
|
|
|
@staticmethod
|
|
def from_dict(manifest: dict):
|
|
"""Set attributes from dicts."""
|
|
if manifest is None:
|
|
raise HacsException("Missing manifest data")
|
|
|
|
manifest_data = HacsManifest()
|
|
manifest_data.manifest = {
|
|
k: v
|
|
for k, v in manifest.items()
|
|
if k in manifest_data.__dict__ and v != manifest_data.__getattribute__(k)
|
|
}
|
|
|
|
for key, value in manifest_data.manifest.items():
|
|
if key == "country" and isinstance(value, str):
|
|
setattr(manifest_data, key, [value])
|
|
elif key in manifest_data.__dict__:
|
|
setattr(manifest_data, key, value)
|
|
return manifest_data
|
|
|
|
def update_data(self, data: dict) -> None:
|
|
"""Update the manifest data."""
|
|
for key, value in data.items():
|
|
if key not in self.__dict__:
|
|
continue
|
|
|
|
if key == "country":
|
|
if isinstance(value, str):
|
|
setattr(self, key, [value])
|
|
else:
|
|
setattr(self, key, value)
|
|
else:
|
|
setattr(self, key, value)
|
|
|
|
|
|
class RepositoryReleases:
|
|
"""RepositoyReleases."""
|
|
|
|
last_release = None
|
|
last_release_object = None
|
|
published_tags = []
|
|
objects: list[GitHubReleaseModel] = []
|
|
releases = False
|
|
downloads = None
|
|
|
|
|
|
class RepositoryPath:
|
|
"""RepositoryPath."""
|
|
|
|
local: str | None = None
|
|
remote: str | None = None
|
|
|
|
|
|
class RepositoryContent:
|
|
"""RepositoryContent."""
|
|
|
|
path: RepositoryPath | None = None
|
|
files = []
|
|
objects = []
|
|
single = False
|
|
|
|
|
|
class HacsRepository:
|
|
"""HacsRepository."""
|
|
|
|
def __init__(self, hacs: HacsBase) -> None:
|
|
"""Set up HacsRepository."""
|
|
self.hacs = hacs
|
|
self.additional_info = ""
|
|
self.data = RepositoryData()
|
|
self.content = RepositoryContent()
|
|
self.content.path = RepositoryPath()
|
|
self.repository_object: AIOGitHubAPIRepository | None = None
|
|
self.updated_info = False
|
|
self.state = None
|
|
self.force_branch = False
|
|
self.integration_manifest = {}
|
|
self.repository_manifest = HacsManifest.from_dict({})
|
|
self.validate = Validate()
|
|
self.releases = RepositoryReleases()
|
|
self.pending_restart = False
|
|
self.tree = []
|
|
self.treefiles = []
|
|
self.ref = None
|
|
self.logger = LOGGER
|
|
|
|
def __str__(self) -> str:
|
|
"""Return a string representation of the repository."""
|
|
return self.string
|
|
|
|
@property
|
|
def string(self) -> str:
|
|
"""Return a string representation of the repository."""
|
|
return f"<{self.data.category.title()} {self.data.full_name}>"
|
|
|
|
@property
|
|
def display_name(self) -> str:
|
|
"""Return display name."""
|
|
if self.repository_manifest.name is not None:
|
|
return self.repository_manifest.name
|
|
|
|
if self.data.category == "integration":
|
|
if self.data.manifest_name is not None:
|
|
return self.data.manifest_name
|
|
if "name" in self.integration_manifest:
|
|
return self.integration_manifest["name"]
|
|
|
|
return self.data.full_name.split("/")[-1].replace("-", " ").replace("_", " ").title()
|
|
|
|
@property
|
|
def ignored_by_country_configuration(self) -> bool:
|
|
"""Return True if hidden by country."""
|
|
if self.data.installed:
|
|
return False
|
|
configuration = self.hacs.configuration.country.lower()
|
|
if configuration == "all":
|
|
return False
|
|
|
|
manifest = [entry.lower() for entry in self.repository_manifest.country or []]
|
|
if not manifest:
|
|
return False
|
|
return configuration not in manifest
|
|
|
|
@property
|
|
def display_status(self) -> str:
|
|
"""Return display_status."""
|
|
if self.data.new:
|
|
status = "new"
|
|
elif self.pending_restart:
|
|
status = "pending-restart"
|
|
elif self.pending_update:
|
|
status = "pending-upgrade"
|
|
elif self.data.installed:
|
|
status = "installed"
|
|
else:
|
|
status = "default"
|
|
return status
|
|
|
|
@property
|
|
def display_installed_version(self) -> str:
|
|
"""Return display_authors"""
|
|
if self.data.installed_version is not None:
|
|
installed = self.data.installed_version
|
|
else:
|
|
if self.data.installed_commit is not None:
|
|
installed = self.data.installed_commit
|
|
else:
|
|
installed = ""
|
|
return str(installed)
|
|
|
|
@property
|
|
def display_available_version(self) -> str:
|
|
"""Return display_authors"""
|
|
if self.data.last_version is not None:
|
|
available = self.data.last_version
|
|
else:
|
|
if self.data.last_commit is not None:
|
|
available = self.data.last_commit
|
|
else:
|
|
available = ""
|
|
return str(available)
|
|
|
|
@property
|
|
def display_version_or_commit(self) -> str:
|
|
"""Does the repositoriy use releases or commits?"""
|
|
if self.data.releases:
|
|
version_or_commit = "version"
|
|
else:
|
|
version_or_commit = "commit"
|
|
return version_or_commit
|
|
|
|
@property
|
|
def pending_update(self) -> bool:
|
|
"""Return True if pending update."""
|
|
if not self.can_download:
|
|
return False
|
|
if self.data.installed:
|
|
if self.data.selected_tag is not None:
|
|
if self.data.selected_tag == self.data.default_branch:
|
|
if self.data.installed_commit != self.data.last_commit:
|
|
return True
|
|
return False
|
|
if self.display_version_or_commit == "version":
|
|
if (
|
|
result := version_left_higher_then_right(
|
|
self.display_available_version,
|
|
self.display_installed_version,
|
|
)
|
|
) is not None:
|
|
return result
|
|
if self.display_installed_version != self.display_available_version:
|
|
return True
|
|
|
|
return False
|
|
|
|
@property
|
|
def can_download(self) -> bool:
|
|
"""Return True if we can download."""
|
|
if self.repository_manifest.homeassistant is not None:
|
|
if self.data.releases:
|
|
if not version_left_higher_or_equal_then_right(
|
|
self.hacs.core.ha_version.string,
|
|
self.repository_manifest.homeassistant,
|
|
):
|
|
return False
|
|
return True
|
|
|
|
@property
|
|
def localpath(self) -> str | None:
|
|
"""Return localpath."""
|
|
return None
|
|
|
|
@property
|
|
def should_try_releases(self) -> bool:
|
|
"""Return a boolean indicating whether to download releases or not."""
|
|
if self.repository_manifest.zip_release:
|
|
if self.repository_manifest.filename.endswith(".zip"):
|
|
if self.ref != self.data.default_branch:
|
|
return True
|
|
if self.ref == self.data.default_branch:
|
|
return False
|
|
if self.data.category not in ["plugin", "theme"]:
|
|
return False
|
|
if not self.data.releases:
|
|
return False
|
|
return True
|
|
|
|
async def validate_repository(self) -> None:
|
|
"""Validate."""
|
|
|
|
@concurrent(concurrenttasks=10, backoff_time=5)
|
|
async def update_repository(self, ignore_issues=False, force=False) -> None:
|
|
"""Update the repository"""
|
|
|
|
async def common_validate(self, ignore_issues: bool = False) -> None:
|
|
"""Common validation steps of the repository."""
|
|
self.validate.errors.clear()
|
|
|
|
# Make sure the repository exist.
|
|
self.logger.debug("%s Checking repository.", self.string)
|
|
await self.common_update_data(ignore_issues=ignore_issues)
|
|
|
|
# Get the content of hacs.json
|
|
if RepositoryFile.HACS_JSON in [x.filename for x in self.tree]:
|
|
if manifest := await self.async_get_hacs_json():
|
|
self.repository_manifest = HacsManifest.from_dict(manifest)
|
|
self.data.update_data(
|
|
self.repository_manifest.to_dict(),
|
|
action=self.hacs.system.action,
|
|
)
|
|
|
|
async def common_registration(self) -> None:
|
|
"""Common registration steps of the repository."""
|
|
# Attach repository
|
|
if self.repository_object is None:
|
|
try:
|
|
self.repository_object, etag = await self.async_get_legacy_repository_object(
|
|
etag=None if self.data.installed else self.data.etag_repository,
|
|
)
|
|
self.data.update_data(
|
|
self.repository_object.attributes,
|
|
action=self.hacs.system.action,
|
|
)
|
|
self.data.etag_repository = etag
|
|
except HacsNotModifiedException:
|
|
self.logger.debug("%s Did not update, content was not modified", self.string)
|
|
return
|
|
|
|
if self.repository_object:
|
|
self.data.last_updated = self.repository_object.attributes.get("pushed_at", 0)
|
|
self.data.last_fetched = datetime.utcnow()
|
|
|
|
# Set topics
|
|
self.data.topics = self.data.topics
|
|
|
|
# Set description
|
|
self.data.description = self.data.description
|
|
|
|
@concurrent(concurrenttasks=10, backoff_time=5)
|
|
async def common_update(self, ignore_issues=False, force=False, skip_releases=False) -> bool:
|
|
"""Common information update steps of the repository."""
|
|
self.logger.debug("%s Getting repository information", self.string)
|
|
|
|
# Attach repository
|
|
current_etag = self.data.etag_repository
|
|
try:
|
|
await self.common_update_data(
|
|
ignore_issues=ignore_issues,
|
|
force=force,
|
|
skip_releases=skip_releases,
|
|
)
|
|
except HacsRepositoryExistException:
|
|
self.data.full_name = self.hacs.common.renamed_repositories[self.data.full_name]
|
|
await self.common_update_data(ignore_issues=ignore_issues, force=force)
|
|
|
|
except HacsException:
|
|
if not ignore_issues and not force:
|
|
return False
|
|
|
|
if not self.data.installed and (current_etag == self.data.etag_repository) and not force:
|
|
self.logger.debug("%s Did not update, content was not modified", self.string)
|
|
return False
|
|
|
|
# Update last updated
|
|
if self.repository_object:
|
|
self.data.last_updated = self.repository_object.attributes.get("pushed_at", 0)
|
|
|
|
# Update last available commit
|
|
await self.repository_object.set_last_commit()
|
|
self.data.last_commit = self.repository_object.last_commit
|
|
|
|
# Get the content of hacs.json
|
|
if RepositoryFile.HACS_JSON in [x.filename for x in self.tree]:
|
|
if manifest := await self.async_get_hacs_json():
|
|
self.repository_manifest = HacsManifest.from_dict(manifest)
|
|
self.data.update_data(
|
|
self.repository_manifest.to_dict(),
|
|
action=self.hacs.system.action,
|
|
)
|
|
|
|
# Update "info.md"
|
|
self.additional_info = await self.async_get_info_file_contents()
|
|
|
|
# Set last fetch attribute
|
|
self.data.last_fetched = datetime.utcnow()
|
|
|
|
return True
|
|
|
|
async def download_zip_files(self, validate) -> None:
|
|
"""Download ZIP archive from repository release."""
|
|
try:
|
|
contents = None
|
|
target_ref = self.ref.split("/")[1]
|
|
|
|
for release in self.releases.objects:
|
|
self.logger.debug(
|
|
"%s ref: %s --- tag: %s", self.string, target_ref, release.tag_name
|
|
)
|
|
if release.tag_name == target_ref:
|
|
contents = release.assets
|
|
break
|
|
|
|
if not contents:
|
|
validate.errors.append(f"No assets found for release '{self.ref}'")
|
|
return
|
|
|
|
download_queue = QueueManager(hass=self.hacs.hass)
|
|
|
|
for content in contents or []:
|
|
download_queue.add(self.async_download_zip_file(content, validate))
|
|
|
|
await download_queue.execute()
|
|
except BaseException: # lgtm [py/catch-base-exception] pylint: disable=broad-except
|
|
validate.errors.append("Download was not completed")
|
|
|
|
async def async_download_zip_file(self, content, validate) -> None:
|
|
"""Download ZIP archive from repository release."""
|
|
try:
|
|
filecontent = await self.hacs.async_download_file(content.browser_download_url)
|
|
|
|
if filecontent is None:
|
|
validate.errors.append(f"[{content.name}] was not downloaded")
|
|
return
|
|
|
|
temp_dir = await self.hacs.hass.async_add_executor_job(tempfile.mkdtemp)
|
|
temp_file = f"{temp_dir}/{self.repository_manifest.filename}"
|
|
|
|
result = await self.hacs.async_save_file(temp_file, filecontent)
|
|
with zipfile.ZipFile(temp_file, "r") as zip_file:
|
|
zip_file.extractall(self.content.path.local)
|
|
|
|
def cleanup_temp_dir():
|
|
"""Cleanup temp_dir."""
|
|
if os.path.exists(temp_dir):
|
|
self.logger.debug("%s Cleaning up %s", self.string, temp_dir)
|
|
shutil.rmtree(temp_dir)
|
|
|
|
if result:
|
|
self.logger.info("%s Download of %s completed", self.string, content.name)
|
|
await self.hacs.hass.async_add_executor_job(cleanup_temp_dir)
|
|
return
|
|
|
|
validate.errors.append(f"[{content.name}] was not downloaded")
|
|
except BaseException: # lgtm [py/catch-base-exception] pylint: disable=broad-except
|
|
validate.errors.append("Download was not completed")
|
|
|
|
async def download_content(self) -> None:
|
|
"""Download the content of a directory."""
|
|
if self.hacs.configuration.experimental:
|
|
if (
|
|
not self.repository_manifest.zip_release
|
|
and not self.data.file_name
|
|
and self.content.path.remote is not None
|
|
):
|
|
self.logger.info("%s Trying experimental download", self.string)
|
|
try:
|
|
await self.download_repository_zip()
|
|
return
|
|
except HacsException as exception:
|
|
self.logger.exception(exception)
|
|
|
|
contents = self.gather_files_to_download()
|
|
if self.repository_manifest.filename:
|
|
self.logger.debug("%s %s", self.string, self.repository_manifest.filename)
|
|
if not contents:
|
|
raise HacsException("No content to download")
|
|
|
|
download_queue = QueueManager(hass=self.hacs.hass)
|
|
|
|
for content in contents:
|
|
if self.repository_manifest.content_in_root and self.repository_manifest.filename:
|
|
if content.name != self.repository_manifest.filename:
|
|
continue
|
|
download_queue.add(self.dowload_repository_content(content))
|
|
|
|
await download_queue.execute()
|
|
|
|
async def download_repository_zip(self):
|
|
"""Download the zip archive of the repository."""
|
|
ref = f"{self.ref}".replace("tags/", "")
|
|
|
|
if not ref:
|
|
raise HacsException("Missing required elements.")
|
|
|
|
url = f"{BASE_API_URL}/repos/{self.data.full_name}/zipball/{ref}"
|
|
|
|
filecontent = await self.hacs.async_download_file(
|
|
url,
|
|
headers={
|
|
"Authorization": f"token {self.hacs.configuration.token}",
|
|
"User-Agent": f"HACS/{self.hacs.version}",
|
|
},
|
|
)
|
|
if filecontent is None:
|
|
raise HacsException(f"[{self}] Failed to download zipball")
|
|
|
|
temp_dir = await self.hacs.hass.async_add_executor_job(tempfile.mkdtemp)
|
|
temp_file = f"{temp_dir}/{self.repository_manifest.filename}"
|
|
result = await self.hacs.async_save_file(temp_file, filecontent)
|
|
if not result:
|
|
raise HacsException("Could not save ZIP file")
|
|
|
|
with zipfile.ZipFile(temp_file, "r") as zip_file:
|
|
extractable = []
|
|
for path in zip_file.filelist:
|
|
filename = "/".join(path.filename.split("/")[1:])
|
|
if (
|
|
filename.startswith(self.content.path.remote)
|
|
and filename != self.content.path.remote
|
|
):
|
|
path.filename = filename.replace(self.content.path.remote, "")
|
|
extractable.append(path)
|
|
|
|
zip_file.extractall(self.content.path.local, extractable)
|
|
|
|
def cleanup_temp_dir():
|
|
"""Cleanup temp_dir."""
|
|
if os.path.exists(temp_dir):
|
|
self.logger.debug("%s Cleaning up %s", self.string, temp_dir)
|
|
shutil.rmtree(temp_dir)
|
|
|
|
await self.hacs.hass.async_add_executor_job(cleanup_temp_dir)
|
|
self.logger.info("%s Content was extracted to %s", self.string, self.content.path.local)
|
|
|
|
async def async_get_hacs_json(self, ref: str = None) -> dict[str, Any] | None:
|
|
"""Get the content of the hacs.json file."""
|
|
try:
|
|
response = await self.hacs.async_github_api_method(
|
|
method=self.hacs.githubapi.repos.contents.get,
|
|
raise_exception=False,
|
|
repository=self.data.full_name,
|
|
path=RepositoryFile.HACS_JSON,
|
|
**{"params": {"ref": ref or self.version_to_download()}},
|
|
)
|
|
if response:
|
|
return json_loads(decode_content(response.data.content))
|
|
except BaseException: # lgtm [py/catch-base-exception] pylint: disable=broad-except
|
|
pass
|
|
|
|
async def async_get_info_file_contents(self) -> str:
|
|
"""Get the content of the info.md file."""
|
|
|
|
def _info_file_variants() -> tuple[str, ...]:
|
|
name: str = (
|
|
"readme"
|
|
if self.repository_manifest.render_readme or self.hacs.configuration.experimental
|
|
else "info"
|
|
)
|
|
return (
|
|
f"{name.upper()}.md",
|
|
f"{name}.md",
|
|
f"{name}.MD",
|
|
f"{name.upper()}.MD",
|
|
name.upper(),
|
|
name,
|
|
)
|
|
|
|
info_files = [filename for filename in _info_file_variants() if filename in self.treefiles]
|
|
|
|
if not info_files:
|
|
return ""
|
|
|
|
try:
|
|
response = await self.hacs.async_github_api_method(
|
|
method=self.hacs.githubapi.repos.contents.get,
|
|
raise_exception=False,
|
|
repository=self.data.full_name,
|
|
path=info_files[0],
|
|
)
|
|
if response:
|
|
return render_template(
|
|
self.hacs,
|
|
decode_content(response.data.content)
|
|
.replace("<svg", "<disabled")
|
|
.replace("</svg", "</disabled"),
|
|
self,
|
|
)
|
|
except BaseException as exc: # lgtm [py/catch-base-exception] pylint: disable=broad-except
|
|
self.logger.error("%s %s", self.string, exc)
|
|
|
|
return ""
|
|
|
|
def remove(self) -> None:
|
|
"""Run remove tasks."""
|
|
if self.hacs.repositories.is_registered(repository_id=str(self.data.id)):
|
|
self.logger.info("%s Starting removal", self.string)
|
|
self.hacs.repositories.unregister(self)
|
|
|
|
async def uninstall(self) -> None:
|
|
"""Run uninstall tasks."""
|
|
self.logger.info("%s Removing", self.string)
|
|
if not await self.remove_local_directory():
|
|
raise HacsException("Could not uninstall")
|
|
self.data.installed = False
|
|
if self.data.category == "integration":
|
|
if self.data.config_flow:
|
|
await self.reload_custom_components()
|
|
else:
|
|
self.pending_restart = True
|
|
elif self.data.category == "theme":
|
|
try:
|
|
await self.hacs.hass.services.async_call("frontend", "reload_themes", {})
|
|
except BaseException: # lgtm [py/catch-base-exception] pylint: disable=broad-except
|
|
pass
|
|
elif self.data.category == "template":
|
|
await self.hacs.hass.services.async_call("homeassistant", "reload_custom_templates", {})
|
|
|
|
await async_remove_store(self.hacs.hass, f"hacs/{self.data.id}.hacs")
|
|
|
|
self.data.installed_version = None
|
|
self.data.installed_commit = None
|
|
self.hacs.async_dispatch(
|
|
HacsDispatchEvent.REPOSITORY,
|
|
{
|
|
"id": 1337,
|
|
"action": "uninstall",
|
|
"repository": self.data.full_name,
|
|
"repository_id": self.data.id,
|
|
},
|
|
)
|
|
|
|
await self.async_remove_entity_device()
|
|
ir.async_delete_issue(self.hacs.hass, DOMAIN, f"removed_{self.data.id}")
|
|
|
|
async def remove_local_directory(self) -> None:
|
|
"""Check the local directory."""
|
|
|
|
try:
|
|
if self.data.category == "python_script":
|
|
local_path = f"{self.content.path.local}/{self.data.name}.py"
|
|
elif self.data.category == "template":
|
|
local_path = f"{self.content.path.local}/{self.data.file_name}"
|
|
elif self.data.category == "theme":
|
|
path = (
|
|
f"{self.hacs.core.config_path}/"
|
|
f"{self.hacs.configuration.theme_path}/"
|
|
f"{self.data.name}.yaml"
|
|
)
|
|
if os.path.exists(path):
|
|
os.remove(path)
|
|
local_path = self.content.path.local
|
|
elif self.data.category == "integration":
|
|
if not self.data.domain:
|
|
if domain := DOMAIN_OVERRIDES.get(self.data.full_name):
|
|
self.data.domain = domain
|
|
self.content.path.local = self.localpath
|
|
else:
|
|
self.logger.error("%s Missing domain", self.string)
|
|
return False
|
|
local_path = self.content.path.local
|
|
else:
|
|
local_path = self.content.path.local
|
|
|
|
if os.path.exists(local_path):
|
|
if not is_safe(self.hacs, local_path):
|
|
self.logger.error("%s Path %s is blocked from removal", self.string, local_path)
|
|
return False
|
|
self.logger.debug("%s Removing %s", self.string, local_path)
|
|
|
|
if self.data.category in ["python_script", "template"]:
|
|
os.remove(local_path)
|
|
else:
|
|
shutil.rmtree(local_path)
|
|
|
|
while os.path.exists(local_path):
|
|
await sleep(1)
|
|
else:
|
|
self.logger.debug(
|
|
"%s Presumed local content path %s does not exist", self.string, local_path
|
|
)
|
|
|
|
except (
|
|
BaseException # lgtm [py/catch-base-exception] pylint: disable=broad-except
|
|
) as exception:
|
|
self.logger.debug("%s Removing %s failed with %s", self.string, local_path, exception)
|
|
return False
|
|
return True
|
|
|
|
async def async_pre_registration(self) -> None:
|
|
"""Run pre registration steps."""
|
|
|
|
@concurrent(concurrenttasks=10)
|
|
async def async_registration(self, ref=None) -> None:
|
|
"""Run registration steps."""
|
|
await self.async_pre_registration()
|
|
|
|
if ref is not None:
|
|
self.data.selected_tag = ref
|
|
self.ref = ref
|
|
self.force_branch = True
|
|
|
|
if not await self.validate_repository():
|
|
return False
|
|
|
|
# Run common registration steps.
|
|
await self.common_registration()
|
|
|
|
# Set correct local path
|
|
self.content.path.local = self.localpath
|
|
|
|
# Run local post registration steps.
|
|
await self.async_post_registration()
|
|
|
|
async def async_post_registration(self) -> None:
|
|
"""Run post registration steps."""
|
|
if not self.hacs.system.action:
|
|
return
|
|
await self.hacs.validation.async_run_repository_checks(self)
|
|
|
|
async def async_pre_install(self) -> None:
|
|
"""Run pre install steps."""
|
|
|
|
async def _async_pre_install(self) -> None:
|
|
"""Run pre install steps."""
|
|
self.logger.info("%s Running pre installation steps", self.string)
|
|
await self.async_pre_install()
|
|
self.logger.info("%s Pre installation steps completed", self.string)
|
|
|
|
async def async_install(self) -> None:
|
|
"""Run install steps."""
|
|
await self._async_pre_install()
|
|
self.hacs.async_dispatch(
|
|
HacsDispatchEvent.REPOSITORY_DOWNLOAD_PROGRESS,
|
|
{"repository": self.data.full_name, "progress": 30},
|
|
)
|
|
self.logger.info("%s Running installation steps", self.string)
|
|
await self.async_install_repository()
|
|
self.hacs.async_dispatch(
|
|
HacsDispatchEvent.REPOSITORY_DOWNLOAD_PROGRESS,
|
|
{"repository": self.data.full_name, "progress": 90},
|
|
)
|
|
self.logger.info("%s Installation steps completed", self.string)
|
|
await self._async_post_install()
|
|
self.hacs.async_dispatch(
|
|
HacsDispatchEvent.REPOSITORY_DOWNLOAD_PROGRESS,
|
|
{"repository": self.data.full_name, "progress": False},
|
|
)
|
|
|
|
async def async_post_installation(self) -> None:
|
|
"""Run post install steps."""
|
|
|
|
async def _async_post_install(self) -> None:
|
|
"""Run post install steps."""
|
|
self.logger.info("%s Running post installation steps", self.string)
|
|
await self.async_post_installation()
|
|
self.data.new = False
|
|
self.hacs.async_dispatch(
|
|
HacsDispatchEvent.REPOSITORY,
|
|
{
|
|
"id": 1337,
|
|
"action": "install",
|
|
"repository": self.data.full_name,
|
|
"repository_id": self.data.id,
|
|
},
|
|
)
|
|
self.logger.info("%s Post installation steps completed", self.string)
|
|
|
|
async def async_install_repository(self) -> None:
|
|
"""Common installation steps of the repository."""
|
|
persistent_directory = None
|
|
await self.update_repository(force=True)
|
|
if self.content.path.local is None:
|
|
raise HacsException("repository.content.path.local is None")
|
|
self.validate.errors.clear()
|
|
|
|
if not self.can_download:
|
|
raise HacsException("The version of Home Assistant is not compatible with this version")
|
|
|
|
version = self.version_to_download()
|
|
if version == self.data.default_branch:
|
|
self.ref = version
|
|
else:
|
|
self.ref = f"tags/{version}"
|
|
|
|
self.hacs.async_dispatch(
|
|
HacsDispatchEvent.REPOSITORY_DOWNLOAD_PROGRESS,
|
|
{"repository": self.data.full_name, "progress": 40},
|
|
)
|
|
|
|
if self.data.installed and self.data.category == "netdaemon":
|
|
persistent_directory = BackupNetDaemon(hacs=self.hacs, repository=self)
|
|
await self.hacs.hass.async_add_executor_job(persistent_directory.create)
|
|
|
|
elif self.repository_manifest.persistent_directory:
|
|
if os.path.exists(
|
|
f"{self.content.path.local}/{self.repository_manifest.persistent_directory}"
|
|
):
|
|
persistent_directory = Backup(
|
|
hacs=self.hacs,
|
|
local_path=f"{self.content.path.local}/{self.repository_manifest.persistent_directory}",
|
|
backup_path=tempfile.gettempdir() + "/hacs_persistent_directory/",
|
|
)
|
|
await self.hacs.hass.async_add_executor_job(persistent_directory.create)
|
|
|
|
if self.data.installed and not self.content.single:
|
|
backup = Backup(hacs=self.hacs, local_path=self.content.path.local)
|
|
await self.hacs.hass.async_add_executor_job(backup.create)
|
|
|
|
self.hacs.log.debug("%s Local path is set to %s", self.string, self.content.path.local)
|
|
self.hacs.log.debug("%s Remote path is set to %s", self.string, self.content.path.remote)
|
|
|
|
self.hacs.async_dispatch(
|
|
HacsDispatchEvent.REPOSITORY_DOWNLOAD_PROGRESS,
|
|
{"repository": self.data.full_name, "progress": 50},
|
|
)
|
|
|
|
if self.repository_manifest.zip_release and version != self.data.default_branch:
|
|
await self.download_zip_files(self.validate)
|
|
else:
|
|
await self.download_content()
|
|
|
|
self.hacs.async_dispatch(
|
|
HacsDispatchEvent.REPOSITORY_DOWNLOAD_PROGRESS,
|
|
{"repository": self.data.full_name, "progress": 70},
|
|
)
|
|
|
|
if self.validate.errors:
|
|
for error in self.validate.errors:
|
|
self.logger.error("%s %s", self.string, error)
|
|
if self.data.installed and not self.content.single:
|
|
await self.hacs.hass.async_add_executor_job(backup.restore)
|
|
await self.hacs.hass.async_add_executor_job(backup.cleanup)
|
|
raise HacsException("Could not download, see log for details")
|
|
|
|
self.hacs.async_dispatch(
|
|
HacsDispatchEvent.REPOSITORY_DOWNLOAD_PROGRESS,
|
|
{"repository": self.data.full_name, "progress": 80},
|
|
)
|
|
|
|
if self.data.installed and not self.content.single:
|
|
await self.hacs.hass.async_add_executor_job(backup.cleanup)
|
|
|
|
if persistent_directory is not None:
|
|
await self.hacs.hass.async_add_executor_job(persistent_directory.restore)
|
|
await self.hacs.hass.async_add_executor_job(persistent_directory.cleanup)
|
|
|
|
if self.validate.success:
|
|
self.data.installed = True
|
|
self.data.installed_commit = self.data.last_commit
|
|
|
|
if version == self.data.default_branch:
|
|
self.data.installed_version = None
|
|
else:
|
|
self.data.installed_version = version
|
|
|
|
async def async_get_legacy_repository_object(
|
|
self,
|
|
etag: str | None = None,
|
|
) -> tuple[AIOGitHubAPIRepository, Any | None]:
|
|
"""Return a repository object."""
|
|
try:
|
|
repository = await self.hacs.github.get_repo(self.data.full_name, etag)
|
|
return repository, self.hacs.github.client.last_response.etag
|
|
except AIOGitHubAPINotModifiedException as exception:
|
|
raise HacsNotModifiedException(exception) from exception
|
|
except (ValueError, AIOGitHubAPIException, Exception) as exception:
|
|
raise HacsException(exception) from exception
|
|
|
|
def update_filenames(self) -> None:
|
|
"""Get the filename to target."""
|
|
|
|
async def get_tree(self, ref: str):
|
|
"""Return the repository tree."""
|
|
if self.repository_object is None:
|
|
raise HacsException("No repository_object")
|
|
try:
|
|
tree = await self.repository_object.get_tree(ref)
|
|
return tree
|
|
except (ValueError, AIOGitHubAPIException) as exception:
|
|
raise HacsException(exception) from exception
|
|
|
|
async def get_releases(self, prerelease=False, returnlimit=5) -> list[GitHubReleaseModel]:
|
|
"""Return the repository releases."""
|
|
response = await self.hacs.async_github_api_method(
|
|
method=self.hacs.githubapi.repos.releases.list,
|
|
repository=self.data.full_name,
|
|
)
|
|
releases = []
|
|
for release in response.data or []:
|
|
if len(releases) == returnlimit:
|
|
break
|
|
if release.draft or (release.prerelease and not prerelease):
|
|
continue
|
|
releases.append(release)
|
|
return releases
|
|
|
|
async def common_update_data(
|
|
self,
|
|
ignore_issues: bool = False,
|
|
force: bool = False,
|
|
retry=False,
|
|
skip_releases=False,
|
|
) -> None:
|
|
"""Common update data."""
|
|
releases = []
|
|
try:
|
|
repository_object, etag = await self.async_get_legacy_repository_object(
|
|
etag=None if force or self.data.installed else self.data.etag_repository,
|
|
)
|
|
self.repository_object = repository_object
|
|
if self.data.full_name.lower() != repository_object.full_name.lower():
|
|
self.hacs.common.renamed_repositories[
|
|
self.data.full_name
|
|
] = repository_object.full_name
|
|
if not self.hacs.system.generator:
|
|
raise HacsRepositoryExistException
|
|
self.logger.error(
|
|
"%s Repository has been renamed - %s", self.string, repository_object.full_name
|
|
)
|
|
self.data.update_data(
|
|
repository_object.attributes,
|
|
action=self.hacs.system.action,
|
|
)
|
|
self.data.etag_repository = etag
|
|
except HacsNotModifiedException:
|
|
return
|
|
except HacsRepositoryExistException:
|
|
raise HacsRepositoryExistException from None
|
|
except (AIOGitHubAPIException, HacsException) as exception:
|
|
if not self.hacs.status.startup:
|
|
self.logger.error("%s %s", self.string, exception)
|
|
if not ignore_issues:
|
|
self.validate.errors.append("Repository does not exist.")
|
|
raise HacsException(exception) from exception
|
|
|
|
# Make sure the repository is not archived.
|
|
if self.data.archived and not ignore_issues:
|
|
self.validate.errors.append("Repository is archived.")
|
|
if self.data.full_name not in self.hacs.common.archived_repositories:
|
|
self.hacs.common.archived_repositories.add(self.data.full_name)
|
|
raise HacsRepositoryArchivedException(f"{self} Repository is archived.")
|
|
|
|
# Make sure the repository is not in the blacklist.
|
|
if self.hacs.repositories.is_removed(self.data.full_name):
|
|
removed = self.hacs.repositories.removed_repository(self.data.full_name)
|
|
if removed.removal_type != "remove" and not ignore_issues:
|
|
self.validate.errors.append("Repository has been requested to be removed.")
|
|
raise HacsException(f"{self} Repository has been requested to be removed.")
|
|
|
|
# Get releases.
|
|
if not skip_releases:
|
|
try:
|
|
releases = await self.get_releases(
|
|
prerelease=self.data.show_beta,
|
|
returnlimit=self.hacs.configuration.release_limit,
|
|
)
|
|
if releases:
|
|
self.data.releases = True
|
|
self.releases.objects = releases
|
|
self.data.published_tags = [x.tag_name for x in self.releases.objects]
|
|
self.data.last_version = next(iter(self.data.published_tags))
|
|
|
|
except HacsException:
|
|
self.data.releases = False
|
|
|
|
if not self.force_branch:
|
|
self.ref = self.version_to_download()
|
|
if self.data.releases:
|
|
for release in self.releases.objects or []:
|
|
if release.tag_name == self.ref:
|
|
if assets := release.assets:
|
|
downloads = next(iter(assets)).download_count
|
|
self.data.downloads = downloads
|
|
elif self.hacs.system.generator and self.repository_object:
|
|
await self.repository_object.set_last_commit()
|
|
self.data.last_commit = self.repository_object.last_commit
|
|
|
|
self.hacs.log.debug(
|
|
"%s Running checks against %s", self.string, self.ref.replace("tags/", "")
|
|
)
|
|
|
|
try:
|
|
self.tree = await self.get_tree(self.ref)
|
|
if not self.tree:
|
|
raise HacsException("No files in tree")
|
|
self.treefiles = []
|
|
for treefile in self.tree:
|
|
self.treefiles.append(treefile.full_path)
|
|
except (AIOGitHubAPIException, HacsException) as exception:
|
|
if (
|
|
not retry
|
|
and self.ref is not None
|
|
and str(exception).startswith("GitHub returned 404")
|
|
):
|
|
# Handle tags/branches being deleted.
|
|
self.data.selected_tag = None
|
|
self.ref = self.version_to_download()
|
|
self.logger.warning(
|
|
"%s Selected version/branch %s has been removed, falling back to default",
|
|
self.string,
|
|
self.ref,
|
|
)
|
|
return await self.common_update_data(ignore_issues, force, True)
|
|
if not self.hacs.status.startup and not ignore_issues:
|
|
self.logger.error("%s %s", self.string, exception)
|
|
if not ignore_issues:
|
|
raise HacsException(exception) from None
|
|
|
|
def gather_files_to_download(self) -> list[FileInformation]:
|
|
"""Return a list of file objects to be downloaded."""
|
|
files = []
|
|
tree = self.tree
|
|
ref = f"{self.ref}".replace("tags/", "")
|
|
releaseobjects = self.releases.objects
|
|
category = self.data.category
|
|
remotelocation = self.content.path.remote
|
|
|
|
if self.should_try_releases:
|
|
for release in releaseobjects or []:
|
|
if ref == release.tag_name:
|
|
for asset in release.assets or []:
|
|
files.append(
|
|
FileInformation(asset.browser_download_url, asset.name, asset.name)
|
|
)
|
|
if files:
|
|
return files
|
|
|
|
if self.content.single:
|
|
for treefile in tree:
|
|
if treefile.filename == self.data.file_name:
|
|
files.append(
|
|
FileInformation(
|
|
treefile.download_url, treefile.full_path, treefile.filename
|
|
)
|
|
)
|
|
return files
|
|
|
|
if category == "plugin":
|
|
for treefile in tree:
|
|
if treefile.path in ["", "dist"]:
|
|
if remotelocation == "dist" and not treefile.filename.startswith("dist"):
|
|
continue
|
|
if not remotelocation:
|
|
if not treefile.filename.endswith(".js"):
|
|
continue
|
|
if treefile.path != "":
|
|
continue
|
|
if not treefile.is_directory:
|
|
files.append(
|
|
FileInformation(
|
|
treefile.download_url, treefile.full_path, treefile.filename
|
|
)
|
|
)
|
|
if files:
|
|
return files
|
|
|
|
if self.repository_manifest.content_in_root:
|
|
if not self.repository_manifest.filename:
|
|
if category == "theme":
|
|
tree = filter_content_return_one_of_type(self.tree, "", "yaml", "full_path")
|
|
|
|
for path in tree:
|
|
if path.is_directory:
|
|
continue
|
|
if path.full_path.startswith(self.content.path.remote):
|
|
files.append(FileInformation(path.download_url, path.full_path, path.filename))
|
|
return files
|
|
|
|
@concurrent(concurrenttasks=10)
|
|
async def dowload_repository_content(self, content: FileInformation) -> None:
|
|
"""Download content."""
|
|
try:
|
|
self.logger.debug("%s Downloading %s", self.string, content.name)
|
|
|
|
filecontent = await self.hacs.async_download_file(content.download_url)
|
|
|
|
if filecontent is None:
|
|
self.validate.errors.append(f"[{content.name}] was not downloaded.")
|
|
return
|
|
|
|
# Save the content of the file.
|
|
if self.content.single or content.path is None:
|
|
local_directory = self.content.path.local
|
|
|
|
else:
|
|
_content_path = content.path
|
|
if not self.repository_manifest.content_in_root:
|
|
_content_path = _content_path.replace(f"{self.content.path.remote}", "")
|
|
|
|
local_directory = f"{self.content.path.local}/{_content_path}"
|
|
local_directory = local_directory.split("/")
|
|
del local_directory[-1]
|
|
local_directory = "/".join(local_directory)
|
|
|
|
# Check local directory
|
|
pathlib.Path(local_directory).mkdir(parents=True, exist_ok=True)
|
|
|
|
local_file_path = (f"{local_directory}/{content.name}").replace("//", "/")
|
|
|
|
result = await self.hacs.async_save_file(local_file_path, filecontent)
|
|
if result:
|
|
self.logger.info("%s Download of %s completed", self.string, content.name)
|
|
return
|
|
self.validate.errors.append(f"[{content.name}] was not downloaded.")
|
|
|
|
except (
|
|
BaseException # lgtm [py/catch-base-exception] pylint: disable=broad-except
|
|
) as exception:
|
|
self.validate.errors.append(f"Download was not completed [{exception}]")
|
|
|
|
async def async_remove_entity_device(self) -> None:
|
|
"""Remove the entity device."""
|
|
if (
|
|
self.hacs.configuration == ConfigurationType.YAML
|
|
or not self.hacs.configuration.experimental
|
|
):
|
|
return
|
|
|
|
device_registry: dr.DeviceRegistry = dr.async_get(hass=self.hacs.hass)
|
|
device = device_registry.async_get_device(identifiers={(DOMAIN, str(self.data.id))})
|
|
|
|
if device is None:
|
|
return
|
|
|
|
device_registry.async_remove_device(device_id=device.id)
|
|
|
|
def version_to_download(self) -> str:
|
|
"""Determine which version to download."""
|
|
if self.data.last_version is not None:
|
|
if self.data.selected_tag is not None:
|
|
if self.data.selected_tag == self.data.last_version:
|
|
self.data.selected_tag = None
|
|
return self.data.last_version
|
|
return self.data.selected_tag
|
|
return self.data.last_version
|
|
|
|
if self.data.selected_tag is not None:
|
|
if self.data.selected_tag == self.data.default_branch:
|
|
return self.data.default_branch
|
|
if self.data.selected_tag in self.data.published_tags:
|
|
return self.data.selected_tag
|
|
|
|
return self.data.default_branch or "main"
|