Saturday, October 15, 2016

Spring MVC like processing of AMQP messages

Requirement


At K2 when we were trying to build a micro-services based solution, we decided to use AMQP messages instead of REST over HTTP (reasons out of scope of this post). Obviously to make it developer friendly it was most important to have mechanism in place to intercept and process messages in most non-intrusive, transparent and a developer familiar way.

What we wanted to do was to process AMQP messages in a very similar way the Http Requests are processed using spring-webmvc annotations such as @RequestMapping, @RequestParam etc. But, instead of the Http Request my source object will be an AMQP message. The AMQP message request will have two headers, for example -
method="POST"
url="/api/myobjects/{someParam}"
and the payload may optionally contain data in JSON format. If you have noticed, this is nothing but HTTP REST api mapped to AMQP message. We wanted to be able to write a Spring Controller like handler, for example -
@Controller
pubic class MyObjectHandler {

  @RequestMapping(value="/api/myobjects/{someParam}", method="POST")
  public MyObject createMyObject(@Payload MyObject myObj, @PathParam String someParam) {
    //... some processing
    return myObj;
  }
  // ...more handlers
}

The Alternatives

We had looked at spring-amqp/rabbitmq annotations and also spring integration annotations. They are close to what we wanted, but would not allow routing to handler methods based on header parameters, especially the REST url. The spring integration would also lead to extremely complex xmls.
We posted a question on stackoverflow to seek some expert advise. The quick response from Gary Russell (Lead developer - Spring Integration and Spring AMQP) acted as a starting point to the final solution we built.

End Solution's overview from its User's perspective

Before we get to the actual solution, let us see how an end message handler would look like to a developer who uses this, this will give you a clear idea of what we are trying to do. Lets call the handlers a MessageProcessor (equivalent to Spring Controller). CustomerService is an example MessageProcessor that will have handler methods that will be invoked based on the url and method headers in the incoming AMQP message.

@Service
@MessageProcessor
public class CustomerService {
  // ...
  @MessageMapping(value = "/api/customer/{customerId}", method = MessageMethod.GET)
  public Customer getCustomer(@UrlPathParam int customerId) {
    return repo.findOne(customerId);
  }
       
  @MessageMapping(value = "/api/customer/{customerId}/credentials", method = MessageMethod.POST)
  public Customer addCustomer(@Header String status, @Headers Map headers, Customer customer) {
    //... some custome headers processing
         return repo.save(customer);
  }
       
  @MessageMapping(value = "/api/customer/customprocess", method = MessageMethod.POST)
  public Customer processMyMessage(@Message message) {
    //... process the raw message in some custom way
    return customer;
  }
}
Note: Not all URLs in above example are REST compliant, they just clarify purpose and use of the MessageProcessor.

Input Headers For Routing


As you see in example above, Handlers can be mapped to the incoming messages based on two headers in the message. These headers are mandatory in incoming message.
  • url : REST style url which can also have path parameters. e.g. /api/tenant/{tenantId}
  • method : HTTP method. Supported types: GET, POST, PUT, DELETE

Supported Annotations

  • @MessageProcessor A custom defined annotation. The message handlers must be annotated with @MessageProcessor along with any other spring bean annotation such as @Bean, @Service, @Controller etc
  • @MessageMapping A custom defined annotation. Maps a method to incoming message based on two above mentioned headers - url and method. These two headers are matched with value and method attributes respectively as shown in example code above

Supported Handler Method Parameters

  • @UrlPathParameter A custom defined annotation. Method arguments annotated with this annotation will be resolved against the variables in url path
  • @Header Method arguments annotated with this annotation will be resolved against incoming message headers
  • @Headers annotated argument that must also be assignable to java.util.Map for getting access to all headers.
  • Parameter of type Message will be automatically resolved to the received Message without need of any annotation
  • Parameter of type MessageResponse (A custom defined class) will be automatically resolved to an instance of MessageResponse supplied by the framework. This can be used by the handlers to add custom headers in the response if required. The default success headers (i.e. statusCode=200, status="OK", message="OK") can also be overridden by adding them in headers in MessageResponse.
  • A non-annotated argument that is not of the type Message is considered to be the payload. You can make that explicit by annotating the parameter with @Payload. If the incoming message has a header 'contentType' with value as 'application/json', then the payload considered to be json string and is transformed to the argument object

Response Processing

