""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see """ import logging import os import subprocess import psutil from defence360agent.utils import ( atomic_rewrite, is_systemd_boot, check_exit_code, check_run, retry_on, run, ) from defence360agent.utils.kwconfig import KWConfig logger = logging.getLogger(__name__) class PureFTPBaseConfig(KWConfig): SEARCH_PATTERN = r"^\s*?{}\s+(.*?)\s*?$" WRITE_PATTERN = "{} {}" DEFAULT_FILENAME = "/etc/pure-ftpd.conf" async def uploadscript_enable(): if not is_systemd_boot(): # for CentOS/CloudLinux 6 await run( ["start", "imunify360-pure"], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) atomic_rewrite("/etc/init/imunify360-pure.override", "", backup=False) else: await check_run(["systemctl", "enable", "imunify360-pure"]) await check_run(["systemctl", "start", "imunify360-pure"]) async def uploadscript_disable(): if not is_systemd_boot(): # for CentOS/CloudLinux 6 await run( ["stop", "imunify360-pure"], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) atomic_rewrite( "/etc/init/imunify360-pure.override", "start on manual\n", backup=False, ) else: await check_run(["systemctl", "stop", "imunify360-pure"]) await check_run(["systemctl", "disable", "imunify360-pure"]) async def uploadscript_status(): if not is_systemd_boot(): # for CentOS/CloudLinux 6 _, out, _ = await run(["status", "imunify360-pure"]) return "start/running" in out.decode() else: rc, _, _ = await run(["systemctl", "status", "imunify360-pure"]) return rc == 0 async def restart(): if not is_systemd_boot(): # for CentOS/CloudLinux 6 await check_exit_code(["/etc/init.d/pure-ftpd", "restart"]) else: try: await check_run(["systemctl", "restart", "pure-ftpd"]) except Exception as err: if err.args and err.args[0] == 5: logger.warning( "%s\nService pure-ftpd is probably deactivated.", repr(err) ) else: raise @retry_on(FileNotFoundError, max_tries=10) def thirdparty_uploadscript(): imunify_scripts = [ "/opt/alt/python35/share/imunify360/scripts/pure_scan.sh", "/opt/alt/python35/share/imunify360/scripts/pureftpd-on-upload", "/usr/share/imunify360/scripts/pureftpd-on-upload", ] it = psutil.process_iter(attrs=["exe", "cmdline"]) while True: try: proc = next(it) except StopIteration: break except FileNotFoundError as e: logger.warning("File not found during process iter: %s", e) raise else: if proc.info["exe"] == "/usr/sbin/pure-uploadscript" and all( proc.info["cmdline"] and script not in proc.info["cmdline"] for script in imunify_scripts ): return " ".join(proc.info["cmdline"]) return None # not found def detect(): return os.path.isfile("/usr/sbin/pure-ftpd") and os.access( "/usr/sbin/pure-ftpd", os.X_OK ) def get_pureftpd_configs(hosting_panel): configs = [PureFTPBaseConfig] if ( hosting_panel.is_installed() and hosting_panel.pure_ftp_conf_cls is not None ): configs.append(hosting_panel.pure_ftp_conf_cls) return configs async def enable_scan_in_config(hosting_panel): configs = get_pureftpd_configs(hosting_panel) for cls in configs: try: cls("CallUploadScript").set("yes") except OSError as e: logger.warning("Error when accessing pureftp config: %s", e) # this is required to update pure-ftpd config await restart() def scan_in_config_enabled(hosting_panel): configs = get_pureftpd_configs(hosting_panel) for cls in configs: try: value = cls("CallUploadScript").get() if value is None or value.strip() == "no": return False except OSError as e: logger.warning("Error when accessing pureftp config: %s", e) return False return True async def disable_purescan(hosting_panel): if thirdparty_uploadscript() is None: configs = get_pureftpd_configs(hosting_panel) for cls in configs: try: cls("CallUploadScript").set("no") except OSError as e: logger.warning("Error when accessing pureftp config: %s", e) # this is required to update pure-ftpd config await restart() await uploadscript_disable()