"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import logging
from peewee import Model, SqliteDatabase
from defence360agent.feature_management.constants import FULL
from imav.migration_utils.other import skip_for_im360
from imav.migration_utils.revisium import (
find_revisium_db,
get_all_domains,
get_permission_status,
)
logger = logging.getLogger(__name__)
CLEANUP_ALLOWED_PERM_NAME = "ra_cleanup_allowed"
ra = SqliteDatabase(None)
class db:
FeatureManagementPermissions: Model
@classmethod
def init(cls, migrator):
cls.FeatureManagementPermissions = migrator.orm[
"feature_management_permissions"
]
def get_imav_permission_map() -> dict:
"""
{"user123": False, "user345": True, ..}
"""
user_imav_cleanup_perm = {}
try:
all_domains = get_all_domains()
except Exception as e:
logger.warning("Failed to get Plesk domains: %r", e)
return user_imav_cleanup_perm
for domain_info in all_domains.values():
if (owner := domain_info.get("owner")) is None:
# skip a domain without an owner because it's an alias
# and doesn't have its own files
continue
try:
imav_cleanup_allowed = get_permission_status(
domain_info["id"], CLEANUP_ALLOWED_PERM_NAME
)
except Exception as e:
logger.warning(
"Failed to get permission status for domain: %s Error: %s",
str(domain_info["name"]),
str(e),
)
continue
user_imav_cleanup_perm[owner] = imav_cleanup_allowed
return user_imav_cleanup_perm
@skip_for_im360
def migrate(migrator, database, fake=False, **kwargs):
if fake:
return
revisium_db_path = find_revisium_db()
if revisium_db_path is None:
logger.info("No legacy ImunifyAV database found. Skipping...")
return
try:
imav_permission_map = get_imav_permission_map()
except Exception as e:
logger.warning(
"Cannot obtain permissions map for migration, Error: %s",
str(e),
)
return
if not imav_permission_map:
logger.info("No users to migrate AV cleanup permissions")
return
db.init(migrator)
for username, status in imav_permission_map.items():
try:
feature_perms, _ = db.FeatureManagementPermissions.get_or_create(
user=username
)
if status:
feature_perms.av = FULL
feature_perms.save()
except Exception as e:
logger.warning(
"Failed to update feature permissions for user: %s, Error: %s",
username,
str(e),
)
continue
@skip_for_im360
def rollback(migrator, database, fake=False, **kwargs):
pass