import asyncio import pathlib import shutil from defence360agent.subsys.panels import hosting_panel from defence360agent.utils import importer panel_users = importer.get( module="imav.malwarelib.utils.user_list", name="panel_users", default=None ) def migrate(migrator, database, fake=False, **kwargs): if fake: return loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) users = loop.run_until_complete(panel_users()) for user in users: path_obj = pathlib.Path(user["home"]) old_path = str(path_obj.parent / ".rapid-scan-db" / path_obj.name) try: new_path = hosting_panel.HostingPanel().get_rapid_scan_db_dir( user["home"] ) except OSError: continue if new_path is None or old_path == new_path: continue try: shutil.move(old_path, new_path) except OSError: pass def rollback(migrator, database, fake=False, **kwargs): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) users = loop.run_until_complete(panel_users()) for user in users: path_obj = pathlib.Path(user["home"]) old_path = str(path_obj.parent / ".rapid-scan-db" / path_obj.name) try: new_path = hosting_panel.HostingPanel().get_rapid_scan_db_dir( user["home"] ) except OSError: continue if new_path is None or old_path == new_path: continue try: shutil.move(new_path, old_path) except OSError: pass