Boofuzz 分析

Boofuzz 源码分析

最近又有需要用到fuzz服务器的需求,找来找去还是觉得boofuzz是最靠谱的,以前就做过boofuzz源码的一些分析,这次就正好整理一下

Boofuzz样例

以一个tftp Fuzzer为例,大概看一下Boofuzz的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from boofuzz import *
import time

def main():
session = Session(sleep_time=1,
target=Target(connection=SocketConnection("127.0.0.1",69,proto="udp")))
s_initialize("write")
s_static("\\x00\\x02")
s_string("filename")
s_static("\\x00")
s_static("netascii")
s_static("\\x00")

session.connect(s_get('write'))
session.fuzz()

if __name__ == '__main__':
main()

在源码中可以看到SocketConnection会在未来版本中移除,现在应该使用BaseSocketConnection

1
2
3
4
5
warnings.warn(
"SocketConnection is deprecated and will be removed in a future version of Boofuzz. "
"Use the classes derived from BaseSocketConnection instead.",
FutureWarning,
)

Fuzzer的创建包含三层,Session -> Target -> Connection

Connection

顾名思义,Connection对象是网络连接的代表,Boofuzz支持各种基于socket的连接,从源码部分就可以看到,常用的还是TCP和UDP,这里还支持网络协议栈中2层和3层的原生Socket,不过我还没用过。

image-20220114172637119

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def SocketConnection(
host,
port=None,
proto="tcp",
bind=None,
send_timeout=5.0,
recv_timeout=5.0,
ethernet_proto=None,
l2_dst=b"\xFF" * 6,
udp_broadcast=False,
server=False,
sslcontext=None,
server_hostname=None,
):


warnings.warn(
"SocketConnection is deprecated and will be removed in a future version of Boofuzz. "
"Use the classes derived from BaseSocketConnection instead.",
FutureWarning,
)
if proto not in _PROTOCOLS:
raise exception.SullyRuntimeError("INVALID PROTOCOL SPECIFIED: %s" % proto)

if proto in _PROTOCOLS_PORT_REQUIRED and port is None:
raise ValueError("__init__() argument port required for protocol {0}".format(proto))

if proto == "udp":
return udp_socket_connection.UDPSocketConnection(
host, port, send_timeout, recv_timeout, server, bind, udp_broadcast
)
elif proto == "tcp":
return tcp_socket_connection.TCPSocketConnection(host, port, send_timeout, recv_timeout, server)
elif proto == "ssl":
return ssl_socket_connection.SSLSocketConnection(
host, port, send_timeout, recv_timeout, server, sslcontext, server_hostname
)
elif proto == "raw-l2":
return raw_l2_socket_connection.RawL2SocketConnection(host, send_timeout, recv_timeout)
elif proto == "raw-l3":
if ethernet_proto is None:
ethernet_proto = raw_l3_socket_connection.ETH_P_IP

return raw_l3_socket_connection.RawL3SocketConnection(host, send_timeout, recv_timeout, ethernet_proto, l2_dst)

在SocketConnection函数中根据我们传入的proto参数来调用响应Connection类的构造函数

以TCPSocketConnection为例,其继承自BaseSocketConnection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class TCPSocketConnection(base_socket_connection.BaseSocketConnection):
"""BaseSocketConnection implementation for use with TCP Sockets.

.. versionadded:: 0.2.0

Args:
host (str): Hostname or IP adress of target system.
port (int): Port of target service.
send_timeout (float): Seconds to wait for send before timing out. Default 5.0.
recv_timeout (float): Seconds to wait for recv before timing out. Default 5.0.
server (bool): Set to True to enable server side fuzzing.

"""

def __init__(self, host, port, send_timeout=5.0, recv_timeout=5.0, server=False):
super(TCPSocketConnection, self).__init__(send_timeout, recv_timeout)

self.host = host
self.port = port
self.server = server
self._serverSock = None

BaseSocketConnection是一个继承了itarget_connection.ITargetConnection接口的抽象基类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class BaseSocketConnection(with_metaclass(abc.ABCMeta, itarget_connection.ITargetConnection)):
"""This class serves as a base for a number of Connections over sockets.

.. versionadded:: 0.2.0

Args:
send_timeout (float): Seconds to wait for send before timing out. Default 5.0.
recv_timeout (float): Seconds to wait for recv before timing out. Default 5.0.
"""

def __init__(self, send_timeout, recv_timeout):
self._send_timeout = send_timeout
self._recv_timeout = recv_timeout

self._sock = None

def close(self):
"""
Close connection to the target.

Returns:
None
"""
self._sock.close()

@abc.abstractmethod
def open(self):
"""
Opens connection to the target. Make sure to call close!

Returns:
None
"""
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, _seconds_to_sockopt_format(self._send_timeout))
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, _seconds_to_sockopt_format(self._recv_timeout))

python元类编程

抽象类和接口都是面向对象里面的概念,抽象类是指一类不可直接实例化,只可被继承的类,接口则定义了继承接口的类必须实现的方法,python是没有这两个概念相关的关键字的,在python中,抽象类是以抽象基类的方式实现的(abstract base classes(ABC))

ABC中提供了@abstractmethod装饰器来指定抽象方法,下面代码中定义了一个抽象类C,并且定义了三个抽象方法,D类则是继承抽象类C然后实现了他的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import abc