The response is sent to the queue as defined by the 'amqp_replyTo' header in the incoming message
  • If the return type of message handler method is void, no processing will be done.
  • If the return type is of type java.lang.String or primitive type (e.g. int, float) or primitive wrapper (e.g. Integer, Float) it will be returned as response message payload as is.
  • In all other cases, the response object will be converted to its json representation and the json string will be sent as a part of response message.

Response Status and Error Handling

The response statuses are similar to as that of HTTP status codes. See StatusCode
  • If the handler handles the incoming message with no errors/exceptions the response message will be added with default success headers. i.e. statusCode=200, status="OK", message="OK". Handlers can override theses statuses by adding them to MessageResponse as mentioned above
  • If the handler throws a ServiceException the response status headers will be added based on those specified in the ServiceException thrown. The message payload can also be added in such cases by specifying it via ServiceException.setPayload()
  • If the handler throws any other exception than ServiceException, then the response message will have status headers as statusCode=500, status="INTERNAL_SERVER_ERROR", message=< String message as received in the exception > The payload will be a map containing error data.

The Solution


The solution re-uses many spring classes apart from some custom implementations. Below I will be showing vital components of the solution. 


Important Classes


MessageHandler

The most important class is the MessageHandler. MessageHandler itself is a spring bean that on initialization scans through all classes and finds out @MessageProcessors and builds metadata of all the methods that are annotated with @MessageMapping.

This class also is responsible for receiving AMQP messages using @RabbitListener. Once it receives an AMQP messages, it looks up the metadata for an appropriate @MessageMapping method of a @MessageProcessor and invokes it. Of course you must have appropriate RabbitConnectionConfiguration in your spring context, skipping that for brevity.

The MessageHandler class looks as below :
/*Import statements moved at the end of this code snippet */

@Component
public class MessageHandler {
  
  public static final String RESP_HEADER_KEY_STATUS_CODE = "statusCode";
  public static final String RESP_HEADER_KEY_STATUS = "status";
  public static final String RESP_HEADER_KEY_MESSAGE = "message";
  
  public static final String HEADER_KEY_METHOD = "method";
  public static final String HEADER_KEY_URL = "url";

  @Autowired
  private ApplicationContext context;
  
  private Set<String> supportedMethods;
  
  /**
   * Map of all discovered handlers. It is maintained as a Map of Map. The outer map has the discovered 
   * url as key. The inner Map contains discovered handler methods against the HTTP method (GET/POST/PUT/DELETE)
   */
  private Map<String/*url*/, Map<MessageMethod, HandlerMethod>> handlers = new HashMap<>();
  
  private PathMatcher pathMatcher = new AntPathMatcher();
  private ObjectMapper objectMapper = new ObjectMapper(); //For json conversions
  private HandlerMethodArgumentResolverComposite resolvers = new HandlerMethodArgumentResolverComposite();
  private ConversionService conversionService = new DefaultFormattingConversionService();
  
  private MessageConverter messageConverter;
  
  private static Logger logger = LoggerFactory.getLogger(MessageHandler2.class);
  
