def_start_target(self, target):
started = Falsefor monitor in target.monitors:
if monitor.start_target():
started = Truebreakif started:
for monitor in target.monitors:
monitor.post_start_target(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self)
def_generate_mutations_indefinitely(self, max_depth=None, path=None):
"""Yield MutationContext with n mutations per message over all messages, with n increasing indefinitely."""
depth = 1while max_depth isNoneor depth <= max_depth:
valid_case_found_at_this_depth = Falsefor m in self._generate_n_mutations(depth=depth, path=path):
valid_case_found_at_this_depth = Trueyield m
ifnot valid_case_found_at_this_depth:
break
depth += 1
def_generate_n_mutations(self, depth, path):
"""Yield MutationContext with n mutations per message over all messages."""for path in self._iterate_protocol_message_paths(path=path):
for m in self._generate_n_mutations_for_path(path, depth=depth):
yield m
def_iterate_protocol_message_paths(self, path=None):
""" Iterates over protocol and yields a path (list of Connection) leading to a given message). Args: path (list of Connection): Provide a specific path to yield only that specific path. Yields: list of Connection: List of edges along the path to the current one being fuzzed. Raises: exception.SulleyRuntimeError: If no requests defined or no targets specified """# we can't fuzz if we don't have at least one target and one request.ifnot self.targets:
raise exception.SullyRuntimeError("No targets specified in session")
ifnot self.edges_from(self.root.id):
raise exception.SullyRuntimeError("No requests specified in session")
if path isnotNone:
yield path
else:
for x in self._iterate_protocol_message_paths_recursive(this_node=self.root, path=[]):
yield x
def_iterate_protocol_message_paths_recursive(self, this_node, path):
"""Recursive helper for _iterate_protocol. Args: this_node (node.Node): Current node that is being fuzzed. path (list of Connection): List of edges along the path to the current one being fuzzed. Yields: list of Connection: List of edges along the path to the current one being fuzzed. """# step through every edge from the current node.for edge in self.edges_from(this_node.id):
# keep track of the path as we fuzz through it, don't count the root node.# we keep track of edges as opposed to nodes because if there is more then one path through a set of# given nodes we don't want any ambiguity.
path.append(edge)
message_path = self._message_path_to_str(path)
logging.debug("fuzzing: {0}".format(message_path))
self.fuzz_node = self.nodes[path[-1].dst]
yield path
# recursively fuzz the remainder of the nodes in the session graph.for x in self._iterate_protocol_message_paths_recursive(self.fuzz_node, path):
yield x
# finished with the last node on the path, pop it off the path stack.if path:
path.pop()
defedges_from(self, edge_id):
""" Enumerate the edges from the specified node. @type edge_id: Mixed @param edge_id: Identifier of node to enumerate edges from @rtype: list @return: List of edges from the specified node """return [edge_value for edge_value inlist(self.edges.values()) if edge_value.src == edge_id]
def_generate_mutations_indefinitely(self, max_depth=None, path=None):
"""Yield MutationContext with n mutations per message over all messages, with n increasing indefinitely."""
depth = 1while max_depth isNoneor depth <= max_depth:
valid_case_found_at_this_depth = Falsefor m in self._generate_n_mutations(depth=depth, path=path):
valid_case_found_at_this_depth = Trueyield m
ifnot valid_case_found_at_this_depth:
break
depth += 1
if self._keep_web_open and self.web_port isnotNone:
self.end_time = time.time()
print(
"\nFuzzing session completed. Keeping webinterface up on {}:{}".format(
self.web_address, self.web_port
),
"\nPress ENTER to close webinterface",
)
input()
deffuzz(self, name=None, max_depth=None):
if name isNoneor name == "":
self._main_fuzz_loop(self._generate_mutations_indefinitely(max_depth=max_depth))
else:
self.fuzz_by_name(name=name)
def_main_fuzz_loop(self, fuzz_case_iterator):
"""Execute main fuzz logic; takes an iterator of test cases. Preconditions: `self.total_mutant_index` and `self.total_num_mutations` are set properly. Args: fuzz_case_iterator (Iterable): An iterator that walks through fuzz cases and yields MutationContext objects. See _iterate_single_node() for details. Returns: None """
self.server_init()
try:
self._start_target(self.targets[0])
if self._reuse_target_connection:
self.targets[0].open()
self.num_cases_actually_fuzzed = 0
self.start_time = time.time()
for mutation_context in fuzz_case_iterator:
if self.total_mutant_index < self._index_start:
continue# Check restart intervalif (
self.num_cases_actually_fuzzed
and self.restart_interval
and self.num_cases_actually_fuzzed % self.restart_interval == 0
):
self._fuzz_data_logger.open_test_step("restart interval of %d reached" % self.restart_interval)
self._restart_target(self.targets[0])
self._fuzz_current_case(mutation_context)
self.num_cases_actually_fuzzed += 1if self._index_end isnotNoneand self.total_mutant_index >= self._index_end:
breakif self._reuse_target_connection:
self.targets[0].close()
if self._keep_web_open and self.web_port isnotNone:
self.end_time = time.time()
print(
"\nFuzzing session completed. Keeping webinterface up on {}:{}".format(
self.web_address, self.web_port
),
"\nPress ENTER to close webinterface",
)
input()
exceptKeyboardInterrupt:
# TODO: should wait for the end of the ongoing test case, and stop gracefully netmon and procmon
self.export_file()
self._fuzz_data_logger.log_error("SIGINT received ... exiting")
raiseexcept exception.BoofuzzRestartFailedError:
self._fuzz_data_logger.log_error("Restarting the target failed, exiting.")
self.export_file()
raiseexcept exception.BoofuzzTargetConnectionFailedError:
# exception should have already been handled but rethrown in order to escape test runpassexceptException:
self._fuzz_data_logger.log_error("Unexpected exception! {0}".format(traceback.format_exc()))
self.export_file()
raisefinally:
self._fuzz_data_logger.close_test()