class C(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def a(self): pass

@classmethod
@abc.abstractmethod
def clsa(cls): pass

@staticmethod
@abc.abstractmethod
def stca(): pass

class D(C):
@property
def a(self):
print('property: a')

@classmethod
def clsa(cls):
print('classmethod clsa')

@staticmethod
def stca():
print('staticmethod stca')

d = D()
d.a
# property: a
d.clsa()
# classmethod clsa
d.stca()
# staticmethod stca

这里BaseSocketConnection的定义中用到了with_metaclass来创建这个类

Python Metaclass : Understanding the ‘with_metaclass()’ - Stack Overflow

这里引入with_metaclass是为了兼容python2和python3,在我的python3.8上with_metaclass如下

1
2
3
4
5
6
7
8
9
def with_metaclass(meta, *bases):
class metaclass(meta):
__call__ = type.__call__
__init__ = type.__init__
def __new__(cls, name, this_bases, d):
if this_bases is None:
return type.__new__(cls, name, (), d)
return meta(name, bases, d)
return metaclass('temporary_class', None, {})

根据BaseSocketConnection传入的参数,这里meta是ABCmeta,bases是ITargetConnection,这里是定义了一个临时元类metaclass继承自ABCmeta,并重写了其new方法,这样下面return时,就会依次调用其new和init方法来新建一个对象,而元类创建的是一个类,因此结果是一个基类为bases的抽象类,然后BaseSocketConnection就继承自它,不过这里this_bases的含义就很迷,我自己调试的话,这里会因为this_bases等于空而走到第一个return上,这样就不会继承bases,不过后来我添加打印代码后,确实是继承了ITargetConnection的。。迷惑

image-20220114193202669

总之这里BaseSocketConnection继承了ITargetConnection的接口,定义了一些基本方法,open send recv close以及关于延时的变量等

image-20220114193339463

TCPSocketConnection

来看看具体的TCPSocket的实现,

open

1
2
3
def open(self):
self._open_socket()
self._connect_socket()

open_socket函数

就是创建一个socket

1
2
3
4
5
def _open_socket(self):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# call superclass to set timeout sockopt
super(TCPSocketConnection, self).open()

connect_socket

这里可以看到boofuzz支持server被连接的模式,只会接受一个连接,如果是多连接的场景,需要自己修改对应逻辑,主动连接同理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def _connect_socket(self):
#server模式
if self.server:
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self._sock.bind((self.host, self.port))
except socket.error as e:
if e.errno == errno.EADDRINUSE:
raise exception.BoofuzzOutOfAvailableSockets()
else:
raise

self._serverSock = self._sock
try:
#只会接受一个连接
self._serverSock.listen(1)
self._sock, addr = self._serverSock.accept()
except socket.error as e:
# When connection timeout expires, tear down the server socket so we can re-open it again after
# restarting the target.
self.close()
if e.errno in [errno.EAGAIN]:
raise exception.BoofuzzTargetConnectionFailedError(str(e))
else:
raise
#主动连接
else:
try:
self._sock.connect((self.host, self.port))
except socket.error as e:
if e.errno == errno.EADDRINUSE:
raise exception.BoofuzzOutOfAvailableSockets()
elif e.errno in [errno.ECONNREFUSED, errno.EINPROGRESS, errno.ETIMEDOUT]:
raise exception.BoofuzzTargetConnectionFailedError(str(e))
else:
raise

send

就是socketsend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def send(self, data):
num_sent = 0

try:
num_sent = self._sock.send(data)
except socket.error as e:
if e.errno == errno.ECONNABORTED:
raise_(
exception.BoofuzzTargetConnectionAborted(socket_errno=e.errno, socket_errmsg=e.strerror),
None,
sys.exc_info()[2],
)
elif e.errno in [errno.ECONNRESET, errno.ENETRESET, errno.ETIMEDOUT, errno.EPIPE]:
raise_(exception.BoofuzzTargetConnectionReset(), None, sys.exc_info()[2])
else:
raise

return num_sent

recv

这里可以设定最大接受字节数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def recv(self, max_bytes):
data = b""

try:
data = self._sock.recv(max_bytes)
except socket.timeout:
data = b""
except socket.error as e:
if e.errno == errno.ECONNABORTED:
raise_(
exception.BoofuzzTargetConnectionAborted(socket_errno=e.errno, socket_errmsg=e.strerror),
None,
sys.exc_info()[2],
)
elif (e.errno == errno.ECONNRESET) or (e.errno == errno.ENETRESET) or (e.errno == errno.ETIMEDOUT):
raise_(exception.BoofuzzTargetConnectionReset(), None, sys.exc_info()[2])
elif e.errno == errno.EWOULDBLOCK: # timeout condition if using SO_RCVTIMEO or SO_SNDTIMEO
data = b""
else:
raise

return data

可以看出Connection这层就已经实现了连接的建立以及数据的收发相关的功能

Target

1
2
3
4
5
class Target(object):
"""Target descriptor container.

Takes an ITargetConnection and wraps send/recv with appropriate
FuzzDataLogger calls.

Target注释部分也说了,Target对象主要是在Connection的接口上包上log,可以看到send函数,除了加的repeater功能,基本上就是加了圈log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def open(self):
"""
Opens connection to the target. Make sure to call close!

:return: None
"""
self._fuzz_data_logger.log_info("Opening target connection ({0})...".format(self._target_connection.info))
self._target_connection.open()
self._fuzz_data_logger.log_info("Connection opened.")


def send(self, data):
num_sent = 0
if self._fuzz_data_logger is not None:
repeat = ""
if self.repeater is not None:
repeat = ", " + self.repeater.log_message()

self._fuzz_data_logger.log_info("Sending {0} bytes{1}...".format(len(data), repeat))

if self.repeater is not None:
self.repeater.start()
while self.repeater.repeat():
num_sent = self._target_connection.send(data=data)
self.repeater.reset()
else:
num_sent = self._target_connection.send(data=data)

if self._fuzz_data_logger is not None:
self._fuzz_data_logger.log_send(data[:num_sent])

另外Target中还有一些Monitor的初始化工作,后面再说,Target中提供了设置Logger的接口函数

1
2
3
4
5
6
7
8
9
10
def set_fuzz_data_logger(self, fuzz_data_logger):
"""
Set this object's fuzz data logger -- for sent and received fuzz data.

:param fuzz_data_logger: New logger.
:type fuzz_data_logger: ifuzz_logger.IFuzzLogger

:return: None
"""
self._fuzz_data_logger = fuzz_data_logger

在session中会调用这个函数来添加logger,默认是FuzzLoggerText

Session Logger部分

Session对象可以看成整个fuzzer的后端对象,其参数基本上就是涉及到fuzz控制的各种粒度,其函数基本上就是fuzz过程了,fuzz过程后面再分析,这里先看下上面余留的logger的部分,

1
2
3
4
5
6
7
8
9
class Session(pgraph.Graph):
def __init__(args):
if fuzz_loggers is None:
fuzz_loggers = []
if self.console_gui and os.name != "nt":
fuzz_loggers.append(fuzz_logger_curses.FuzzLoggerCurses(web_port=self.web_port))
self._keep_web_open = False
else:
fuzz_loggers = [fuzz_logger_text.FuzzLoggerText()]

如果fuzz_loggers没指定的话,这里就设置成FuzzLoggerText,而FuzzLoggerText默认会设置为标注输出,因此就形成了打印到终端,所以如果我们想输出到文件,就可以set自己new的FuzzLoggerText,并设置其file_handle为文件句柄

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class FuzzLoggerText(ifuzz_logger_backend.IFuzzLoggerBackend):
def __init__(self, file_handle=sys.stdout, bytes_to_str=DEFAULT_HEX_TO_STR):
"""
:type file_handle: io.BinaryIO
:param file_handle: Open file handle for logging. Defaults to sys.stdout.

:type bytes_to_str: function
:param bytes_to_str: Function that converts sent/received bytes data to string for logging.
"""
self._file_handle = file_handle
self._format_raw_bytes = bytes_to_str
def _print_log_msg(self, msg_type, msg=None, data=None):
print(
helpers.format_log_msg(msg_type=msg_type, description=msg, data=data, indent_size=self.INDENT_SIZE),
file=self._file_handle,
)

数据格式

boofuzz是基于格式的,因此在开始fuzz前需要先定义目标数据格式,boofuzz有两种数据定义的方式Static Protocol Definition(old)和 Protocol Definition(new),这两种数据定义的方式只是接口不同,其内部存储的格式是类似的,而且每种基本都够用了,所以这里只分析下Static Protocol Definition

Static Protocol Definition — boofuzz 0.4.0 documentation

Protocol Definition — boofuzz 0.4.0 documentation

数据分成三个层次,Request是发出的message,Blocks来组成message,Primitives是组成block的元素

其组成图如下

Request.stack
string1
bytes1
block1
string2
block2
string3
bytes2

s_initialize

s_initialize会创建一个request,我们需要提供一个name来标识这个request,新建的Request会被加到REQUESTS中,并设置为当前操作的Request

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def s_initialize(name):
"""
Initialize a new block request. All blocks / primitives generated after this call apply to the named request.
Use s_switch() to jump between factories.

:type name: str
:param name: Name of request
"""
if name in blocks.REQUESTS:
raise exception.SullyRuntimeError("blocks.REQUESTS ALREADY EXISTS: %s" % name)
#REQUESTS是全局字典,这里向里面添加个Request
blocks.REQUESTS[name] = Request(name)
#同时设置为当前操作的Request
blocks.CURRENT = blocks.REQUESTS[name]

Request的初始化函数

Request.init

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#对图初始化,新建一个root节点
self.root = pgraph.Node()
self.root.label = "__ROOT_NODE__"
self.root.name = self.root.label
self.last_recv = None
self.last_send = None

self.add_node(self.root)
#把传进来的target加到target数组中
if target is not None:

def apply_options(monitor):
monitor.set_options(crash_filename=self._crash_filename)

return

target.monitor_alive.append(apply_options)

try:
self.add_target(target)
except exception.BoofuzzRpcError as e:
self._fuzz_data_logger.log_error(str(e))
raise

s_get s_switch

网络协议一般是各种Request的状态转移图,Boofuzz也支持建立这种图,我们可以再次调用s_initialize来创建一个新的Request,通过s_get可以在不同的Request直接切换,从而改变当前操作的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def s_get(name=None):
if not name:
return blocks.CURRENT

# ensure this gotten request is the new current.
s_switch(name)

if name not in blocks.REQUESTS:
raise exception.SullyRuntimeError("blocks.REQUESTS NOT FOUND: %s" % name)

return blocks.REQUESTS[name]

def s_switch(name):
"""
Change the current request to the one specified by "name".

:type name: str
:param name: Name of request
"""

if name not in blocks.REQUESTS:
raise exception.SullyRuntimeError("blocks.REQUESTS NOT FOUND: %s" % name)
#将name的Request设置为CURRENT
blocks.CURRENT = blocks.REQUESTS[name]

connect

connect是连边,即在两个Node(Request)上连边,只填一个参数的话,就是默认把提供的参数node练到root上,node就是Request对象,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def connect(self, src, dst=None, callback=None):
# if only a source was provided, then make it the destination and set the source to the root node.
if dst is None: #dst不指定就是 root
dst = src
src = self.root

# if source or destination is a name, resolve the actual node.
if isinstance(src, six.string_types):
src = self.find_node("name", src)

if isinstance(dst, six.string_types):
dst = self.find_node("name", dst)

# if source or destination is not in the graph, add it.
if src != self.root and self.find_node("name", src.name) is None:
self.add_node(src)

if self.find_node("name", dst.name) is None:
self.add_node(dst)

# create an edge between the two nodes and add it to the graph.
edge = Connection(src.id, dst.id, callback) #建边
self.add_edge(edge)

return edge

def add_node(self, node):
"""
Add a pgraph node to the graph. We overload this routine to automatically generate and assign an ID whenever a
node is added.

Args:
node (pgraph.Node): Node to add to session graph
"""

node.number = len(self.nodes)
node.id = len(self.nodes)

if node.id not in self.nodes:
self.nodes[node.id] = node

return self

这个Connection 就是继承自最朴素的Edge (边),只不过其提供了一个callback参数,这个会在状态转移的时候调用,因此可以添加一些自定义的功能

1
2
3
4
5
class Connection(pgraph.Edge):
def __init__(self, src, dst, callback=None):
super(Connection, self).__init__(src, dst)

self.callback = callback

状态图案例

image-20220116121630857

创建完Request之后,接下来就是向里面添加Primitives,根据数据类型划分出多个添加函数,首先看看string类型的函数s_string

函数中新建String对象后,通过Request的push函数填充到request中

s_string

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def s_string(value="", size=None, padding=b"\x00", encoding="ascii", fuzzable=True, max_len=None, name=None):
# support old interface where default was -1 instead of None
if size == -1:
size = None
if max_len == -1:
max_len = None

blocks.CURRENT.push(
String(
name=name,
default_value=value,
size=size,
padding=padding,
encoding=encoding,
max_len=max_len,
fuzzable=fuzzable,
)
)

Request.push

1.首先给传进来的item也就是Primitive添加一些环境信息:

​ (1)context_path: 调用_generate_context_path产生的字符串,_generate_context_path是将block_stack中的字符串全部拼接在一起产生路径字符串,用于标记item的位置

​ (2)设置item的request为当前request

2.将item的qualified_name加入到names map中,用于监测重复插入

3.如果当前request还没有block,block_stack就为空,就将item插入到整体的stack中,如果block_stack不为空,就相当于现在还在组建block,就把item插入到栈顶的block中

4.如果item是block,会先把block插入到stack中,然后插入到block_stack中作为当前的open_block,接下来的primitive都会插入到block里面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def push(self, item):
"""
Push an item into the block structure. If no block is open, the item goes onto the request stack. otherwise,
the item goes onto the last open blocks stack.

What this method does:
1. Sets context_path for each pushed FuzzableWrapper.
2. Sets request for each FuzzableWrapper
3. Checks for duplicate qualified_name items
4. Adds item to self.names map (based on qualified_name)
5. Adds the item to self.stack, or to the stack of the currently opened block.

Also: Manages block_stack, mostly an implementation detail to help static protocol definition

@type item: BasePrimitive | Block | Request | Size | Repeat
@param item: Some primitive/block/request/etc.
"""
item.context_path = self._generate_context_path(self.block_stack)
item.request = self
# ensure the name doesn't already exist.
if item.qualified_name in list(self.names):
raise exception.SullyRuntimeError("BLOCK NAME ALREADY EXISTS: %s" % item.qualified_name)

self.names[item.qualified_name] = item

# if there are no open blocks, the item gets pushed onto the request stack.
# otherwise, the pushed item goes onto the stack of the last opened block.
if not self.block_stack:
self.stack.append(item)
else:
self.block_stack[-1].push(item)

# add the opened block to the block stack.
if isinstance(item, Block) or isinstance(item, Aligned): # TODO generic check here
self.block_stack.append(item)

def _generate_context_path(self, block_stack):
context_path = ".".join(x.name for x in block_stack) # TODO put in method
context_path = ".".join(filter(None, (self.name, context_path)))
return context_path

block

根据doc,有两种插入block的方式with 和startend模式

startend方式

s_block_start 初始化一个block,并将其push

s_block_close 关闭这个block,说明数据已经填充完毕了

1
2
3
4
5
if s_block_start("header"):
s_static("\x00\x01")
if s_block_start("body"):
...
s_block_end()

s_block_start

1
2
3
4
5
def s_block_start(name=None, *args, **kwargs):
block = Block(name=name, *args, **kwargs)
blocks.CURRENT.push(block)

return block

s_block_end

1
2
def s_block_end(name=None):
blocks.CURRENT.pop()

with方式

1
2
3
with s_block("header"):
s_static("\x00\x01")
if s_block_start("body"):

with方式能用是因为s_block中调用的是s_block_start插入block,但是返回的是个ScopedBlock对象,这个对象注册了exit方法

当with范围结束时,就会调用s_block_end方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def s_block(name=None, group=None, encoder=None, dep=None, dep_value=None, dep_values=None, dep_compare="=="):

class ScopedBlock(object):
def __init__(self, block):
self.block = block

def __enter__(self):
"""
Setup before entering the "with" statement body
"""
return self.block

def __exit__(self, type, value, traceback):
"""
Cleanup after executing the "with" statement body
"""
# Automagically close the block when exiting the "with" statement
s_block_end()

block = s_block_start(
name,
request=blocks.CURRENT,
group=group,
encoder=encoder,
dep=dep,
dep_value=dep_value,
dep_values=dep_values,
dep_compare=dep_compare,
)

return ScopedBlock(block)

start fuzz

fuzz

fuzz开始于fuzz函数,传入一个request的name的话就会只fuzz这个request,不传就会按建立的图去遍历着fuzz

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def fuzz(self, name=None, max_depth=None):
"""Fuzz the entire protocol tree.
Iterates through and fuzzes all fuzz cases, skipping according to
self.skip and restarting based on self.restart_interval.
If you want the web server to be available, your program must persist
after calling this method. helpers.pause_for_signal() is
available to this end.
Args:
name (str): Pass in a Request name to fuzz only a single request message. Pass in a test cas
only a single test case.
max_depth (int): Maximum combinatorial depth; set to 1 for "simple" fuzzing.
Returns:
None
"""
self.total_mutant_index = 0
self.total_num_mutations = self.num_mutations(max_depth=max_depth)
if name is None or name == "":
self._main_fuzz_loop(self._generate_mutations_indefinitely(max_depth=max_depth))
else:
self.fuzz_by_name(name=name)

_main_fuzz_loop

1.首先会开启一个boofuzz的可视化web server

2.调用_start_target启动target,一般测试服务器的时候,是我们手动启动目标服务器,所以用不到这个,但是配合ProcMonitor我们可以设置自启动目标(Windows平台)

3.记录fuzz开始时间

4.开始fuzz大循环,每次循环调用_fuzz_current_case进行fuzz

5.num_cases_actually_fuzzed+1,如果_index_end参数不为空且total_mutant_index>=_index_end的话就结束fuzz

6.记录fuzz结束时间

这里还有个选项是_reuse_target_connection,重用连接,开启这个选项后,整个大循环中只会在这里open一次连接,如果不开这个选项,每次fuzz都会重新open一次连接

1
2
3
4
5
6
7
8
9
10
11
12
def _main_fuzz_loop(self, fuzz_case_iterator):
"""Execute main fuzz logic; takes an iterator of test cases.
Preconditions: `self.total_mutant_index` and `self.total_num_mutations` are set properly.
Args:
fuzz_case_iterator (Iterable): An iterator that walks through fuzz cases and yields MutationContext objec
See _iterate_single_node() for details.
Returns:
None
"""
#这里好像是开启一个boofuzz的可视化web端口
if self.web_port is not None:
self.server_init()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
try:
self._start_target(self.targets[0])
if self._reuse_target_connection:
self.targets[0].open()
self.num_cases_actually_fuzzed = 0
#记录fuzz开始时间
self.start_time = time.time()
for mutation_context in fuzz_case_iterator:
if self.total_mutant_index < self._index_start:
continue
# Check restart interval
if (
self.num_cases_actually_fuzzed
and self.restart_interval
and self.num_cases_actually_fuzzed % self.restart_interval == 0
):
self._fuzz_data_logger.open_test_step("restart interval of %d reached" % self.restart_interval)
self._restart_target(self.targets[0])
#这里开始fuzz这次
self._fuzz_current_case(mutation_context)
#这里是记录实际进行fuzz的次数
self.num_cases_actually_fuzzed += 1
if self._index_end is not None and self.total_mutant_index >= self._index_end:
break
if self._reuse_target_connection:
self.targets[0].close()
if self._keep_web_open and self.web_port is not None:
self.end_time = time.time()
print(
"\nFuzzing session completed. Keeping webinterface up on localhost:{}".format(self.web_port),
"\nPress ENTER to close webinterface",
)
input()

_start_target

内部调用monitor的start_target来启动目标,目标启动后,调用monitor的post_start_target回调函数

1
2
3
4
5
6
7
8
9
def _start_target(self, target):
started = False
for monitor in target.monitors:
if monitor.start_target():
started = True
break
if started:
for monitor in target.monitors:
monitor.post_start_target(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self)

_fuzz_current_case

1.打印一些信息

2.调用_open_connection_keep_trying打开连接,在这里可以实现自定义的网络状态monitor

3.调用_pre_send函数,这里会调用monitor中的pre_send回调函数(Session处填的pre_send_callback会复制到CallbackMonitor的on_pre_send中,这里pre_send就会调用它们)具体可以看后面单独对CallbackMonitor的分析

4.调用edge的callback函数,产生callback数据

5.调用transmit_fuzz进行测试数据的收发

6.调用_check_for_passively_detected_failures函数检查是否发生了crash

根据设置的sleep_time参数暂停

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def _fuzz_current_case(self, mutation_context):
"""
Fuzzes the current test case. Current test case is controlled by
fuzz_case_iterator().
Args:
mutation_context (MutationContext): Current mutation context.
"""
target = self.targets[0]
self._pause_if_pause_flag_is_set()
test_case_name = self._test_case_name(mutation_context)
self.current_test_case_name = test_case_name
self._fuzz_data_logger.open_test_case(
"{0}: {1}".format(self.total_mutant_index, test_case_name),
name=test_case_name,
index=self.total_mutant_index,
num_mutations=self.total_num_mutations,
current_index=self.mutant_index,
current_num_mutations=self.fuzz_node.get_num_mutations(),
)
if self.total_num_mutations is not None:
self._fuzz_data_logger.log_info(
"Type: {0}. Case {1} of {2} overall.".format(
type(self.fuzz_node.mutant).__name__,
self.total_mutant_index,
self.total_num_mutations,
)
)
else:
self._fuzz_data_logger.log_info(
"Type: {0}".format(
type(self.fuzz_node.mutant).__name__,
)
)
try:
#打开连接
self._open_connection_keep_trying(target)
#_pre_send被调用
self._pre_send(target)
#这里是正常发送message_path最后一条路径前面的路径数据,这里就可以看出boofuzz是按node Fuzz的
for e in mutation_context.message_path[:-1]:
prev_node = self.nodes[e.src]
node = self.nodes[e.dst]
protocol_session = ProtocolSession(
previous_message=prev_node,
current_message=node,
)
mutation_context.protocol_session = protocol_session
callback_data = self._callback_current_node(node=node, edge=e, test_case_context=protocol_session)
self._fuzz_data_logger.open_test_step("Transmit Prep Node '{0}'".format(node.name))
self.transmit_normal(target, node, e, callback_data=callback_data, mutation_context=mutation_context)
prev_node = self.nodes[mutation_context.message_path[-1].src]
node = self.nodes[mutation_context.message_path[-1].dst]
protocol_session = ProtocolSession(
previous_message=prev_node,
current_message=node,
)
mutation_context.protocol_session = protocol_session
#这里会调用callback,同时返回一个callback数据
callback_data = self._callback_current_node(
node=self.fuzz_node, edge=mutation_context.message_path[-1], test_case_context=protocol_session
)
self._fuzz_data_logger.open_test_step("Fuzzing Node '{0}'".format(self.fuzz_node.name))
#进行实际的数据发送
self.transmit_fuzz(
target,
self.fuzz_node,
mutation_context.message_path[-1],
callback_data=callback_data,
mutation_context=mutation_context,
)
#检查是否发生了crash
self._check_for_passively_detected_failures(target=target)
if not self._reuse_target_connection:
target.close()
#这里也提供了接口来睡眠
if self.sleep_time > 0:
self._fuzz_data_logger.open_test_step("Sleep between tests.")
self._sleep(self.sleep_time)

_open_connection_keep_trying

在不开启_reuse_target_connection的情况下调用target的open函数,代码中已经实现了自定义的网络状态monitor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def _open_connection_keep_trying(self, target):
"""Open connection and if it fails, keep retrying.
Args:
target (Target): Target to open.
"""
#只有不开_reuse_target_connection的时候才会open
if not self._reuse_target_connection:
out_of_available_sockets_count = 0
unable_to_connect_count = 0
initial_time = time.time()
while True:
try:
#内部就是调用target的open函数,前面已经分析了
target.open()
break # break if no exception
except exception.BoofuzzTargetConnectionFailedError:
if self.restart_threshold and unable_to_connect_count >= self.restart_threshold:
self._fuzz_data_logger.log_info(
"Unable to reconnect to target: Reached threshold of {0} retries. Ending fuzzing.".format(
self.restart_threshold
)
)
#自添加代码,实现网络状态的Monitor
with open(self.crash_filename + "_" + str(self.num_cases_actually_fuzzed),"wb") as fp:
fp.write((self.current_test_case_name+"\n").encode())
fp.write(self.last_send)
pass
raise
elif self.restart_timeout and time.time() >= initial_time + self.restart_timeout:
self._fuzz_data_logger.log_info(
"Unable to reconnect to target: Reached restart timeout of {0}s. Ending fuzzing.".format(
self.restart_timeout
)
)
raise
else:
self._fuzz_data_logger.log_info(constants.WARN_CONN_FAILED_TERMINAL)
self._restart_target(target)
unable_to_connect_count += 1
except exception.BoofuzzOutOfAvailableSockets:
out_of_available_sockets_count += 1
if out_of_available_sockets_count == 50:
raise exception.BoofuzzError("There are no available sockets. Ending fuzzing.")
self._fuzz_data_logger.log_info("There are no available sockets. Waiting for another 5 seconds.")
time.sleep(5)

_pre_send

依次调用target的monitor中的回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def _pre_send(self, target):
"""
Execute custom methods to run prior to each fuzz request. The order of events is as follows::
pre_send() - req - callback ... req - callback - post_send()
When fuzzing RPC for example, register this method to establish the RPC bind.
Args:
target (session.target): Target we are sending data to
"""
for monitor in target.monitors:
try:
self._fuzz_data_logger.open_test_step("Monitor {}.pre_send()".format(str(monitor)))
monitor.pre_send(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self)
except Exception:
self._fuzz_data_logger.log_error(
constants.ERR_CALLBACK_FUNC.format(func_name="{}.pre_send()".format(str(monitor)))
+ traceback.format_exc()
)

_callback_current_node

调用当前边edge的callback函数,并返回callback数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def _callback_current_node(self, node, edge, test_case_context):
"""Execute callback preceding current node.

Args:
test_case_context (ProtocolSession): Context for test case-scoped data.
node (pgraph.node.node (Node), optional): Current Request/Node
edge (pgraph.edge.edge (pgraph.edge), optional): Edge along the current fuzz path from "node" to next node.

Returns:
bytes: Data rendered by current node if any; otherwise None.
"""
data = None

# if the edge has a callback, process it. the callback has the option to render the node, modify it and return.
#调用edge的callback函数,并返回callback数据
if edge.callback:
self._fuzz_data_logger.open_test_step("Callback function '{0}'".format(edge.callback.__name__))
data = edge.callback(
self.targets[0],
self._fuzz_data_logger,
session=self,
node=node,
edge=edge,
test_case_context=test_case_context,
)

return data

_check_for_passively_detected_failures

依次调用monitor的post_send函数来获取是否发生了crash,如果发生了crash就继续调用get_crash_synopsis函数来获取crash概要

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def _check_for_passively_detected_failures(self, target, failure_already_detected=False):
"""Check for and log passively detected failures. Return True if any found.

Args:
target (Target): Target to be checked for failures.
failure_already_detected (bool): If a failure was already detected.

Returns:
bool: True if failures were found. False otherwise.
"""
has_crashed = False
if len(target.monitors) > 0:
self._fuzz_data_logger.open_test_step("Contact target monitors")
# So, we need to run through the array two times. First, we check
# if any of the monitors reported a failure and
# if so, we need to
# gather a crash synopsis from them. We don't know whether
# a monitor can provide a crash synopsis, but in any case, we'll
# check. In the second run, we try to get crash synopsis from the
# monitors that did not detect a crash as supplemental information.
finished_monitors = []
#依次调用monitor的post_send函数
for monitor in target.monitors:
if not monitor.post_send(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self):
has_crashed = True
self._fuzz_data_logger.log_fail(
"{0} detected crash on test case #{1}: {2}".format(
str(monitor), self.total_mutant_index, monitor.get_crash_synopsis()
)
)
finished_monitors.append(monitor)

if not has_crashed and not failure_already_detected:
self._fuzz_data_logger.log_pass("No crash detected.")
else:
for monitor in set(target.monitors) - set(finished_monitors):

synopsis = monitor.get_crash_synopsis()
if len(synopsis) > 0:
self._fuzz_data_logger.log_fail(
"{0} provided additional information for crash on #{1}: {2}".format(
str(monitor), self.total_mutant_index, monitor.get_crash_synopsis()
)
)
return has_crashed

transmit_fuzz

进行实际的数据收发

1.判断是否传入了callback数据,如果有callback数据就使用callback数据,否则调用render来产生变异数据

2.发送数据,并将发送的数据保存在last_send中

3.接受数据,并将接受的数据保存在last_recv中

last_send和last_recv都非常重要,last_send可以在监测crash时dump出来作为crash样本,last_recv则可以在边回调中决定状态机的走向,以及产生callback数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def transmit_fuzz(self, sock, node, edge, callback_data, mutation_context):
"""Render and transmit a fuzzed node, process callbacks accordingly.

Args:
sock (Target, optional): Socket-like object on which to transmit node
node (pgraph.node.node (Node), optional): Request/Node to transmit
edge (pgraph.edge.edge (pgraph.edge), optional): Edge along the current fuzz path from "node" to next node.
callback_data (bytes): Data from previous callback.
mutation_context (MutationContext): Current mutation context.
"""
#这里就可以看到,边的callback是先于数据发送的,如果callback返回了自定义数据,那这里就会直接拿callback返回的数据发送
#如果callback返回空数据,这里就会正常调用变异的数据渲染,然后发送变异数据
if callback_data:
data = callback_data
else:
data = self.fuzz_node.render(mutation_context)

try: # send
#这里发送变异数据,同时将发送的数据保存在last_send里面
self.targets[0].send(data)
self.last_send = data
except exception.BoofuzzTargetConnectionReset:
if self._ignore_connection_issues_when_sending_fuzz_data:
self._fuzz_data_logger.log_info(constants.ERR_CONN_RESET)
else:
raise BoofuzzFailure(message=constants.ERR_CONN_RESET)
except exception.BoofuzzTargetConnectionAborted as e:
msg = constants.ERR_CONN_ABORTED.format(socket_errno=e.socket_errno, socket_errmsg=e.socket_errmsg)
if self._ignore_connection_issues_when_sending_fuzz_data:
self._fuzz_data_logger.log_info(msg)
else:
raise BoofuzzFailure(msg)
except exception.BoofuzzSSLError as e:
if self._ignore_connection_ssl_errors:
self._fuzz_data_logger.log_info(str(e))
else:
raise BoofuzzFailure(str(e))

received = b""
try: # recv
if self._receive_data_after_fuzz:
received = self.targets[0].recv()
except exception.BoofuzzTargetConnectionReset:
if self._check_data_received_each_request:
raise BoofuzzFailure(message=constants.ERR_CONN_RESET)
else:
self._fuzz_data_logger.log_info(constants.ERR_CONN_RESET)
except exception.BoofuzzTargetConnectionAborted as e:
msg = constants.ERR_CONN_ABORTED.format(socket_errno=e.socket_errno, socket_errmsg=e.socket_errmsg)
if self._check_data_received_each_request:
raise BoofuzzFailure(msg)
else:
self._fuzz_data_logger.log_info(msg)
pass
except exception.BoofuzzSSLError as e:
if self._ignore_connection_ssl_errors:
self._fuzz_data_logger.log_info(str(e))
else:
self._fuzz_data_logger.log_fail(str(e))
raise BoofuzzFailure(str(e))
#这里会将这次接受到的数据保存在last_recv里面
self.last_recv = received

crash dump

上面只介绍到监测crash而没说哪里dump crash,实际上crash的dump在各个Monitor中(在_fuzz_current_case函数中是没有的)

以Procmon的DebugThread为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def post_send(self):
"""
This routine is called after the fuzzer transmits a test case and returns the status of the target.

Returns:
bool: True if the target is still active, False otherwise.
"""
if self.is_alive():
return True
else:
with open(self.process_monitor.crash_filename, "a") as rec_file:
rec_file.write(self.process_monitor.last_synopsis)

if self.process_monitor.coredump_dir is not None:
dest = os.path.join(self.process_monitor.coredump_dir, str(self.process_monitor.test_number))
src = _get_coredump_path()

if src is not None:
self.log("moving core dump %s -> %s" % (src, dest))
os.rename(src, dest)
return False

到此为止,数据的收发流程基本就了解了,剩下需要看下数据是怎么从request中产生并变异的

数据变异

_generate_mutations_indefinitely这里会产生一个iterator,迭代产生mutation_context

1
2
if name is None or name == "":
self._main_fuzz_loop(self._generate_mutations_indefinitely(max_depth=max_depth))

_generate_mutations_indefinitely

这里max_path默认传进来是个none

调用_generate_n_mutations来产生mutation_context

depth是在一次fuzz_case中,产生几个变异体,depth为1,那就是一次就变异一个primitive

_generate_mutations_indefinitely

1
2
3
4
5
6
7
8
9
10
11
12
def _generate_mutations_indefinitely(self, max_depth=None, path=None):
"""Yield MutationContext with n mutations per message over all messages, with n increasing indefinitely."""
# indefinitely 无限期的
depth = 1
while max_depth is None or depth <= max_depth:
valid_case_found_at_this_depth = False
for m in self._generate_n_mutations(depth=depth, path=path):
valid_case_found_at_this_depth = True
yield m
if not valid_case_found_at_this_depth:
break
depth += 1

_generate_n_mutations

这里会先得到path再从path里得到要fuzz的node

1
2
3
4
5
6
7
8
9
10
def _generate_n_mutations(self, depth, path):
"""Yield MutationContext with n mutations per message over all messages."""
#调试此处的yield
# for path in self._iterate_protocol_message_paths(path=path):
# print(self._message_path_to_str(path))
#先得到要fuzz的path,再从中获取要fuzz的node
for path in self._iterate_protocol_message_paths(path=path):
#_generate_n_mutations_for_path这个函数会根据上面取得的path构建MutationContext
for m in self._generate_n_mutations_for_path(path, depth=depth):
yield m

_iterate_protocol_message_paths

先检查下是否有target 以及从root发出的边

如果指定了path,就直接返回指定的path,但是默认都是空

调用_iterate_protocol_message_paths_recursive遍历path

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def _iterate_protocol_message_paths(self, path=None):
"""
Iterates over protocol and yields a path (list of Connection) leading to a given message).
#如果指定了path的集合,就返回这个指定的边的集合,否则就遍历整个协议中所有的边的可能性
Args:
path (list of Connection): Provide a specific path to yield only that specific path.

