satumaimo_10の備忘録

個人的なメモ中心

PythonでQEMUとシリアル通信を行う方法の考察

こちらの記事の続きで、QEMUの起動/停止やQEMUモニタの操作と合わせてQEMUとのシリアル通信もPythonで自動化したいと考えています。

そこで、PythonQEMUとシリアル通信を行う方法を考えた結果をメモします。

QEMUからのシリアル通信デバイスの種類

公式ドキュメントによると、シリアル通信のデバイスとして概ね以下の種類が挙げられます。

これらの方法のうち、

  • ptyは割り当てられる仮想端末がQEMUを実行する度に異なる

  • /dev/XXXはroot権限が必要

  • stdioはQEMUモニタで使用する予定

といった理由から、telnetでシリアル通信を行う方法を採用したいと思います。

QEMU側の起動コマンド

qemu-system-aarch64 -cpu cortex-a57 -machine virt -kernel out.img -montor stdio -serial telnet::60000,server,nowait -nographic

telnetの使用するポートはプライベートなものを指定します。

Python側のコード

#!/usr/bin/python3

import telnetlib

host = "localhost"
port = "60000"

tn = telnetlib.Telnet(host=host, port=port)

tn.write(b"foo\r\n")

print(tn.read_until(b"ok\n", timeout=1).decode('ascii'), end="")

数行でtelnetを通してシリアル通信を行うプログラムが書けました。

なお、telnetの出力を読み出す関数としてread_until()以外にもread_some()read_eager()read_lazy()などがありましたが、

  • read_some()は読み出せる文字の数が不安定

  • read_eager()read_lazy()は何故か出力を読み出せなかった(環境の問題?)

といった理由で(少なくとも自分の環境では)使えないので、今回の場合はタイムアウトを指定しつつread_until()を使うのが適切だと思われます。

参考サイト

QEMU Emulator User Documentation

qemu: pyhtonでゲストとシリアル通信 - φ(・・*)ゞ ウーン カーネルとか弄ったりのメモ

telnetlib --- Telnet クライアント — Python 3.10.4 ドキュメント

Pythonでtelnet接続してみる | BTY備忘録

x86_64環境上でのARM64組み込みバイナリのクロスコンパイルに関するメモ

Interface 2022年7月号第3部第3章の内容を(今更)実践したので内容をメモします。

なお、誌面ではmacOSを用いて仮想マイコンの実験を行っていますが、本記事ではUbuntu 22.04 LTSを用いて実験します。

ARMツールチェーンのインストール

仮想マイコン上で動作するバイナリを開発する為に、ツールチェーンを公式サイトからダウンロードします。

$ curl -OL https://developer.arm.com/-/media/Files/downloads/gnu/11.2-2022.02/binrel/gcc-arm-11.2-2022.02-aarch64-arm-none-linux-gnueabihf.tar.xz
$ tar Jxfv gcc-arm-11.2-2022.02-x86_64-aarch64-none-elf.tar.xz

/path/to/gcc-arm-xx-x86_64-aarch64-none-elf/binにパスを通し、任意の場所でコマンドが使えるようにして、インストールを終了します。

補足1

  • none: ベアメタル(OSを持たないプラットフォーム)上で動作するバイナリの生成を意味する

OS上で動作するバイナリであれば、標準Cライブラリを使用でき、実行時にはインタプリタによって仮想メモリ上にバイナリが配置されますが、 ベアメタル上で動作するバイナリは、標準Cライブラリを使用できず、実行前に物理メモリ(ROM)上にバイナリが直接配置されます。

補足2

Ubuntuの公式レポジトリで提供されているパッケージ(gcc-arm-none-eabi)では、64bitARMを対象としたプログラムはコンパイルできないようなので、注意が必要です。

ARM64プログラムのクロスコンパイル

下記コマンドでプログラムを生成します。 (引用の範囲を逸脱するため、プログラムのソースコードは省略)

$ aarch64-none-elf-gcc -g -c -static start.S -o start.o
$ aarch64-none-elf-gcc -g -c -static main.c -o main.o
$ aarch64-none-elf-ld -Ttext 0x40080000 -o out.elf start.o main.o
$ aarch64-none-elf-objcopy -O binary out.elf out.img

