mirror of
https://git.eden-emu.dev/archive/citron
synced 2026-03-30 16:08:26 -04:00
revert 47edb00351
revert Merge pull request 'Service: Sockets: Fix busy-waiting CPU starvation and Close/Socket race conditions' (#100) from fix-socket-performance-and-crashes into main Reviewed-on: https://git.citron-emu.org/Citron/Emulator/pulls/100
This commit is contained in:
@@ -34,24 +34,30 @@ void ProxySocket::HandleProxyPacket(const ProxyPacket& packet) {
|
||||
|
||||
const auto my_ip = room_member->GetFakeIpAddress();
|
||||
|
||||
// If the sender (local_endpoint) is OUR IP, ignore it.
|
||||
// We don't want to process our own sent packets.
|
||||
if (packet.local_endpoint.ip == my_ip) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Only accept packets meant for us or actual broadcasts.
|
||||
if (packet.remote_endpoint.ip != my_ip && !packet.broadcast) {
|
||||
return;
|
||||
}
|
||||
|
||||
// PROTOCOL & PORT CHECK
|
||||
if (protocol != packet.protocol || local_endpoint.portno != packet.remote_endpoint.portno || closed) {
|
||||
stats.packets_dropped++;
|
||||
return;
|
||||
}
|
||||
|
||||
// BROADCAST CHECK
|
||||
if (!broadcast && packet.broadcast) {
|
||||
stats.packets_dropped++;
|
||||
return;
|
||||
}
|
||||
|
||||
// DECOMPRESSION & QUEUEING
|
||||
auto decompressed = packet;
|
||||
decompressed.data = Common::Compression::DecompressDataZSTD(packet.data);
|
||||
if (decompressed.data.empty() && !packet.data.empty()) {
|
||||
@@ -59,14 +65,10 @@ void ProxySocket::HandleProxyPacket(const ProxyPacket& packet) {
|
||||
return;
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard guard(packets_mutex);
|
||||
received_packets.push(decompressed);
|
||||
stats.packets_received++;
|
||||
stats.bytes_received += decompressed.data.size();
|
||||
}
|
||||
// Wake up RecvFrom immediately
|
||||
cv_packet_received.notify_all();
|
||||
std::lock_guard guard(packets_mutex);
|
||||
received_packets.push(decompressed);
|
||||
stats.packets_received++;
|
||||
stats.bytes_received += decompressed.data.size();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
@@ -153,65 +155,34 @@ std::pair<s32, Errno> ProxySocket::RecvFrom(int flags, std::span<u8> message, So
|
||||
ASSERT(flags == 0);
|
||||
ASSERT(message.size() < static_cast<size_t>(std::numeric_limits<int>::max()));
|
||||
|
||||
const auto timeout_ms = receive_timeout == 0 ? 5000 : receive_timeout;
|
||||
|
||||
std::unique_lock lock(packets_mutex);
|
||||
|
||||
// If not blocking and no packets, return immediately
|
||||
if (received_packets.empty() && !blocking) {
|
||||
return {-1, Errno::AGAIN};
|
||||
}
|
||||
|
||||
bool signaled = cv_packet_received.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this] {
|
||||
return !received_packets.empty() || closed;
|
||||
});
|
||||
|
||||
if (closed) {
|
||||
return {-1, Errno::BADF};
|
||||
}
|
||||
|
||||
if (!signaled) {
|
||||
return {-1, Errno::TIMEDOUT};
|
||||
}
|
||||
|
||||
// Packet is ready, process it while still holding the lock
|
||||
ProxyPacket& packet = received_packets.front();
|
||||
if (addr) {
|
||||
addr->family = Domain::INET;
|
||||
addr->ip = packet.local_endpoint.ip;
|
||||
addr->portno = packet.local_endpoint.portno;
|
||||
}
|
||||
|
||||
bool peek = (flags & FLAG_MSG_PEEK) != 0;
|
||||
std::size_t read_bytes;
|
||||
std::size_t max_length = message.size();
|
||||
|
||||
if (packet.data.size() > max_length) {
|
||||
read_bytes = max_length;
|
||||
std::memcpy(message.data(), packet.data.data(), max_length);
|
||||
|
||||
if (protocol == Protocol::UDP) {
|
||||
if (!peek) {
|
||||
received_packets.pop();
|
||||
}
|
||||
return {-1, Errno::MSGSIZE};
|
||||
} else if (protocol == Protocol::TCP) {
|
||||
if (!peek) {
|
||||
std::vector<u8> numArray;
|
||||
numArray.reserve(packet.data.size() - max_length);
|
||||
std::copy(packet.data.begin() + max_length, packet.data.end(), std::back_inserter(numArray));
|
||||
packet.data = std::move(numArray);
|
||||
// TODO (flTobi): Verify the timeout behavior and break when connection is lost
|
||||
const auto timestamp = std::chrono::steady_clock::now();
|
||||
// When receive_timeout is set to zero, the socket is supposed to wait indefinitely until a
|
||||
// packet arrives. In order to prevent lost packets from hanging the emulation thread, we set
|
||||
// the timeout to 5s instead
|
||||
const auto timeout = receive_timeout == 0 ? 5000 : receive_timeout;
|
||||
while (true) {
|
||||
{
|
||||
std::lock_guard guard(packets_mutex);
|
||||
if (received_packets.size() > 0) {
|
||||
return ReceivePacket(flags, message, addr, message.size());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
read_bytes = packet.data.size();
|
||||
std::memcpy(message.data(), packet.data.data(), read_bytes);
|
||||
if (!peek) {
|
||||
received_packets.pop();
|
||||
|
||||
if (!blocking) {
|
||||
return {-1, Errno::AGAIN};
|
||||
}
|
||||
|
||||
std::this_thread::yield();
|
||||
|
||||
const auto time_diff = std::chrono::steady_clock::now() - timestamp;
|
||||
const auto time_diff_ms =
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(time_diff).count();
|
||||
|
||||
if (time_diff_ms > timeout) {
|
||||
return {-1, Errno::TIMEDOUT};
|
||||
}
|
||||
}
|
||||
|
||||
return {static_cast<s32>(read_bytes), Errno::SUCCESS};
|
||||
}
|
||||
|
||||
std::pair<s32, Errno> ProxySocket::ReceivePacket(int flags, std::span<u8> message, SockAddrIn* addr,
|
||||
@@ -336,19 +307,15 @@ std::pair<s32, Errno> ProxySocket::SendTo(u32 flags, std::span<const u8> message
|
||||
}
|
||||
|
||||
Errno ProxySocket::Close() {
|
||||
{
|
||||
std::lock_guard guard(packets_mutex);
|
||||
fd = INVALID_SOCKET;
|
||||
closed = true;
|
||||
std::lock_guard guard(packets_mutex);
|
||||
fd = INVALID_SOCKET;
|
||||
closed = true;
|
||||
|
||||
while (!received_packets.empty()) {
|
||||
received_packets.pop();
|
||||
}
|
||||
// Flush any pending packets so they don't get processed after closure
|
||||
while (!received_packets.empty()) {
|
||||
received_packets.pop();
|
||||
}
|
||||
|
||||
// Wake up any threads stuck in RecvFrom so they can close properly
|
||||
cv_packet_received.notify_all();
|
||||
|
||||
return Errno::SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
// SPDX-FileCopyrightText: Copyright 2022 yuzu Emulator Project
|
||||
// SPDX-FileCopyrightText: Copyright 2026 citron Emulator Project
|
||||
// SPDX-License-Identifier: GPL-2.0-or-later
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <span>
|
||||
#include <vector>
|
||||
#include <queue>
|
||||
@@ -94,7 +92,6 @@ private:
|
||||
Protocol protocol;
|
||||
|
||||
std::mutex packets_mutex;
|
||||
std::condition_variable cv_packet_received;
|
||||
|
||||
RoomNetwork& room_network;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user