Yields:
list of Connection: List of edges along the path to the current one being fuzzed.

Raises:
exception.SulleyRuntimeError: If no requests defined or no targets specified
"""
# we can't fuzz if we don't have at least one target and one request.
if not self.targets:
raise exception.SullyRuntimeError("No targets specified in session")

if not self.edges_from(self.root.id):
raise exception.SullyRuntimeError("No requests specified in session")

if path is not None:
yield path
else:
for x in self._iterate_protocol_message_paths_recursive(this_node=self.root, path=[]):
yield x

_iterate_protocol_message_paths_recursive

这里遍历path的方法使用的是DFS,只不过用yield实现的,看着有些别扭,最终会产生从root出发的所有路径

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def _iterate_protocol_message_paths_recursive(self, this_node, path):
"""Recursive helper for _iterate_protocol.
#迭代的去取该协议中的msg的path
#这里应该是会返回所有路径
Args:
this_node (node.Node): Current node that is being fuzzed.
path (list of Connection): List of edges along the path to the current one being fuzzed.

Yields:
list of Connection: List of edges along the path to the current one being fuzzed.
"""
# step through every edge from the current node.
for edge in self.edges_from(this_node.id):
# keep track of the path as we fuzz through it, don't count the root node.
# we keep track of edges as opposed to nodes because if there is more then one path through a set of
# given nodes we don't want any ambiguity.
path.append(edge)

message_path = self._message_path_to_str(path)
logging.debug("fuzzing: {0}".format(message_path))
self.fuzz_node = self.nodes[path[-1].dst]

yield path

# recursively fuzz the remainder of the nodes in the session graph.
for x in self._iterate_protocol_message_paths_recursive(self.fuzz_node, path):
yield x

# finished with the last node on the path, pop it off the path stack.
if path:
path.pop()

知道path如何产生的之后再回去看MutationContext是怎么产生的,_generate_n_mutations_for_path函数对传进来的path产生mutation_context,mutation_context就是代表这次case的变异体上下文,depth是标识一个fuzz_case使用几个变异体,默认为1

那么MutationContext的mutations就只有一个qualified_name

_generate_n_mutations_for_path

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def _generate_n_mutations_for_path(self, path, depth):
"""Yield MutationContext with n mutations for a specific message.

