Replies: 2 comments 3 replies
-
|
After some tests, I built the following self-contained example. This code opens a unix socket The data consists of consecutive numbers whose purpose is to ensure the packet is transmitted properly. The server checks whether the received data consists of consecutive numbers; the first number is the index of the currently received message.
#include "data.capnp.c++"
#include <capnp/message.h>
#include <capnp/serialize.h>
#include <kj/io.h>
#include <poll.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <cassert>
#include <iostream>
#include <mutex>
#include <string_view>
#include <thread>
bool shouldStop{};
int listenSocket;
std::mutex outMutex;
template <typename... Args>
void print(std::ostream& os, Args&&... args) {
std::lock_guard lock(outMutex);
(os << ... << args) << std::endl;
}
void runServer() {
listen(listenSocket, 5);
print(std::cout, "started server, listening to socket");
unsigned char receivedMessages{};
struct pollfd list[2]{
{.fd = listenSocket, .events = POLLIN, .revents = 0},
{.fd = 0, .events = POLLIN, .revents = 0}
};
while (!shouldStop) {
poll(list, list[1].fd == 0 ? 1 : 2, 1000);
if (shouldStop) break;
if ((list[0].revents & POLLIN) == POLLIN) {
// accept connection request.
// note: this simplified code only accepts
// a single connection.
struct sockaddr_un raddr;
unsigned int sockLen;
list[1].fd = accept(listenSocket, (struct sockaddr*)(&raddr), &sockLen);
print(std::cout, "[server] accepted connection");
} else if (list[1].fd != 0 and (list[1].revents & POLLIN) == POLLIN) {
// data sent from client
int available;
ioctl(list[1].fd, FIONREAD, &available);
print(std::cout, "[server] connection from client has ", available, " bytes available. reading…");
kj::FdInputStream stream(list[1].fd);
try {
auto reader = ::capnp::InputStreamMessageReader(stream).getRoot<Payload>();
assert(reader.hasContent());
auto content = reader.getContent();
unsigned char expected{receivedMessages++};
for (auto item: content) {
assert(item == expected++);
}
} catch (const kj::Exception& ex) {
print(std::cerr, "[server] exception occurred:");
print(std::cerr, " ", ex.getDescription().cStr());
break;
}
}
}
close(listenSocket);
}
int main(int argc, char *argv[]) {
listenSocket = socket(PF_UNIX, SOCK_STREAM, 0);
struct sockaddr_un addr;
bzero(&addr, sizeof(sockaddr_un));
addr.sun_family = AF_UNIX;
std::string_view path("socket");
path.copy(addr.sun_path, 104);
if (bind(listenSocket, (struct sockaddr*)(&addr), sizeof(struct sockaddr_un)) != 0) {
std::cerr << "failed to bind to socket" << std::endl;
return 1;
}
std::thread server(runServer);
sleep(1);
int clientSocket = socket(PF_UNIX, SOCK_STREAM, 0);
if (connect(clientSocket, (struct sockaddr*)(&addr), sizeof(struct sockaddr_un)) != 0) {
std::cerr << "could not connect to socket" << std::endl;
shouldStop = true;
return 1;
}
kj::FdOutputStream stream(clientSocket);
unsigned char data[1024 * 1024];
for (std::size_t i = 0; i < 1024 * 1024; ++i) {
data[i] = i % 256;
}
for (std::size_t i = 1024, j = 0; j + i < 1024 * 1024; ++j, i *= 2) {
capnp::MallocMessageBuilder msgBuilder;
auto builder = msgBuilder.initRoot<Payload>();
auto content = builder.initContent(i);
std::copy(data + j, data + j + i, content.begin());
auto segments = msgBuilder.getSegmentsForOutput();
print(std::cout,
"writing ", segments.size(), " segments of ",
capnp::computeSerializedSizeInWords(msgBuilder) * sizeof(capnp::word),
" bytes in sum to socket");
for (auto& segment: segments) {
print(std::cout, " segment: ",
segment.size() * sizeof(capnp::word));
}
try {
::capnp::writeMessage(stream, msgBuilder);
} catch (const kj::Exception& ex) {
print(std::cerr, "[client] exception in writeMessage:");
print(std::cerr, " ", ex.getDescription().cStr());
shouldStop = true;
close(clientSocket);
return 1;
}
sleep(1);
}
shouldStop = true;
close(clientSocket);
return 0;
}The output looks like this: The output strongly indicates that as soon as the message contains multiple segments, the message is not properly received. The number of bytes sent and received matches but somehow the data stream seems to get corrupted. Is there something wrong with my code? Is there a problem in Cap'n Proto? |
Beta Was this translation helpful? Give feedback.
-
This is a red herring.
This is basic socket behavior, not specific to Cap'n Proto. Once you tell Cap'n Proto to start reading the data, assuming the socket is in blocking mode (the default), it should receive all the bytes. I'm not sure why your code is hanging; perhaps there is a problem inside |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
I'm trying to send large messages through a unix socket with Cap'n Proto. On the sender side, I do this:
This prints:
Now on the server side, I do
This prints
So it is missing some of the bytes from the message that was sent. After this output, it hangs in
handleData, presumably waiting for the missing data.If I terminate the client (closing the connection), the server reports
So I'm pretty sure the problem is that I do not receive all the data. But why? I can't imagine Cap'n Proto having some limit on how much data it can transfer.
Beta Was this translation helpful? Give feedback.
All reactions