Observer pattern applied at remote level

Every experienced swing developer faced the “referential data handling” issue.
What’s the big deal in handling referential data ?
Well, acquiring them is sure not an issue.
Referential data, in the way software understands it (elsewhere it is just some data that represent the reference be it very dynamic or not) vary rarely. That characteristic makes them ideal candidates to caching. The most trivial (thus most used) caching mechanism is “loading at startup”. This solution is simple yet very incomplete because “vary rarely” doesn’t mean “never vary”. We have to get notified of changes (new referential data, updated ones and deleted ones).

To build a more elaborate solution we can :
– either often ask the server and get the information
– or wait for server notification.

The first solution is simple but results in unnecessary traffic thus won’t scale.

The second listens to changes and react consequently … Reminds me of a pattern (maybe this one).
How do we remotely listen to changes ? The key is brokers. The JEE implementation of broker architecture is JMS.
One have to understand brokers architectures before implementing anything related to enterprise messaging and routing.
Brokers are infrastructure components that handle 2 conversation topologies :
– queues : messages posted to queues are consumed by exactly one consumer (the first one to read them)
– topic : messages posted to topics are consumed by all subscribers
Simple yet precise definitions here

A server has many clients and all the clients has to get notified of referential changes so we’ll go for topics.

The broker is started within spring configuration :

<amq:broker useJmx='true'>

<amq:persistenceFactory>
<amq:amqPersistenceAdapterFactory dataDirectory="${basedir}/target" />
</amq:persistenceFactory>

<amq:transportConnectors>
<amq:transportConnector uri="${activemq.broker.url}" />
</amq:transportConnectors>

</amq:broker>

The server-side holds a publisher component which publishes to a destination : the topic.
Here is the topic configuration

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${activemq.broker.url}" />
</bean>

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="pubSubDomain" value="true" />
<property name="defaultDestinationName" value="${activemq.referential.changes.topic}" />
</bean>

And the publisher’s code


/**
 * @author louis.gueye@gmail.com
 * 
 */
@Component(ReferentialChangesPublisher.BEAN_ID)
public class ReferentialChangesPublisherImpl implements
		ReferentialChangesPublisher {

	@Autowired
	private JmsTemplate jmsTemplate;

	/**
	 * @see org.diveintojee.poc.remote.observer.pattern.domain.services.ReferentialChangesPublisher#addEntity(org.diveintojee.poc.remote.observer.pattern.domain.Entity)
	 */
	@Override
	public void addEntity(final Entity entity) {

		final ReferentialChangesMessage message = buildCreateMessage(entity);

		jmsTemplate.send(new MessageCreator() {

			@Override
			public Message createMessage(final Session session)
					throws JMSException {

				ObjectMessage msg = session.createObjectMessage();
				msg.setObject(message);
				return msg;

			}

		});

	}

	/**
	 * @param persistable
	 * @return
	 */
	protected ReferentialChangesMessage buildCreateMessage(
			final PersistableEntity persistable) {
		return new ReferentialChangesMessage(DataModificationOperation.CREATE,
				persistable.getId(), persistable.getClass());
	}

}

This publisher is invoked by some server-side code. In the example, it is invoked when adding a new entity

On client side we subscribe to the topic and listen to events.

Here is the configuration of the listener

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${activemq.broker.url}" />
</bean>

<bean id="destination" class="org.apache.activemq.command.ActiveMQTopic">
<property name="physicalName"
value="${activemq.referential.changes.topic}?consumer.retroactive=true&amp;consumer.prefetchSize=10" />
</bean>

<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="destination" />
<property name="pubSubDomain" value="true" />
<property name="messageListener" ref="referentialChangesSubscriber" />
<property name="recoveryInterval" value="60000" />
</bean>

And the code of the subscriber :

/**
 * @author louis.gueye@gmail.com
 * 
 */
@Component(value = ReferentialChangesSubscriber.BEAN_ID)
public class ReferentialChangesSubscriberImpl implements
		ReferentialChangesSubscriber {

	@Autowired
	@Qualifier(RootWindowController.BEAN_ID)
	RootWindowController rootWindowController;

	private List<ReferentialChangesMessage> messages;

	/**
	 * @see org.diveintojee.poc.remote.observer.pattern.domain.services.ReferentialChangesSubscriber#getLastMessage()
	 */
	@Override
	public ReferentialChangesMessage getLastMessage() {

		if (CollectionUtils.isEmpty(getMessages()))
			return null;

		return getMessages().get(getMessages().size() - 1);

	}

	/**
	 * @return
	 */
	private List<ReferentialChangesMessage> getMessages() {
		return messages;
	}

	/**
	 * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
	 */
	@Override
	public void onMessage(final Message message) {

		if (getMessages() == null)
			setMessages(new ArrayList<ReferentialChangesMessage>());

		try {

			getMessages().add(
					(ReferentialChangesMessage) ((ObjectMessage) message)
							.getObject());

			rootWindowController.updateStatusBarText(getLastMessage());

		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	/**
	 * @param messages
	 */
	private void setMessages(final List<ReferentialChangesMessage> messages) {
		this.messages = messages;
	}

}

The test was a bit hard to write. I did not write it exactly the way I imagined it. I got a

javax.management.InstanceAlreadyExistsException

mainly due to issues in stopping activemq.
The process doesn’t destroy neatly, it hangs some time, creating side effects when another test tries to launch a broker.
So I added the broker related tests to an existing one which is a bad design because it ties a tests one to another which creates side-effects. The test isolation is broken but I did not find another way.

// Omitted for brevity
	@Autowired
	@Qualifier(ReferentialService.BEAN_ID)
	private ReferentialService referentialService;

	// Subscriber
	@Autowired
	@Qualifier(ReferentialChangesSubscriber.BEAN_ID)
	private ReferentialChangesSubscriber referentialChangesSubscriber;

	@Test
	public final void addEntityWillNotifySubscribers() {
		Entity entity = new Entity();
		String label = "test entity";
		entity.setLabel(label);
		Long id = referentialService.addEntity(entity);

		ReferentialChangesMessage referentialChangesMessage = referentialChangesSubscriber
				.getLastMessage();
		Assert.assertNotNull(referentialChangesMessage);
		Assert.assertEquals(Entity.class,
				referentialChangesMessage.getEntityClass());
		Assert.assertEquals(DataModificationOperation.CREATE,
				referentialChangesMessage.getOperation());
		Assert.assertEquals(id, referentialChangesMessage.getId());

	}

To go further :
– handle broker communication errors
– externalize strings
– add slf4j
– implement a full referential

The source code is available on my Github repository here
Feel free to comment and/or share.

Louis.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s