Ask to join (#4)

This adds a new callback `joinRequest` and a new API function `Discord_Respond` to reply to it.
This commit is contained in:
Chris Marsh 2017-10-12 13:06:55 -07:00 committed by GitHub
parent 25b6f1dcde
commit 39ff0bf3e4
7 changed files with 191 additions and 48 deletions

View file

@ -15,6 +15,21 @@ static const char* APPLICATION_ID = "345229890980937739";
static int FrustrationLevel = 0; static int FrustrationLevel = 0;
static int64_t StartTime; static int64_t StartTime;
static int prompt(char* line, size_t size)
{
int res;
char* nl;
printf("\n> ");
fflush(stdout);
res = fgets(line, (int)size, stdin) ? 1 : 0;
line[size - 1] = 0;
nl = strchr(line, '\n');
if (nl) {
*nl = 0;
}
return res;
}
static void updateDiscordPresence() static void updateDiscordPresence()
{ {
char buffer[256]; char buffer[256];
@ -62,19 +77,37 @@ static void handleDiscordSpectate(const char* secret)
printf("\nDiscord: spectate (%s)\n", secret); printf("\nDiscord: spectate (%s)\n", secret);
} }
static int prompt(char* line, size_t size) static void handleDiscordJoinRequest(const DiscordJoinRequest* request)
{ {
int res; int response = -1;
char* nl; char yn[4];
printf("\n> "); printf("\nDiscord: join request from %s - %s - %s\n",
fflush(stdout); request->username,
res = fgets(line, (int)size, stdin) ? 1 : 0; request->avatarUrl,
line[size - 1] = 0; request->userId);
nl = strchr(line, '\n'); do {
if (nl) { printf("Accept? (y/n)");
*nl = 0; if (!prompt(yn, sizeof(yn))) {
break;
}
if (!yn[0]) {
continue;
}
if (yn[0] == 'y') {
response = DISCORD_REPLY_YES;
break;
}
if (yn[0] == 'n') {
response = DISCORD_REPLY_NO;
break;
}
} while (1);
if (response != -1) {
Discord_Respond(request->userId, response);
} }
return res;
} }
static void discordInit() static void discordInit()
@ -86,6 +119,7 @@ static void discordInit()
handlers.errored = handleDiscordError; handlers.errored = handleDiscordError;
handlers.joinGame = handleDiscordJoin; handlers.joinGame = handleDiscordJoin;
handlers.spectateGame = handleDiscordSpectate; handlers.spectateGame = handleDiscordSpectate;
handlers.joinRequest = handleDiscordJoinRequest;
Discord_Initialize(APPLICATION_ID, &handlers, 1, NULL); Discord_Initialize(APPLICATION_ID, &handlers, 1, NULL);
} }

View file

@ -41,14 +41,25 @@ typedef struct DiscordRichPresence {
int8_t instance; int8_t instance;
} DiscordRichPresence; } DiscordRichPresence;
typedef struct DiscordJoinRequest {
char userId[24];
char username[48];
char avatarUrl[128];
} DiscordJoinRequest;
typedef struct DiscordEventHandlers { typedef struct DiscordEventHandlers {
void (*ready)(); void (*ready)();
void (*disconnected)(int errorCode, const char* message); void (*disconnected)(int errorCode, const char* message);
void (*errored)(int errorCode, const char* message); void (*errored)(int errorCode, const char* message);
void (*joinGame)(const char* joinSecret); void (*joinGame)(const char* joinSecret);
void (*spectateGame)(const char* spectateSecret); void (*spectateGame)(const char* spectateSecret);
void (*joinRequest)(const DiscordJoinRequest* request);
} DiscordEventHandlers; } DiscordEventHandlers;
#define DISCORD_REPLY_NO 0
#define DISCORD_REPLY_YES 1
#define DISCORD_REPLY_IGNORE 2
DISCORD_EXPORT void Discord_Initialize(const char* applicationId, DISCORD_EXPORT void Discord_Initialize(const char* applicationId,
DiscordEventHandlers* handlers, DiscordEventHandlers* handlers,
int autoRegister, int autoRegister,
@ -65,6 +76,8 @@ DISCORD_EXPORT void Discord_UpdateConnection();
DISCORD_EXPORT void Discord_UpdatePresence(const DiscordRichPresence* presence); DISCORD_EXPORT void Discord_UpdatePresence(const DiscordRichPresence* presence);
DISCORD_EXPORT void Discord_Respond(const char* userid, /* DISCORD_REPLY_ */ int reply);
#ifdef __cplusplus #ifdef __cplusplus
} /* extern "C" */ } /* extern "C" */
#endif #endif