Args:
path (list of Connection): Nodes (Requests) along the path to the current one being fuzzed.
depth (int): Yield sets of depth mutations.

Yields:
MutationContext: A MutationContext containing one mutation.
"""
for mutations in self._generate_n_mutations_for_path_recursive(path, depth=depth):
if not self._mutations_contain_duplicate(mutations):
self.total_mutant_index += 1
yield MutationContext(message_path=path, mutations={n.qualified_name: n for n in mutations})

_generate_n_mutations_for_path_recursive

调用_generate_n_mutations_for_path_recursive 产生mutation集合

mutaions由两个部分组成,一个是_generate_mutations_for_request函数产生的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def _generate_n_mutations_for_path_recursive(self, path, depth, skip_elements=None):
if skip_elements is None:
skip_elements = set()
#这里depth怎么也是1
if depth == 0:
yield []
return
new_skip = skip_elements.copy()
#调试yield
# for mutations in self._generate_mutations_for_request(path=path, skip_elements=skip_elements):
# print(mutations)
for mutations in self._generate_mutations_for_request(path=path, skip_elements=skip_elements):
new_skip.update(m.qualified_name for m in mutations)
for ms in self._generate_n_mutations_for_path_recursive(path, depth=depth - 1, skip_elements=new_skip):
yield mutations + ms

_generate_mutations_for_request

设置fuzz_node为当前路径上的最后一个node,之后调用fuzz_node的get_mutations,fuzz_node是一个request对象,所以这里调用的是request的get_mutations方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def _generate_mutations_for_request(self, path, skip_elements=None):
"""Yield each mutation for a specific message (the last message in path).