  @PostConstruct
  public void init() {
    logger.info("Initializing MessageHandler");
    initConverters();
    initArgumentResolvers();
    
    for (String beanName : BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context, Object.class)) {
      
      //For every bean annotated with @MessageProcessor, detect the handler methods
      if (context.findAnnotationOnBean(beanName, MessageProcessor.class) != null) {
        logger.info("Found message processor: {}", beanName);
        detectHandlerMethods(beanName);
      }
    }
    supportedMethods = Arrays.asList(MessageMethod.values()).stream().map(method -> method.name()).collect(Collectors.toSet());
    logger.info("Mapped handler methods : {}", handlers);
    logger.info("Ready to process incoming messages");
  }
  
  /**
   * Processes incoming message by finding and invoking appropriate message handler.
   * @param message incoming message. Required headers are <tt>url</tt> and <tt>method</tt>
   * @return message processing result object
   * @throws Exception if any error processing message
   */
  @SuppressWarnings("rawtypes")
  @SendTo
  @RabbitListener(bindings = 
    {@QueueBinding(
      value = @Queue(value = "your.queue.here", durable = "true", autoDelete = "false"),
      exchange = @Exchange(value = "your.exchange.here", durable = "true", autoDelete = "false"),
      key = "your.key.here"), 
    }
  )
  public Message handleMessage(Message<?> message) throws Exception {
    if (logger.isDebugEnabled()) {
      logger.debug("Received message: {}", message);
      message.getHeaders().forEach((k, v) -> logger.debug("Header: "+ k + "="+ v));
    }
    
    String url = (String)message.getHeaders().get(HEADER_KEY_URL);
    String strMethod = (String)message.getHeaders().get(HEADER_KEY_METHOD);
    if (url == null || strMethod == null) {
      String errMsg = "Required headers '" + HEADER_KEY_URL +"' and/or '" + HEADER_KEY_METHOD + "' not found in incoming message";
      return generateErrorResponse(StatusCode.BAD_REQUEST, StatusCode.BAD_REQUEST.name(), errMsg, 
          getErrorMap(new ServiceException(errMsg), null), message);
    }
    
    int indexOfQuestionMark = url.indexOf("?"); //Ignore the '?' and query string further if present. Not needed for matching.
    if (indexOfQuestionMark != -1) {
      url = url.substring(0, indexOfQuestionMark);
    }
    if (!supportedMethods.contains(strMethod)) {
      String errMsg = "Unsupported Method : " + strMethod;
      return generateErrorResponse(StatusCode.BAD_REQUEST, StatusCode.BAD_REQUEST.name(), errMsg, 
          getErrorMap(new ServiceException(errMsg), null), message);
    }
    
    MessageMethod method = MessageMethod.valueOf(strMethod);
    logger.debug("Finding handler for method={} and url={}", method, url);
    HandlerInvocationDetails handlerDetails = lookupMatchingHandler(url, method);
    
    if (handlerDetails == null || handlerDetails.handlerMethod == null) {
      String errMsg = "No handler found for url: " + url + " & method: " + strMethod;
      return generateErrorResponse(StatusCode.NOT_FOUND, StatusCode.NOT_FOUND.name(), errMsg, 
          getErrorMap(new ServiceException(errMsg), null), message);
    }
    MessageWrapper<?> messageWrapper = new MessageWrapper<>(message);
    
    if (handlerDetails.uriTemplateVariables != null) { //Put the resolved URL template variables
      messageWrapper.getData().put(UrlPathVariableMethodArgumentResolver.URI_TEMPLATE_VARIABLES_KEY, handlerDetails.uriTemplateVariables);
    }
    
    logger.debug("Found Handler: {}", handlerDetails.handlerMethod);

    InvocableHandlerMethod invocable = new InvocableHandlerMethod(handlerDetails.handlerMethod);
    invocable.setMessageMethodArgumentResolvers(resolvers);
    
    try {
      Object returnValue = invocable.invoke(messageWrapper);
      MethodParameter returnType = handlerDetails.handlerMethod.getReturnType();
      
      Message returnMsg = processReturnValue(returnValue, returnType, messageWrapper);
      logger.debug("Return value type={} val={}", (returnValue == null ? null :  returnValue.getClass().getName()), returnValue);
      return returnMsg;
    } catch (ServiceException ex) {
      logger.error("Error while processing message {}", messageWrapper, ex);
      return generateErrorResponse(ex.getStatusCode(), ex.getStatus(), ex.getMessage(), getErrorMap(ex, ex.getPayload()), message);
    } catch (Throwable th) { //Any other Exception than ServiceException
      logger.error("Error while processing message {}", messageWrapper, th);
      return generateErrorResponse(StatusCode.INTERNAL_SERVER_ERROR, StatusCode.INTERNAL_SERVER_ERROR.name(), th.getMessage(), getErrorMap(th, null), message);
    }

  }

  /**
   * Initialize all the method argument resolvers. Resolvers map the handler method arguments to correct values from 
   * incoming Message
   */
  private void initArgumentResolvers() {
    logger.info("Initializing argument resolvers");
    ConfigurableBeanFactory beanFactory =
        (ClassUtils.isAssignableValue(ConfigurableApplicationContext.class, context)) ?
            ((ConfigurableApplicationContext) context).getBeanFactory() : null;
            
    List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();
    argumentResolvers.add(new HeadersMethodArgumentResolver());
    argumentResolvers.add(new HeaderMethodArgumentResolver(conversionService, beanFactory));
    argumentResolvers.add(new MessageMethodArgumentResolver());
    argumentResolvers.add(new UrlPathVariableMethodArgumentResolver(conversionService));
    argumentResolvers.add(new MessageResponseArgumentResolver());
    argumentResolvers.add(new PayloadArgumentResolver(this.messageConverter, new NoOpValidator())); //Must be the last resolver. @Payload annotation is optional
    resolvers.addResolvers(argumentResolvers);
  }

  /**
   * Initialize all the message Payload converters. These are used by PayloadArgumentResolver which is last resolver in the 
   * argumentResolvers list.
   */
  private void initConverters() {
    Collection<MessageConverter> converters = new ArrayList<>();
    converters.add(new StringMessageConverter());
    converters.add(new ByteArrayMessageConverter());
    converters.add(new JsonMessageConverter());
    this.messageConverter = new CompositeMessageConverter(converters);
  }

  /**
   * Detects all the handler methods defined by @MessageMapping annotations
   * @param beanName name of the bean with @MessageProcessor annotation
   */
  private void detectHandlerMethods(String beanName) {
    Class<?> handlerType = context.getType(beanName);
    Class<?> userType = ClassUtils.getUserClass(handlerType); //To take care of CGLIB generated classes
    Object handlerBean = context.getBean(beanName);
    
    HandlerMethodSelector.selectMethods(userType, (method) -> {
      MessageMapping annotation = AnnotationUtils.findAnnotation(method, MessageMapping.class);
      if (annotation != null) {
        addHandler(handlerBean, method);
      }
      return annotation != null ? true : false; //Ignored
    });
  }

  /**
   * Add the identified handler in the handlers map
   * @param handlerBean the handler bean with @MessageProcessor annotation
   * @param method the handler method with @MessageMapping annotation
   */
  private void addHandler(Object handlerBean, Method method) {
    MessageMapping annotation = method.getAnnotation(MessageMapping.class);
    HandlerMethod handlerMethod = new HandlerMethod(handlerBean, method);
    
    Map<MessageMethod, HandlerMethod> methodwiseHandlers = getMethodwiseHandlers(annotation);
    HandlerMethod oldMapping = methodwiseHandlers.put(annotation.method(), handlerMethod);
    
    if (oldMapping != null) {
      logger.warn("WARN: Duplicate handler detected! Ignoring older. Old={}, New={}", oldMapping, handlerMethod);
    }
  }

  /**
   * Get all the handlers already found for 'annotation.value()' defined url. Creates and returns an empty one if not already there.
   * @param annotation the @MessageMapping annotation found on the method
   */
  private Map<MessageMethod, HandlerMethod> getMethodwiseHandlers(MessageMapping annotation) {
    Map<MessageMethod, HandlerMethod> urlHandlers = handlers.get(annotation.value());
    if (urlHandlers == null) {
      urlHandlers = new HashMap<>();
      handlers.put(annotation.value(), urlHandlers);
    }
    return urlHandlers;
  }
  
  /**
   * Process the result obtained by invoking the matched message handler
   * @param returnValue the value to be returned
   * @param returnType type of return value as in the message handler method signature
   * @param message the incoming message
   * @return if return type is void, returns null. If the return type is primitive or primitive wrapper or String, returns as is. 
   * Otherwise the returned object is converted to json represented String and returned
   * @throws JsonProcessingException in case of failure transforming response object to json
   */
  @SuppressWarnings("rawtypes")
  private Message processReturnValue(Object returnValue, MethodParameter returnType, MessageWrapper<?> message) throws JsonProcessingException {
    
    if (void.class.equals(returnType.getParameterType())) {
      return null; // No response will be sent
    }
    
    if (Message.class.isAssignableFrom(returnType.getParameterType())) { //If of type Message, return as is.
      // Do not manipulate message
      return (Message)returnValue;
    }
    
    return MessageBuilder
        .withPayload(getTransformedPayload(returnValue))
        .copyHeaders(message.getMessageResponse().getHeaders())
        .copyHeadersIfAbsent(message.getHeaders()) //Copy all headers from incoming message to outgoing.
        .copyHeaders(getMissingResponseStatusHeadersIfAny(message.getMessageResponse().getHeaders()))
        .build();
  }

  private Map<String, ?> getMissingResponseStatusHeadersIfAny(Map<String, ?> msgResponseHeaders) {
    Map<String, Object> successHeaders = new HashMap<>();
    if (!msgResponseHeaders.containsKey(RESP_HEADER_KEY_STATUS_CODE)) {
      successHeaders.put(RESP_HEADER_KEY_STATUS_CODE, StatusCode.OK.getCode());
    }
    if (!msgResponseHeaders.containsKey(RESP_HEADER_KEY_STATUS)) {
      successHeaders.put(RESP_HEADER_KEY_STATUS, StatusCode.OK.name());
    }
    if (!msgResponseHeaders.containsKey(RESP_HEADER_KEY_MESSAGE)) {
      successHeaders.put(RESP_HEADER_KEY_MESSAGE, StatusCode.OK.name());
    }
    return successHeaders;
  }

  /**
   * Looks up from the handlers map, a handler that matches a given url and method (GET/PUT etc)
   * @param lookupUrl the url as received in the incoming message header 'url'
   * @param lookupMethod the method as received in the incoming message header 'method'
   * @return all details required for invoking the matched handler
   */
  private HandlerInvocationDetails lookupMatchingHandler(String lookupUrl, MessageMethod lookupMethod) {
    
    Map<MessageMethod, HandlerMethod> methodwiseHandlers = handlers.get(lookupUrl);
    if (methodwiseHandlers != null) { //Exact match
      logger.debug("Found exact match for requesst url [{}]. Available method handlers:{}", lookupUrl, methodwiseHandlers);
      return new HandlerInvocationDetails(methodwiseHandlers.get(lookupMethod));
    }

    //Non-exact matches
    List<String> matchingPatterns = new ArrayList<>();
    for (String registeredPattern : handlers.keySet()) {
      if (pathMatcher.match(registeredPattern, lookupUrl)) {
        matchingPatterns.add(registeredPattern);
      }
    }
    
    String bestPatternMatch = null;
    Comparator<String> patternComparator = pathMatcher.getPatternComparator(lookupUrl);
    if (!matchingPatterns.isEmpty()) {
      Collections.sort(matchingPatterns, patternComparator);
      if (logger.isDebugEnabled()) {
        logger.debug("Matching patterns for request url [{}] are {}", lookupUrl, matchingPatterns);
      }
      bestPatternMatch = matchingPatterns.get(0);
    }
    
    logger.debug("bestPatternMatch={}", bestPatternMatch);
    
    if (bestPatternMatch != null) {
      methodwiseHandlers = handlers.get(bestPatternMatch);
      String pathWithinMapping = pathMatcher.extractPathWithinPattern(bestPatternMatch, lookupUrl);
      logger.debug("pathWithinMapping={}", pathWithinMapping);

      // There might be multiple 'best patterns', let's make sure we have the correct URI template variables
      // for all of them
      Map<String, Object> uriTemplateVariables = new LinkedHashMap<>();
      for (String matchingPattern : matchingPatterns) {
        if (patternComparator.compare(bestPatternMatch, matchingPattern) == 0) {
          Map<String, String> vars = pathMatcher.extractUriTemplateVariables(matchingPattern, lookupUrl);
          uriTemplateVariables.putAll(vars);
        }
      }
      logger.debug("uriTemplateVariables={}", uriTemplateVariables);
      if (logger.isDebugEnabled()) {
        logger.debug("Resolved URI Template variables for request [{}] are {}", lookupUrl, uriTemplateVariables);
      }
      return new HandlerInvocationDetails(methodwiseHandlers.get(lookupMethod), uriTemplateVariables);
    }
    return null;
  }
  
  /**
   * Generate an error response.
   * @param statusCode HTTP equivalent status code
   * @param status short status code. may be used by the receiver for comparison etc.
   * @param strMessage human understandable error message, could be longer and descriptive, may include stack trace etc.
   * @param payload optional payload.
   * @param message the incoming message. Headers from this will also be copied to 
   * @return Error message with input information. The message contains below headers to indicate error response
   * "statusCode", "status", "message". All headers from input message will also be copied to this.
   * @throws JsonProcessingException 
   */
  @SuppressWarnings("rawtypes")
  private Message generateErrorResponse(StatusCode statusCode, String status, String strMessage, 
      Object payload, Message<?> message) throws JsonProcessingException {
    Map<String, Object> headers = new HashMap<>();
    message.getHeaders().forEach((name, value) -> {
      headers.put(name, value);
    });
    //Overwrite these if already existed.
    headers.put(RESP_HEADER_KEY_STATUS_CODE, statusCode != null ? statusCode.getCode() : StatusCode.INTERNAL_SERVER_ERROR);
    headers.put(RESP_HEADER_KEY_STATUS, status);
    headers.put(RESP_HEADER_KEY_MESSAGE, strMessage);
    
    Message msg = MessageBuilder.withPayload(payload == null ? "" : getTransformedPayload(payload))
        .copyHeaders(headers)
        .build();
    return msg;
  }
  
  private Object getTransformedPayload(Object payload) throws JsonProcessingException {
    if (payload == null) {
      return "";
    }
    if (payload.getClass().isPrimitive() 
        || ClassUtils.isPrimitiveWrapper(payload.getClass()) 
        || String.class.equals(payload.getClass())) { 
      //return as is
      return payload;
    }
    return objectMapper.writeValueAsString(payload);
  }

  private Map<String, Object> getErrorMap(Throwable th, Object payload) {
    Map<String, Object> errorMap = new LinkedHashMap<>();
    errorMap.put("error", true);
    errorMap.put("type", th.getClass().getName());
    errorMap.put("message", th.getMessage());
    errorMap.put("payload", payload);
    errorMap.put("stackTrace", th.getStackTrace());
    errorMap.put("cause", th.getCause() == null ? null : getErrorMap(th.getCause(), null));
    return errorMap;
  }

  /**
   * Everything required for handler invocation is accumulated here
   */
  private class HandlerInvocationDetails {
    
    private HandlerMethod handlerMethod;
    
    private Map<String, Object> uriTemplateVariables;
    
    public HandlerInvocationDetails(HandlerMethod handlerMethod) {
      this(handlerMethod, new LinkedHashMap<String, Object>());
    }

    public HandlerInvocationDetails(HandlerMethod handlerMethod, Map<String, Object> uriTemplateVariables) {
      this.handlerMethod = handlerMethod;
      this.uriTemplateVariables = uriTemplateVariables;
    }

  }
  
  /**
   * Pass-through validator that does nothing
   */
  private static final class NoOpValidator implements Validator {

    @Override
    public boolean supports(Class<?> clazz) {
      return false;
    }

    @Override
    public void validate(Object target, Errors errors) {
    }
  }

}