View file

@ -13,6 +13,7 @@ set(BASE_RPC_SRC
serialization.cpp serialization.cpp
connection.h connection.h
backoff.h backoff.h
msg_queue.h
) )
if (${BUILD_DYNAMIC_LIB}) if (${BUILD_DYNAMIC_LIB})

View file

@ -4,6 +4,7 @@
#include "discord_register.h" #include "discord_register.h"
#include "rpc_connection.h" #include "rpc_connection.h"
#include "serialization.h" #include "serialization.h"
#include "msg_queue.h"
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
@ -16,6 +17,7 @@
constexpr size_t MaxMessageSize{16 * 1024}; constexpr size_t MaxMessageSize{16 * 1024};
constexpr size_t MessageQueueSize{8}; constexpr size_t MessageQueueSize{8};
constexpr size_t JoinQueueSize{8};
struct QueuedMessage { struct QueuedMessage {
size_t length; size_t length;
@ -45,10 +47,9 @@ static int LastDisconnectErrorCode{0};
static char LastDisconnectErrorMessage[256]; static char LastDisconnectErrorMessage[256];
static std::mutex PresenceMutex; static std::mutex PresenceMutex;
static QueuedMessage QueuedPresence{}; static QueuedMessage QueuedPresence{};
static QueuedMessage SendQueue[MessageQueueSize]{}; MsgQueue<QueuedMessage, MessageQueueSize> SendQueue;
static std::atomic_uint SendQueueNextAdd{0}; MsgQueue<DiscordJoinRequest, MessageQueueSize> JoinAskQueue;
static std::atomic_uint SendQueueNextSend{0};
static std::atomic_uint SendQueuePendingSends{0};
// We want to auto connect, and retry on failure, but not as fast as possible. This does expoential // We want to auto connect, and retry on failure, but not as fast as possible. This does expoential
// backoff from 0.5 seconds to 1 minute // backoff from 0.5 seconds to 1 minute
static Backoff ReconnectTimeMs(500, 60 * 1000); static Backoff ReconnectTimeMs(500, 60 * 1000);
@ -69,30 +70,6 @@ static void UpdateReconnectTime()
std::chrono::duration<int64_t, std::milli>{ReconnectTimeMs.nextDelay()}; std::chrono::duration<int64_t, std::milli>{ReconnectTimeMs.nextDelay()};
} }
static QueuedMessage* SendQueueGetNextAddMessage()
{
// if we are not connected, let's not batch up stale messages for later
if (!Connection || !Connection->IsOpen()) {
return nullptr;
}
// if we are falling behind, bail
if (SendQueuePendingSends.load() >= MessageQueueSize) {
return nullptr;
}
auto index = (SendQueueNextAdd++) % MessageQueueSize;
return &SendQueue[index];
}
static QueuedMessage* SendQueueGetNextSendMessage()
{
auto index = (SendQueueNextSend++) % MessageQueueSize;
return &SendQueue[index];
}
static void SendQueueCommitMessage()
{
SendQueuePendingSends++;
}
DISCORD_EXPORT void Discord_UpdateConnection() DISCORD_EXPORT void Discord_UpdateConnection()
{ {
if (!Connection) { if (!Connection) {
@ -134,7 +111,7 @@ DISCORD_EXPORT void Discord_UpdateConnection()
continue; continue;
} }
if (strcmp(evtName, "GAME_JOIN") == 0) { if (strcmp(evtName, "ACTIVITY_JOIN") == 0) {
auto data = GetObjMember(&message, "data"); auto data = GetObjMember(&message, "data");
auto secret = GetStrMember(data, "secret"); auto secret = GetStrMember(data, "secret");
if (secret) { if (secret) {
@ -142,7 +119,7 @@ DISCORD_EXPORT void Discord_UpdateConnection()
WasJoinGame.store(true); WasJoinGame.store(true);
} }
} }
else if (strcmp(evtName, "GAME_SPECTATE") == 0) { else if (strcmp(evtName, "ACTIVITY_SPECTATE") == 0) {
auto data = GetObjMember(&message, "data"); auto data = GetObjMember(&message, "data");
auto secret = GetStrMember(data, "secret"); auto secret = GetStrMember(data, "secret");
if (secret) { if (secret) {
@ -150,6 +127,25 @@ DISCORD_EXPORT void Discord_UpdateConnection()
WasSpectateGame.store(true); WasSpectateGame.store(true);
} }
} }
else if (strcmp(evtName, "ACTIVITY_JOIN_REQUEST") == 0) {
auto data = GetObjMember(&message, "data");
auto user = GetObjMember(data, "user");
auto userId = GetStrMember(user, "id");
auto username = GetStrMember(user, "username");
auto avatarUrl = GetStrMember(user, "avatar");
auto joinReq = JoinAskQueue.GetNextAddMessage();
if (userId && username && joinReq) {
StringCopy(joinReq->userId, userId);
StringCopy(joinReq->username, username);
if (avatarUrl) {
StringCopy(joinReq->avatarUrl, avatarUrl);
}
else {
joinReq->avatarUrl[0] = 0;
}
JoinAskQueue.CommitAdd();
}
}
} }
} }
@ -168,10 +164,10 @@ DISCORD_EXPORT void Discord_UpdateConnection()
} }
} }
while (SendQueuePendingSends.load()) { while (SendQueue.HavePendingSends()) {
auto qmessage = SendQueueGetNextSendMessage(); auto qmessage = SendQueue.GetNextSendMessage();
Connection->Write(qmessage->buffer, qmessage->length); Connection->Write(qmessage->buffer, qmessage->length);
--SendQueuePendingSends; SendQueue.CommitSend();
} }
} }
} }
@ -199,11 +195,11 @@ void SignalIOActivity()
bool RegisterForEvent(const char* evtName) bool RegisterForEvent(const char* evtName)
{ {
auto qmessage = SendQueueGetNextAddMessage(); auto qmessage = SendQueue.GetNextAddMessage();
if (qmessage) { if (qmessage) {
qmessage->length = qmessage->length =
JsonWriteSubscribeCommand(qmessage->buffer, sizeof(qmessage->buffer), Nonce++, evtName); JsonWriteSubscribeCommand(qmessage->buffer, sizeof(qmessage->buffer), Nonce++, evtName);
SendQueueCommitMessage(); SendQueue.CommitAdd();
SignalIOActivity(); SignalIOActivity();
return true; return true;
} }
@ -243,11 +239,15 @@ DISCORD_EXPORT void Discord_Initialize(const char* applicationId,
ReconnectTimeMs.reset(); ReconnectTimeMs.reset();
if (Handlers.joinGame) { if (Handlers.joinGame) {
RegisterForEvent("GAME_JOIN"); RegisterForEvent("ACTIVITY_JOIN");
} }
if (Handlers.spectateGame) { if (Handlers.spectateGame) {
RegisterForEvent("GAME_SPECTATE"); RegisterForEvent("ACTIVITY_SPECTATE");
}
if (Handlers.joinRequest) {
RegisterForEvent("ACTIVITY_JOIN_REQUEST");
} }
}; };
Connection->onDisconnect = [](int err, const char* message) { Connection->onDisconnect = [](int err, const char* message) {
@ -290,6 +290,21 @@ DISCORD_EXPORT void Discord_UpdatePresence(const DiscordRichPresence* presence)
SignalIOActivity(); SignalIOActivity();
} }
DISCORD_EXPORT void Discord_Respond(const char* userId, /* DISCORD_REPLY_ */ int reply)
{
// if we are not connected, let's not batch up stale messages for later
if (!Connection || !Connection->IsOpen()) {
return;
}
auto qmessage = SendQueue.GetNextAddMessage();
if (qmessage) {
qmessage->length =
JsonWriteJoinReply(qmessage->buffer, sizeof(qmessage->buffer), userId, reply, Nonce++);
SendQueue.CommitAdd();
SignalIOActivity();
}
}
DISCORD_EXPORT void Discord_RunCallbacks() DISCORD_EXPORT void Discord_RunCallbacks()
{ {
// Note on some weirdness: internally we might connect, get other signals, disconnect any number // Note on some weirdness: internally we might connect, get other signals, disconnect any number
@ -326,6 +341,19 @@ DISCORD_EXPORT void Discord_RunCallbacks()
Handlers.spectateGame(SpectateGameSecret); Handlers.spectateGame(SpectateGameSecret);
} }
// Right now this batches up any requests and sends them all in a burst; I could imagine a world
// where the implementer would rather sequentially accept/reject each one before the next invite
// is sent. I left it this way because I could also imagine wanting to process these all and
// maybe show them in one common dialog and/or start fetching the avatars in parallel, and if
// not it should be trivial for the implementer to make a queue themselves.
while (JoinAskQueue.HavePendingSends()) {
auto req = JoinAskQueue.GetNextSendMessage();
if (Handlers.joinRequest) {
Handlers.joinRequest(req);
}
JoinAskQueue.CommitSend();
}
if (!isConnected) { if (!isConnected) {
// if we are not connected, disconnect message last // if we are not connected, disconnect message last
if (wasDisconnected && Handlers.disconnected) { if (wasDisconnected && Handlers.disconnected) {

36
src/msg_queue.h Normal file
View file

@ -0,0 +1,36 @@
#pragma once
#include <atomic>
// A simple queue. No locks, but only works with a single thread as producer and a single thread as
// a consumer. Mutex up as needed.
template <typename ElementType, size_t QueueSize>
class MsgQueue {
ElementType queue_[QueueSize]{};
std::atomic_uint nextAdd_{0};
std::atomic_uint nextSend_{0};
std::atomic_uint pendingSends_{0};
public:
MsgQueue(){};
ElementType* GetNextAddMessage()
{
// if we are falling behind, bail
if (pendingSends_.load() >= QueueSize) {
return nullptr;
}
auto index = (nextAdd_++) % QueueSize;
return &queue_[index];
}
void CommitAdd() { ++pendingSends_; }
bool HavePendingSends() const { return pendingSends_.load() != 0; }
ElementType* GetNextSendMessage()
{
auto index = (nextSend_++) % QueueSize;
return &queue_[index];
}
void CommitSend() { --pendingSends_; }
};

View file

@ -197,3 +197,32 @@ size_t JsonWriteSubscribeCommand(char* dest, size_t maxLen, int nonce, const cha
return writer.Size(); return writer.Size();
} }
size_t JsonWriteJoinReply(char* dest, size_t maxLen, const char* userId, int reply, int nonce)
{
JsonWriter writer(dest, maxLen);
{
WriteObject obj(writer);
WriteKey(writer, "cmd");
if (reply == DISCORD_REPLY_YES) {
writer.String("SEND_ACTIVITY_JOIN_INVITE");
}
else {
writer.String("CLOSE_ACTIVITY_JOIN_REQUEST");
}
WriteKey(writer, "args");
{
WriteObject args(writer);
WriteKey(writer, "user_id");
writer.String(userId);
}
JsonWriteNonce(writer, nonce);
}
return writer.Size();
}

View file

@ -33,6 +33,8 @@ size_t JsonWriteRichPresenceObj(char* dest,
const DiscordRichPresence* presence); const DiscordRichPresence* presence);
size_t JsonWriteSubscribeCommand(char* dest, size_t maxLen, int nonce, const char* evtName); size_t JsonWriteSubscribeCommand(char* dest, size_t maxLen, int nonce, const char* evtName);
size_t JsonWriteJoinReply(char* dest, size_t maxLen, const char* userId, int reply, int nonce);
// I want to use as few allocations as I can get away with, and to do that with RapidJson, you need // I want to use as few allocations as I can get away with, and to do that with RapidJson, you need
// to supply some of your own allocators for stuff rather than use the defaults // to supply some of your own allocators for stuff rather than use the defaults