""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see Utilities to help upload a malicious file.""" import asyncio import json import logging import os from dataclasses import dataclass from typing import Iterable, List from urllib.parse import quote_from_bytes, urljoin import urllib from defence360agent import utils from defence360agent.contracts.config import Core, Malware as Config from defence360agent.contracts.license import LicenseCLN, LicenseError from defence360agent.internals.iaid import ( IAIDTokenError, IndependentAgentIDAPI, ) from imav.contracts.config import MalwareTune logger = logging.getLogger(__name__) if utils.OsReleaseInfo.id_like() & utils.OsReleaseInfo.DEBIAN: _CURL = "/opt/alt/curlssl/usr/bin/curl" else: _CURL = "/opt/alt/curlssl11/usr/bin/curl" _API_BASE_URL = os.environ.get("I360_MRS_API_BASE_URL", Core.API_BASE_URL) _ENDPOINT_UPLOAD = os.environ.get("I360_MRS_ENDPOINT_UPLOAD", "api/v1/upload") _ENDPOINT_CHECK = os.environ.get( "I360_MRS_ENDPOINT_CHECK", "api/v1/check-known-hashes" ) _POST_FILE_TIMEOUT = int( os.environ.get("IMUNIFY360_POST_FILE_TIMEOUT", 60 * 60) # hour ) FALSE_NEGATIVE = "false_negative" FALSE_POSITIVE = "false_positive" UNKNOWN_REASON = "unknown" class ClientError(OSError): """HTTP client error. It is used to hide what specific http client is used by upload_file(). """ class FileTooLargeError(OSError): pass def _token_to_headers(): token = LicenseCLN.get_token() headers = { "I360-Id": token["id"], "I360-Limit": token["limit"], "I360-Status": token["status"], "I360-Token-Expire-Utc": token["token_expire_utc"], "I360-Token-Created-Utc": token["token_created_utc"], "I360-Sign": token["sign"], } headers = {key: str(value) for key, value in headers.items()} return headers async def _post_file(filename, url, headers=None, timeout=None): """ Post *filename* as multipart/form-data to *url* with given HTTP *headers*. Return server response as bytes (http body). Raise TimeoutError on timeout. Raise ConnectionError if failed to connect to host. Raise ClientError on error. """ if headers is None: headers = {} headers_args = [ b"-H%s: %s" % (header.encode("ascii"), value.encode("latin-1")) for header, value in headers.items() ] quoted_full_path = quote_from_bytes(os.fsencode(filename), safe="").encode( "ascii" ) cmd = ( [os.fsencode(_CURL)] + headers_args + [b"--max-time", str(timeout).encode("ascii")] * (timeout is not None) + [ b"--form", # https://curl.haxx.se/docs/knownbugs.html#multipart_formposts_file_name_en b'file=@"%s";filename="%s"' % ( # escape backslash, double quotes os.fsencode(filename) .replace(b"\\", b"\\\\") .replace(b'"', b'\\"'), quoted_full_path, ), b"--fail", # disable progress meter b"--silent", b"--show-error", url.encode("ascii"), ] ) rc, out, err = await utils.run(cmd) if rc != 0: Error = ( ConnectionError if rc == 7 else TimeoutError if rc == 28 else ClientError ) raise Error( ( "Failed to post {filename} to {url}:" " curl: cmd={cmd}, rc={rc}, out={out}, err={err}" ).format(**vars()) ) return out async def upload_file(file: str, upload_reason=UNKNOWN_REASON): """ Upload a file to Malware Response Service. :param file: path to file :param upload_reason: one of 'unknown', 'false_positive', 'false_negative' :return: dict representing json response :raises LicenseError: """ if not LicenseCLN.is_valid(): raise LicenseError( "File uploading to Malware Responce Serivce " "requires a valid license" ) file_size = os.path.getsize(file) if file_size > Config.MAX_MRS_UPLOAD_FILE: raise FileTooLargeError( "File {} is {} bytes, files larger than {} bytes " "are not allowed.".format( file, file_size, Config.MAX_MRS_UPLOAD_FILE ) ) url = urljoin(_API_BASE_URL, _ENDPOINT_UPLOAD) headers = { **_token_to_headers(), "I360-Upload-Reason": upload_reason, } response_body = await _post_file( file, url, headers, timeout=_POST_FILE_TIMEOUT ) result = json.loads(response_body.decode()) logger.info( "Uploaded file %r to the Malware Response Service with reason: %s", file, upload_reason, ) return result async def upload_with_retries(file, upload_reason=UNKNOWN_REASON): """ :raises LicenseError, ClientError, TimeoutError, ConnectionError, """ # exponential backoff: total delay ~6 min (not including the upload time) delays = [0.5, 2.5, 6, 15, 40, 100, 200] max_tries = len(delays) + 1 for i, pause in enumerate(delays, start=1): error = await _try_upload( file, raise_errors=False, upload_reason=upload_reason ) if not error: break if isinstance(error, FileTooLargeError): logger.warning("File %s is too big. Stop retrying to upload", file) break logger.warning( "Attempt %d/%d: failed uploading file %s, reason: %s. Retrying in" " %s seconds", i, max_tries, file, error, pause, ) await asyncio.sleep(pause) else: # exhausted retries, one last attempt, raise error if it fails await _try_upload(file, raise_errors=True, upload_reason=upload_reason) async def _try_upload(file, raise_errors, *, upload_reason=UNKNOWN_REASON): """Return error instead of raising it unless *raise_errors* is true. :raises LicenseError: :raises ClientError, TimeoutError, ConnectionError, FileTooLargeError: if raise_errors is True """ try: await upload_file(file, upload_reason=upload_reason) except ( ClientError, ConnectionError, FileTooLargeError, TimeoutError, ) as e: if raise_errors: raise e return e else: return None @dataclass class HitInfo: file: str hash: str async def check_known_hashes( loop, hashes: Iterable[str], upload_reason=UNKNOWN_REASON ) -> List[str]: hashes = list(hashes) if MalwareTune.NO_CHECK_KNOWN_HASHES: return hashes try: token = await IndependentAgentIDAPI.get_token() except IAIDTokenError: return hashes url = urljoin(_API_BASE_URL, _ENDPOINT_CHECK) headers = { "X-Auth": token, "I360-Upload-Reason": upload_reason, "Content-Type": "application/json", } request = {"hashes": hashes} try: result = await loop.run_in_executor( None, _do_request, urllib.request.Request( url, data=json.dumps(request).encode(), headers=headers, method="POST", ), ) except Exception as e: logger.warning("Failed to check known hashes: %s", e) return hashes return result["unknown_hashes"] def _do_request(request: urllib.request.Request) -> None: with urllib.request.urlopen( request, timeout=Core.DEFAULT_SOCKET_TIMEOUT ) as response: if response.status != 200: raise Exception("status code is {}".format(response.status)) return json.loads(response.read().decode())