linux-rootkit

Feature-rich interactive rootkit that targets Linux kernel 4.19, accompanied by a dynamic kernel memory analysis GDB plugin for in vivo introspection (e.g. using QEMU)
git clone git://git.deurzen.net/linux-rootkit
Log | Files | Refs

commit bb457eca9fd33e3fe2f7d660339327e93a327527
parent 4f0cb9bcd4a4358a31c8aacc605fc771dd2a86bb
Author: deurzen <m.deurzen@tum.de>
Date:   Thu, 21 Jan 2021 23:33:11 +0100

adds initial live to ELF byte comparison code

Diffstat:
Mmem_forensics/memcheck-gdb.py | 72++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------
1 file changed, 58 insertions(+), 14 deletions(-)

diff --git a/mem_forensics/memcheck-gdb.py b/mem_forensics/memcheck-gdb.py @@ -588,7 +588,6 @@ class RkCheckFunctions(gdb.Command): f = None s = None - d = None symbols = None headers = None @@ -617,14 +616,17 @@ class RkCheckFunctions(gdb.Command): self.f = elffile.ELFFile(open(file_g, "rb")) self.s = self.f.get_section_by_name(".symtab") + print("this might take a while") + print("exits silently when no tampering has been detected") + for symbol in self.s.iter_symbols(): if symbol.entry["st_info"]["type"] == "STT_FUNC": - # just to test - if symbol.name == "ksys_getdents64" or symbol.name == "__x64_sys_getdents64": - for segment in self.f.iter_segments(): - # if segment.header.p_type == "PT_LOAD": - self.d = segment.data() - self.compare_function(symbol.name, symbol.entry["st_size"], symbol.entry["st_value"]) + name = symbol.name + size = symbol.entry["st_size"] + value = symbol.entry["st_value"] + + if name is not None: + self.compare_function(name, size, value) # TODO: compare `size` number of bytes starting from `value` in ELF # with `size` number of bytes starting from address of symbol @@ -633,16 +635,58 @@ class RkCheckFunctions(gdb.Command): # malicious code is defined on running machine? def compare_function(self, name, size, value): addr = self.get_v_addr(name) + + if addr is None: + print(f"could not retrieve virtual address address for symbol `{name}`") + return None + # read in live bytes from start address of the function + 5B (to offset the call to __fentry__) - live_bytes = gdb.execute(f"xbfunc {hex(int(addr, 16) + 5)} {size - 5}", to_string=True).split() + live_bytes = gdb.execute(f"xbfunc {addr} {size}", to_string=True).split() + objdump = subprocess.check_output(f"objdump --insn-width 1 -z --disassemble={name} {file_g}", shell=True).split(b"\n")[:-1] + + start = None + for i, s in enumerate(objdump): + if name in s.decode(sys.stdout.encoding): + start = i + break + + objdump = objdump[start+1:] + + end = None + for i, s in enumerate(objdump): + if b'' == s: + end = i + break + + if end is not None: + objdump = objdump[:end] + + # callq takes up 5 bytes, remove those from live and ELF bytes to compare (temp fix) + exclude = [] + for i, s in enumerate(objdump): + if b"\tcallq" in s: + exclude.append(i) + exclude.append(i + 1) + exclude.append(i + 2) + exclude.append(i + 3) + exclude.append(i + 4) + + live_bytes = [live_byte for i, live_byte in enumerate(live_bytes) if i not in exclude] live_bytes = "".join(live_bytes) - live_bytes = bytes.fromhex(live_bytes) - print(live_bytes) - name = str.encode(name) - print(name) - index = self.d.find(name) - print(index) + objdump = [elf_byte for i, elf_byte in enumerate(objdump) if i not in exclude] + objdump = [line.split(b"\t") for line in objdump] + + elf_bytes = [line[1].decode(sys.stdout.encoding).strip() for line in objdump] + elf_bytes = "".join(elf_bytes) + + int3_chain = ''.join([c * len(live_bytes) for c in "c"]) + if live_bytes == int3_chain: + return None + + if live_bytes != elf_bytes: + print(f"function `{name} compromised, live bytes not equal to ELF bytes") + print(f"expected: {elf_bytes}, live: {live_bytes}") def get_v_addr(self, symbol): try: