Modern programming languages and multi-core CPUs offer very efficient multi-threading. Using multithreading can improve performance and responsiveness of an application, but working with threads is quite difficult as they can make things much more complicated. One way to organise threads so that they co-operate without tripping over each other is to use a messaging mechanism to communicate between them.
Why use messages?
Messages provide a good model for communication between independent processes because we humans use them all the time. We naturally coordinate and co-operate by sending messages to each other. The messages can be synchronous (like a conversation) or asynchronous (like an email or a letter) or broadcast (like a radio programme). The messaging paradigm is easy for us to imagine and understand. In particular, it provides a natural way for us to think about how things interact. We can easily imagine a process that is responsible for a particular action, which is started when it receives a message. The message may contain information needed for the action. When the action is complete, the process can report the result by sending another message. We can imagine a simple, independent process, all it has to do is wait for the arrival of a message, carry out a task, and send a message saying it has finished. What could be simpler?
What is a message bus?
A message bus provides a means of message transmission in which the sender and receiver are not directly connected. For example, Ethernet is a message bus – all senders put all their messages on the bus and, locally at least, all receivers receive every message.
When messages are sent on a bus, there needs to be a way for the receiver(s) to select the messages they need to process. There are various ways to do this, but for the message bus implemented in this article we allow a sender to label a message with a sender role, a subject and a message type. These may appear to be quite arbitrary properties, but they fit in with the way in which the bus is used and provide a straightforward way for receivers to filter messages so that they process only those messages that are relevant.
This form of messaging is usually referred to as a Publish and Subscribe model.
How our bus will work
These are the essential characteristics of our message bus:
- The message bus operates within a single application, to send messages between independent worker threads.
- Any worker thread in the application can access the message bus.
- Any worker thread may send and receive messages using the bus.
- Messages are broadcast , so every receiver that is listening will get every message.
- The bus does not store messages so a receiver will not get any messages that were sent before it connects to the bus.
- The thread that sends a message is separated from the thread(s) that receive it, so sending and receiving are always asynchronous.
- A receiver can set a filter to select only relevant messages for delivery – subscribing to a subset of the messages sent on the bus.
- Worker processes that send and receive messages are not held up by other worker threads when they do so. We want our senders and receivers to be working at their tasks without having to wait for messages to be delivered and processed by other threads.
Classes of the message bus
These are the classes which make up the bus:
cBus
- The base class of the bus and all the other classes. This class is never instantiated directly, but holds class (Shared) variables and methods that provide some core functions of the bus.
cBusLink
- A component that provides the mechanism for delivering messages to receivers.
cThread
- A component that provides and controls a thread for use within the sender and receiver classes.
cSender
- The class that is used by senders to put messages into the Message bus. Each worker process that sends messages uses a
cSender
object. cReceiver
- The class that is used by worker processes to subscribe and take delivery of messages from the bus.
cFilter
- Class used to apply subscription filters to incoming messages within a
cReceiver
. cMessage
- Objects of this class are sent and received. In our system the message content is a string, but the class could be extended through inheritance to provide richer content.
cBus and cBusLink – the core of the message bus
cBus
is the base class for all the other classes in the implementation. cBus
is a virtual class – it is never itself instantiated. It contains only one class member, oBusLink
, a shared instance of cBusLink
. oBusLink
is protected, which means it is accessible only to derived (child) classes of cBus
.
cBus
and cBusLink
, which are central to the whole message bus, are very simple (see Listing 1).
Listing 1 – cBus and cBusLink classes
Public Class cBus '// /////////////////////////////////////// '// The BusLink class is used only as a means of '// propagating publication of a message from '// senders to receivers. Protected Class cBusLink '// Event published with new message Public Event NewMessage(ByVal oMessage As cMessage) '// Event published when bus is stopped Public Event StopBus() '// Flag to indicate that the bus has been '// stopped. Provides orderly shutdown Private bStopped As Boolean = False '// Method to publish a message Public Sub PublishMessage(ByVal oMessage As cMessage) If bStopped Then Exit Sub RaiseEvent NewMessage(oMessage) End Sub '// Method to stop the bus, for orderly shutdown Public Sub StopBusNow() bStopped = True RaiseEvent StopBus() End Sub End Class '// Global shared single instance of cBusLink '// used to send messages to all receivers Protected Shared oBusLink As New cBusLink '// Global shared flag indicating the bus has '// been stopped Protected Shared bStopped As Boolean = False '// /////////////////////////////////////// '// ID generator is used by other classes to '// generate unique sequence numbers Protected Class cIDGenerator Private _ID As Long = 0 Public Function NextID() As Long _ID += 1 Return _ID End Function End Class '// //////////////////////////////////// '// Public method to stop the bus before '// closedown. Ensures orderly closedown. Public Shared Sub StopBusNow() bstopped = True oBusLink.StopBusNow() End Sub End Class
The class cBusLink
is at the core of the message bus and is responsible for delivering messages to every recipient through the NewMessage
event. As we shall see later, every cReceiver
object holds a reference to a single shared cBusLink
object and they all subscribe to its NewMessage
event. When this event is fired, every cReceiver
object is given a reference to the new message.
cMessage
Objects of the cMessage
class carry the message data from sender to recipient. In our implementation, the class has only a single string payload, see Listing 2 – but you can implement sub-types of cMessage
with additional properties and methods for more sophisticated communication between senders and receivers.
Listing 2 – cMessage class
Public Class cMessage Inherits cBus '// ///////////////////////////////// '// This class is a container for allocating '// unique message ids to each mec Private Shared _oMsgID As New cIDGenerator '// Properties of the message, accessible to derived '// classes Protected _SenderRole As String = "" Protected _SenderRef As String = "" Protected _Subject As String = "" Protected _Type As String = "" Protected _Content As String = "" '// Message ID is private, it cannot be changed, '// even by derived classes Private _MsgID As Long '// ///////////////////////////// '// Default constructor used only for '// derived classes Protected Sub New() _MsgID = _oMsgID.NextID End Sub '// ///////////////////////////// '// Public constructor requires key message '// properties to be supplied. The message '// cannot be modified thereafter. Public Sub New(ByVal Sender As String, _ ByVal Subject As String, _ ByVal Type As String, _ Optional ByVal Content As String = "") _SenderRole = Sender _Subject = Subject _Type = Type _Content = Content _MsgID = _oMsgID.NextID End Sub '// ///////////////////////////////////////////////// '// Property accessors - all read-only so values '// cannot be changed by any recipient. Public ReadOnly Property SenderRole() As String Get Return _SenderRole End Get End Property Public ReadOnly Property Subject() As String Get Return _Subject End Get End Property Public ReadOnly Property Type() As String Get Return _Type End Get End Property Public ReadOnly Property MsgID() As Long Get Return _MsgID End Get End Property Public ReadOnly Property Content() As String Get Return _Content End Get End Property '// '//////////////////////////// End Class
This class implementation is mostly straightforward, but some aspects are worth looking at more closely:
- The class inherits
cBus
to gain access to the protected classcIDGenerator
which is declared in the base class. - All the variables that store property values, except for MsgID, are declared Protected so that they can be accessed within in a child class. MsgID is declared Private so its value cannot be changed by a child class.
cSender
cSender
and its counterpart cReceiver
do all the hard work. cSender
is the class used by a worker thread to add messages to the bus. Before we look under the hood, let’s examine the public members of the class that a sending process will use.
Using the cSender
class
First, a worker process that wants to send messages must instantiate an instance of cSender
, providing the sender’s role as a parameter. The role allows for the possibility that there might be multiple worker threads performing the same role within the application. A recipient can filter messages based on the role of the sender, but does not need to know that there is more than one sender acting in that role.
: Dim oSender as New cSender("clock") :
Once instantiated, the cSender
object can be used to send messages on the bus:
: Dim oMsg as New cMessage("time", "hourchange", "10>11") oSender.SendMessage oMsg :
In this case, the message has the type "time", the subject "hourchange" and the content "10>11".
Under the hood of the cSender
class
The cSender
implementation uses a queue to separate the sender process from the bus. When the worker thread sends a message it is written to the injector queue, from where it is picked up by a separate injector thread and published through the bus link:
The injector runs on a separate thread, so that placing a message on the bus does not hold up the worker process. The injector thread is provided by a cThread
object which runs only when messages are waiting in the injector queue. cThread
is described in more detail below.
The implementation of the cSender
class is shown in Listing 3.
Listing 3 – cSender Class
Public Class cSender Inherits cBus '// ////////////////////////////////////////// '// Queue of messages waiting to be injected '// into the message bus. Each sender has its '// own private injector queue Private _oMsgQ As New System.Collections.Generic.Queue(Of cMessage) '// ///////////////////////////////////////// '// Reference to the global BusLink instance, used '// only to pick up the BusStopped event published '// by the bus when stopped. Private WithEvents oMyBusLink As cBusLink '// ///////////////////////////////////////// '// Event to inform owner the bus has stopped Public Event Stopped() '// Sender role, used to identify the sender and '// provide the key for filtering messages '// at the receiver. Private _Role As String Public ReadOnly Property Role() As String Get Return _Role End Get End Property #Region "Construct and destruct" '// ////////////////////////////////////////// '// Constructor with role (mandatory) Public Sub New(ByVal sRole As String) _Role = sRole '// Set the reference to the buslink to the '// shared instance of the single buslink. We '// need this reference to pick up the stop event oMyBusLink = oBusLink End Sub '// ////////////////////////////////////////////// '// This method is called when the bus is closed down Private Sub oBusLink_StopBus() Handles oMyBusLink.StopBus SyncLock _oMsgQ RaiseEvent Stopped() End SyncLock End Sub #End Region #Region "Sending messages" '// ///////////////////////////////////////// '// Method used by worker thread to place a '// new default cMessage object on the injector '// queue. Public Function SendNewMessage(ByVal Type As String, _ ByVal Subj As String, _ Optional ByVal Ref As String = "", _ Optional ByVal Content As String = "") As cMessage If BusStopped Then Return Nothing Dim oM As New cMessage(_Role, Type, Subj, Ref, Content) SendMessage(oM) Return oM End Function '// ////////////////////////////////////////// '// Method used by worker thread to place message '// object on the injector queue. Public Sub SendMessage(ByVal pMessage As cMessage) If BusStopped Then Exit Sub '// We do not allow Nothing to be sent If pMessage Is Nothing Then '// Do nothing '// We could throw an error here Else SyncLock _oMsgQ _oMsgQ.Enqueue(pMessage) '// Start the thread only if '// one message on the queue. If _oMsgQ.Count = 1 Then _oInjectorThread.Start() End If End SyncLock End If End Sub '// //////////////////////////////////////// '// Holds up the caller thread until all the messages '// have been injected into the bus Public Sub Flush() Do Until _oMsgQ.Count = 0 Threading.Thread.Sleep(2) Loop End Sub #End Region #Region "Message Injector" '// ////////////////////////////////////////// '// Functions run by the thread for injecting messages '// into the bus. The thread runs only when at '// least one message is waiting in the injector queue. Private WithEvents _oInjectorThread As New cThread '// ////////////////////////////////////////// '// Injector Thread fires Run event to place '// messages on the queue Private Sub _oInjectorThread_Run() Handles _oInjectorThread.Run InjectMessagesNow() End Sub '// /////////////////////////////////////////// '// When the injector thread runs, this function '// is called to push all the queued messages into '// the bus. Private Sub InjectMessagesNow() Dim oM As cMessage '// Loop until all messages in the '// queue have been injected into the '// bus. Do '// Check if stopped flag was set while '// going round loop. If BusStopped Then Exit Sub '// Get the next message off the '// injector queue SyncLock _oMsgQ If _oMsgQ.Count > 0 Then oM = _oMsgQ.Dequeue() Else oM = Nothing End If '// Release the lock so that the worker '// process can add new messages to '// the queue while we are publishing '// this message on the bus End SyncLock If oM Is Nothing Then '// Queue is empty, so finish the '// loop Exit Do End If '// Now we have got the message, we can '// send it using the single global '// cBusLink which is instantiated in the '// base class cBus. SyncLock oBusLink oBusLink.PublishMessage(oM) End SyncLock Loop End Sub #End Region Protected Overrides Sub Finalize() '// Close down the injector thread _oInjectorThread.StopThread() MyBase.Finalize() End Sub End Class
The method SendMessage
is used by a worker process to place messages on the injector queue. The queue class is not threadsafe, so SyncLock
is used to protect the queue from simultaneous use by another thread. The injector thread is started only when a message is added to an empty queue, and this fires the event cThread.Run
.
The private method _oInjectorThread_Run
handles the injector thread Run
event. The method takes all the waiting messages from the injector queue, placing them in turn on the bus by using the BusLink’s PublishMessage
method. When the method exits, the thread is blocked in within cThread
until another message is placed on the empty queue. If a message is added to the injector queue while an earlier message is being sent on the bus, it will be included in the sending loop without needing the Run
event to fire again.
cReceiver
Objects of this class are used by worker processes to receive messages from the bus.
The process that creates the cReceiver
object can choose to set filters so that only relevant messages are delivered. More detail on filtering is given below.
When the receiver object connects to the bus, it sets its own private member variable _BusLinkRef
to refer to the shared member oBusLink
. _BusLinkRef
is declared WithEvents
so that the NewMessage
event of the cBusLink
can be handled.
The thread that owns the receiver can set a cFilter
object on the receiver. Then every message received through the NewMessage
event is checked against the filter and, if it passes, it is added to the receiver’s incoming message queue, waiting to be delivered. The filter can be changed during the run.
Messages are delivered and processed in one of three ways:
- The worker thread calls
GetNextMessage
to return the next message from the queue. If there are no messages waiting, the method returns Nothing. - The worker thread calls DeliverMessages to deliver all queued messages through the
MessageReceived
event. The events are raised on the worker thread. - The creator/owner calls the
StartAsync
method to request that the receiver object provides a separate worker thread to raise theMessageReceived
event, when new messages arrive. The event is raised on a thread provided by acThread
object withincReceiver
.
Using GetNextMessage
or DeliverMessages
means that the receiver worker thread must set up its own processing loop, for example by having its own timer to repeat the loop. This is appropriate when, for example, the thread needs to interact with the GUI – using a Timer component on a form could provide the thread.
In contrast, using StartAsync
means that the cReceiver
object will create its own internal worker thread that raises the MessageReceived
event.
Listing 4 – cReceiver class
Public Class cReceiver Inherits cBus '// ////////////////////////////////////// '// Id generator for all cReceiver objects Private Shared _oRecId As New cIDGenerator '// ////////////////////////////////////// '// Event used to deliver a message to the '// message handler function Public Event MessageReceived(ByVal oMessage As cMessage) '// ////////////////////////////////////// '// Event used to indicate the bus has stopped, '// used to ensure orderly shutdown of the bus Public Event Stopped() Public ReadOnly Property IsStopped() As Boolean Get Return BusStopped End Get End Property '// ////////////////////////////////////// '// Message queue holding the messages '// waiting to be delivered Private _MQueue As New System.Collections.Generic.Queue(Of cMessage) '// /////////////////////////////////////////// '// Filter set by the recipient to select '// messages. Fileter can be by specific role(s), '// subjects(s) or type(s) or using more specialised '// filters. Filters can be changed at any time. The '// default no filter allows all messages through. Public Filter As cFilter = Nothing '// ////////////////////////////////////////// '// Reference to the single global buslink '// so that the receiver can pick up published '// messages from the bus Private WithEvents _BusLinkRef As cBusLink '// Flag to indicate that this object has been '// finalised and is closing. Private _Closing As Boolean = False Private _RaiseStopEvent As Boolean = False '// ///////////////////////////////////////// '// Unique identifier of this receiver object Private _ID As Long '// ///////////////////////////////////////// '// Counts of number of messages received '// and delivered Private _BCount As Long = 0 ' Messages from the Bus Private _RCount As Long = 0 ' Messages received onto the queue Private _DCount As Long = 0 ' Messages delivered to the worker '// ////////////////////////////////// '// Constructor Public Sub New() _ID = _oRecId.NextID End Sub '// /////////////////////////////////// '// Establishes connection to the bus so that '// message delivery can start Public Sub Connect() '// ///////////////////////////////////////// '// Set the buslink variable to refer to the '// shared buslink so that it delivers '// messages through the event handler _BusLinkRef = oBusLink '// NOTE: oBus is a direct reference to '// the protected shared class member. End Sub '// //////////////////////////////////////// '// Breaks the connection with the bus '// so that messages are no longer '// received. Public Sub Disconnect() _BusLinkRef = Nothing End Sub '// ///////////////////////////////// '// Accessor methods for the readonly '// properties Public ReadOnly Property BCount() As Long '// Bus message count Get Return _BCount End Get End Property Public ReadOnly Property RCount() As Long '// Received message count Get Return _RCount End Get End Property Public ReadOnly Property DCount() As Long '// Delivered message count Get Return _DCount End Get End Property Public ReadOnly Property QCount() As Long '// Queued (waiting) message count Get If _MQueue IsNot Nothing Then Return _MQueue.Count Else Return 0 End If End Get End Property Public ReadOnly Property ID() As Long '// Unique ID number of this receiver Get Return _ID End Get End Property Public Function MessagesWaiting() As Boolean '// Helper property returns true if there '// are messages waiting Return QCount > 0 End Function #Region "Message arrival" '// ////////////////////////////////// '// This method handles the new message '// event from the bus. The message is '// queued for delivery. Private Sub oBusLink_NewMessage( _ ByVal oMessage As cMessage _ ) Handles _BusLinkRef.NewMessage '// Discard message if closing, or the bus has stopped If _Closing Then Exit Sub If BusStopped Then Exit Sub _BCount += 1 '// //////////////////////////// '// Check against the filter. '// The message must be included by the filter '// otherwise it will not be delivered. Select Case True Case Filter Is Nothing, Filter.bInclude(oMessage) '// /////////////////////////////// '// New message has passed the filter, so '// add it to the message queue waiting '// for delivery to the worker process. AddToQueue(oMessage) End Select End Sub '// //////////////////////////////// '// Method used to add messages '// to the message queue when they arrive '// from the message bus. Private Sub AddToQueue(ByVal oMessage As cMessage) '// //////////////////////////////////////////// '// Check if the queue exists - if not, then '// exit without adding a message. If _MQueue Is Nothing Then Exit Sub '// //////////////////////////////////////////// '// Check if closing or stopped, if so exit If BusStopped Then Exit Sub If _Closing Then Exit Sub Dim bStartDelivery As Boolean '// //////////////////////////////////////////// '// SyncLock the queue to guarantee exclusive '// access, then add the message SyncLock _MQueue _RCount += 1 _MQueue.Enqueue(oMessage) '// //////////////////////////////////////////////// '// We start the delivery thread if async AND '// this is the first message in the queue bStartDelivery = _AsyncMode And _MQueue.Count = 1 End SyncLock '// ////////////////////////////// '// Check if we need to start the delivery thread '// which we do only in async mode and if this is '// the first message in the queue If bStartDelivery Then _DeliveryThread.Start() End If End Sub #End Region #Region "Message delivery" '// //////////////////////////////// '// '// Message delivery can be made in these '// ways: '// * Asynchronously on a provided thread '// - call StartAsync to enable this '// - messages are delivered through MessageReceived event '// '// * By a call from the worker thread '// - use GetNextMessage to retrieve the message '// '// GetNextMessage returns the next '// message as the function result. '// It returns Nothing if '// there is no message in the queue '// '// //////////////////////////////// '// Delivery thread is used with asynch delivery only Private WithEvents _DeliveryThread As cThread = Nothing Private _AsyncMode As Boolean = False '//////////////////////////////////// '// Starts Asynchronous delivery through the NewMessage event. '// Called by the creator/owner to initiate a new thread delivering '// messages from this receiver. Public Sub StartAsync() '// Do nothing if closing, stopped or already in asyinc mode. If _Closing Then Exit Sub If BusStopped Then Exit Sub If _AsyncMode Then Exit Sub _AsyncMode = True '// Create and start the delivery thread. If _DeliveryThread Is Nothing Then _DeliveryThread = New cThread _DeliveryThread.Start() End Sub '// /////////////////////////////////////////////// '// Picks up the next message from the queue '// if any and returns it. Returns Nothing '// if there is no message. Public Function GetNextMessage() As cMessage '// Do not return anything if closing or stopped If _Closing Then Return Nothing If BusStopped Then Return Nothing Dim oM As cMessage '// Lock the queue and get the next message SyncLock _MQueue If _MQueue.Count > 0 Then oM = _MQueue.Dequeue _DCount += 1 Else oM = Nothing End If End SyncLock '// Return the message (if any) Return oM End Function '// /////////////////////////////////////////////// '// This event handler is called when the thread runs '// - only when messages are waiting to be delivered in '// async mode Private Sub _DeliveryThread_Run() Handles _DeliveryThread.Run DeliverWaitingMessages() End Sub '// /////////////////////////////////////////////// '// Delivers all the messages in the incoming '// message queue using the MessageReceived event Public Sub DeliverWaitingMessages() '// Raise the stop event if the bus has been stopped If BusStopped Then '// Inform the delivery thread If _RaiseStopEvent Then RaiseEvent Stopped() _RaiseStopEvent = False End If Exit Sub End If '// Do nothing if closing If _Closing Then Exit Sub '// The queue may be nothing , so simply '// exit and try again on the cycle If _MQueue Is Nothing Then Exit Sub Dim oM As cMessage '// Retrieve all the messages and deliver them '// using the message received event. Do '// Lock the queue before dequeuing the message SyncLock _MQueue If _MQueue.Count > 0 Then oM = _MQueue.Dequeue Else oM = Nothing End If End SyncLock '// /////// '// After releasing the lock we '// can deliver the message. If oM IsNot Nothing Then _DCount += 1 RaiseEvent MessageReceived(oM) End If '// If the queue was not empty then loop back for the '// next message Loop Until oM Is Nothing End Sub #End Region #Region "Stats Report" '//////////////////////////////////////////////// '// This sub simply publishes a message of '// stats about this receiver. Public Sub StatsReport() If BusStopped Then Exit Sub Dim sRpt As String sRpt = "Report from Receiver #" & Me.ID sRpt &= "|BUS=" & _BCount sRpt &= "|REC=" & _RCount sRpt &= "|DEL=" & _DCount sRpt &= "|Q=" & _MQueue.Count sRpt &= "|Closing=" & _Closing Dim s As New cSender("Receiver#" & ID) s.SendNewMessage("STATS", "STATS", sRpt) s.Flush() s = Nothing End Sub #End Region '// /////////////////////////////////// '// Handler for the stopbus event. Do '// not deliver any more messages once the '// bus has been stopped. Private Sub oBusLinkRef_StopBus() Handles _BusLinkRef.StopBus _Closing = True '_DeliveryTimer = Nothing _AsyncMode = False _RaiseStopEvent = True End Sub '// //////////////////////////////////// '// Finalise to tidy up resources when being disposed Protected Overrides Sub Finalize() _DeliveryThread.StopThread() _Closing = True _AsyncMode = False _MQueue = Nothing MyBase.Finalize() End Sub End Class
cThread
The cThread
class provides a thread and the control methods needed to block and release the thread as required.
By default, the thread is blocked. The class provides a method, Start
, which unblocks the thread. The thread immediately raises the Run
event to carry out the processing required, and then blocks again until the Start
method is called again, when it repeats the Run
event.
In our message bus, cThread
is used in cSender
to inject messages onto the bus, and in cReceiver
to deliver messages, when operating in Async mode. In both of these classes the Run
event handler picks messages off a queue until it is empty, then exits. It is quite likely that new messages are added to the queue while the handler is running, and these are picked up in the handler loop. Eventually, the queue is empty and if Start has not been called again, the thread blocks until it is.
The implementation of the class is shown in Listing 5.
Listing 5 – cThread class
Public Class cThread Inherits cBus Private WithEvents _BusLinkRef As cBusLink = oBusLink Private Shared iThreadCount As Long = 0 '// Event fired to execute the thread's '// assigned processes. Public Event Run() '// Thread object provides the thread Private _Thread As New Thread(AddressOf RunThread) '// Signal object to block the thread '// when there are no messages to be delivered Private _Signal As New EventWaitHandle(False, EventResetMode.AutoReset) '// Flag to indicate thread has been stopped Private bThreadStopped As Boolean = False '// Start the thread on creation of the object Public Sub New() _Thread.Start() End Sub '// Start called by owner to '// unblock this thread. Public Sub Start() If _Thread.ThreadState = ThreadState.Unstarted Then _Thread.Start() SyncLock Me _Signal.Set() End SyncLock End Sub '// Stop called by owner to close '// down thread Public Sub StopThread() bThreadStopped = True _Signal.Set() End Sub '// Method executed by the thread. This is '// a repeated loop until the bus is stopped Private Sub RunThread() Do '// The signal blocks the thread until '// it is released by the Start method _Signal.WaitOne() If bThreadStopped Then Exit Sub End If '// Raise the thread event that will '// do the work. RaiseEvent Run() Loop End Sub Private Sub _BusLinkRef_StopBus() Handles _BusLinkRef.StopBus StopThread() End Sub End Class
cFilter
cFilter
objects are used by cReceiver
to apply filtering to incoming messages. The base cFilter
class is declared Must Override so cannot be instantiated. It is only by defining a child class to apply some filtering logic that messages get filtered. This is how it works:
- The base class,
cFilter
defines a Protected Must Override methodbMatches
, which takes aMessage
object as a parameter. In a child class this method is overridden to implement specific filtering logic. cFilter
defines a Public method,bInclude
, which takes a message object as a parameter and returns true if the message is to be included, and false if not. This is the method used bycReceiver
to check if a message passes the filter. Apart from testing its ownbMatches
value, this method also contains the logic to check othercFilter
objects that have been attached in And / Or collections.- Four further methods,
And_
,And_Not
,Or_
andOr_Not
provide the means to add other filter objects to the And/Or collections of this filter.
Using the And_
and Or_
etc. methods makes it easy to build compound logical conditional tests using basic filter components. For example, if I have two filter objects FilterA and FilterB, they can be combined as FilterA.Or_(FilterB), or FilterA.And_(FilterB). It is also possible to combine several chains of filters. For example, FilterA.And_(FilterB.Or_Not(FilterC)) implements the filter condition A and (B or not C).
Actual filtering classes implemented
Various specialised classes of cFilter
are implemented to provide filtering on sender role, message type and subject. These include, for example, cRoleEquals
, cTypeEquals
and cSubjectEquals
. As their names suggest, these filters check that the key fields of the message match a given string.
A worker process that uses cReceiver
can apply filters to the incoming message simply by setting the Filter property of the receiver:
: Dim oReceiver as new cReceiver oReceiver.Filter = new cRoleEquals("monitor") :
Inside cFilter and its derived classes
The base cFilter
class defines the protected MustOverride method bMatches
. The derived classes override bMatches
, providing the appropriate code to determine the match. For example, in the case of the cSubjectContains
class, the overriding bMatches
method is:
: Public Overrides Function bMatches(ByVal oMessage As cMessage) As Boolean Return oMessage.Subject.Contains(FilterString) End Function :
If you need to have a more specialised filtering mechanism in your application, it is easy to define a derived class of cFilter
that implements whatever logic you need in bMatches.
Listing 6 – cFilter class and derived classes
'// The filter base class is used to implement '// message filtering on incoming messages '// at each receiver. Filters can be grouped in '// AND and OR groups - the message is '// included if it matches all filters in the '// AND group or any filter in the OR group. Public MustInherit Class cFilter Inherits cBus '// A collection of filters which this filter must AND '// with to allow the message through Private oAnds As New System.Collections.Generic.List(Of cFilter) '// A collection of filters which this filter must OR '// with to allow the message through Private oOrs As New System.Collections.Generic.List(Of cFilter) '// Check if the message is included by this filter Public Function bInclude(ByVal oMessage As cMessage) As Boolean Dim bResult As Boolean '// First, test this filter alone bResult = bMatches(oMessage) Dim oFF As cFilter '// If this filter matches, then check all the '// ANDs to see if they also match If bResult Then For Each oFF In oAnds bResult = oFF.bMatches(oMessage) '// As soon as we find the first failure to '// match we know the result is a non-match '// for this filter and all its ANDs If Not bResult Then Exit For Next End If '// If all the ANDS were true, then the whole result '// is true regardless of the OR result. If bResult Then Return True '// The ANDs did not match, so now '// we find if any one OR matches, and if so '// the result is true For Each oFF In oOrs bResult = oFF.bInclude(oMessage) If bResult Then Return True Next oFF '// No match on any of the ORS, so '// the message does not match this filter Return False End Function '// /////////////////////////////////// '// This method must be overridden in child '// classes to implement the matching test. Protected MustOverride Function bMatches( _ ByVal omessage As cMessage) As Boolean '// /////////////////////////////////// '// These methods add a given filter to the '// ANDs or ORs collections to build filtering '// logic. Public Function And_(ByVal oFilter As cFilter) As cFilter oAnds.Add(oFilter) Return Me End Function Public Function Or_(ByVal ofilter As cFilter) As cFilter oOrs.Add(ofilter) Return Me End Function Public Function Or_Not(ByVal ofilter As cFilter) As cFilter oOrs.Add(Not_(ofilter)) Return Me End Function Public Function And_Not(ByVal oFilter As cFilter) As cFilter oAnds.Add(Not_(oFilter)) Return Me End Function '// '// /////////////////////////////////////// '// /////////////////////////////////////// '// Class and function to provide negation '// of a filter condition Private Class cNot Inherits cFilter Private oNotFilter As cFilter Public Sub New(ByVal oFilter As cFilter) oNotFilter = oFilter End Sub Protected Overrides Function bMatches(ByVal omessage As cMessage) As Boolean Return Not oNotFilter.bMatches(omessage) End Function End Class Private Function Not_(ByVal oFilter As cFilter) As cFilter Return New cNot(oFilter) End Function '// '// ///////////////////////////////////////////// End Class #Region "Filter implementations" '// ///////////////////////////////////////// '// Derived specialised classes for implementing '// different specific filters. Public Class cTypeContains Inherits cFilter Public FilterString As String Public Sub New(ByVal sFilter As String) FilterString = sFilter End Sub Protected Overrides Function bMatches( _ ByVal oMessage As cMessage) As Boolean Return oMessage.Type.Contains(FilterString) End Function End Class Public Class cTypeEquals Inherits cFilter Public FilterString As String Public Sub New(ByVal sFilter As String) FilterString = sFilter End Sub Protected Overrides Function bMatches( _ ByVal oMessage As cMessage) As Boolean Return oMessage.Type = FilterString End Function End Class Public Class cRoleContains Inherits cFilter Public FilterString As String Public Sub New(ByVal sFilter As String) FilterString = sFilter End Sub Protected Overrides Function bMatches( _ ByVal oMessage As cMessage) As Boolean Return oMessage.SenderRole.Contains(FilterString) End Function End Class Public Class cRoleEquals Inherits cFilter Public FilterString As String Public Sub New(ByVal sFilter As String) FilterString = sFilter End Sub Protected Overrides Function bMatches( _ ByVal oMessage As cMessage) As Boolean Return oMessage.SenderRole = FilterString End Function End Class Public Class cSubjectContains Inherits cFilter Public FilterString As String Public Sub New(ByVal sFilter As String) FilterString = sFilter End Sub Protected Overrides Function bMatches( _ ByVal oMessage As cMessage) As Boolean Return oMessage.Subject.Contains(FilterString) End Function End Class Public Class cSubjectEquals Inherits cFilter Public FilterString As String Public Sub New(ByVal sFilter As String) FilterString = sFilter End Sub Protected Overrides Function bMatches( _ ByVal oMessage As cMessage) As Boolean Return oMessage.Subject = FilterString End Function End Class Public Class cRoleTypeSubjectFilter Inherits cFilter Public sRole As String = "" Public sType As String = "" Public sSubject As String = "" Protected Overrides Function bMatches( _ ByVal oMessage As cMessage) As Boolean Return oMessage.Type = sType _ And oMessage.SenderRole = sRole _ And oMessage.Subject = sSubject End Function End Class '// '/////////////////////////////////////////////// #End Region
A demo application
The demo application included in the zip file is a simple windows forms application that includes a number of components that communicate with each other via the MessageBus:
- The main control form provides buttons for opening the other form types
- A mouse tracker form, that monitors mouse movements over the form and sends mouse movement messages on the bus
- A clock object that sends a time message whenever the time ticks past a tenth of a second, a second, a minute or an hour.
- A mouse follower form, that monitors mouse movement messages from the bus and positions a red box on the form at the position indicated by the message. This form also receives clock events from the bus and displays the time, as sent out by the clock object.
- A message sender form, which can generate bus messages of different types at a frequency set by the user
- A message receiver form, that lists messages received, optionally filtered on attributes set by the user
The user can open as many sender forms, receiver forms and mouse follower forms as they wish, and can set the message types to be sent and received. Each of the forms operates independently of the others.
Comments