/* Imports used in this class -
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.BeanFactoryUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.MethodParameter;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.convert.ConversionService;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.ByteArrayMessageConverter;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.handler.HandlerMethod;
import org.springframework.messaging.handler.HandlerMethodSelector;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver;
import org.springframework.messaging.handler.annotation.support.HeadersMethodArgumentResolver;
import org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver;
import org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.ClassUtils;
import org.springframework.util.PathMatcher;
import org.springframework.validation.Errors;
import org.springframework.validation.Validator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
*/

Listing below some of the non-trivial classes used by above MessageHandler -

JsonMessageConverter

A MessageConverter that supports MIME type "application/json" with the payload converted to and from a json.
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.AbstractMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.MimeType;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JsonMessageConverter extends AbstractMessageConverter {

  private final Charset defaultCharset;
  
  private final ObjectMapper objectMapper;

  public JsonMessageConverter() {
    this(Charset.forName("UTF-8"));
  }

  public JsonMessageConverter(Charset defaultCharset) {
    super(new MimeType("application", "json", defaultCharset));
    this.defaultCharset = defaultCharset;
    objectMapper = new ObjectMapper();
  }

  @Override
  protected boolean supports(Class<?> clazz) {
    return true;
  }

  @Override
  public Object convertFromInternal(Message<?> message, Class<?> targetClass) {
    Charset charset = getContentTypeCharset(getMimeType(message.getHeaders()));
    Object payload = message.getPayload();
    String strJson = (payload instanceof String) ? (String) payload : new String((byte[]) payload, charset);
    try {
      return objectMapper.readValue(strJson, targetClass);
    } catch (Exception e) {
      throw new MessageProcessingException(e);
    }
  }

  @Override
  public Object convertToInternal(Object payload, MessageHeaders headers) {
    throw new UnsupportedOperationException();
  }

  private Charset getContentTypeCharset(MimeType mimeType) {
    if (mimeType != null && mimeType.getCharSet() != null) {
      return mimeType.getCharSet();
    } else {
      return this.defaultCharset;
    }
  }
}

