in src/filtration/filtration_processor.h [76:146]
void collect(PacketInfo& info)
{
// TODO: this code must be generalized with RPCFiltrator class
uint32_t hdr_len{0};
auto msg = reinterpret_cast<const MessageHeader*>(info.data);
switch(msg->type())
{
case MsgType::CALL:
{
auto call = static_cast<const CallHeader*>(msg);
if(RPCValidator::check(call))
{
if(NFS3Validator::check(call))
{
uint32_t proc{call->proc()};
if(ProcEnumNFS3::WRITE == proc) // truncate NFSv3 WRITE call message to NFSv3-RW-limit
{
hdr_len = (nfs3_rw_hdr_max < info.dlen ? nfs3_rw_hdr_max : info.dlen);
}
else
{
if(ProcEnumNFS3::READ == proc)
nfs3_read_match.insert(call->xid());
hdr_len = info.dlen;
}
}
else if(NFS4Validator::check(call))
{
hdr_len = info.dlen; // fully collect NFSv4 messages
}
else
{
return;
}
}
else
{
return;
}
}
break;
case MsgType::REPLY:
{
auto reply = static_cast<const ReplyHeader*>(msg);
if(RPCValidator::check(reply))
{
// Truncate NFSv3 READ reply message to NFSv3-RW-limit
//* Collect fully if reply received before matching call
if(nfs3_read_match.erase(reply->xid()) > 0)
{
hdr_len = (nfs3_rw_hdr_max < info.dlen ? nfs3_rw_hdr_max : info.dlen);
}
else
hdr_len = info.dlen;
}
else // isn't RPC reply, stream is corrupt
{
return;
}
}
break;
default:
return;
}
collection.allocate();
collection.push(info, hdr_len);
collection.complete(info);
}