Source code for esedbrc.schema_extractor
#!/usr/bin/env python3
"""ESE database schema extractor."""
import logging
import os
import pyesedb
from artifacts import definitions as artifacts_definitions
from artifacts import reader as artifacts_reader
from artifacts import registry as artifacts_registry
from dfimagetools import definitions as dfimagetools_definitions
from dfimagetools import file_entry_lister
from esedbrc import resources
from esedbrc import yaml_definitions_file
[docs]
class EseDbSchemaExtractor:
"""ESE database schema extractor."""
_DATABASE_DEFINITIONS_FILE = os.path.join(
os.path.dirname(__file__), "data", "known_databases.yaml"
)
_MINIMUM_FILE_SIZE = 16
[docs]
def __init__(self, artifact_definitions, mediator=None):
"""Initializes a ESE database file schema extractor.
Args:
artifact_definitions (str): path to a single artifact definitions
YAML file or a directory of definitions YAML files.
mediator (Optional[dfvfs.VolumeScannerMediator]): a volume scanner
mediator.
"""
super().__init__()
self._artifacts_registry = artifacts_registry.ArtifactDefinitionsRegistry()
self._known_database_definitions = {}
self._mediator = mediator
if artifact_definitions:
reader = artifacts_reader.YamlArtifactsReader()
if os.path.isdir(artifact_definitions):
self._artifacts_registry.ReadFromDirectory(reader, artifact_definitions)
elif os.path.isfile(artifact_definitions):
self._artifacts_registry.ReadFromFile(reader, artifact_definitions)
definitions_file = yaml_definitions_file.YAMLDatabaseDefinitionsFile()
for database_definition in definitions_file.ReadFromFile(
self._DATABASE_DEFINITIONS_FILE
):
artifact_definition = self._artifacts_registry.GetDefinitionByName(
database_definition.artifact_definition
)
if not artifact_definition:
logging.warning(
(
f"Unknown artifact definition: "
f"{database_definition.artifact_definition:s}"
)
)
else:
self._known_database_definitions[
database_definition.database_identifier
] = artifact_definition
def _CheckSignature(self, file_object):
"""Checks the signature of a given database file-like object.
Args:
file_object (dfvfs.FileIO): file-like object of the database.
Returns:
bool: True if the signature matches that of a ESE database, False
otherwise.
"""
if not file_object:
return False
file_object.seek(4, os.SEEK_SET)
file_data = file_object.read(4)
return file_data == b"\xef\xcd\xab\x89"
def _FormatSchemaAsYAML(self, schema):
"""Formats a schema into YAML.
Args:
schema (list[EseTableDefinition]): schema as unique table definitions or
None if the schema could not be retrieved.
Returns:
str: schema formatted as YAML.
Raises:
RuntimeError: if a query could not be parsed.
"""
lines = ["# esedb-kb database schema."]
for table_definition in sorted(
schema, key=lambda table_definition: table_definition.name
):
lines.extend(["---", f"table: {table_definition.name:s}", "columns:"])
for column_definition in table_definition.column_definitions:
# TODO: convert type to human readable string.
lines.extend(
[
f"- name: {column_definition.name:s}",
f" value_type: {column_definition.type:d}",
]
)
lines.append("")
return "\n".join(lines)
def _GetDatabaseIdentifier(self, path_segments):
"""Determines the database identifier.
Args:
path_segments (list[str]): path segments.
Returns:
str: database identifier or None if the type could not be determined.
"""
# TODO: make comparison more efficient.
for (
database_identifier,
artifact_definition,
) in self._known_database_definitions.items():
for source in artifact_definition.sources:
if source.type_indicator in (
artifacts_definitions.TYPE_INDICATOR_DIRECTORY,
artifacts_definitions.TYPE_INDICATOR_FILE,
artifacts_definitions.TYPE_INDICATOR_PATH,
):
for source_path in set(source.paths):
source_path_segments = source_path.split(source.separator)
if not source_path_segments[0]:
source_path_segments = source_path_segments[1:]
# TODO: add support for parameters.
last_index = len(source_path_segments)
for index in range(1, last_index + 1):
source_path_segment = source_path_segments[-index]
if not source_path_segment or len(source_path_segment) < 2:
continue
if (
source_path_segment[0] == "%"
and source_path_segment[-1] == "%"
):
source_path_segments = source_path_segments[
-index + 1 :
]
break
if len(source_path_segments) > len(path_segments):
continue
is_match = True
last_index = min(len(source_path_segments), len(path_segments))
for index in range(1, last_index + 1):
source_path_segment = source_path_segments[-index]
# TODO: improve handling of *
if "*" in source_path_segment:
continue
path_segment = path_segments[-index].lower()
source_path_segment = source_path_segment.lower()
is_match = path_segment == source_path_segment
if not is_match:
break
if is_match:
return database_identifier
return None
def _GetDatabaseSchema(self, database_path):
"""Retrieves schema from given database.
Args:
database_path (str): file path to database.
Returns:
list[EseTableDefinition]: schema as unique table definitions or None if
the schema could not be retrieved.
"""
with open(database_path, "rb") as file_object:
return self._GetDatabaseSchemaFromFileObject(file_object)
def _GetDatabaseSchemaFromFileObject(self, file_object):
"""Retrieves schema from given database file-like object.
Args:
file_object (dfvfs.FileIO): file-like object of the database.
Returns:
list[EseTableDefinition]: schema as unique table definitions or None if
the schema could not be retrieved.
"""
esedb_file = pyesedb.file()
esedb_file.open_file_object(file_object)
try:
table_definitions = []
for esedb_table in iter(esedb_file.tables):
table_definition = resources.EseTableDefinition(
esedb_table.name, esedb_table.template_name
)
for esedb_column in esedb_table.columns:
table_definition.AddColumnDefinition(
esedb_column.identifier, esedb_column.name, esedb_column.type
)
table_definitions.append(table_definition)
unique_table_definitions = []
for table_definition in table_definitions:
table_columns = [
definition.CopyToDict()
for definition in table_definition.column_definitions
]
is_unique_table = True
for compare_table_definition in unique_table_definitions:
compare_table_columns = [
definition.CopyToDict()
for definition in compare_table_definition.column_definitions
]
if table_columns == compare_table_columns:
compare_table_definition.aliases.append(table_definition.name)
is_unique_table = False
if is_unique_table:
# TODO: generalize name of unique tables e.g. change
# AppCacheEntryEx_9 into AppCacheEntryEx_# or AppCacheEntryEx_1
unique_table_definitions.append(table_definition)
finally:
esedb_file.close()
# TODO: move schema into object.
return unique_table_definitions
[docs]
def GetDisplayPath(self, path_segments, data_stream_name=None):
"""Retrieves a path to display.
Args:
path_segments (list[str]): path segments of the full path of the file
entry.
data_stream_name (Optional[str]): name of the data stream.
Returns:
str: path to display.
"""
display_path = ""
path_segments = [
segment.translate(
dfimagetools_definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE
)
for segment in path_segments
]
display_path = "".join([display_path, "/".join(path_segments)])
if data_stream_name:
data_stream_name = data_stream_name.translate(
dfimagetools_definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE
)
display_path = ":".join([display_path, data_stream_name])
return display_path or "/"
[docs]
def ExtractSchemas(self, path, options=None):
"""Extracts database schemas from the path.
Args:
path (str): path of a ESE database file or storage media image containing
ESE database files.
options (Optional[dfvfs.VolumeScannerOptions]): volume scanner options. If
None the default volume scanner options are used, which are defined in
the dfVFS VolumeScannerOptions class.
Yields:
tuple[str, dict[str, str]]: known database type identifier or the name of
the ESE database file if not known and schema.
"""
entry_lister = file_entry_lister.FileEntryLister(mediator=self._mediator)
base_path_specs = entry_lister.GetBasePathSpecs(path, options=options)
if not base_path_specs:
logging.warning(
f"Unable to determine base path specifications from: {path:s}"
)
else:
for file_entry, path_segments in entry_lister.ListFileEntries(
base_path_specs
):
if not file_entry.IsFile() or file_entry.size < self._MINIMUM_FILE_SIZE:
continue
file_object = file_entry.GetFileObject()
if not self._CheckSignature(file_object):
continue
display_path = self.GetDisplayPath(path_segments)
# logging.info(
# f'Extracting schema from database file: {display_path:s}')
database_schema = self._GetDatabaseSchemaFromFileObject(file_object)
if database_schema is None:
logging.warning(
(
f"Unable to determine schema from database file: "
f"{display_path:s}"
)
)
continue
# TODO: improve support to determine identifier for single database
# file.
database_identifier = self._GetDatabaseIdentifier(path_segments)
if not database_identifier:
logging.warning(
(
f"Unable to determine known database identifier of file: "
f"{display_path:s}"
)
)
database_identifier = path_segments[-1]
yield database_identifier, database_schema
[docs]
def FormatSchema(self, schema, output_format):
"""Formats a schema into a word-wrapped string.
Args:
schema (dict[str, str]): schema as an SQL query per table name.
output_format (str): output format.
Returns:
str: formatted schema.
Raises:
RuntimeError: if a query could not be parsed.
"""
if output_format == "yaml":
return self._FormatSchemaAsYAML(schema)
raise RuntimeError(f"Unsupported output format: {output_format:s}")