Args:
path (list of Connection): Nodes (Requests) along the path to the current one being fuzzed.
path (iter of str): Qualified names of elements to skip while fuzzing.

Yields:
Mutation: Mutation object describing a single mutation.
"""
if skip_elements is None:
skip_elements = []
#这里设置fuzz_node为当前fuzz路径的dst
self.fuzz_node = self.nodes[path[-1].dst]
self.mutant_index = 0
#调试yield
#value_list = list(self.fuzz_node.get_mutations(skip_elements=skip_elements))
#这里会对node里的item枚举产生mutation
for mutations in self.fuzz_node.get_mutations(skip_elements=skip_elements):
#记录整体的已经发生的变异次数
self.mutant_index += 1
yield mutations

if self._skip_current_node_after_current_test_case:
self._skip_current_node_after_current_test_case = False
break
elif self._skip_current_element_after_current_test_case:
self.fuzz_node.mutant.stop_mutations()
self._skip_current_element_after_current_test_case = False
continue

request.get_mutations

Request继承自FuzzableBlock,mutations是FuzzableBlock的方法

1
2
def get_mutations(self, default_value=None, skip_elements=None):
return self.mutations(default_value=default_value, skip_elements=skip_elements)

FuzzableBlock.mutations

遍历当前request的stack中的item,也就插入的block和primitive,再调用他们的get_mutations函数得到mutation

primitives都继承自fuzzable,所以这里调用的是fuzzable的get_mutations

1
2
3
4
5
6
7
8
9
10
def mutations(self, default_value, skip_elements=None):
if skip_elements is None:
skip_elements = []
#遍历stack中的item
for item in self.stack:
if item.qualified_name in skip_elements:
continue
self.request.mutant = item
for mutation in item.get_mutations():
yield mutation

fuzzable.get_mutations

这个函数就是对当前item进行变异,并将变异的值传到生成的Mutation里面

Mutation的构造这里就能看到,是由一个值value,一个所属item的qualified_name,以及变异计数index组成的

这里终止变异用的是_halt_mutations标志,而stop_mutations函数是提供的接口,其内部就是设置_halt_mutations为true

itertools.chain的功能就是合并列表,所以这里值得来源就为self.mutations(self.original_value()),self._fuzz_values

其中_fuzz_values在Fuzzable的init函数中是可以作为构造参数传入的,但是String(继承自Fuzzble)的构造函数里并没有这个参数,所以就没找到接口设置这个值(除了手动赋值),所以这里总是空列表

下面会以String为例,分析了其mutations函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def get_mutations(self):
"""Iterate mutations. Used by boofuzz framework.