これらのコマンドについて(自分の分かる範囲で)解説します。

  • aarch64-none-elf-gcc -g -c -static start.S -o start.oaarch64-none-elf-gcc -g -c -static main.c -o main.o

ソースコードアセンブリをオブジェクトファイルへアセンブルするコマンドです。

-staticオプションは、Man page曰く共有ライブラリとのリンクを抑制するとのことです。

直観的には、「OSの無い環境では動的リンカも存在しないので、共有(動的)ライブラリを使わないようにする必要がある」と理解できるのですが、 一方で、「そもそも(少なくとも今回の場合は)共有ライブラリを使用していないので、-staticオプションは不要なのでは」という疑問があります。

実際に、-staticオプションがある場合と無い場合のout.elfファイルにおけるreadelfの出力をdiffしてみたのですが、目立った違いはありませんでした。 (せいぜいファイル名の違いによって.strtabセクションの内容が違っていた程度)

結論としては、少なくとも今回の場合は-staticオプションは不要ではないかと思われるのですが、とりあえず-staticオプションを付けたまま話を続けます。

  • aarch64-none-elf-ld -Ttext 0x40080000 -o out.elf start.o main.o

オブジェクトファイルstart.omain.oをリンクするコマンドです。

-Ttext 0x40080000は、Man Page曰く出力ファイルにおける.textセクションの絶対アドレスを指定するオプションです。

Interfaceの記事曰く、

QEMUではプログラムが0x40080000に配置される

そうなので、プログラムの本体である.textセクションがQEMUにて最初に読み込まれるよう調整するためのオプションだと思われます。

  • aarch64-none-elf-objcopy -O binary out.elf out.img

out.elfout.imgへ変換する処理をしています。 (なぜelfファイルからimgファイルへの変換が必要なのか、そもそもimgファイルが何なのかは現状よく分かってないです…。)

QEMUでバイナリを実行

$ qemu-system-aarch64 -cpu cortex-a57 -machine virt -kernel out.img -monitor stdio

上記コマンドでHello worldが出力されたコンソールが現れれば、誌面の実験の再現には成功です。

参考サイト

Key difference between GCC arm-none-eabi and arm-eabi - Arm Development Studio forum - Support forums - Arm Community

環境

Ubuntu 22.04 LTS

boofuzz v0.4.1に関する個人的なメモ

boofuzzがターゲットの異常を検出しない場合がある

ターゲットが、本来であればバッファオーバーフローを起こすはずのファズデータを受け取っても、boofuzzがターゲットの異常を検出しない場合があった。

session.pyの1768行time.sleep(0.5)を挿入すると、正しくバッファオーバーフローを検出するようになった。

boofuzzは、 ファズデータを送信した直後にターゲットのプロセスが存在するかを判断している。

そのため、ファズデータによってターゲットが強制終了する前に、誤ってターゲットが正常稼働しているとboofuzzが判断し、 その後にターゲットが強制終了している可能性がある。

ファジングのターゲット監視に関する考察

プロセスの監視はターゲットが異常終了した場合の原因特定や、ファジングの自動化には役に立つ。

しかし、プロセスが強制終了するまでに若干の時間がある場合や、プロセスがフリーズする場合を考慮すると、 ターゲットの死活監視はネットワーク越しに行った方が、ターゲットの異常を検知する方法としては適切に思える。

例えば、こちらの論文では、 常にレスポンスが返ってくると想定されるメッセージ(heartbeat message)の送信によって、 ネットワーク越しに死活監視を行っている。

【boofuzz v0.4.1】ターゲットの死活監視を伴ったファジング

boofuzz v0.4.1を用いてファジングを行います。

ターゲットには、VulnServer-Linuxを使用します。

スクリプトの作成

サンプルスクリプトや下記参考サイトを参照し、以下のmyscript.pyを作成しました。

#!/usr/bin/env python3

from boofuzz import *
import datetime
import os

