Enterprise Java

ActiveMQ Network Connectors

This post is more for me and any ActiveMQ contributors that may be interested in how the Network Connectors work for ActiveMQ. I recently spent some time looking at the code and thought that it would be good to draw up some quick diagrams to help me remember what I learned and help to identify where to debug in the future if there are issues I am researching. If I make a mistake and you’d like to add clarification, please do so in the comments.
First, you set up your network connectors by configuring them in the ActiveMQ configuration file. This configuration gets mapped to the corresponding ActiveMQ beans using the xbean library for which I have a separate blog post which explains exactly how this is done. To specify network connectors, you add the <networkConnectors/> element to your configuration file and add a <networkConnector/>, <multicastNetworkConnector/>, or <ldapNetworkConnector/>. These three different types of network connectors can be used to establish a network of brokers with <networkConnector/> being most common. Here’s how the three map to Java classes:
<networkConnector/> maps to org.apache.activemq.network.DiscoveryNetworkConnector <multicastNetworkConnector/> maps to org.apache.activemq.network.MulticastNetworkConnector
<ldapNetworkConnector/> maps to org.apache.activemq.network.LdapNetworkConnector
Each of those inherit from the org.apache.activemq.network.NetworkConnector super type as depicted in this diagram: 
So when you have a configuration like this:
<networkConnector uri="static://(tcp://localhost:61626,tcp://localhost:61627)" />
a new DiscoverNetworkConnector will be configured, instantiated, and added as a connector to the BrokerService (which is the main class for where a lot of the ActiveMQ broker details is handled). While the DiscoverNetworkConnector is being assembled from the configuration, the URI that you specify is used to create a DiscoveryAgent. The discover agent is in charge of assembling the connection and handling failover events that are packaged as DiscoverEvents. Determining which DiscoverAgent is picked depends on the DiscoverAgentFactory and the URI specified. In the case of “static”, the SimpleDiscoverAgent is used. Each URI in the list of possible URIs are treated differently and are assigned their own Transport (more on this in a sec). Which means, for each URI you list, a new socket will be established and the broker will attempt to establish a network connector over each of the sockets. You may be wondering how to best implement failover then? In the case described above, you will have multiple connections, and if one of those connections is to a slave that isn’t listening, you will see that the connection fails and the discover agent tries to establish the connection again. This could go on infinitely which consumes resources. Another approach is to use just one URI for the static discover agent that utilizes the failover() logic:
<networkConnector uri="static:failover:(tcp://localhost:61626,tcp://localhost:61627)" />
In this case, only one transport will be created, and the failover logic will wrap it and know about both URIs. If one is not available, it won’t keep retrying needlessly. Instead it will connect to whichever one it can and only reconnect to the failover URL if the current connection goes down. Note this approach had a bug in it before ActiveMQ version 5.5.1.-fuse-00-06.
The discover agent is in charge of creating the bridge, but it delegates that responsibility to a DiscoverListener. In the example from above, the DiscoverListener interface is implemented by the DiscoverNetworkConnector.onServiceAdd() method.
To establish the bridge, a transport is opened up for both the local broker (using VM) and the remote broker (using the specified protocol, in this case TCP). Once the local and remote transports are created, the bridge can be assembled in the DiscoverNetworkConnector.createBridge(…) method. This method uses the Factory pattern again to find which bridge to use.
The possible bridge implementations are shown below:
By default, with conduitSubscriptions=true, the DurableConduitBridge is used. Conduit subscriptions establish a single flow of messages to a remote broker to reduce duplicates that can happen when remote topics have multiple consumers. This works great by default, but if you want to load balance your messages across all consumers, then you will want to set conduit subscriptions to false (see the documentation for conduit subscriptions at FuseSource‘s documentation on Fuse Message Broker). When set to false, the DemandForwardingBridge is used. Once the bridge is assembled, it is configured in the NetworkConnector.configureBridge(…) method.
Once everything is assembled and configured on the bridge, it’s then started. Once it’s started, it begins sending broker Command objects to the remote broker to identify itself, set up a session, and demand consumer info from it. This is in the DemandForwardingBridgeSupport.startRemoteBridge() super class method as seen from the diagram.
If you’re debugging errors with the network connectors, hopefully this helps identify possible locations for where errors can take place.

Christian Posta

Christian is a Principal Consultant at FuseSource specializing in developing enterprise software applications with an emphasis on software integration and messaging. His strengths include helping clients build software using industry best practices, Test Driven Design, ActiveMQ, Apache Camel, ServiceMix, Spring Framework, and most importantly, modeling complex domains so that they can be realized in software. He works primarily using Java and its many frameworks, but his favorite programming language is Python. He's in the midst of learning Scala and hopes to contribute to the Apache Apollo project.
Notify of

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Inline Feedbacks
View all comments
Back to top button