Mock Streams
Concept-conforming test doubles for the partial-I/O concepts in Streams. Use them to drive protocol code without real network I/O, with optional chunking to exercise partial-transfer paths.
read_stream
read_stream implements the ReadStream concept. Test code stages bytes
via provide(), then the system under test (or the test body) calls
read_some() to consume them. The attached fuse injects errors at
every read call, exercising the caller’s error-handling paths. Because
fuse copies share state (see
Shared State Across Copies),
constructing read_stream rs(f) by value still ties rs to the same
fail-point machinery as f.
#include <boost/capy/test/read_stream.hpp>
#include <boost/capy/test/fuse.hpp>
#include <boost/capy/buffers/make_buffer.hpp>
#include <boost/capy/task.hpp>
using namespace boost::capy;
using namespace boost::capy::test;
void test_read_stream()
{
fuse f;
read_stream rs(f);
rs.provide("Hello, ");
rs.provide("World!");
auto r = f.armed([&](fuse&) -> task<void> {
char buf[32];
auto [ec, n] = co_await rs.read_some(
mutable_buffer(buf, sizeof(buf)));
if(ec)
co_return;
BOOST_TEST(std::string_view(buf, n) == "Hello, World!");
});
BOOST_TEST(r.success);
}
Chunked Delivery
Passing a max_read_size to the constructor limits how many bytes
read_some returns per call. Use this to simulate a network that
delivers data in small pieces and verify your protocol code loops
correctly on partial reads.
// At most 4 bytes per read_some call
fuse f;
read_stream rs(f, 4);
rs.provide("Hello, World!");
auto r = f.armed([&](fuse&) -> task<void> {
char buf[32];
auto [ec, n] = co_await rs.read_some(
mutable_buffer(buf, sizeof(buf)));
if(ec)
co_return;
BOOST_TEST(n == 4); // "Hell"
});
BOOST_TEST(r.success);
EOF Behavior
When all provided data has been consumed, read_some returns
cond::eof with a byte count of zero. The stream does not
suspend; the result is available immediately.
fuse f;
read_stream rs(f);
rs.provide("hi");
auto r = f.inert([&](fuse&) -> task<void> {
char buf[8];
// First read: consumes "hi"
auto [ec, n] = co_await rs.read_some(
mutable_buffer(buf, sizeof(buf)));
BOOST_TEST(!ec);
BOOST_TEST(std::string_view(buf, n) == "hi");
// Second read: EOF
auto [ec2, n2] = co_await rs.read_some(
mutable_buffer(buf, sizeof(buf)));
BOOST_TEST(ec2 == cond::eof);
BOOST_TEST(n2 == 0);
});
BOOST_TEST(r.success);
| Member | Description |
|---|---|
|
Construct with an optional shared |
|
Append bytes to the internal buffer for subsequent reads. Multiple calls accumulate data. |
|
Partial read. Returns up to |
|
Return the number of bytes remaining to be read. |
|
Clear all data and reset the read position. |
write_stream
write_stream implements the WriteStream concept. The system under
test calls write_some() and the test inspects what was written via
data(). Test code may also call expect() to register the data it
anticipates; any mismatch between written bytes and that prefix causes
write_some() to return error::test_failure directly. The fuse is a
separate concern used only for error injection. Because fuse copies
share state (see
Shared State Across Copies),
constructing write_stream ws(f) by value still ties ws to the same
fail-point machinery as f.
#include <boost/capy/test/write_stream.hpp>
#include <boost/capy/test/fuse.hpp>
#include <boost/capy/buffers/make_buffer.hpp>
#include <boost/capy/task.hpp>
using namespace boost::capy;
using namespace boost::capy::test;
void test_write_stream()
{
fuse f;
write_stream ws(f);
auto r = f.armed([&](fuse&) -> task<void> {
auto [ec, n] = co_await ws.write_some(
const_buffer("Hello", 5));
if(ec)
co_return;
BOOST_TEST(ws.data() == "Hello");
});
BOOST_TEST(r.success);
}
Chunked Writes
Passing a max_write_size to the constructor limits how many bytes
write_some accepts per call, simulating a slow consumer. Use this
to verify that your code loops correctly until all data is transferred.
fuse f;
write_stream ws(f, 4); // accept at most 4 bytes per call
auto r = f.inert([&](fuse&) -> task<void> {
auto [ec, n] = co_await ws.write_some(
const_buffer("Hello", 5));
BOOST_TEST(!ec);
BOOST_TEST(n == 4); // only "Hell" was accepted
});
BOOST_TEST(r.success);
Expected Data Verification
Call expect() before or after writes to assert that the written data
matches a prefix. Matched bytes are consumed from both sides. If written
data does not match the expected prefix, the next write_some call
returns error::test_failure.
fuse f;
write_stream ws(f);
ws.expect("Hello World");
auto r = f.inert([&](fuse&) -> task<void> {
// Writing matching data succeeds
auto [ec, n] = co_await ws.write_some(
const_buffer("Hello World", 11));
BOOST_TEST(!ec);
});
BOOST_TEST(r.success);
| Member | Description |
|---|---|
|
Construct with an optional shared |
|
Partial write. Appends up to |
|
Return bytes written but not yet matched by |
|
Return the number of bytes written. |
|
Register expected data and immediately check any already-written bytes. Returns an error if existing data does not match. |
stream
stream is a connected bidirectional test double. Create a pair with
make_stream_pair(f). Bytes written to one end become readable on the
other. If read_some is called on an end with no buffered data, the
calling coroutine suspends until the peer calls write_some. This
makes stream useful for testing client/server code without real
sockets.
Both stream ends satisfy ReadStream and WriteStream.
#include <boost/capy/test/stream.hpp>
#include <boost/capy/test/fuse.hpp>
#include <boost/capy/buffers/make_buffer.hpp>
#include <boost/capy/task.hpp>
using namespace boost::capy;
using namespace boost::capy::test;
void test_stream_pair()
{
fuse f;
auto [a, b] = make_stream_pair(f);
auto r = f.armed([&](fuse&) -> task<void> {
auto [ec, n] = co_await a.write_some(
const_buffer("hello", 5));
if(ec)
co_return;
char buf[32];
auto [ec2, n2] = co_await b.read_some(
mutable_buffer(buf, sizeof(buf)));
if(ec2)
co_return;
BOOST_TEST(std::string_view(buf, n2) == "hello");
});
BOOST_TEST(r.success);
}
Connected Semantics
Data written to a goes into b’s incoming buffer, and vice versa.
`write_some completes immediately and posts any suspended peer reader
before returning. If b.read_some() is called when a has not yet
written anything, the coroutine suspends; it resumes the moment a
calls write_some.
The provide() member is a shortcut that injects bytes directly into
the peer’s incoming buffer, bypassing the fuse. Use it during test
setup when you want to pre-populate data without going through an
operation under test.
EOF and Cross-End Closure
Calling close() on one end signals EOF to the peer. The peer drains
any buffered data first; once the buffer is empty, subsequent
read_some calls on the peer return cond::eof. The peer may still
call write_some after receiving EOF.
When the fuse injects an error during read_some or write_some, the
pair is automatically closed: the calling end returns the injected
error, any suspended reader on the other end is resumed with
cond::eof, and all subsequent operations on both ends return
cond::eof.
Thread Safety
Single-threaded only. Both ends of the pair must be accessed from the same thread. Concurrent access from multiple threads or multiple concurrent coroutines is undefined behavior.
| Function / Member | Description |
|---|---|
|
Create a connected pair sharing the supplied fuse. |
|
Partial read from the peer’s outgoing data. Suspends if no data is
available. Returns |
|
Partial write into the peer’s incoming buffer. Resumes a suspended
peer reader if any. Returns |
|
Signal EOF to the peer’s reads. Buffered data is drained first. Writes from the peer are unaffected. |
|
Limit bytes returned per |
|
Inject bytes into this stream for reading, bypassing the fuse.
Resumes a suspended |
|
Read exactly |
|
Return a view of the unread bytes buffered in this stream. |
Putting It Together
The following snippet tests a function that reads a single line
terminated by '\n' from a ReadStream. The fuse.armed() loop
runs the coroutine repeatedly, failing at every read_some call in
turn, then reruns in exception mode. Each injected failure exercises
a different error-handling branch inside read_line.
#include <boost/capy/buffers/make_buffer.hpp>
#include <boost/capy/concept/read_stream.hpp>
#include <boost/capy/task.hpp>
#include <boost/capy/test/fuse.hpp>
#include <boost/capy/test/read_stream.hpp>
using namespace boost::capy;
using namespace boost::capy::test;
// Function under test: read until '\n' or EOF
template<ReadStream S>
task<std::pair<std::error_code, std::string>>
read_line(S& stream)
{
std::string line;
char ch;
for(;;)
{
auto [ec, n] = co_await stream.read_some(
mutable_buffer(&ch, 1));
if(ec)
co_return {ec, std::move(line)};
if(ch == '\n')
break;
line += ch;
}
co_return {std::error_code{}, std::move(line)};
}
void test_read_line()
{
fuse f;
auto r = f.armed([&](fuse&) -> task<void> {
read_stream rs(f);
rs.provide("hello\n");
auto [ec, line] = co_await read_line(rs);
if(ec)
co_return; // fuse injected an error; exit gracefully
BOOST_TEST(line == "hello");
});
BOOST_TEST(r.success);
}
Reference
| Header | Contents |
|---|---|
|
Mock ReadStream with controllable partial reads. |
|
Mock WriteStream with controllable partial writes and expectations. |
|
Connected bidirectional pair for client/server tests. |
Continue to Mock Sources and Sinks.