Yields:
list of Mutation: Mutations

"""
try:
if not self.fuzzable:
return
index = 0
#value_list = list(itertools.chain(self.mutations(self.original_value()), self._fuzz_values))

for value in itertools.chain(self.mutations(self.original_value()), self._fuzz_values):
#这里是我自己添加的,当mutate进行200次后,就停止对这个item的mutate,以加速path的遍历,要不然会一直卡在对这一个item的遍历上
if index>200:
self.stop_mutations()
if self._halt_mutations:
self._halt_mutations = False
return
if isinstance(value, list):
yield value
elif isinstance(value, Mutation):
yield [value]
else:
yield [Mutation(value=value, qualified_name=self.qualified_name, index=index)]
index += 1
finally:
self._halt_mutations = False # in case stop_mutations is called when mutations were exhausted anyway

stop_mutations

1
2
3
4
5
6
7
8
9
def stop_mutations(self):
"""Stop yielding mutations on the currently running :py:meth:`mutations` call.

Used by boofuzz to stop fuzzing an element when it's already caused several failures.

Returns:
NoneType: None
"""
self._halt_mutations = True

String.mutation

这里进一步可以看到变异值有三个来源:

_fuzz_library 可以理解为预置的容易产生crash的字典

_yield_variable_mutations(default_value) 对default_value进行重叠以产生变异值

_yield_long_strings(self.long_string_seeds) 带点随机性的变异,(随机替换字符为终结符)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def mutations(self, default_value):
"""
Mutate the primitive by stepping through the fuzz library extended with the "this" library, return False on
completion.

