Add a first draft of a message bus
This commit is contained in:
		
							parent
							
								
									712da0e988
								
							
						
					
					
						commit
						b3ca7926b2
					
				| 
						 | 
				
			
			@ -11,9 +11,11 @@
 | 
			
		|||
#include <GLFW/glfw3.h>
 | 
			
		||||
 | 
			
		||||
#include "common/logging.hpp"
 | 
			
		||||
#include "common/messaging/bus.hpp"
 | 
			
		||||
 | 
			
		||||
// Globals
 | 
			
		||||
static Logger logger;
 | 
			
		||||
static MessageBus messager(&logger);
 | 
			
		||||
 | 
			
		||||
// End globals
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -22,6 +24,10 @@ static void error_callback(int error, const char* description)
 | 
			
		|||
   fprintf(stderr, "Error %d: %s\n", error, description);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void debug_messagebus_callback(Message *m) {
 | 
			
		||||
   logger.log(info, "(debug_messagebus_callback) Processed message: '%s'", m->name);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int main(int argc, char** argv)
 | 
			
		||||
{
 | 
			
		||||
    GLFWwindow* window;
 | 
			
		||||
| 
						 | 
				
			
			@ -87,12 +93,19 @@ int main(int argc, char** argv)
 | 
			
		|||
    glBindBuffer(GL_ARRAY_BUFFER, vertexbuffer);
 | 
			
		||||
    glBufferData(GL_ARRAY_BUFFER, sizeof(g_vertex_buffer_data),
 | 
			
		||||
                 g_vertex_buffer_data, GL_STATIC_DRAW);
 | 
			
		||||
    static Console console;
 | 
			
		||||
    static Console console(&messager);
 | 
			
		||||
    bool show_console = false;
 | 
			
		||||
    messager.registerCallback(&debug_messagebus_callback);
 | 
			
		||||
    while(!glfwWindowShouldClose(window)) {
 | 
			
		||||
       glfwPollEvents();
 | 
			
		||||
       // Process input.
 | 
			
		||||
 | 
			
		||||
       // Process messages
 | 
			
		||||
       int messages_treated = messager.processAll();
 | 
			
		||||
       if (messages_treated > 0) {
 | 
			
		||||
          logger.log(info, "Treated %d messages this tick", messages_treated);
 | 
			
		||||
       }
 | 
			
		||||
 | 
			
		||||
       // @TODO Game tick
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,6 +2,8 @@
 | 
			
		|||
#include <ctype.h>          // toupper
 | 
			
		||||
#include <string.h>
 | 
			
		||||
 | 
			
		||||
#include "common/messaging/bus.hpp"
 | 
			
		||||
 | 
			
		||||
struct Console
 | 
			
		||||
{
 | 
			
		||||
   char                  InputBuf[256];
 | 
			
		||||
| 
						 | 
				
			
			@ -12,8 +14,9 @@ struct Console
 | 
			
		|||
   ImGuiTextFilter       Filter;
 | 
			
		||||
   bool                  AutoScroll;
 | 
			
		||||
   bool                  ScrollToBottom;
 | 
			
		||||
   MessageBus*           messager;
 | 
			
		||||
 | 
			
		||||
   Console()
 | 
			
		||||
   Console(MessageBus *messager)
 | 
			
		||||
      {
 | 
			
		||||
         ClearLog();
 | 
			
		||||
         memset(InputBuf, 0, sizeof(InputBuf));
 | 
			
		||||
| 
						 | 
				
			
			@ -26,6 +29,7 @@ struct Console
 | 
			
		|||
         AutoScroll = true;
 | 
			
		||||
         ScrollToBottom = false;
 | 
			
		||||
         AddLog("Welcome to Dear ImGui!");
 | 
			
		||||
         this->messager = messager;
 | 
			
		||||
      }
 | 
			
		||||
   ~Console()
 | 
			
		||||
      {
 | 
			
		||||
| 
						 | 
				
			
			@ -40,7 +44,7 @@ struct Console
 | 
			
		|||
   static char* Strdup(const char *str)                             { size_t len = strlen(str) + 1; void* buf = malloc(len); IM_ASSERT(buf); return (char*)memcpy(buf, (const void*)str, len); }
 | 
			
		||||
   static void  Strtrim(char* str)                                  { char* str_end = str + strlen(str); while (str_end > str && str_end[-1] == ' ') str_end--; *str_end = 0; }
 | 
			
		||||
 | 
			
		||||
   void    ClearLog()
 | 
			
		||||
   void ClearLog()
 | 
			
		||||
      {
 | 
			
		||||
         for (int i = 0; i < Items.Size; i++)
 | 
			
		||||
            free(Items[i]);
 | 
			
		||||
| 
						 | 
				
			
			@ -95,7 +99,7 @@ struct Console
 | 
			
		|||
      AddLog("[debug] Parsed host '%s' [default:%d] and port '%s' [default:%d] from connect command.",
 | 
			
		||||
             host, !gotHost, port, !gotPort);
 | 
			
		||||
      // Raise an event
 | 
			
		||||
      
 | 
			
		||||
      this->messager->add("connect");
 | 
			
		||||
   }
 | 
			
		||||
 | 
			
		||||
   void    AddLog(const char* fmt, ...) IM_FMTARGS(2)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,150 @@
 | 
			
		|||
#ifndef _MESSAGE_BUS_H
 | 
			
		||||
#define _MESSAGE_BUS_H
 | 
			
		||||
 | 
			
		||||
#include <assert.h>
 | 
			
		||||
#include <stdarg.h>
 | 
			
		||||
#include <stdlib.h>
 | 
			
		||||
#include <string.h>
 | 
			
		||||
#include <vector>
 | 
			
		||||
 | 
			
		||||
#include "common/logging.hpp"
 | 
			
		||||
 | 
			
		||||
struct MessageParam {
 | 
			
		||||
   public:
 | 
			
		||||
   int data_size;
 | 
			
		||||
   void *data;
 | 
			
		||||
   MessageParam(void *src, int size) {
 | 
			
		||||
      data_size = size;
 | 
			
		||||
      data = src;
 | 
			
		||||
   }
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
struct Message {
 | 
			
		||||
   const char *name;
 | 
			
		||||
   int parameter_count;
 | 
			
		||||
   Message(const char *name) {
 | 
			
		||||
      this->name = name;
 | 
			
		||||
   }
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
class MessageBus {
 | 
			
		||||
   // arrray of things
 | 
			
		||||
   // This class is responsible for allocating and freeing memory for the
 | 
			
		||||
   // messages and their parameters.
 | 
			
		||||
   private:
 | 
			
		||||
   Message *bufferStart;
 | 
			
		||||
   int bufferSize;
 | 
			
		||||
   Message *occupiedHead;
 | 
			
		||||
   Message *occupiedTail;
 | 
			
		||||
   int count;
 | 
			
		||||
   Logger *logger;
 | 
			
		||||
   std::vector<void (*)(Message*)> callbacks;
 | 
			
		||||
 | 
			
		||||
   public:
 | 
			
		||||
   MessageBus(Logger *l, int initial_size = 100) {
 | 
			
		||||
      this->bufferSize = initial_size;
 | 
			
		||||
      this->bufferStart = (Message *) calloc(initial_size, sizeof(struct Message));
 | 
			
		||||
      this->occupiedHead = NULL; // the furthest "into" the buffer we are
 | 
			
		||||
      this->occupiedTail = NULL; // the closest to the "start" of the buffer the useful bits are
 | 
			
		||||
      this->count = 0;
 | 
			
		||||
      this->logger = l;
 | 
			
		||||
      l->log(info, "Initialized MessageBus with a buffer of %d items (%d bytes), starting at %d",
 | 
			
		||||
             initial_size, initial_size*sizeof(struct Message), this->bufferStart);
 | 
			
		||||
   }
 | 
			
		||||
  ~MessageBus() {
 | 
			
		||||
      free(this->bufferStart);
 | 
			
		||||
   }
 | 
			
		||||
 | 
			
		||||
   void registerCallback(void (*fp)(Message*)) {
 | 
			
		||||
      this->callbacks.push_back(fp);
 | 
			
		||||
   }
 | 
			
		||||
 | 
			
		||||
   void removeCallback(void (*fp)(Message*)) {
 | 
			
		||||
      bool found = false;
 | 
			
		||||
      int i;
 | 
			
		||||
      for(i = 0 ; i < this->callbacks.size(); i++) {
 | 
			
		||||
         if (fp == this->callbacks.at(i)) {
 | 
			
		||||
            found = true;
 | 
			
		||||
            break;
 | 
			
		||||
         }
 | 
			
		||||
      }
 | 
			
		||||
      if (found) {
 | 
			
		||||
         this->callbacks.erase(this->callbacks.begin() + i);
 | 
			
		||||
      }
 | 
			
		||||
   }
 | 
			
		||||
 | 
			
		||||
   void add(const char *name) {
 | 
			
		||||
      Message *new_item;
 | 
			
		||||
      if (occupiedHead == NULL) {
 | 
			
		||||
         new_item = this->bufferStart;
 | 
			
		||||
         this->occupiedHead = this->bufferStart;
 | 
			
		||||
         this->occupiedTail = this->bufferStart;
 | 
			
		||||
         this->count++;
 | 
			
		||||
         this->logger->log(info, "Added new item at start of empty buffer");
 | 
			
		||||
      }
 | 
			
		||||
      else {
 | 
			
		||||
         // Do we have free space?
 | 
			
		||||
         if (count < bufferSize) {
 | 
			
		||||
            new_item = this->occupiedHead + sizeof(struct Message);
 | 
			
		||||
            this->occupiedHead += sizeof(struct Message);
 | 
			
		||||
            this->count++;
 | 
			
		||||
            this->logger->log(info, "Added new item in buffer with free space");
 | 
			
		||||
         }
 | 
			
		||||
         else {
 | 
			
		||||
            // Free up space (realloc, grow if necessary);
 | 
			
		||||
            this->realloc();
 | 
			
		||||
            new_item = this->occupiedHead + sizeof(struct Message);
 | 
			
		||||
            this->occupiedHead += sizeof(struct Message);
 | 
			
		||||
            this->count++;
 | 
			
		||||
            this->logger->log(info, "Add new item after re-allocation");
 | 
			
		||||
         }
 | 
			
		||||
      }
 | 
			
		||||
      new_item->name = name;
 | 
			
		||||
      this->logger->log(info, "%d items in MessageBus buffer", this->count);
 | 
			
		||||
   }
 | 
			
		||||
 | 
			
		||||
   void remove() {
 | 
			
		||||
      this->occupiedTail += sizeof(struct Message);
 | 
			
		||||
      this->count--;
 | 
			
		||||
   }
 | 
			
		||||
 | 
			
		||||
   void realloc() {
 | 
			
		||||
      // How much free space do we have at the start?
 | 
			
		||||
      int start_available = (this->occupiedTail - this->bufferStart) / sizeof(struct Message);
 | 
			
		||||
      // We'll do a full grow if there's less than 25% free
 | 
			
		||||
      int newsize = this->bufferSize;
 | 
			
		||||
      this->logger->log(info, "Re-allocating MessageBus buffer to gain some space");
 | 
			
		||||
      if (start_available < this->bufferSize / 25) {
 | 
			
		||||
         int newsize = this->bufferSize * 2;
 | 
			
		||||
         this->logger->log(info, "Growing message buffer from %d to %d", this->bufferSize, newsize);
 | 
			
		||||
      }
 | 
			
		||||
      Message *newStart = (Message *) calloc(newsize, sizeof(struct Message));
 | 
			
		||||
      int copySize = this->occupiedHead + sizeof(struct Message) - this->occupiedTail;
 | 
			
		||||
      memcpy(newStart, this->occupiedTail, copySize);
 | 
			
		||||
      Message *newHead;
 | 
			
		||||
      newHead = newStart + (this->occupiedHead - this->occupiedTail);
 | 
			
		||||
      this->bufferStart = newStart;
 | 
			
		||||
      this->occupiedTail = newStart;
 | 
			
		||||
      this->occupiedHead = newHead;
 | 
			
		||||
   }
 | 
			
		||||
 | 
			
		||||
   int processAll() {
 | 
			
		||||
      int done = 0;
 | 
			
		||||
      while(this->count > 0) {
 | 
			
		||||
         this->process();
 | 
			
		||||
         done++;
 | 
			
		||||
      }
 | 
			
		||||
      return done;
 | 
			
		||||
   }
 | 
			
		||||
 | 
			
		||||
   void process() {
 | 
			
		||||
      assert(this->count > 0);
 | 
			
		||||
      // Call all the listening entities that care about this message or whatever
 | 
			
		||||
      for (int i = 0 ; i < this->callbacks.size() ; i++) {
 | 
			
		||||
         this->callbacks.at(i)(this->occupiedTail);
 | 
			
		||||
      }
 | 
			
		||||
      this->remove();
 | 
			
		||||
   }
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
#endif
 | 
			
		||||
		Loading…
	
		Reference in New Issue