This is the final part of my writedown on Volatility KASLR adventures.

After figuring out my memory analysis problems with dumps from Linux systems with kernel 4.9 I wanted to implement the KASLR detection in Volatility. As stated before my Ruby is quite fluent but my Python skills are more on advanced beginner level, thus I wanted to refrain from messing up with the Linux overlay in the Volatility core too much and rather implement this as a plugin.

Preparing Volatility core infrastructure for physical KASLR shifts

However a few changes in the Linux overlay were required nonetheless, since it only supported one shift parameter and I needed two, one for the virtual and one for the physical shift:

# Introduce the phys_shift parameter for Linux profiles
+ self.phys_shift
...
# Use that value for DTB finding
-  init_task_addr = tbl["init_task"][0][0] + shift_address
-  dtb_sym_addr   = tbl[sym][0][0] + shift_address
+  init_task_addr = tbl["init_task"][0][0] + shift_address + phys_shift
+  dtb_sym_addr   = tbl[sym][0][0] + shift_address + phys_shift
...
# And for address space validation
-  check = init_task_addr - shift
+  check = init_task_addr - shift + self.obj_vm.profile.phys_shift

Moreover if I want to enable the user to supply a physical shift to Volatility on the command line I need to introduce a corresponding option, which is done in volatilty/plugins/linux/common.py:

@staticmethod
def register_options(config):
  config.add_option("PHYSICALSHIFT", type = 'int', default = 0, help = "Linux KASLR physical shift address")

Authoring a Volatility plugin

After this has been accomplished, let’s try and write a plugin. I started out with the boilerplate code that is required for all plugins. To get an idea of what is required we should have a glance at the steps that are taken when you run Volatility:

  1. Volatility parses the command line arguments
  2. Volatility looks at the supplied profile and determines e.g. operating system
  3. Since we are looking at a Linux profile Volatility applies the Linux overlay
  4. That overlay parses the System.map and dwarf file of our profile
  5. Then it sets up a runtime profile in which various properties are set, partially derived from the profile, partially some Linux inherent stuff
  6. Volatility tries to find the DTB
  7. If it found a legit DTB it will try to validate address space and profile
  8. The plugin supplied on the command line can now run
  9. That plugin needs to have a calculate method that gathers the relevant data and either a render_* or unified_output method for displaying the results

To avoid reinventing the wheel we include some Volatility classes in our plugin and inherit from AbstractLinuxIntelCommand:

# Skeleton for the plugin
import volatility.obj as obj
import volatility.utils as utils
import volatility.plugins.linux.common as linux_common

class linux_kaslr_shift(linux_common.AbstractLinuxIntelCommand):
    """Automatically detect KASLR physical/virtual shifts and alternate DTBs"""

    def calculate(self):
      yield "Skeleton"

    def render_text(self, outfd, data):
      self.table_header(outfd, [("Column:", 16)])
      self.table_row(outfd, data)

Now that we have the basic building blocks in place we need to figure out a control flow for the calculate method.

The steps the linux_kaslr_shift plugin will take are

  1. Load the profile, so that I can easily lookup symbols and do virtual to physical translation
  2. Find possible kernel DTB candidates
  3. For each candidate build the PTE tree reachable from the DTB
  4. Find all PAs in that tree that point to the physical location of init_task
  5. Calculate the indices for the different paging levels that are traversed in order to reach the PTE that point to the PA from the previous step
  6. Calculate the virtual shift by subtracting the expected indices that would be used without KASLR with those that are observed in our sample
  7. Subtract the virtual shift from the total shift observed to determine the physical shift
  8. Yield each DTB, virtual shift and physical shift, so these can be consumed by render_text

I already experienced problems during the first step, since when instantiating the profile Volatility will try to validate profile and address space which at this point fails due to physical and virtual shift being unknown. Thus I decided to monkey patch the validation code to always tell Volatility that the address space is valid:

# Include more classes
import volatility.plugins.overlays.linux.linux as linux_overlay
# Patched validation function
def generate_suggestions_valid_as(self):
    yield True

# Apply it only in my plugin:
class linux_kaslr_shift(linux_common.AbstractLinuxIntelCommand):

    def calculate(self):
        ...
        linux_overlay.VolatilityLinuxIntelValidAS.generate_suggestions = \
            generate_suggestions_valid_as

Afterwards symbol and struct look up works fine.

Helper methods

Before delving into the core code of the plugin I want to highlight some helper functions I needed. I explained some of them in the previous post. Here is the rest:

def find_key(d, value):
    """Searches for a leaf in a nested dictionary and returns the path taken"""
    for k, v in d.items():
        if isinstance(v, dict):
            p = find_key(v, value)
            if p:
                return [k] + p
        elif value.match(v):
            return [k]

