Christian Posta

About Christian Posta

Christian is a Principal Consultant at FuseSource specializing in developing enterprise software applications with an emphasis on software integration and messaging.

ActiveMQ Network Connectors

This post is more for me and any ActiveMQ contributors that may be interested in how the Network Connectors work for ActiveMQ. I recently spent some time looking at the code and thought that it would be good to draw up some quick diagrams to help me remember what I learned and help to identify where to debug in the future if there are issues I am researching. If I make a mistake and you’d like to add clarification, please do so in the comments.
First, you set up your network connectors by configuring them in the ActiveMQ configuration file. This configuration gets mapped to the corresponding ActiveMQ beans using the xbean library for which I have a separate blog post which explains exactly how this is done. To specify network connectors, you add the <networkConnectors/> element to your configuration file and add a <networkConnector/>, <multicastNetworkConnector/>, or <ldapNetworkConnector/>. These three different types of network connectors can be used to establish a network of brokers with <networkConnector/> being most common. Here’s how the three map to Java classes:
<networkConnector/> maps to org.apache.activemq.network.DiscoveryNetworkConnector <multicastNetworkConnector/> maps to org.apache.activemq.network.MulticastNetworkConnector
<ldapNetworkConnector/> maps to org.apache.activemq.network.LdapNetworkConnector
Each of those inherit from the org.apache.activemq.network.NetworkConnector super type as depicted in this diagram: 
So when you have a configuration like this:
<networkConnector uri="static://(tcp://localhost:61626,tcp://localhost:61627)" />
a new DiscoverNetworkConnector will be configured, instantiated, and added as a connector to the BrokerService (which is the main class for where a lot of the ActiveMQ broker details is handled). While the DiscoverNetworkConnector is being assembled from the configuration, the URI that you specify is used to create a DiscoveryAgent. The discover agent is in charge of assembling the connection and handling failover events that are packaged as DiscoverEvents. Determining which DiscoverAgent is picked depends on the DiscoverAgentFactory and the URI specified. In the case of “static”, the SimpleDiscoverAgent is used. Each URI in the list of possible URIs are treated differently and are assigned their own Transport (more on this in a sec). Which means, for each URI you list, a new socket will be established and the broker will attempt to establish a network connector over each of the sockets. You may be wondering how to best implement failover then? In the case described above, you will have multiple connections, and if one of those connections is to a slave that isn’t listening, you will see that the connection fails and the discover agent tries to establish the connection again. This could go on infinitely which consumes resources. Another approach is to use just one URI for the static discover agent that utilizes the failover() logic:
<networkConnector uri="static:failover:(tcp://localhost:61626,tcp://localhost:61627)" />
In this case, only one transport will be created, and the failover logic will wrap it and know about both URIs. If one is not available, it won’t keep retrying needlessly. Instead it will connect to whichever one it can and only reconnect to the failover URL if the current connection goes down. Note this approach had a bug in it before ActiveMQ version 5.5.1.-fuse-00-06.
The discover agent is in charge of creating the bridge, but it delegates that responsibility to a DiscoverListener. In the example from above, the DiscoverListener interface is implemented by the DiscoverNetworkConnector.onServiceAdd() method.
To establish the bridge, a transport is opened up for both the local broker (using VM) and the remote broker (using the specified protocol, in this case TCP). Once the local and remote transports are created, the bridge can be assembled in the DiscoverNetworkConnector.createBridge(…) method. This method uses the Factory pattern again to find which bridge to use.
The possible bridge implementations are shown below:
By default, with conduitSubscriptions=true, the DurableConduitBridge is used. Conduit subscriptions establish a single flow of messages to a remote broker to reduce duplicates that can happen when remote topics have multiple consumers. This works great by default, but if you want to load balance your messages across all consumers, then you will want to set conduit subscriptions to false (see the documentation for conduit subscriptions at FuseSource‘s documentation on Fuse Message Broker). When set to false, the DemandForwardingBridge is used. Once the bridge is assembled, it is configured in the NetworkConnector.configureBridge(…) method.
Once everything is assembled and configured on the bridge, it’s then started. Once it’s started, it begins sending broker Command objects to the remote broker to identify itself, set up a session, and demand consumer info from it. This is in the DemandForwardingBridgeSupport.startRemoteBridge() super class method as seen from the diagram.
If you’re debugging errors with the network connectors, hopefully this helps identify possible locations for where errors can take place.
Related Whitepaper:

Functional Programming in Java: Harnessing the Power of Java 8 Lambda Expressions

Get ready to program in a whole new way!

Functional Programming in Java will help you quickly get on top of the new, essential Java 8 language features and the functional style that will change and improve your code. This short, targeted book will help you make the paradigm shift from the old imperative way to a less error-prone, more elegant, and concise coding style that’s also a breeze to parallelize. You’ll explore the syntax and semantics of lambda expressions, method and constructor references, and functional interfaces. You’ll design and write applications better using the new standards in Java 8 and the JDK.

Get it Now!  

Leave a Reply


8 + = twelve



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.

Sign up for our Newsletter

15,153 insiders are already enjoying weekly updates and complimentary whitepapers! Join them now to gain exclusive access to the latest news in the Java world, as well as insights about Android, Scala, Groovy and other related technologies.

As an extra bonus, by joining you will get our brand new e-books, published by Java Code Geeks and their JCG partners for your reading pleasure! Enter your info and stay on top of things,

  • Fresh trends
  • Cases and examples
  • Research and insights
  • Two complimentary e-books