Do need queue here. Writes async now.

This commit is contained in:
Chris Marsh 2017-07-17 15:42:49 -07:00
parent 444e10acaf
commit 52290c488c

View file

@ -10,17 +10,45 @@
#include <condition_variable> #include <condition_variable>
#include <thread> #include <thread>
constexpr size_t MaxMessageSize = 16 * 1024;
constexpr size_t MessageQueueSize = 8;
struct QueuedMessage {
size_t length;
char buffer[MaxMessageSize];
};
static RpcConnection* Connection{nullptr}; static RpcConnection* Connection{nullptr};
static char ApplicationId[64]{}; static char ApplicationId[64]{};
static DiscordEventHandlers Handlers{}; static DiscordEventHandlers Handlers{};
static std::atomic_bool WasJustConnected{false}; static std::atomic_bool WasJustConnected{false};
static std::atomic_bool WasJustDisconnected{false}; static std::atomic_bool WasJustDisconnected{false};
static int LastErrorCode = 0; static int LastErrorCode{0};
static char LastErrorMessage[256]; static char LastErrorMessage[256];
static std::atomic_bool KeepRunning{true}; static std::atomic_bool KeepRunning{true};
static std::mutex WaitForIOMutex; static std::mutex WaitForIOMutex;
static std::condition_variable WaitForIOActivity; static std::condition_variable WaitForIOActivity;
static std::thread IoThread; static std::thread IoThread;
static QueuedMessage SendQueue[MessageQueueSize]{};
static std::atomic_uint SendQueueNextAdd{0};
static std::atomic_uint SendQueueNextSend{0};
static std::atomic_uint SendQueuePendingSends{0};
static QueuedMessage* SendQueueGetNextAddMessage() {
// if we are falling behind, bail
if (SendQueuePendingSends.load() >= MessageQueueSize) {
return nullptr;
}
auto index = (SendQueueNextAdd++) % MessageQueueSize;
return &SendQueue[index];
}
static QueuedMessage* SendQueueGetNextSendMessage() {
auto index = (SendQueueNextSend++) % MessageQueueSize;
return &SendQueue[index];
}
static void SendQueueCommitMessage() {
SendQueuePendingSends++;
}
void Discord_UpdateConnection() void Discord_UpdateConnection()
{ {
@ -34,12 +62,18 @@ void Discord_UpdateConnection()
// todo: do something... // todo: do something...
printf("Hey, I got a message\n"); printf("Hey, I got a message\n");
} }
// writes
while (SendQueuePendingSends.load()) {
auto qmessage = SendQueueGetNextSendMessage();
Connection->Write(qmessage->buffer, qmessage->length);
--SendQueuePendingSends;
}
} }
} }
void DiscordRpcIo() void DiscordRpcIo()
{ {
printf("Discord io thread start\n");
const std::chrono::duration<int64_t, std::milli> maxWait{500LL}; const std::chrono::duration<int64_t, std::milli> maxWait{500LL};
while (KeepRunning.load()) { while (KeepRunning.load()) {
@ -48,8 +82,6 @@ void DiscordRpcIo()
std::unique_lock<std::mutex> lock(WaitForIOMutex); std::unique_lock<std::mutex> lock(WaitForIOMutex);
WaitForIOActivity.wait_for(lock, maxWait); WaitForIOActivity.wait_for(lock, maxWait);
} }
Connection->Close();
printf("Discord io thread stop\n");
} }
void SignalIOActivity() void SignalIOActivity()
@ -94,12 +126,14 @@ extern "C" void Discord_Shutdown()
extern "C" void Discord_UpdatePresence(const DiscordRichPresence* presence) extern "C" void Discord_UpdatePresence(const DiscordRichPresence* presence)
{ {
char jsonBuffer[16 * 1024]; auto qmessage = SendQueueGetNextAddMessage();
char* jsonWrite = jsonBuffer; if (qmessage) {
char* jsonWrite = qmessage->buffer;
JsonWriteRichPresenceObj(jsonWrite, presence); JsonWriteRichPresenceObj(jsonWrite, presence);
size_t length = jsonWrite - jsonBuffer; qmessage->length = jsonWrite - qmessage->buffer;
Connection->Write(jsonBuffer, length); SendQueueCommitMessage();
SignalIOActivity(); SignalIOActivity();
}
} }
extern "C" void Discord_Update() extern "C" void Discord_Update()