This method searches a tree for leafs matching value and returns the indices that were chosen in order to traverse from root to leaf. If you look at the example tree from previous post:

'''
The nested dictionary will look like this:
PML4 ind | PDPT Ind | PD Ind | PT Ind | PA of page
{   279: {    91: {   0: {   0: '0x8000000000000163L',
                             1: '0x8000000000001163L',
                             ...
                         },
                      2: {   15: ....
                         }
             123: ...
         },
    511: { ...
         }
}
'''

find_key(tree, '0x8000000000001163L') would return [279, 91, 0, 1], thus in the PGD the index to reach the next hop of our desired path is 279, in the PUD the index would be 91 and so on.

Then there is the method searching for DTBs:

def find_dtb_candidates(self, aspace, distance, init_task_addr):
        # Returns a dictionary of possible_dtb: shift combinations
        comm_offset = self.profile.get_obj_offset("task_struct", "comm")
        pid_offset = self.profile.get_obj_offset("task_struct", "pid")
        dtb_sym_addr = init_task_addr - distance
        shift = 0xffffffff80000000
        possible_dtbs = {}
        limespace = aspace.base

        scanner = linux_overlay.swapperScan(needles = ["swapper/0\x00\x00\x00\x00\x00\x00"])
        for swapper_offset in scanner.scan(aspace.base):
            swapper_address = swapper_offset - comm_offset

            if limespace.read(swapper_address, 4) != "\x00\x00\x00\x00":
                continue

            if limespace.read(swapper_address + pid_offset, 4) != "\x00\x00\x00\x00":
                continue

            tmp_shift_address = swapper_address - (init_task_addr - shift)

            if tmp_shift_address & 0xfff != 0x000:
                continue

            shift_address = tmp_shift_address
            good_dtb = dtb_sym_addr - shift + shift_address
            debug.debug("DTB: {thisdtb:#x}, Shift: {thisshift:#x}".format(thisdtb = good_dtb, thisshift = shift_address))

            possible_dtbs[good_dtb] = shift_address

        return possible_dtbs

This is basically the original Linux overlay DTB searching code, with the slight modification that it doesn’t only return the first legit DTB but also other legit candidates that might show up in the dump. I considered alternative DTBs since the first one might not be the one matching the running system but rather an artifact before the kernel relocated itself in memory or the DTB of a Linux VM with the same kernel running on the system we acquired the memory image from. In case the plugin yields several hits it’s up to the user to determine the correct one.

def format_index_binary(self, num):
  # For each level of address translation 9 bits are used to
  # determine the index in a map/table/directory
  # If the difference is negative the pte_match is not a
  # valid candidate, thus it will return False so that
  # the parent method can sort out invalid candidates
  if num < 0:
    return False
  else:
    return '{0:09b}'.format(num)

As already apparent by the comment this will convert an integer to a binary string and sort out invalid candidates. For each index of the 4 translation steps we pass it to this method and generate a 36 character long binary string that is the virtual shift represented in 1’s and 0’s.

def pdpte_index(self, vaddr):
    return (vaddr >> 30) & (512 - 1)

This is a helper method for the AMD64 paged memory address space that returns the index of a virtual address in the PUD. Appropriate methods for PGD, PMD and PT indices are already implemented in that address space.

More imports

For some functionality (e.g. debug output, regex matching) in the plugin I needed to include more classes and libraries. This is the final import code:

import volatility.obj as obj
import volatility.utils as utils
import volatility.plugins.linux.common as linux_common
import volatility.plugins.overlays.linux.linux as linux_overlay
import re
import volatility.debug as debug
from operator import attrgetter

The final calculate method

With all of this in place I could finally implement and run the calculate method:

def calculate(self):

  bitmask = 0xFFFFFFFFFF000
  page_marker = 0x8000000000000000

  # Monkeypatch address space check so we can continue
  linux_overlay.VolatilityLinuxIntelValidAS.generate_suggestions = \
      generate_suggestions_valid_as

  # Initalize address space, profile, etc.
  aspace = utils.load_as(self._config)
  linux_common.set_plugin_members(self)
  profile = self.profile
  tbl = profile.sys_map["kernel"]
  shift = profile.shift_address

  if profile.metadata.get('arch').lower() != 'x64':
      debug.error("This plugin only supports Intel x64 profiles")

  init_task_addr = tbl["init_task"][0][0]
  dtb_sym_addr = tbl["init_level4_pgt"][0][0]
  dtb_init_dist = init_task_addr - dtb_sym_addr

  # Monkeypatch amd64 address space
  aspace.get_nonzero_entries = get_nonzero_entries
  aspace.pdpte_index = pdpte_index

  # Retrieve the expected indices for the symbol for all 4
  # translation levels. These will be used to calculate the
  # difference between observed and expected indices in the
  # various tables
  pte_index = aspace.pte_index(init_task_addr)
  pde_index = aspace.pde_index(init_task_addr)
  pdpe_index = aspace.pdpte_index(aspace, init_task_addr)
  pml4e_index = aspace.pml4e_index(init_task_addr)

  expected = [pml4e_index, pdpe_index, pde_index, pte_index]

  # Determine all possible DTBs, the first one discovered might
  # not be the real one
  dtb_candidates = self.find_dtb_candidates(aspace,
                                            dtb_init_dist,
                                            init_task_addr)
  dtbs_and_offsets = {}
  for dtb, shift in dtb_candidates.iteritems():
      # Calculate the offset from DTB to init_task
      # init_task_phys is the physical address of the 'init_task' symbol
      # in the memory dump that is processed
      init_task_phys = dtb + dtb_init_dist
      debug.debug("Discovered DTB: {thisdtb:#x}".format(thisdtb = dtb))

      # Generate a nested dictionary of all pages
      tree = self.build_pte_tree(aspace, dtb)
      pte_to_search_for = hex(init_task_phys + page_marker)
      # Substitute last 3 bytes to '.', since their value may vary
      pte_to_search_for = \
          pte_to_search_for.replace(pte_to_search_for[-4:-1], '...')

      pte_regex = re.compile(pte_to_search_for)
      index_offsets = []

      searching = True
      while searching:
          pte_match = find_key(tree, pte_regex)
          if pte_match:
              index_offsets.append(pte_match)
              del tree[pte_match[0]][pte_match[1]][pte_match[2]][pte_match[3]]
          else:
              # If nothing is found, abort the loop
              searching = False

      dtbs_and_offsets[dtb] = index_offsets

  for dtb, index_offsets in dtbs_and_offsets.iteritems():
      candidates = []
      for pte_match in index_offsets:
          candidate = self.build_virtual_shift(expected, pte_match)
          if candidate:
              candidate = int(candidate, 2)
              yield [dtb, candidate, dtb_candidates[dtb] - candidate]

As you can see in the last line the calculate yields an array comprised of 3 elements for each valid DTB/Virtual shift/physical shift combination. To display the results properly the render_text methods needs to be adapted:

def render_text(self, outfd, data):
    self.table_header(outfd, [("DTB", "[addrpad]"),
                              ("Virtual Shift", "[addrpad]"),
                              ("Physical Shift", "[addrpad]")])

    for shift_combination in data:
        self.table_row(outfd, shift_combination[0],
                       shift_combination[1], shift_combination[2])

Switching to unified output

At the time I was writing the plugin I was constantly looking at Github issues and pull request in the Volatility repository. One of those [issues][unifiedbug] mentioned problems with json output for a plugin. This is when I learned about unified output and decided to also implement this for my plugin. To support this I had to include two more imports:

from volatility.renderers import TreeGrid
from volatility.renderers.basic import Address, Hex

I replaced my render_text method with this:

def unified_output(self, data):
    return TreeGrid([("DTB", Address),
                     ("VirtualShift", Address),
                     ("Physical Shift", Address)],
                    self.generator(data))

def generator(self, data):
    for dtb, virtualshift, physicalshift in data:
        yield(0, [Address(dtb), Address(virtualshift), Address(physicalshift)])

Final thoughts and future of the plugin

After completing the plugin I was pondering if I should submit the plugin to the community plugins repository and if I should wait for the next Volatility plugin contest in order to have a chance to win some Volatility swag. However since Debian Stretch will be released soon and other Linux distributions also start shipping KASLR enabled kernel > 4.8 I thought this should be a core plugin and I wanted people to be able to analyze memory images from systems that run more recent distributions, thus I submitted it as a normal pull request.

I instantly received a response by Andrew Case who seems to be in charge of Linux related stuff in Volatility. Since then some weeks have passed and he reimplemented some of this plugins functionality in the Linux overlay. He found a more elegant way to retrieve the virtual shift by looking up a the files member in init_tasks task_struct. This member should - in theory - point to the virtual address of init_files. Thus by subtracting the VA of init_files in System.map from that value you get the virtual shift without any PTE tree building and index comparison. However the initial version of his KASLR detection did not work with my memory samples, thus I created a second pull request to fix it.

If this gets merged in upstream my plugin almost becomes obsolete before it made it into the core plugin set. However the features it still provides are lookups of more than one possible DTBs and virtual shift determination in case the init_files symbol is not available or the value of init_task->files is corrupted (which however in turn undermines the credibility of the entire memory image).

You can find the complete source code for the plugin here. At the time of this writing the value for physical shift has another meaning in my feature branch than upstream. However I plan to adapt output etc. to upstream as soon as their KASLR detection code is working properly.

Thanks for reading all of this.