linux-rootkit

Feature-rich interactive rootkit that targets Linux kernel 4.19, accompanied by a dynamic kernel memory analysis GDB plugin for in vivo introspection (e.g. using QEMU)
git clone git://git.deurzen.net/linux-rootkit
Log | Files | Refs

commit c33ad18c9557150e5d738feed4f6a13a522a31b9
parent f46b361d1c6e9c3a0be30fd746b8693251a1987b
Author: Tizian Leonhardt <tizianleonhardt@web.de>
Date:   Sat, 23 Jan 2021 22:50:10 +0100

Switch to code dictionary (very slow!)

Diffstat:
Mmem_forensics/memcheck-gdb.py | 100+++++++++++++++++++++++++++++--------------------------------------------------
1 file changed, 37 insertions(+), 63 deletions(-)

diff --git a/mem_forensics/memcheck-gdb.py b/mem_forensics/memcheck-gdb.py @@ -589,7 +589,10 @@ class RkCheckFunctions(gdb.Command): symbols = None headers = None - #Key: symbol, value: list of ranges for exclude bytes + #Key: symbol, value: tuple (size, code bytes from ELF file) + code_dict = {} + + #Key: symbol, value: list of ranges for exclude bytes (relative to function entry!) altinstr_dict = {} paravirt_dict = {} @@ -618,87 +621,53 @@ class RkCheckFunctions(gdb.Command): self.s = self.f.get_section_by_name(".symtab") print("this might take a while") - print("exits silently when no tampering has been detected") + print("populating dictionaries...", end='', flush=True) + self.fill_code_dict() self.fill_altinstr_dict() self.fill_paravirt_dict() + print(" done!") + def fill_code_dict(self): for symbol in self.s.iter_symbols(): if symbol.entry["st_info"]["type"] == "STT_FUNC": name = symbol.name size = symbol.entry["st_size"] - value = symbol.entry["st_value"] else: continue - + if name is None or ".cold." in name or ".part." in name or ".constprop." in name: continue + + addr = self.get_v_addr(name) + if addr is None: + continue - self.compare_function(name, size, value) - - def compare_function(self, name, size, value): - addr = self.get_v_addr(name) - - if addr is None: - return None - - # read in live bytes from start address of the function + 5B (to offset the call to __fentry__) - live_bytes = gdb.execute(f"xbfunc {addr} {size}", to_string=True).split() - objdump = subprocess.check_output(f"objdump -z --disassemble={name} {file_g}", shell=True).split(b"\n")[:-1] - - start = None - for i, s in enumerate(objdump): - if name in s.decode(sys.stdout.encoding): - start = i - break - - objdump = objdump[start+1:] - - end = None - for i, s in enumerate(objdump): - if b'' == s: - end = i - break - - if end is not None: - objdump = objdump[:end] - - # exclude_objdump = [] - # for i, s in enumerate(objdump): - # bytes_start = s.decode(sys.stdout.encoding).find(':') - # if b"<" in s and b">" in s or b"0x" in s or b"ffffff" in s[bytes_start:]: - # exclude_objdump.append(i) + objdump = subprocess.check_output(f"objdump -z --disassemble={name} {file_g}", shell=True).split(b"\n")[:-1] - # exclude_live_bytes = [] - # for i in exclude_objdump: - # bytes = objdump[i].split(b"\t")[1] - # bytes = bytes.strip().split(b" ") - # for j in range(len(bytes)): - # exclude_live_bytes.append(i + j) + start = None + for i, s in enumerate(objdump): + if name in s.decode(sys.stdout.encoding): + start = i + break - # objdump = [elf_byte for i, elf_byte in enumerate(objdump) if i not in exclude_objdump] - objdump = [line.split(b"\t") for line in objdump] + objdump = objdump[start+1:] - # live_bytes = [live_byte for i, live_byte in enumerate(live_bytes) if i not in exclude_live_bytes] - live_bytes = "".join(live_bytes) + end = None + for i, s in enumerate(objdump): + if b'' == s: + end = i + break - elf_bytes = [line[1].decode(sys.stdout.encoding).strip().replace(' ', '') for line in objdump] - elf_bytes = "".join(elf_bytes) + if end is not None: + objdump = objdump[:end] - int3_chain = ''.join('c' * len(live_bytes)) - if live_bytes == int3_chain: - return None + objdump = [line.split(b"\t") for line in objdump] - if live_bytes != elf_bytes: - print("x", end='', flush=True) - else: - print("o", end='', flush=True) + elf_bytes = [line[1].decode(sys.stdout.encoding).strip().replace(' ', '') for line in objdump] + elf_bytes = "".join(elf_bytes) - def get_v_addr(self, symbol): - try: - return gdb.execute(f"x {symbol}", to_string=True).split(" ")[0] - except: - return None + self.code_dict[name] = (size, elf_bytes) def fill_altinstr_dict(self): global file_g @@ -740,7 +709,6 @@ class RkCheckFunctions(gdb.Command): i = i + alt_instr_sz - def fill_paravirt_dict(self): global file_g global v_off_g @@ -780,4 +748,10 @@ class RkCheckFunctions(gdb.Command): i = i + paravirt_patch_site_sz + def get_v_addr(self, symbol): + try: + return gdb.execute(f"x {symbol}", to_string=True).split(" ")[0] + except: + return None + RkCheckFunctions()