def main():

    port = 8080
    host = '127.0.0.1'

    start_commands = ['/path/to/VulnServer-Linux/vuln']
    stop_commands = ['ps aux | grep \'vuln\' | grep -v grep | awk \'{ print "kill -9", $2}\' | sudo sh']

    now = datetime.datetime.now()
    os.makedirs("fuzzing_logs", exist_ok=True)
    csv_log_name = './fuzzing_logs/' + now.strftime('%Y%m%d_%H%M%S') + '_fuzzing.csv'
    csv_log = open(csv_log_name, 'w')
    my_logger = [FuzzLoggerCsv(file_handle=csv_log)]

    procmon = ProcessMonitor(host, 26002)
    procmon.set_options(
        start_commands = [start_commands], 
        stop_commands = [stop_commands]
    )

    session = Session(
        target = Target(
            connection = TCPSocketConnection(host, port),
            monitors = [procmon]
        ),
        fuzz_loggers = my_logger
    )

    define_proto(session=session)

    session.fuzz()

def define_proto(session):
    # disable Black formatting to keep custom indentation
    # fmt: off
    req = Request("Client-Message", children=(
        Group(name="Time", values=["HELP", "TIME"], fuzzable=False),
        Delim(name="space", default_value=" ", fuzzable=False),
        String(name="Exploit", default_value="FUZZ"),
        Static(name="CRLF", default_value="\r\n")
    ))
    # fmt: on

    session.connect(req)

if __name__ == "__main__":
    main()

issueに記載のある通り、 procmon.set_options()にてproc_nameへの設定を行うとファジング処理がハングアップするので、注意が必要です。

ファジングの実行

(ターミナル1)
python3 process_monitor.py

(ターミナル2)
python3 myscript.py

上記コマンドの実行によってファジングが実行されます。

boofuzzではProcessMonitorによってターゲットのプロセスを監視しています。 boofuzzはProcessMonitorを利用して、以下の処理の繰り返しで未知の脆弱性を発見します。

  1. ターゲットにファズデータを一度送信
  2. ターゲットの生存確認

上記のようにターゲットの死活監視を伴ったファジングにより、再現性のある脆弱性を発見できます。

更に、ファジング中に停止したターゲットは、前述のProcessMonitorによって再起動されるため、 boofuzzは自動的に複数の脆弱性を発見できます。

実行結果

ファジングの実行結果はWebUIで確認できます。

ブラウザでlocalhost:26000へ接続すると、以下のような画面になります。

boofuzz_WebUI

WebUIから、ターゲットにバッファオーバーフロー脆弱性があることを確認できました。

環境

参考サイト

boofuzz: Network Protocol Fuzzing for Humans — boofuzz 0.4.1 documentation

Boofuzzで始めるネットワークプロトコルファジング - BinaryGenes

Boofuzz - A helpful guide (OSCE - CTP) - Zero Aptitude

jtpereyda/boofuzz - Gitter

Sulley: Fuzzing Framework | Fuzzing Frameworks | InformIT

boofuzzコード読み(3)

前回に引き続き、boofuzzのコード読みをします。

前回までは静的にソースコードを読んでいましたが、あまり理解が進まなかったので、 今回からはvscodeデバッグ機能を用いて順に処理を追っていきます。

簡易httpサーバの起動

boofuzzを実際に動かす都合上、ターゲットのhttpサーバをあらかじめ起動しておきます。

適当なディレクトリを用意し、以下の構成にします。

py_httpserver
├ server.py
└ index.html

server.pyのコードは以下の通りです。

from http.server import SimpleHTTPRequestHandler, HTTPServer

server = HTTPServer(('', 80), SimpleHTTPRequestHandler)
server.serve_forever()

server.pyを実行すれば、一応のhttpサーバが起動します。(実行にはsudo権限が必要です。)

fuzz()のデバッグ

fuzz()の処理をデバッグしていきます。

_main_fuzz_loop()(1)

self._start_target(self.targets[0])

_start_target()を改めて見ます。

_start_target()

def _start_target(self, target):
    started = False
    for monitor in target.monitors:
        if monitor.start_target():
            started = True
            break
    if started:
        for monitor in target.monitors:
            monitor.post_start_target(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self)

前々回の記事では target.monitorsには空のリストであると誤った説明をしましたが、 実際には、リストにCallbackMonitor型のインスタンスが1つ格納されていました。

