This post expands on the previous post in the series by decoupling the components and event handlers and allowing the execution environment to be easily redefined by replacing small sections of the code, without having to change anything else. It also lays the groundwork for a thread safe parallel execution environment.
First, we include the standard C++ library classes which we will be suing.
#include <map> #include <vector> #include <deque> #include <iostream>
This next block of code defines a typedef for a mutex and a class for creating atomic blocks of code through RAII. This isn’t yet used, but in the next post, when we introduce threads and parallel execution, this will be modified to actually refer to a mutex and actually lock and unlock the mutex, allowing the rest of the code to be made threadsafe without any other changes (besides implementing the threads).
// Replace this with a Mutex or lock datatype.
typedef int Lock;
// Use RAII to lock and unlock a mutex.
class Atomic {
private:
Lock lock;
public:
Atomic (Lock& l)
: lock(l) {
// Lock the lock.
}
~Atomic () {
// Unlock the lock.
}
};
The event base class is unchanged from the previous version.
// Base class for event data type.
class Event {
public:
Event (unsigned int t) : type(t) {}
virtual ~Event () {}
const unsigned int type;
};
The component base class is almost the same as in the previous version. In fact, its simpler, as it no longer stores a map of event ids to components. It does still contain the methods for registering to receive events and to send events, but these are merely convenience functions which delegate the actual work to the execution manager.
// Base class for a component.
class Component {
protected:
// Register for all events of type "type".
void registerForEvent (const unsigned type);
// Send an event.
void send (const Event* const event);
public:
// Event handler, called when an event, which is being listened for, is received.
virtual void eventReceived(const Event* const event)=0;
};
The following is an abstract class defining the execution manager. The job of the execution manager is to map events to the desired components and schedule the event handlers. This is an abstract class so that the actual execution model can be defined in a subclass. This lets us swap in different execution models later without having to change much code.
// Abstract class used to manage execution of components.
class Manager {
private:
static Manager* manager;
public:
// Provide access to the stored manager.
static void set (Manager* man) {manager = man;}
static Manager* const get () {return manager;}
virtual ~Manager () {}
// Register for all events of type "type".
virtual void registerForEvent (Component* component, const unsigned type)=0;
// Send an event.
virtual void send (const Event* const event)=0;
// Execute component system.
virtual void exec ()=0;
};
Manager* Manager::manager = 0;
We now implement the component methods by delegaing the work to the execution manager.
void Component::registerForEvent (const unsigned int type) {
// Delegate to manager.
Manager::get()->registerForEvent(this, type);
}
void Component::send (const Event* const event) {
// Delegate to manager.
Manager::get()->send(event);
}
The following code defines a sample event and component and remains largely unchanged from the previous version. It does, however, add another event type, to demonstrate how events may be sent from inside the event handler and it removes the quit event, which is not needed.
// Keep a list of event types.
enum EventIdentifiers {
PRINT_HELLO = 0,
PRINT_MESSAGE,
SEND_MESSAGE
};
// A sample event.
class MessageEvent : public Event {
public:
MessageEvent () : Event(PRINT_MESSAGE) {}
virtual ~MessageEvent () {}
std::string message;
};
// A sample component.
class HelloPrinter : public Component {
public:
HelloPrinter () {
registerForEvent(PRINT_HELLO);
registerForEvent(PRINT_MESSAGE);
registerForEvent(SEND_MESSAGE);
}
virtual ~HelloPrinter () {}
// Sample event handler.
void eventReceived(const Event* const event) {
if (event->type == PRINT_HELLO) {
std::cout << "Hello!\n";
}
else if (event->type == PRINT_MESSAGE) {
std::cout << "Message: " << static_cast<const MessageEvent*>(event)->message << "\n";
}
else if (event->type == SEND_MESSAGE) {
// Demonstrate sending events from inside the event handler.
sendMessage("Send message event sent a message");
}
}
// Convenience functions for sending events.
void sendHello () {
Event* event = new Event(PRINT_HELLO);
send(event);
}
void sendMessage (const std::string& message) {
MessageEvent* event = new MessageEvent;
event->message = message;
send(event);
}
void sendEvent(unsigned int id) {
Event* event = new Event(id);
send(event);
}
};
The reference counting class is a utility class which we use to allow us to reference count events, so that if an event is sent to multiple components, we do not delete it until all components have had a chance to process the event. This means that an event can be allocated, sent and forgotton about, as the reference counter will take care of notifying the manager of when the event should be deleted.
// Class to keep a thread safe reference count for a pointer.
template <class T>
class ReferenceCounter {
private:
const T* const item;
unsigned int count;
Lock lock;
public:
ReferenceCounter (const T* const pointer)
: item(pointer), count(0)
{
}
// Delete the pointer when the reference counter is destroyed.
~ReferenceCounter () {
delete item;
}
// Provide access to the internal lock, in case access to addRef() needs to be synchronised.
Lock& getLock () {return lock;}
// Provide access to the stored pointer - addRef() must have been called once for each time this is used and release() should be called once after each call - if release() returns true, then this pointer is now invalid and the ReferenceCounter should be destroyed.
const T* const get () {return item;}
// Increase the reference count.
ReferenceCounter<T>* addRef () {
++count;
return this;
}
// Decrease the reference count and return true is the reference counter is ready to be destroyed.
bool release () {
Atomic atom(lock);
return --count <= 0;
}
};
We now implement the execution manager. This execution manager will process the events sequentially, inthe order that they were sent and the components will process them in the order that they registered for the events. The execution manager stores a queue of pair – an event and component to process this event – which ill be processed in turn. Events are added to the back and taken from the front for FIFO processing. Also, a map of event types and components is stored, so that we can keep track of which component is itnerested in which event.
// Sample execution manager used to execute components in a synchronous fashion.
class SynchronousExecutionManager : public Manager {
private:
Lock queueLock;
// Queue of events needing to be processed.
std::deque<std::pair<Component*, ReferenceCounter<Event>* > > eventQueue;
Lock mapLock;
// Map of event types to components registered for those events.
std::map<unsigned int, std::vector<Component*> > eventMap;
public:
SynchronousExecutionManager () {}
virtual ~SynchronousExecutionManager () {}
// Register for all events of type "type".
void registerForEvent (Component* component, const unsigned type);
// Send an event.
void send (const Event* const event);
// Execute component system.
void exec ();
};
Registering for an event is a simple matter of adding the component to the list of components registered for a given event type. This is done in an atomic block, so that the event map cannot be modified concurrently.
void SynchronousExecutionManager::registerForEvent (Component* component, const unsigned type) {
Atomic atom(mapLock);
eventMap[type].push_back(component);
}
The send method simply ensures the event is reference counted and then adds itself to the queue for each component that is registered to receive that event. Again, this method locks the shared mutexes so that the queue and map cannot be in an inconsistent state.
void SynchronousExecutionManager::send (const Event* const event) {
Atomic atom1(queueLock);
Atomic atom2(mapLock);
// Create a new reference counter for the event.
ReferenceCounter<Event>* ref = new ReferenceCounter<Event>(event);
// Get list of components which should receive this event.
std::vector<Component*>& components = eventMap[event->type];
// Add the event to the queue, once for each component - add to the reference count for each one.
for (std::vector<Component*>::iterator i = components.begin(); i != components.end(); i++) {
eventQueue.push_back(std::pair<Component*, ReferenceCounter<Event>* >(*i, ref->addRef()));
}
}
This method handles actually calling the event handler for each event. It does this by taking the next event off the queue, in a thread safe manner, and then calling the event handler. Finally, the reference count for the event is decremented and if needs be, the event is deleted. This ensures that events do not leak memory.
void SynchronousExecutionManager::exec () {
ReferenceCounter<Event>* ref;
Component* component;
// As long as there are events to be processed, keep processing.
while (!eventQueue.empty()) {
{
Atomic atom(queueLock);
// Get the next event/component pair from the queue.
std::pair<Component*, ReferenceCounter<Event>* > next = eventQueue.front();
eventQueue.pop_front();
ref = next.second;
component = next.first;
}
// Handle the event.
component->eventReceived(ref->get());
// Release reference count for the event just processed.
if (ref->release()) {
// Nobody else is holding any more references to this or will be touching this in any way
delete ref;
}
}
}
Finally, a test program to test the event system.
// Sample run
int main (int argc, char** argv) {
// Create the execution manager.
Manager::set(new SynchronousExecutionManager());
// Seed the system with some events.
HelloPrinter a, b, c;
a.sendHello();
b.sendMessage("Hi from b");
c.sendEvent(SEND_MESSAGE); // Sent to all 3, which each send another event to all 3, causing 9 string messages to be sent.
// Run the event system.
Manager* manager = Manager::get();
manager->exec();
// Clean up.
delete manager;
}
Hopefully this helped you understand how the components from the previous system can be decoupled in a way that the execution environment can be easily controlled without requiring code modification. In the next part, we will implement a new execution manager, which changes the exec() method to actually process the event handlers in a thread pool, for parallel execution.