UrlPathVariableMethodArgumentResolver

Resolves method parameters annotated with @UrlPathParam
import org.springframework.core.MethodParameter;
import org.springframework.core.convert.ConversionService;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.handler.annotation.ValueConstants;
import org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver;

public class UrlPathVariableMethodArgumentResolver extends AbstractNamedValueMethodArgumentResolver {

  public static final String URI_TEMPLATE_VARIABLES_KEY = ".uriTemplateVariables";

 public UrlPathVariableMethodArgumentResolver(ConversionService cs) {
  super(cs, null);
 }

 @Override
 public boolean supportsParameter(MethodParameter parameter) {
  return parameter.hasParameterAnnotation(UrlPathParam.class);
 }

 @Override
 protected NamedValueInfo createNamedValueInfo(MethodParameter parameter) {
   UrlPathParam annotation = parameter.getParameterAnnotation(UrlPathParam.class);
  return new UrlPathVariableNamedValueInfo(annotation, parameter.getParameterName());
 }

 @Override
 protected Object resolveArgumentInternal(MethodParameter parameter, Message<?> message, String name)
   throws Exception {
   
  MessageWrapper<?> messageWrapper = (MessageWrapper<?>) message;
  @SuppressWarnings("unchecked")
                Map<String, Object> vars = (Map<String, Object>) messageWrapper.getData().get(URI_TEMPLATE_VARIABLES_KEY);

  return (vars != null) ? vars.get(name) : null;
 }