従って、for文は1回実行されます。

monitor._start_target()は、CallbackMonitor型のスーパークラスであるBaseMonitorのメソッドであり、Falseを返すのみの関数です。 そのため、monitor._start_target()では特に処理は行われませんでした。

_main_fuzz_loop()(2)

for mutation_context in fuzz_case_iterator:

この行では、ジェネレータ関数_generate_mutations_indefinitely()イテレータとして呼ばれます。

_generate_mutations_indefinitely()

def _generate_mutations_indefinitely(self, max_depth=None, path=None):
    """Yield MutationContext with n mutations per message over all messages, with n increasing indefinitely."""
    depth = 1
    while max_depth is None or depth <= max_depth:
        valid_case_found_at_this_depth = False
        for m in self._generate_n_mutations(depth=depth, path=path):
            valid_case_found_at_this_depth = True
            yield m
        if not valid_case_found_at_this_depth:
            break
        depth += 1

max_depthがNoneなので、while文が実行されます。

depth=1, path=Noneの引数を取って、ジェネレータ関数_generate_n_mutations()が実行されます。

_generate_n_mutations()

def _generate_n_mutations(self, depth, path):
    """Yield MutationContext with n mutations per message over all messages."""
    for path in self._iterate_protocol_message_paths(path=path):
        for m in self._generate_n_mutations_for_path(path, depth=depth):
            yield m

path=Noneの引数を取って、_iterate_protocol_message_paths()が実行されます。

_iterate_protocol_message_paths()

def _iterate_protocol_message_paths(self, path=None):
    """
    Iterates over protocol and yields a path (list of Connection) leading to a given message).
     Args:
        path (list of Connection): Provide a specific path to yield only that specific path.
     Yields:
        list of Connection: List of edges along the path to the current one being fuzzed.
     Raises:
        exception.SulleyRuntimeError: If no requests defined or no targets specified
    """
    # we can't fuzz if we don't have at least one target and one request.
    if not self.targets:
        raise exception.SullyRuntimeError("No targets specified in session")
     if not self.edges_from(self.root.id):
        raise exception.SullyRuntimeError("No requests specified in session")
     if path is not None:
        yield path
    else:
        for x in self._iterate_protocol_message_paths_recursive(this_node=self.root, path=[]):
            yield x

path=Noneなので、_iterate_protocol_message_paths_recursive(this_node=self.root, path=[])イテレータとして実行されます。 なお、self.rootはクラスpgraph.Node()インスタンスです。

_iterate_protocol_message_paths_recursive()

def _iterate_protocol_message_paths_recursive(self, this_node, path):
    """Recursive helper for _iterate_protocol.
     Args:
        this_node (node.Node): Current node that is being fuzzed.
        path (list of Connection): List of edges along the path to the current one being fuzzed.
     Yields:
        list of Connection: List of edges along the path to the current one being fuzzed.
    """
    # step through every edge from the current node.
    for edge in self.edges_from(this_node.id):
        # keep track of the path as we fuzz through it, don't count the root node.
        # we keep track of edges as opposed to nodes because if there is more then one path through a set of
        # given nodes we don't want any ambiguity.
        path.append(edge)
        message_path = self._message_path_to_str(path)
        logging.debug("fuzzing: {0}".format(message_path))
        self.fuzz_node = self.nodes[path[-1].dst]
         yield path
        # recursively fuzz the remainder of the nodes in the session graph.
        for x in self._iterate_protocol_message_paths_recursive(self.fuzz_node, path):
            yield x
    # finished with the last node on the path, pop it off the path stack.
    if path:
        path.pop()

後述しますが、edges_from()関数はConnectionクラスのインスタンスを含むリストを返す関数です。

edges_from()

def edges_from(self, edge_id):
    """
    Enumerate the edges from the specified node.

    @type  edge_id: Mixed
    @param edge_id: Identifier of node to enumerate edges from

    @rtype:  list
    @return: List of edges from the specified node
    """

    return [edge_value for edge_value in list(self.edges.values()) if edge_value.src == edge_id]

