#include "CNetMessagesManager.h" #include "CEngine.h" #include "boost/crc.hpp" #include namespace { bool SeqLess(std::uint16_t a, std::uint16_t b) { return static_cast(a - b) < 0; //difference limit is 32767 } } CNetMessagesManager::CNetMessagesManager() { Reports.reserve(1024); //TODO hardcoded } void CNetMessagesManager::AddMessage(std::shared_ptr msg, bool reliable) { if(reliable) { ReliableQueue.push_back(msg); } else { UnreliableQueue.push_back(msg); } } void CNetMessagesManager::SendReliable(std::shared_ptr msg) { AddMessage(msg, true); } void CNetMessagesManager::SendUnreliable(std::shared_ptr msg) { AddMessage(msg, false); } void CNetMessagesManager::AddReceivedPacket(std::vector&& buf, CTimePoint _nettime) { CPacket packet; packet.Buffer = std::move(buf); packet.NetTime = _nettime; ReceivedQueue.push_back(std::move(packet)); } void CNetMessagesManager::ProcessPackets() { std::sort(ReceivedQueue.begin(), ReceivedQueue.end(), [](auto& a, auto& b) -> bool { if(a.Buffer.size() < 2 || b.Buffer.size() < 2) { return false; } packetid_t id_a = *reinterpret_cast(&a.Buffer.front()); packetid_t id_b = *reinterpret_cast(&b.Buffer.front()); return SeqLess(id_a, id_b); //return id_a < id_b; }); for(auto& packet : ReceivedQueue) { ProcessSinglePacket(packet); } ReceivedQueue.clear(); } //sv //adding reliable - adding to ReliableQueue (SendReliable) //sending reliable - moving to SentMessages //cl //received reliable - sending Report //sv //received report bool CNetMessagesManager::ProcessSinglePacket(CPacket& packet) { size_t offset = sizeof(CPacketHeader); if(packet.Buffer.size() <= offset) { Log::ErrInstance() << "[DROP] packet is too small\n"; return false; } //drop packet (packet is too small) CPacketHeader* header = reinterpret_cast(&packet.Buffer.front()); //size 12 //offset 8 //12 - 8 = 4 //0 1 2 3 4 5 6 7 8 9 10 11 // x x x x boost::crc_32_type result; result.process_bytes(&packet.Buffer[offset], packet.Buffer.size() - offset); auto packet_crc = result.checksum(); if(packet_crc != header->Checksum) { //TODO failed checksum Log::ErrInstance() << "[DROP] failed checksum\n"; return false; //drop packet (failed checksum) } CBufferWrapper wrapper(packet.Buffer); wrapper.SetSeek(sizeof(CPacketHeader)); std::uint16_t reportsCount = wrapper.Read(); std::vector receivedReports; for(std::uint16_t i = 0; i < reportsCount; i++) { auto report = wrapper.Read(); receivedReports.push_back(report); } std::vector sentMessagesToRemove; for(auto& sentMessage : SentMessages) { auto it = std::find_if(receivedReports.begin(), receivedReports.end(), [&sentMessage](CReliableReport& report) -> bool { return report.ID == sentMessage.second.ID; }); if(it == receivedReports.end()) { sentMessage.second.SkippedPackets++; } else if(it->Checksum == sentMessage.second.Checksum && it->Type == sentMessage.second.Type) { sentMessagesToRemove.push_back(sentMessage.second.ID); } } for(auto id : sentMessagesToRemove) { std::erase_if(SentMessages, [id](auto& kv) -> bool { return kv.second.ID == id; }); } std::uint16_t resentCount = wrapper.Read(); std::uint16_t reliableCount = wrapper.Read(); std::uint16_t unreliableCount = wrapper.Read(); //Log::Instance() << "ProcessSinglePacket " << UserData << " unrealiable " << unreliableCount << Log::Endl; auto& netFactory = CEngine::GetInstance()->NetMessagesFactory; for(std::uint16_t i = 0; i < resentCount; i++) { auto msgId = wrapper.Read(); auto it = std::find_if(ReliablesMessages.begin(), ReliablesMessages.end(), [msgId](auto& msg) -> bool { return msg.ID == msgId; }); if(it != ReliablesMessages.end()) { //TODO message duplicate!! } auto msgType = wrapper.Read(); auto msgTypeStr = netFactory.GetByID(msgType); if(msgTypeStr.empty()) { Log::ErrInstance() << "[DROPRESEND] invalid msg type 1\n"; return false; } //drop packet (invalid msg type) auto msg = netFactory.create>(msgTypeStr); if(!msg) { Log::ErrInstance() << "[DROPRESEND] invalid msg type 2\n"; return false; } //drop packet (invalid msg type) auto msgCrc = msg->Read(wrapper); auto msgSize = msg->GetLastReadSize(); CReceivedReliableMessage reliable; reliable.ID = msgId; reliable.Crc = msgCrc; reliable.Size = msgSize; reliable.Message = std::move(msg); CReliableReport report; report.Checksum = msgCrc; report.ID = msgId; report.Type = msgType; Reports.push_back(std::move(report)); ReliablesMessages.push_back(std::move(reliable)); } for(std::uint16_t i = 0; i < reliableCount; i++) { auto msgId = wrapper.Read(); auto it = std::find_if(ReliablesMessages.begin(), ReliablesMessages.end(), [msgId](auto& msg) -> bool { return msg.ID == msgId; }); if(it != ReliablesMessages.end()) { //TODO message duplicate!! } auto msgType = wrapper.Read(); auto msgTypeStr = netFactory.GetByID(msgType); if(msgTypeStr.empty()) { Log::ErrInstance() << "[DROP] invalid msg type 1\n"; return false; } //drop packet (invalid msg type) auto msg = netFactory.create>(msgTypeStr); if(!msg) { Log::ErrInstance() << "[DROP] invalid msg type 2\n"; return false; } //drop packet (invalid msg type) auto msgCrc = msg->Read(wrapper); auto msgSize = msg->GetLastReadSize(); CReceivedReliableMessage reliable; reliable.ID = msgId; reliable.Crc = msgCrc; reliable.Size = msgSize; reliable.Message = std::move(msg); CReliableReport report; report.Checksum = msgCrc; report.ID = msgId; report.Type = msgType; Reports.push_back(std::move(report)); ReliablesMessages.push_back(std::move(reliable)); } for(std::uint16_t i = 0; i < unreliableCount; i++) { auto msgType = wrapper.Read(); //Log::Instance() << "Reading unreliable" << Log::Endl; auto msgTypeStr = netFactory.GetByID(msgType); if(msgTypeStr.empty()) { break; } //(invalid msg type) -- unreliable, not dropping the packet auto msg = netFactory.create>(msgTypeStr); if(!msg) { break; } //(invalid msg type) -- unreliable, not dropping the packet msg->Read(wrapper); //Log::Instance() << "Unreliable is " << msg->GetType() << Log::Endl; msg->NetTime = packet.NetTime; UnreliablesMessages.push_back(std::move(msg)); } return true; } void CNetMessagesManager::ProcessMessages() { std::sort(ReliablesMessages.begin(), ReliablesMessages.end(), [](auto& a, auto& b) -> bool { return SeqLess(a.ID, b.ID); //return a.ID < b.ID; }); for(auto& msg : UnreliablesMessages) { msg->Process(); } for(auto& msg : ReliablesMessages) { msg.Message->Process(); } ReliablesMessages.clear(); UnreliablesMessages.clear(); } bool CNetMessagesManager::PackSnapshot(std::vector& buf) { auto diff = CEngine::GetInstance()->Time.GetCurrent() - LastPack; if(PackDelay != 0.0 && diff < std::chrono::duration(PackDelay)) { return false; } CBufferWrapper packet(buf); packet.Write(PacketID); packet.Write(0); //placeholder packet.Write(Reports.size()); //TODO: implement (reports_count) auto& factory = CEngine::GetInstance()->NetMessagesFactory; for(auto& report : Reports) //implemented (reports) { packet.Write(report.ID); packet.Write(report.Type); packet.Write(report.Checksum); } Reports.clear(); size_t maxSkipped = 10; //TODO not hardcoded size_t maxResent = 10; //TODO not hardcoded for(auto& kv : SentMessages) { auto& sentMessage = kv.second; if(sentMessage.SkippedPackets >= maxSkipped && sentMessage.Resent >= maxResent) { //TODO disconnect } } std::erase_if(SentMessages, [&maxSkipped, &maxResent](auto& kv) -> bool { auto& sentMessage = kv.second; return sentMessage.SkippedPackets >= maxSkipped && sentMessage.Resent >= maxResent; //TODO disconnect }); std::vector toResend; for(auto& kv : SentMessages) { auto msgId = kv.first; auto& sentMessage = kv.second; if(sentMessage.SkippedPackets >= maxSkipped) { toResend.push_back(&kv.second); //sentMessage.Resent++; } } packet.Write(toResend.size()); //implemented (resend_reliable_count) packet.Write(ReliableQueue.size()); //implemented (reliable_count) packet.Write(UnreliableQueue.size()); for(auto msg : toResend) { msg->SkippedPackets = 0; msg->Resent++; netmsgid_t ID = msg->ID; packet.Write(ID); auto msgType = factory.GetIndex(msg->Message->GetType()); packet.Write(msgType); crc32_t crc = msg->Message->Write(packet); } for(auto& msg : ReliableQueue) //implemented (reliable_data) { netmsgid_t ID = MsgID; MsgID++; packet.Write(ID); auto msgType = factory.GetIndex(msg->GetType()); packet.Write(msgType); crc32_t crc = msg->Write(packet); CSentMessage sent; sent.Checksum = crc; sent.ID = ID; sent.Message = msg; sent.Type = msgType; sent.WriteTime = CEngine::GetInstance()->Time.GetCurrent(); SentMessages.emplace(ID, std::move(sent)); } for(auto& msg : UnreliableQueue) { //Log::Instance() << "Sending unreliable " << msg->GetType() << Log::Endl; auto msgType = factory.GetIndex(msg->GetType()); packet.Write(msgType); msg->Write(packet); } size_t offset = sizeof(CPacketHeader); if(buf.size() <= offset) { Log::ErrInstance() << "[SENDDROP] packet is too small\n"; return false; } //drop packet (packet is too small) CPacketHeader* header = reinterpret_cast(&buf.front()); boost::crc_32_type result; result.process_bytes(&buf[offset], buf.size() - offset); header->Checksum = result.checksum(); PacketID++; ReliableQueue.clear(); UnreliableQueue.clear(); LastPack = CEngine::GetInstance()->Time.GetCurrent(); return true; }