 @Override
 protected void handleMissingValue(String name, MethodParameter parameter, Message<?> message) {
  throw new MessageHandlingException(message, "Missing path template variable '" + name +
    "' for method parameter type [" + parameter.getParameterType() + "]");
 }

 private static class UrlPathVariableNamedValueInfo extends NamedValueInfo {
  private UrlPathVariableNamedValueInfo(UrlPathParam annotation, String paramName) {
   super(paramName, true, ValueConstants.DEFAULT_NONE);
  }
 }
}
MessageResponseArgumentResolver

Resolves method parameters of type MessageResponse
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;

public class MessageResponseArgumentResolver implements HandlerMethodArgumentResolver {

  @Override
  public boolean supportsParameter(MethodParameter parameter) {
    return parameter.getParameterType().equals(MessageResponse.class);
  }

  @Override
  public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
    
    MessageWrapper<?> messageWrapper = (MessageWrapper<?>) message;
    return messageWrapper.getMessageResponse();
  }
}

MessageResponse

The response to be used by @MessageProcessor methods when they want to add headers in the AMQP response message in addition to returning a response object. Also allows suppressing a response AMQP message to be sent by setting the sendResponse flag to false. The default value is true.
public class MessageResponse {
  
  private Map<String, Object> headers;

  public MessageResponse() {
    headers = new HashMap<>();
  }