特定のノードからエッジを返す関数です。(直訳)

self.edgesはkeyが数値、valueがConnection型のインスタンスである辞書型のオブジェクトです。

Connectionpgraph.Edgeのサブクラスです。

このreturn文のワンライナーは、Sessionクラスのインスタンスが持つ辞書型のアトリビュートedgesからvalueであるConnectionクラスのインスタンスを取り出し、そのインスタンスのうちアトリビュートsrcがedge_idと同一であるものをリスト化したものを返す文であるといえます。

この時点では、Sessionインスタンスが持つアトリビュートedgesは1つのみのようなので、返り値も1つのConnectionインスタンスのみが格納されたリストになります。

_iterate_protocol_message_paths_recursive()(2)

path.append(edge)

message_path = self._message_path_to_str(path)

Connectionクラスedgeが空のリストpathにappendされます。

_message_path_to_str()の内容は以下の通りです。

def _message_path_to_str(self, message_path):
    return "->".join([self.nodes[e.dst].name for e in message_path])

返り値として'HTTP-Request'が返ってきます。

logging.debug("fuzzing: {0}".format(message_path))
self.fuzz_node = self.nodes[path[-1].dst]

yield path

現状のコードを読み続ける方針でboofuzzを理解するのに困難を感じてきたため、一旦ここまでにします。

おまけ

雑にデバッグを続けていたところ、boofuzz/primitives/string.pyにfuzzingのシード値を見つけました。