Args:
default_value (str): Default value of element.

Yields:
str: Mutations
"""
last_val = None

for val in itertools.chain(
self._fuzz_library,
self._yield_variable_mutations(default_value),
self._yield_long_strings(self.long_string_seeds),
):
current_val = self._adjust_mutation_for_size(val)
if last_val == current_val:
continue
last_val = current_val
yield current_val

_fuzz_library (只贴了一部分)

1
2
3
4
5
6
7
8
9
10
11
12
# store fuzz_library as a class variable to avoid copying the ~70MB structure across each instantiated primitive.
# Has to be sorted to avoid duplicates
_fuzz_library = [
"!@#$%%^#$%#$@#$%$$@#$%^^**(()",
"", # strings ripped from spike (and some others I added)
"$(reboot)",
"$;reboot",
"%00",
"%00/",
"%01%02%03%04%0a%0d%0aADSF",
"%01%02%03@%04%0a%0d%0aADSF",
"%0a reboot %0a",

_yield_variable_mutations

重叠产生变异值

1
2
3
4
5
6
7
8
9
_variable_mutation_multipliers = [2, 10, 100]

def _yield_variable_mutations(self, default_value):
for length in self._variable_mutation_multipliers:
value = default_value * length
if value not in self._fuzz_library:
yield value
if self.max_len is not None and len(value) >= self.max_len:
break

_yield_long_strings

这个函数有两部分,第一部分仍然是采取重叠的策略来产生变异值,只不过seed来自于long_string_seeds

random.sample(list,size)的功能是随机抽样,从list中抽size个 数

第二部分是先按_long_string_lengths中的长度产生一个D*size的字符串,然后再随机将其中的字符替换成/00,这是整个String的变异过程中唯一随机的部分。替换哪些位置的字符是在String初始化时已经按_long_string_lengths中的长度初始化过了(random_indices)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#传进去的sequences
long_string_seeds = ["C","1","<",">","'",'"',"/","\\","?","=","a=","&",".",",","(",")","]","[","%","*","-","+","{","}","\x14","\x00","\xFE",]
_long_string_lengths = [8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 32768, 0xFFFF]
_long_string_deltas = [-2, -1, 0, 1, 2]
_extra_long_string_lengths = [99999, 100000, 500000, 1000000]

def _yield_long_strings(self, sequences):
"""
Given a sequence, yield a number of selectively chosen strings lengths of the given sequence.

@type sequences: list(str)
@param sequences: Sequence to repeat for creation of fuzz strings.
"""
#也是重叠的策略,只不过这里是从long_string_seeds中选出sequence来进行重叠
for sequence in sequences:
#按_long_string_lengths和_long_string_deltas结合产生的长度产生(标准)
for size in [
length + delta
for length, delta in itertools.product(self._long_string_lengths, self._long_string_deltas)
]:
if self.max_len is None or size <= self.max_len:
data = sequence * math.ceil(size / len(sequence))
yield data[:size]
else:
break
#按_extra_long_string_lengths中的长度产生(额外定义)
for size in self._extra_long_string_lengths:
if self.max_len is None or size <= self.max_len:
data = sequence * math.ceil(size / len(sequence))
yield data[:size]
else:
break
#按最大长度产生
if self.max_len is not None:
data = sequence * math.ceil(self.max_len / len(sequence))
yield data

for size in self._long_string_lengths:
if self.max_len is None or size <= self.max_len:
s = "D" * size
for loc in self.random_indices[size]:
yield s[:loc] + "\x00" + s[loc + 1 :] # Replace character at loc with terminator
else:
break

#def init():
self.random_indices = {}


local_random = random.Random(0) # We want constant random numbers to generate reproducible test cases
previous_length = 0
# For every length add a random number of random indices to the random_indices dict. Prevent duplicates by
# adding only indices in between previous_length and current length.
for length in self._long_string_lengths:
self.random_indices[length] = local_random.sample(
range(previous_length, length), local_random.randint(1, self._long_string_lengths[0])
)
previous_length = length

总结数据变异流程

一次fuzz_case所用的变异数据来自于mutation_context,mutation_context由message_path和mutations组成

mutations产生于primitive,对primitive的一次变异产生一个mutation,mutation中包含变异值和所属的primitive的qualified_name

根据传入的depth的数值,mutation_context可以包含多个mutation,只不过默认值depth为1,因此mutation_context一般就包含一个mutation

1
2
3
4
5
6
7
8
9
10
11
#session._main_fuzz_loop()
for mutation_context in fuzz_case_iterator:
if self.total_mutant_index < self._index_start:
continue

#session._generate_n_mutations_for_path()
self.total_mutant_index += 1
yield MutationContext(message_path=path, mutations={n.qualified_name: n for n in mutations})

#Fuzzable.get_mutations()
yield [Mutation(value=value, qualified_name=self.qualified_name, index=index)]

变量total_mutant_index标记产生了多少mutation_context,也就等同于fuzz_case的次数

变量mutant_index标记产生了多少mutant(mutation),在depth为1的情况下,mutant_index就等于total_mutant_index

而mutation的产生则是首先会遍历出状态图的所有path,然后对path中最后一个node中的item进行变异

其他细节

这里再分析下刚才没提到的一些细节,其实整体框架和流程已经分析完了,但是对这些小细节也比较清楚的话,能更好的了解boofuzz

qualified_name

我们创建primitive时一般只给个default_value,这样在Fuzzable里,就会默认赋值个name,格式是类型加计数,例如String1 String2

primitive的context_path是在push的时候赋予的,标记着的是其在request中的位置

最后qualified_name的产生是将context_path和name拼接在一起

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#Fuzzable._init()
if self._name is None:
Fuzzable.name_counter += 1
self._name = "{0}{1}".format(type(self).__name__, Fuzzable.name_counter)

#Request.push()
item.context_path = self._generate_context_path(self.block_stack)
def _generate_context_path(self, block_stack):
context_path = ".".join(x.name for x in block_stack) # TODO put in method
context_path = ".".join(filter(None, (self.name, context_path)))
return context_path
#Fuzzable.qualified_name
@property
def qualified_name(self):
"""Dot-delimited name that describes the request name and the path to the element within the request.

Example: "request1.block1.block2.node1"