  public void addHeader(String name, Object value) {
    headers.put(name, value);
  }
  
  public Object getHeader(String name) {
    return headers.get(name);
  }

  public Map<String, Object> getHeaders() {
    return Collections.unmodifiableMap(headers);
  }
}

MessageWrapper

A Message compatible wrapper for Message, to allow adding additional data
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

/**
 * @param <T> Message payload type
 */
public class MessageWrapper<T> implements Message<T> {
  
  private Message<T> message;
  private Map<String, Object> data = new LinkedHashMap<>();
  private MessageResponse response = new MessageResponse();
  
  public MessageWrapper(Message<T> message) {
    this.message = message;
  }

  @Override
  public T getPayload() {
    return message.getPayload();
  }

  @Override
  public MessageHeaders getHeaders() {
    return message.getHeaders();
  }
  
  public Map<String, Object> getData() {
    return data;
  }
  
  public MessageResponse getMessageResponse() {
    return response;
  }
}

Custom defined annotations


MessageProcessor
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface MessageProcessor {
}

MessageMapping
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface MessageMapping {
  
  /**
   * URL value with support for path parameters
   */
  String value();

  /**
   * Method - GET/PUT/POST/DELETE
   */
  MessageMethod method();
}

UrlPathParam
@Target({ElementType.PARAMETER})
@Retention(RetentionPolicy.RUNTIME)
public @interface UrlPathParam {
}
* Some obvious imports are excluded from code snippets above for brevity. Some obvious custom Enum definitions are also excluded.

Thanks for reading till the end :) Your views, suggestions are welcome. Also please share if you have done it/plan to do it in any better way!