_fuzz_library = [
    "!@#$%%^#$%#$@#$%$$@#$%^^**(()",
    "",  # strings ripped from spike (and some others I added)
    "$(reboot)",
    "$;reboot",
    "%00",
(中略)

boofuzzは脆弱性を突けそうな文字列を事前に用意し、片っ端から送り込む方法でファジングを行っていると推測できました。(boofuzzに限った話では無いかもしれませんが)

感想と今後の方針

現状、個々の関数が何をやっているのかすら満足に理解できていない状態なので、漠然とコードを読み進めるのは方針として良くないように思えました。

ドキュメントを参照したい所ですが、boofuzzは開発者向けのドキュメントがあまり充実していないので、 今後どうするべきかは悩ましい所です。

個人的には、

  • ファザーはどのようにしてプロトコルを認識しているか
  • ファザーはデータをどのように変異させているか
  • ファザーはファジングの有効性をどのように判断しているか

あたりを知りたいので、 まずは「ファザーはどのようにしてプロトコルを認識しているか」を理解するのを目的として、 http_simple.pysession.connect(req)辺りからコードを読み直したいと思います。

また、boofuzzの主要なクラスはpgraphを継承しているので、グラフを意識してコードを読むともう少し理解できそうな気がします。(下図参照)

boofuzz_class

参考サイト

【Python】VSCode で外部モジュール・外部ライブラリも含めてデバッグする方法

ソースコードを読むための技術(チートシート)

boofuzzコード読み(2)

boofuzzのコード読みを前回の続きから行います。

fuzz()のコード読み

_generate_mutations_indefinitely()

def _generate_mutations_indefinitely(self, max_depth=None, path=None):
    """Yield MutationContext with n mutations per message over all messages, with n increasing indefinitely."""
    depth = 1
    while max_depth is None or depth <= max_depth:
        valid_case_found_at_this_depth = False
        for m in self._generate_n_mutations(depth=depth, path=path):
            valid_case_found_at_this_depth = True
            yield m
        if not valid_case_found_at_this_depth:
            break
        depth += 1

while文になにやら処理が書いてありますが、http_simple.pyではfuzz()関数実行時にmax_depthを指定していない、 つまりmax_depthの初期値はNoneなので、_generate_mutations_indefinitely()は空のイテレータとして動作することになります。

↑完全に誤解です。次回の記事を参照してください。

_main_fuzz_loop()

for mutation_context in fuzz_case_iterator:
    if self.total_mutant_index < self._index_start:
            continue

    # Check restart interval
    if (
            self.num_cases_actually_fuzzed
            and self.restart_interval
            and self.num_cases_actually_fuzzed % self.restart_interval == 0
    ):
            self._fuzz_data_logger.open_test_step("restart interval of %d reached" % self.restart_interval)
            self._restart_target(self.targets[0])

    self._fuzz_current_case(mutation_context)

    self.num_cases_actually_fuzzed += 1

    if self._index_end is not None and self.total_mutant_index >= self._index_end:
            break

http_simple.pyの例ではfuzz_case_iteratorが空のイテレータとなるので、このfor文の処理は実行されません。

if self._reuse_target_connection:
    self.targets[0].close()

_reuse_target_connectionの値はデフォルトでFalseなので、このif文も実行されません。

if self._keep_web_open and self.web_port is not None:
    self.end_time = time.time()
    print(
        "\nFuzzing session completed. Keeping webinterface up on {}:{}".format(
        self.web_address, self.web_port
        ),
        "\nPress ENTER to close webinterface",
    )
    input()

ファジングが終わってしまいました。

ソースコードを読んだ結果が明らかに実行結果と矛盾しているので、より正確にコードを読むために、 今度はデバッグを行いながらコードを読み直したいと思います。

boofuzzコード読み(1)

ファジングの基本的な仕組みを学ぶために、オープンソースであるboofuzzのコード読みをします。

テンプレートを見る

どのようなファジングを行うかユーザが定義するためのPythonスクリプトを、こちらのサイトに倣ってテンプレート (fuzzer template) と呼ぶことにします。

まずは、コードを読む前にテンプレートを見てみます。 リポジトリexamplesディレクトリに複数のテンプレートが用意されているので、ここではhttp_simple.pyを見ます。

def main():
    session = Session(
        target=Target(connection=TCPSocketConnection("127.0.0.1", 80)),
    )

    define_proto(session=session)

    session.fuzz()

def define_proto(session):
    # disable Black formatting to keep custom indentation
    # fmt: off
    req = Request("HTTP-Request", children=(
        Block("Request-Line", children=(
            Group(name="Method", values=["GET", "HEAD", "POST", "PUT", "DELETE", "CONNECT", "OPTIONS", "TRACE"]),
            Delim(name="space-1", default_value=" "),
            String(name="URI", default_value="/index.html"),
            Delim(name="space-2", default_value=" "),
            String(name="HTTP-Version", default_value="HTTP/1.1"),
            Static(name="CRLF", default_value="\r\n"),
        )),
        Block("Host-Line", children=(
            String(name="Host-Key", default_value="Host:"),
            Delim(name="space", default_value=" "),
            String(name="Host-Value", default_value="example.com"),
            Static(name="CRLF", default_value="\r\n"),
        )),
        Static(name="CRLF", default_value="\r\n"),
    ))
    # fmt: on

    session.connect(req)

(中略)

if __name__ == "__main__":
    main()

上記のテンプレートは、見る限りでは、

  1. localhostの80番ポートをターゲットに設定しつつ、Sessionクラスのインスタンスsessionを定義する
  2. define_proto()関数によって、HTTPプロトコルでファジングを行うよう設定する
  3. session.fuzz()でファジングを実行する

以上の処理を記述していると思われます。

fuzz()のコード読み

実際にファジングを行う処理はsession.fuzz()関数にて行われてそうなので、boofuzz/boofuzz/sessions.pyにて定義されているfuzz()関数からコードを読んでみます。

    def fuzz(self, name=None, max_depth=None):
        if name is None or name == "":
            self._main_fuzz_loop(self._generate_mutations_indefinitely(max_depth=max_depth))
        else:
            self.fuzz_by_name(name=name)

引数nameは、http_simple.pyでは特に設定されていないので、_main_fuzz_loop()関数に着目します。

_main_fuzz_loop()の引数_generate_mutations_indefinitely()は、イテレータを返すジェネレータ関数です。

また、_main_fuzz_loop()は名前からファジングに関する何らかの処理を繰り返す関数であると推測できます。

_main_fuzz_loop()

とりあえず、_main_fuzz_loop()関数を読み進めます。

    def _main_fuzz_loop(self, fuzz_case_iterator):
        """Execute main fuzz logic; takes an iterator of test cases.
        Preconditions: `self.total_mutant_index` and `self.total_num_mutations` are set properly.
        Args:
            fuzz_case_iterator (Iterable): An iterator that walks through fuzz cases and yields MutationContext objects.
                 See _iterate_single_node() for details.
        Returns:
            None
        """
        self.server_init()

        try:
            self._start_target(self.targets[0])

            if self._reuse_target_connection:
                self.targets[0].open()
            self.num_cases_actually_fuzzed = 0
            self.start_time = time.time()
            for mutation_context in fuzz_case_iterator:
                if self.total_mutant_index < self._index_start:
                    continue

                # Check restart interval
                if (
                    self.num_cases_actually_fuzzed
                    and self.restart_interval
                    and self.num_cases_actually_fuzzed % self.restart_interval == 0
                ):
                    self._fuzz_data_logger.open_test_step("restart interval of %d reached" % self.restart_interval)
                    self._restart_target(self.targets[0])

                self._fuzz_current_case(mutation_context)

                self.num_cases_actually_fuzzed += 1

                if self._index_end is not None and self.total_mutant_index >= self._index_end:
                    break

            if self._reuse_target_connection:
                self.targets[0].close()

            if self._keep_web_open and self.web_port is not None:
                self.end_time = time.time()
                print(
                    "\nFuzzing session completed. Keeping webinterface up on {}:{}".format(
                        self.web_address, self.web_port
                    ),
                    "\nPress ENTER to close webinterface",
                )
                input()
        except KeyboardInterrupt:
            # TODO: should wait for the end of the ongoing test case, and stop gracefully netmon and procmon
            self.export_file()
            self._fuzz_data_logger.log_error("SIGINT received ... exiting")
            raise
        except exception.BoofuzzRestartFailedError:
            self._fuzz_data_logger.log_error("Restarting the target failed, exiting.")
            self.export_file()
            raise
        except exception.BoofuzzTargetConnectionFailedError:
            # exception should have already been handled but rethrown in order to escape test run
            pass
        except Exception:
            self._fuzz_data_logger.log_error("Unexpected exception! {0}".format(traceback.format_exc()))
            self.export_file()
            raise
        finally:
            self._fuzz_data_logger.close_test()

この関数のコメントによると、引数fuzz_case_iteratorはファズケースを走査し、MutationContextオブジェクトを生成するイテレータとのことです。現時点ではよく分かりません。

コードが長いので、順番に読んでいきます。

self.server_init()

WebUI関係の関数と思われます。

self._start_target(self.targets[0])

Sessionクラスのリスト型アトリビュートtargetsを引数にとった関数です。

targetsには、Sessionのインスタンス生成時の引数targetが格納されています。 http_simple.pyの例では、Target(connection=TCPSocketConnection("127.0.0.1", 80))が格納されます。

_start_target()はTarget型の引数targetを持ち、関数内でtarget.monitorsイテレータとしたfor文の処理のみが実装されています。

本来の_start_target()関数では監視関連の処理が実行されると思われますが、http_simple.pyの例ではインスタンスtargetの生成時に引数monitorsを指定していない(つまり、target[0].monitorsは空のリストである)ので、何も処理されないと思われます。

if self._reuse_target_connection:
    self.targets[0].open()

self.reuse_target_connectionには、Sessionインスタンス生成時のbool型引数reuse_target_connectionが格納されます。 reuse_target_connectionについては、ソースコードのコメントにて以下のように書かれています。

reuse_target_connection (bool): If True, only use one target connection instead of reconnecting each test case. Default False.

通常boofuzzがファジングを行う際はTCPコネクションの接続と切断を繰り返すので、reuse_target_connectionをTrueにした場合はファジング中のTCPコネクションを確立したままにするという事でしょうか。 いずれにしろ、デフォルトではFalseなのでこのコードもスルーします。

self.num_cases_actually_fuzzed = 0
self.start_time = time.time()

num_cases_actually_fuzzedおよびstart_timeは、この行で定義されるアトリビュートです。用途はまだ分かりません。

_main_fuzz_loop()の以降のコードを理解するには_generate_mutations_indefinitely()について理解する必要がありそうなので、次回は_generate_mutations_indefinitely()から読み進めたいと思います。