"""
return ".".join(s for s in (self._context_path, self.name) if s != "")

path数据发送

前面已经介绍了变异值数据产生和数据发送,但是实际上数据产生和数据发送间还有一些细节没分析

为了贴合网络协议,boofuzz在发送变异数据前,会先把其path上的正常数据先都发送过去,变异mutation都是在path的最后一个node上

1
2
3
4
5
6
7
8
9
10
11
12
13
#seesion._fuzz_current_case()
#这里是正常发送message_path最后一条路径前面的路径数据,这里就可以看出boofuzz是按node Fuzz的
for e in mutation_context.message_path[:-1]:
prev_node = self.nodes[e.src]
node = self.nodes[e.dst]
protocol_session = ProtocolSession(
previous_message=prev_node,
current_message=node,
)
mutation_context.protocol_session = protocol_session
callback_data = self._callback_current_node(node=node, edge=e, test_case_context=protocol_session)
self._fuzz_data_logger.open_test_step("Transmit Prep Node '{0}'".format(node.name))
self.transmit_normal(target, node, e, callback_data=callback_data,mutation_context=mutation_context)

transmit_normal

如果callback_data不为空就发送callback_data,否则发送render(mutation_context)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def transmit_normal(self, sock, node, edge, callback_data, mutation_context):
"""Render and transmit a non-fuzzed node, process callbacks accordingly.

Args:
sock (Target, optional): Socket-like object on which to transmit node
node (pgraph.node.node (Node), optional): Request/Node to transmit
edge (pgraph.edge.edge (pgraph.edge), optional): Edge along the current fuzz path from "node" to next node.
callback_data (bytes): Data from previous callback.
mutation_context (MutationContext): active mutation context
"""
if callback_data:
data = callback_data
else:
data = node.render(mutation_context=mutation_context)

try: # send
self.targets[0].send(data)
self.last_send = data
if self._receive_data_after_each_request:
self.last_recv = self.targets[0].recv()

request.render

这个函数流程前面也没分析,

1
2
3
4
5
def render(self, mutation_context=None):
if self.block_stack:
raise exception.SullyRuntimeError("UNCLOSED BLOCK: %s" % self.block_stack[-1].qualified_name)

return self.get_child_data(mutation_context=mutation_context)

request.get_child_data

这个函数遍历request中的item,来拼接出数据,item基本都继承自Fuzzable(除了Block)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def get_child_data(self, mutation_context):
"""Get child or referenced data for this node.

For blocks that reference other data from the message structure (e.g. size, checksum, blocks). See
FuzzableBlock for an example.

Args:
mutation_context (MutationContext): Mutation context.

Returns:
bytes: Child data.
"""
rendered = b""
for item in self.stack:
rendered += item.render(mutation_context=mutation_context)
return rendered

Fuzzable.render

调用get_value获取值

1
2
3
4
5
6
7
8
def render(self, mutation_context=None):
"""Render after applying mutation, if applicable.
:type mutation_context: MutationContext
"""
return self.encode(value=self.get_value(mutation_context=mutation_context),mutation_context=mutation_context)

def encode(self, value, mutation_context):
return value

Fuzzable.get_value

就是如果当前item在mutation_context的qualified_name中,就返回变异值,否则就返回初始值_default_value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def get_value(self, mutation_context=None):
"""Helper method to get the currently applicable value.

This is either the default value, or the active mutation value as dictated by mutation_context.

Args:
mutation_context (MutationContext):

Returns:

"""
if mutation_context is None:
mutation_context = MutationContext()
if self.qualified_name in mutation_context.mutations:
mutation = mutation_context.mutations[self.qualified_name]
if callable(mutation.value):
value = mutation.value(self.original_value(test_case_context=mutation_context.protocol_session))
else:
value = mutation.value
else:
value = self.original_value(test_case_context=mutation_context.protocol_session)

return value

original_value

因为传进来的都是ProtocolSession对象,所以走else分支返回_default_value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def original_value(self, test_case_context=None):
"""Original, non-mutated value of element.

Args:
test_case_context (ProtocolSession): Used to resolve ReferenceValueTestCaseSession type default values.

Returns:
"""
#这个分支不知道什么时候用到
if isinstance(self._default_value, ProtocolSessionReference):
if test_case_context is None:
return self._default_value.default_value
else:
return test_case_context.session_variables[self._default_value.name]
else:
return self._default_value

继续上面数据发送的位置,path上的正常数据发送完之后才会发送变异数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
prev_node = self.nodes[mutation_context.message_path[-1].src]
node = self.nodes[mutation_context.message_path[-1].dst]
protocol_session = ProtocolSession(
previous_message=prev_node,
current_message=node,
)
mutation_context.protocol_session = protocol_session
#这里会调用callback,同时返回一个callback数据
callback_data = self._callback_current_node(
node=self.fuzz_node, edge=mutation_context.message_path[-1], test_case_context=protocol_session
)
self._fuzz_data_logger.open_test_step("Fuzzing Node '{0}'".format(self.fuzz_node.name))
#进行实际的变异数据发送
self.transmit_fuzz(
target,
self.fuzz_node,
mutation_context.message_path[-1],
callback_data=callback_data,
mutation_context=mutation_context,
)

Monitor

boofuzz只提供了三种monitor,

ProcessMonitor大概是和Procman 进行rpc通讯来监控

NetworkMonitor具体用法不太清楚,看doc里说用了wireshark

CallbackMonitor是默认的Monitor,提供回调函数的功能

我们一般需要一个监控连接状态的Monitor,如果连接失败则判定为发生了crash,保存crash样本,前面代码中有我实现的简陋的方案

CallbackMonitor

这个Monitor是以Monitor的形式提供几种callback,在session的init函数中,是把传进来的callback赋值给CallbackMonitor,这个Monitor也是会默认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

if pre_send_callbacks is None:
pre_send_methods = []
else:
pre_send_methods = pre_send_callbacks

if post_test_case_callbacks is None:
post_test_case_methods = []
else:
post_test_case_methods = post_test_case_callbacks

if post_start_target_callbacks is None:
post_start_target_methods = []
else:
post_start_target_methods = post_start_target_callbacks

if restart_callbacks is None:
restart_methods = []
else:
restart_methods = restart_callbacks


self._callback_monitor = CallbackMonitor(
on_pre_send=pre_send_methods,
on_post_send=post_test_case_methods,
on_restart_target=restart_methods,
on_post_start_target=post_start_target_methods,
)


if target is not None:

def apply_options(monitor):
monitor.set_options(crash_filename=self._crash_filename)

return

target.monitor_alive.append(apply_options)

try:
self.add_target(target)
except exception.BoofuzzRpcError as e:
self._fuzz_data_logger.log_error(str(e))
raise

前面回调函数都是用的target的monitor的回调,在session的init中首先设置了_callback_monitor为刚才创建的CallbackMonitor,其给target设置的有些隐蔽,是在add_target中设置的

add_target

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def add_target(self, target):
"""
Add a target to the session. Multiple targets can be added for parallel fuzzing.

Args:
target (Target): Target to add to session
"""

# pass specified target parameters to the PED-RPC server.
target.monitors_alive()
target.set_fuzz_data_logger(fuzz_data_logger=self._fuzz_data_logger)

if self._callback_monitor not in target.monitors:
target.monitors.append(self._callback_monitor)

# add target to internal list.
self.targets.append(target)

pre_send

以CallbackMonitor的pre_send为例,可以看到其遍历on_pre_send函数来调用

1
2
3
4
5
6
7
8
9
10
11
12
13
def pre_send(self, target=None, fuzz_data_logger=None, session=None):
"""This method iterates over all supplied pre send callbacks and executes them.
Their return values are discarded, exceptions are catched and logged, but otherwise
discarded.
"""
try:
for f in self.on_pre_send:
fuzz_data_logger.open_test_step('Pre_Send callback: "{0}"'.format(f.__name__))
f(target=target, fuzz_data_logger=fuzz_data_logger, session=session, sock=target)
except Exception:
fuzz_data_logger.log_error(
constants.ERR_CALLBACK_FUNC.format(func_name="pre_send") + traceback.format_exc()
)

Refs

jtpereyda/boofuzz: A fork and successor of the Sulley Fuzzing Framework (github.com)

Monitors — boofuzz 0.4.0 documentation

Boofuzz - A helpful guide (OSCE - CTP) - Zero Aptitude (archive.org)

IoT设备固件分析之网络协议 fuzz (seebug.org)

Network & Process monitoring - practical examples · Issue #315 · jtpereyda/boofuzz (github.com)