Featured FREE Whitepapers

What's New Here?


Spring RestTemplate with a linked resource

Spring Data REST is an awesome project that provides mechanisms to expose the resources underlying a Spring Data based repository as REST resources. Exposing a service with a linked resource Consider two simple JPA based entities, Course and Teacher:           @Entity @Table(name = "teachers") public class Teacher { @Id @GeneratedValue(strategy = GenerationType.AUTO) @Column(name = "id") private Long id;@Size(min = 2, max = 50) @Column(name = "name") private String name;@Column(name = "department") @Size(min = 2, max = 50) private String department; ... }@Entity @Table(name = "courses") public class Course { @Id @GeneratedValue(strategy = GenerationType.AUTO) @Column(name = "id") private Long id;@Size(min = 1, max = 10) @Column(name = "coursecode") private String courseCode;@Size(min = 1, max = 50) @Column(name = "coursename") private String courseName;@ManyToOne @JoinColumn(name = "teacher_id") private Teacher teacher; .... } essentially the relation looks like this:Now, all it takes to expose these entities as REST resources is adding a @RepositoryRestResource annotation on their JPA based Spring Data repositories this way, first for the “Teacher” resource: import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.rest.core.annotation.RepositoryRestResource; import univ.domain.Teacher;@RepositoryRestResource public interface TeacherRepo extends JpaRepository<Teacher, Long> { } and for exposing the Course resource: @RepositoryRestResource public interface CourseRepo extends JpaRepository<Course, Long> { } With this done and assuming a few teachers and a few courses are already in the datastore, a GET on courses would yield a response of the following type: { "_links" : { "self" : { "href" : "http://localhost:8080/api/courses{?page,size,sort}", "templated" : true } }, "_embedded" : { "courses" : [ { "courseCode" : "Course1", "courseName" : "Course Name 1", "version" : 0, "_links" : { "self" : { "href" : "http://localhost:8080/api/courses/1" }, "teacher" : { "href" : "http://localhost:8080/api/courses/1/teacher" } } }, { "courseCode" : "Course2", "courseName" : "Course Name 2", "version" : 0, "_links" : { "self" : { "href" : "http://localhost:8080/api/courses/2" }, "teacher" : { "href" : "http://localhost:8080/api/courses/2/teacher" } } } ] }, "page" : { "size" : 20, "totalElements" : 2, "totalPages" : 1, "number" : 0 } } and a specific course looks like this: { "courseCode" : "Course1", "courseName" : "Course Name 1", "version" : 0, "_links" : { "self" : { "href" : "http://localhost:8080/api/courses/1" }, "teacher" : { "href" : "http://localhost:8080/api/courses/1/teacher" } } } If you are wondering what the “_links”, “_embedded” are – Spring Data REST uses Hypertext Application Language(or HAL for short) to represent the links, say the one between a course and a teacher. HAL Based REST service – Using RestTemplate Given this HAL based REST service, the question that I had in my mind was how to write a client to this service. I am sure there are better ways of doing this, but what follows worked for me and I welcome any cleaner ways of writing the client. First, I modified the RestTemplate to register a custom Json converter that understands HAL based links: public RestTemplate getRestTemplateWithHalMessageConverter() { RestTemplate restTemplate = new RestTemplate(); List<HttpMessageConverter<?>> existingConverters = restTemplate.getMessageConverters(); List<HttpMessageConverter<?>> newConverters = new ArrayList<>(); newConverters.add(getHalMessageConverter()); newConverters.addAll(existingConverters); restTemplate.setMessageConverters(newConverters); return restTemplate; }private HttpMessageConverter getHalMessageConverter() { ObjectMapper objectMapper = new ObjectMapper(); objectMapper.registerModule(new Jackson2HalModule()); MappingJackson2HttpMessageConverter halConverter = new TypeConstrainedMappingJackson2HttpMessageConverter(ResourceSupport.class); halConverter.setSupportedMediaTypes(Arrays.asList(HAL_JSON)); halConverter.setObjectMapper(objectMapper); return halConverter; } The Jackson2HalModule is provided by the Spring HATEOS project and understands HAL representation. Given this shiny new RestTemplate, first let us create a Teacher entity: Teacher teacher1 = new Teacher(); teacher1.setName("Teacher 1"); teacher1.setDepartment("Department 1"); URI teacher1Uri = testRestTemplate.postForLocation("http://localhost:8080/api/teachers", teacher1); Note that when the entity is created, the response is a http status code of 201 with the Location header pointing to the uri of the newly created resource, Spring RestTemplate provides a neat way of posting and getting hold of this Location header through an API. So now we have a teacher1Uri representing the newly created teacher. Given this teacher URI, let us now retrieve the teacher, the raw json for the teacher resource looks like the following: { "name" : "Teacher 1", "department" : "Department 1", "version" : 0, "_links" : { "self" : { "href" : "http://localhost:8080/api/teachers/1" } } } and to retrieve this using RestTemplate: ResponseEntity<Resource<Teacher>> teacherResponseEntity = testRestTemplate.exchange("http://localhost:8080/api/teachers/1", HttpMethod.GET, null, new ParameterizedTypeReference<Resource<Teacher>>() { });Resource<Teacher> teacherResource = teacherResponseEntity.getBody();Link teacherLink = teacherResource.getLink("self"); String teacherUri = teacherLink.getHref();Teacher teacher = teacherResource.getContent(); Jackson2HalModule is the one which helps unpack the links this cleanly and to get hold of the Teacher entity itself. I have previously explained ParameterizedTypeReference here. Now, to a more tricky part, creating a Course. Creating a course is tricky as it has a relation to the Teacher and representing this relation using HAL is not that straightforward. A raw POST to create the course would look like this: { "courseCode" : "Course1", "courseName" : "Course Name 1", "version" : 0, "teacher" : "http://localhost:8080/api/teachers/1" } Note how the reference to the teacher is a URI, this is how HAL represents an embedded reference specifically for a POST’ed content, so now to get this form through RestTemplate. First to create a Course: Course course1 = new Course(); course1.setCourseCode("Course1"); course1.setCourseName("Course Name 1"); At this point, it will be easier to handle providing the teacher link by dealing with a json tree representation and adding in the teacher link as the teacher uri: ObjectMapper objectMapper = getObjectMapperWithHalModule(); ObjectNode jsonNodeCourse1 = (ObjectNode) objectMapper.valueToTree(course1); jsonNodeCourse1.put("teacher", teacher1Uri.getPath()); and posting this should create the course with the linked teacher: URI course1Uri = testRestTemplate.postForLocation(coursesUri, jsonNodeCourse1); and to retrieve this newly created Course: ResponseEntity<Resource<Course>> courseResponseEntity = testRestTemplate.exchange(course1Uri, HttpMethod.GET, null, new ParameterizedTypeReference<Resource<Course>>() { });Resource<Course> courseResource = courseResponseEntity.getBody(); Link teacherLinkThroughCourse = courseResource.getLink("teacher"); This concludes how to use the RestTemplate to create and retrieve a linked resource, alternate ideas are welcome.If you are interested in exploring this further, the entire sample is available at this github repo –  and the test is here.Reference: Spring RestTemplate with a linked resource from our JCG partner Biju Kunjummen at the all and sundry blog....

Spark: Write to CSV file

A couple of weeks ago I wrote how I’d been using Spark to explore a City of Chicago Crime data set and having worked out how many of each crime had been committed I wanted to write that to a CSV file. Spark provides a saveAsTextFile function which allows us to save RDD’s so I refactored my code into the following format to allow me to use that:           import au.com.bytecode.opencsv.CSVParser import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._   def dropHeader(data: RDD[String]): RDD[String] = { data.mapPartitionsWithIndex((idx, lines) => { if (idx == 0) { lines.drop(1) } lines }) }   // https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present/ijzp-q8t2 val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"   val crimeData = sc.textFile(crimeFile).cache() val withoutHeader: RDD[String] = dropHeader(crimeData)   val file = "/tmp/primaryTypes.csv" FileUtil.fullyDelete(new File(file))   val partitions: RDD[(String, Int)] = withoutHeader.mapPartitions(lines => { val parser = new CSVParser(',') lines.map(line => { val columns = parser.parseLine(line) (columns(5), 1) }) })   val counts = partitions. reduceByKey {case (x,y) => x + y}. sortBy {case (key, value) => -value}. map { case (key, value) => Array(key, value).mkString(",") }   counts.saveAsTextFile(file) If we run that code from the Spark shell we end up with a folder called /tmp/primaryTypes.csv containing multiple part files: $ ls -lah /tmp/primaryTypes.csv/ total 496 drwxr-xr-x 66 markneedham wheel 2.2K 30 Nov 07:17 . drwxrwxrwt 80 root wheel 2.7K 30 Nov 07:16 .. -rw-r--r-- 1 markneedham wheel 8B 30 Nov 07:16 ._SUCCESS.crc -rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00000.crc -rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00001.crc -rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00002.crc -rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00003.crc ... -rwxrwxrwx 1 markneedham wheel 0B 30 Nov 07:16 _SUCCESS -rwxrwxrwx 1 markneedham wheel 28B 30 Nov 07:16 part-00000 -rwxrwxrwx 1 markneedham wheel 17B 30 Nov 07:16 part-00001 -rwxrwxrwx 1 markneedham wheel 23B 30 Nov 07:16 part-00002 -rwxrwxrwx 1 markneedham wheel 16B 30 Nov 07:16 part-00003 ... If we look at some of those part files we can see that it’s written the crime types and counts as expected: $ cat /tmp/primaryTypes.csv/part-00000 THEFT,859197 BATTERY,757530   $ cat /tmp/primaryTypes.csv/part-00003 BURGLARY,257310 This is fine if we’re going to pass those CSV files into another Hadoop based job but I actually want a single CSV file so it’s not quite what I want. One way to achieve this is to force everything to be calculated on one partition which will mean we only get one part file generated: val counts = partitions.repartition(1). reduceByKey {case (x,y) => x + y}. sortBy {case (key, value) => -value}. map { case (key, value) => Array(key, value).mkString(",") }     counts.saveAsTextFile(file) part-00000 now looks like this: $ cat !$ cat /tmp/primaryTypes.csv/part-00000 THEFT,859197 BATTERY,757530 NARCOTICS,489528 CRIMINAL DAMAGE,488209 BURGLARY,257310 OTHER OFFENSE,253964 ASSAULT,247386 MOTOR VEHICLE THEFT,197404 ROBBERY,157706 DECEPTIVE PRACTICE,137538 CRIMINAL TRESPASS,124974 PROSTITUTION,47245 WEAPONS VIOLATION,40361 PUBLIC PEACE VIOLATION,31585 OFFENSE INVOLVING CHILDREN,26524 CRIM SEXUAL ASSAULT,14788 SEX OFFENSE,14283 GAMBLING,10632 LIQUOR LAW VIOLATION,8847 ARSON,6443 INTERFERE WITH PUBLIC OFFICER,5178 HOMICIDE,4846 KIDNAPPING,3585 INTERFERENCE WITH PUBLIC OFFICER,3147 INTIMIDATION,2471 STALKING,1985 OFFENSES INVOLVING CHILDREN,355 OBSCENITY,219 PUBLIC INDECENCY,86 OTHER NARCOTIC VIOLATION,80 NON-CRIMINAL,12 RITUALISM,12 OTHER OFFENSE ,6 NON - CRIMINAL,2 NON-CRIMINAL (SUBJECT SPECIFIED),2 This works but it’s quite a bit slower than when we were doing the aggregation across partitions so it’s not ideal. Instead, what we can do is make use of one of Hadoop’s merge functions which squashes part files together into a single file. First we import Hadoop into our SBT file: libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.5.2" Now let’s bring our merge function into the Spark shell: import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._   def merge(srcPath: String, dstPath: String): Unit = { val hadoopConfig = new Configuration() val hdfs = FileSystem.get(hadoopConfig) FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null) } And now let’s make use of it: val file = "/tmp/primaryTypes.csv" FileUtil.fullyDelete(new File(file))   val destinationFile= "/tmp/singlePrimaryTypes.csv" FileUtil.fullyDelete(new File(destinationFile))   val counts = partitions. reduceByKey {case (x,y) => x + y}. sortBy {case (key, value) => -value}. map { case (key, value) => Array(key, value).mkString(",") }   counts.saveAsTextFile(file)   merge(file, destinationFile) And now we’ve got the best of both worlds: $ cat /tmp/singlePrimaryTypes.csv THEFT,859197 BATTERY,757530 NARCOTICS,489528 CRIMINAL DAMAGE,488209 BURGLARY,257310 OTHER OFFENSE,253964 ASSAULT,247386 MOTOR VEHICLE THEFT,197404 ROBBERY,157706 DECEPTIVE PRACTICE,137538 CRIMINAL TRESPASS,124974 PROSTITUTION,47245 WEAPONS VIOLATION,40361 PUBLIC PEACE VIOLATION,31585 OFFENSE INVOLVING CHILDREN,26524 CRIM SEXUAL ASSAULT,14788 SEX OFFENSE,14283 GAMBLING,10632 LIQUOR LAW VIOLATION,8847 ARSON,6443 INTERFERE WITH PUBLIC OFFICER,5178 HOMICIDE,4846 KIDNAPPING,3585 INTERFERENCE WITH PUBLIC OFFICER,3147 INTIMIDATION,2471 STALKING,1985 OFFENSES INVOLVING CHILDREN,355 OBSCENITY,219 PUBLIC INDECENCY,86 OTHER NARCOTIC VIOLATION,80 RITUALISM,12 NON-CRIMINAL,12 OTHER OFFENSE ,6 NON - CRIMINAL,2 NON-CRIMINAL (SUBJECT SPECIFIED),2The full code is available as a gist if you want to play around with it.Reference: Spark: Write to CSV file from our JCG partner Mark Needham at the Mark Needham Blog blog....

Spring request-level memoization

Introduction Memoization is a method-level caching technique for speeding-up consecutive invocations. This post will demonstrate how you can achieve request-level repeatable reads for any data source, using Spring AOP only.         Spring Caching Spring offers a very useful caching abstracting, allowing you do decouple the application logic from the caching implementation details. Spring Caching uses an application-level scope, so for a request-only memoization we need to take a DIY approach. Request-level Caching A request-level cache entry life-cycle is always bound to the current request scope. Such cache is very similar to Hibernate Persistence Context that offers session-level repeatable reads. Repeatable reads are mandatory for preventing lost updates, even for NoSQL solutions. Step-by-step implementation First we are going to define a Memoizing marker annotation: @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface Memoize { } This annotation is going to explicitly mark all methods that need to be memoized. To distinguish different method invocations we are going to encapsulate the method call info into the following object type: public class InvocationContext {public static final String TEMPLATE = "%s.%s(%s)";private final Class targetClass; private final String targetMethod; private final Object[] args;public InvocationContext(Class targetClass, String targetMethod, Object[] args) { this.targetClass = targetClass; this.targetMethod = targetMethod; this.args = args; }public Class getTargetClass() { return targetClass; }public String getTargetMethod() { return targetMethod; }public Object[] getArgs() { return args; }@Override public boolean equals(Object that) { return EqualsBuilder.reflectionEquals(this, that); }@Override public int hashCode() { return HashCodeBuilder.reflectionHashCode(this); }@Override public String toString() { return String.format(TEMPLATE, targetClass.getName(), targetMethod, Arrays.toString(args)); } } Few know about the awesomeness of Spring Request/Session bean scopes. Because we require a request-level memoization scope, we can simplify our design with a Spring request scope that hides the actual HttpSession resolving logic: @Component @Scope(proxyMode = ScopedProxyMode.TARGET_CLASS, value = "request") public class RequestScopeCache {public static final Object NONE = new Object();private final Map<InvocationContext, Object> cache = new HashMap<InvocationContext, Object>();public Object get(InvocationContext invocationContext) { return cache.containsKey(invocationContext) ? cache.get(invocationContext) : NONE; }public void put(InvocationContext methodInvocation, Object result) { cache.put(methodInvocation, result); } } Since a mere annotation means nothing without a runtime processing engine, we must therefore define a Spring Aspect implementing the actual memoization logic: @Aspect public class MemoizerAspect {@Autowired private RequestScopeCache requestScopeCache;@Around("@annotation(com.vladmihalcea.cache.Memoize)") public Object memoize(ProceedingJoinPoint pjp) throws Throwable { InvocationContext invocationContext = new InvocationContext( pjp.getSignature().getDeclaringType(), pjp.getSignature().getName(), pjp.getArgs() ); Object result = requestScopeCache.get(invocationContext); if (RequestScopeCache.NONE == result) { result = pjp.proceed(); LOGGER.info("Memoizing result {}, for method invocation: {}", result, invocationContext); requestScopeCache.put(invocationContext, result); } else { LOGGER.info("Using memoized result: {}, for method invocation: {}", result, invocationContext); } return result; } } Testing time Let’s put all this to a test. For simplicity sake, we are going to emulate the request-level scope memoization requirements with a Fibonacci number calculator: @Component public class FibonacciServiceImpl implements FibonacciService {@Autowired private ApplicationContext applicationContext;private FibonacciService fibonacciService;@PostConstruct private void init() { fibonacciService = applicationContext.getBean(FibonacciService.class); }@Memoize public int compute(int i) { LOGGER.info("Calculate fibonacci for number {}", i); if (i == 0 || i == 1) return i; return fibonacciService.compute(i - 2) + fibonacciService.compute(i - 1); } } If we are to calculate the 10th Fibonnaci number, we’ll get the following result: Calculate fibonacci for number 10 Calculate fibonacci for number 8 Calculate fibonacci for number 6 Calculate fibonacci for number 4 Calculate fibonacci for number 2 Calculate fibonacci for number 0 Memoizing result 0, for method invocation: com.vladmihalcea.cache.FibonacciService.compute([0]) Calculate fibonacci for number 1 Memoizing result 1, for method invocation: com.vladmihalcea.cache.FibonacciService.compute([1]) Memoizing result 1, for method invocation: com.vladmihalcea.cache.FibonacciService.compute([2]) Calculate fibonacci for number 3 Using memoized result: 1, for method invocation: com.vladmihalcea.cache.FibonacciService.compute([1]) Using memoized result: 1, for method invocation: com.vladmihalcea.cache.FibonacciService.compute([2]) Memoizing result 2, for method invocation: com.vladmihalcea.cache.FibonacciService.compute([3]) Memoizing result 3, for method invocation: com.vladmihalcea.cache.FibonacciService.compute([4]) Calculate fibonacci for number 5 Using memoized result: 2, for method invocation: com.vladmihalcea.cache.FibonacciService.compute([3]) Using memoized result: 3, for method invocation: com.vladmihalcea.cache.FibonacciService.compute([4]) Memoizing result 5, for method invocation: com.vladmihalcea.cache.FibonacciService.compute([5]) Memoizing result 8, for method invocation: com.vladmihalcea.cache.FibonacciService.compute([6]) Calculate fibonacci for number 7 Using memoized result: 5, for method invocation: com.vladmihalcea.cache.FibonacciService.compute([5]) Using memoized result: 8, for method invocation: com.vladmihalcea.cache.FibonacciService.compute([6]) Memoizing result 13, for method invocation: com.vladmihalcea.cache.FibonacciService.compute([7]) Memoizing result 21, for method invocation: com.vladmihalcea.cache.FibonacciService.compute([8]) Calculate fibonacci for number 9 Using memoized result: 13, for method invocation: com.vladmihalcea.cache.FibonacciService.compute([7]) Using memoized result: 21, for method invocation: com.vladmihalcea.cache.FibonacciService.compute([8]) Memoizing result 34, for method invocation: com.vladmihalcea.cache.FibonacciService.compute([9]) Memoizing result 55, for method invocation: com.vladmihalcea.cache.FibonacciService.compute([10]) Conclusion Memoization is a cross-cutting concern and Spring AOP allows you to decouple the caching details from the actual application logic code.Code available on GitHub.Reference: Spring request-level memoization from our JCG partner Vlad Mihalcea at the Vlad Mihalcea’s Blog blog....

Spring Data JPA Tutorial: Introduction

Creating repositories that use the Java Persistence API is a cumbersome process that takes a lot of time and requires a lot of boilerplate code. We can eliminate some boilerplate code by following these steps:                Create an abstract base repository class that provides CRUD operations for entities. Create the concrete repository class that extends the abstract base repository class.The problem of this approach is that we still have to write the code that creates our database queries and invokes them. To make matters worse, we have to do this every time when we want to create a new database query. This is a waste of time. What would you say if I would tell you that we can create JPA repositories without writing any boilerplate code? The odds are that you might not believe me, but Spring Data JPA helps us to do just that. The website of the Spring Data JPA project states that: Implementing a data access layer of an application has been cumbersome for quite a while. Too much boilerplate code has to be written to execute simple queries as well as perform pagination, and auditing. Spring Data JPA aims to significantly improve the implementation of data access layers by reducing the effort to the amount that’s actually needed. As a developer you write your repository interfaces, including custom finder methods, and Spring will provide the implementation automaticallyThis blog post provides an introduction to Spring Data JPA. We will learn what Spring Data JPA really is and take a quick look at the Spring Data repository interfaces. Let’s get started. What Spring Data JPA Is? Spring Data JPA is not a JPA provider. It is a library / framework that adds an extra layer of abstraction on the top of our JPA provider. If we decide to use Spring Data JPA, the repository layer of our application contains three layers that are described in the following:Spring Data JPA provides support for creating JPA repositories by extending the Spring Data repository interfaces. Spring Data Commons provides the infrastructure that is shared by the datastore specific Spring Data projects. The JPA Provider implements the Java Persistence API.The following figure illustrates the structure of our repository layer:Additional Reading:Spring Data JPA versus JPA: What’s the difference?At first it seems that Spring Data JPA makes our application more complicated, and in a way that is true. It does add an additional layer to our repository layer, but at the same time it frees us from writing any boilerplate code. That sounds like a good tradeoff. Right? Introduction to Spring Data Repositories The power of Spring Data JPA lies in the repository abstraction that is provided by the Spring Data Commons project and extended by the datastore specific sub projects. We can use Spring Data JPA without paying any attention to the actual implementation of the repository abstraction, but we have to be familiar with the Spring Data repository interfaces. These interfaces are described in the following: First, the Spring Data Commons project provides the following interfaces:The Repository<T, ID extends Serializable> interface is a marker interface that has two purposes:It captures the type of the managed entity and the type of the entity’s id. It helps the Spring container to discover the “concrete” repository interfaces during classpath scanning.The CrudRepository<T, ID extends Serializable> interface provides CRUD operations for the managed entity. The PagingAndSortingRepository<T, ID extends Serializable> interface declares the methods that are used to sort and paginate entities that are retrieved from the database. The QueryDslPredicateExecutor<T> interface is not a “repository interface”. It declares the methods that are used to retrieve entities from the database by using QueryDsl Predicate objects.Second, the Spring Data JPA project provides the following interfaces:The JpaRepository<T, ID extends Serializable> interface is a JPA specific repository interface that combines the methods declared by the common repository interfaces behind a single interface. The JpaSpecificationExecutor<T> interface is not a “repository interface”. It declares the methods that are used to retrieve entities from the database by using Specification<T> objects that use the JPA criteria API.The repository hierarchy looks as follows:That is nice, but how can we use them? That is a fair question. The next parts of this tutorial will answer to that question, but essentially we have to follow these steps:Create a repository interface and extend one of the repository interfaces provided by Spring Data. Add custom query methods to the created repository interface (if we need them that is). Inject the repository interface to another component and use the implementation that is provided automatically by Spring.Let’s move on and summarize what we learned from this blog post. Summary This blog post has taught us two things:Spring Data JPA is not a JPA provider. It simply “hides” the Java Persistence API (and the JPA provider) behind its repository abstraction. Spring Data provides multiple repository interfaces that are used for different purposes.The next part of this tutorial describes how we can get the required dependencies. If you want to know more about Spring Data JPA, you should read my Spring Data JPA Tutorial.Reference: Spring Data JPA Tutorial: Introduction from our JCG partner Petri Kainulainen at the Petri Kainulainen blog....

Can MicroServices Architecture Solve All Your Problems?

IT is one field where you can find new things coming everyday. Theses days the whole developer community websites are flooded with MicroServices and Docker related stuff. Among them the idea of MicroServices is very exciting and encourages better way of building software systems. But as with any architectural style there will be pros and cons to every approach. Before discussing what are good and bad sides of MicroServices approach, first let me say what I understood about MicroServices.    MicroServices architecture encourage to build small, focused subsystems which can be integrated into the whole system preferably using REST protocol.Now lets discuss on various aspects of MicroServices architecture. The dream of every software architect/developer : First of all the idea of MicroServices is not new at all. From the very beginning our elders suggest to write classes focusing on one Single Responsibility and write methods to do one particular thing and do it well. Also we were encouraged to build separate modules which can perform some functionally related tasks. Then we bundle all these separate modules together and build an application delegating the appropriate tasks to respective modules. This is what we try to do for many year. But the idea of MicroServices took this approach to next level where you can deploy each module as an individual deployable unit and each service can communicate with any other Service based on some agreed protocol (preferably REST, another trendy cool thing!). So what are the advantages of this MicroServices architectures? There are plenty:You will have many small services with manageable codebases which is easy to read and understand. You can confidently refactor or rewrite entire service because there won’t be any impact on other services. Each microservice can be deployed independently so that adding new features or upgrading any existing software/hardware platform won’t affect other services. You can easily adopt the next cool technology. If one of microservices is very critical service and performance is the highest priority then we can write that particular service using Scala in order to leverage your multi-core hardware support. If you are a service provider company you can sell each service separately possibly making better money compared to selling whole monolithic product. And most important factor is, the term MicroService is cool!What is the other side of MicroServices architecture? As with any approach, MicroServices also has some down sides and associated cost.“Great power comes with great responsibility”. – Uncle Ben Let us see what are the challenges to implement a system using MicroServices architecture. The idea of MicroServices is very simple but very complex to implement in reality. In a monolithic system, the communication between various subsystems are mostly direct object communication. But in MicroServices based system, in order to communicate with other services you may use REST services which means additional HTTP call overhead and its inherent issues like network latency, possible communication failures etc. So we need to consider various aspects while implementing inter-service communication logic such as retry, fail-over and service down scenarios etc. How good is your DevOps infrastructure? In order to go with MicroServices architecture, organization should have a good DevOps team to properly maintain the dozens of MicroService applications. Do your organization has DevOps culture? Or your organization has the problem of blame game between Devs and Ops? If your organization doesn’t have a good DevOps culture and software/hardware resources then MicroServices architecture will be much more difficult to adopt. Are we fixing the actual problem at all? Now many people are saying MicroServices architecture is better than Monolithic architecture. But is Monolithic architecture is the actual reason why many projects are failing? Will MicroServices architecture save the projects from failing? I guess NO. Think, what were the reasons for your previously failed projects. Are those projects failed because of technology issues or people issues? I have never seen a project which is failed because of the wrong technology selection, or wrong architectural approach. But I have seen many projects failing just because of problems with people. I feel there are more severe issues than architecture issues which are causing projects to be failed such as:Having developers without sufficient skills Having developers who don’t want to learn anything new Having developers who don’t have courage to say “NO, we can’t do that in that time” Having Architects who abandoned coding years ago Having Architects who think they know everything and don’t need to listen to their developers pain Having Managers who just blame the developers for not meeting the imposed deadlines without ever asking the  developers for time-linesThese are the real problems which are really causing the project failures. Now do you think just moving to MicroServices architecture saves the IT without fixing these problems? Continuously innovating new ways of doing things is awesome and is required to move ahead. At the same time assuming “the next cool methodology/technology will fix all the problems is also wrong”.So those of you who are just about to jump on MicroServices boat..THINK. FIX THE REAL PROBLEMS FIRST. You can’t fill a bottle which has a hole at it’s bottom.Reference: Can MicroServices Architecture Solve All Your Problems? from our JCG partner Siva Reddy at the My Experiments on Technology blog....

What I look for in frameworks

In every project the discussion comes up over and over again: should we use framework X? or Y? or no framework at all? Even when you limit yourself to the frameworks for web development in the Java space the choices are so plentiful, nobody can know them all. So I need a quick way do identify which frameworks sound promising to me and which I keep for weekend projects.            Stay away from the new kid on the block. While it might be fun to play with the coolest newest thing, I work on projects that have a life cycle of 10-30 years. I wouldn’t want to support an application using some library that was cool between March and July in 1996. Therefore I try not to put that kind of burden on others. Do one thing and do it well. A bad example for this is Hibernate/JPA. It does (or tries to) provide all of the followingmapping between a relational and an object-oriented model caching change detection caching query dslIt is kind of ok for a framework or library to provide multiple services, if you can decide on each service separately if you want to use it or not. But if it controls to many aspects of your project, the chance that it doesn’t do anything well gets huge. And you won’t be able to exchange it easily, because now you have to replace half a dozen libraries at once. Method calls are cool. Annotations are ok. Byte code manipulation is scary. Code generation a reason to run for the hills. In the list only method calls can be abstracted over properly. All the other stuff tends to get in your way. Annotations are kind of harmless, but it is easy to get in situations where you have more annotations than actual code. Byte code manipulation starts to put some serious constraints on what you can do in your code. And code generation additional slows down your build process. Keep the fingers of my domain model. The domain model is really the important part of an application. I can change the persistence or the ui of an application, but if I have to rework the domain model, everything changes and I’m essential rewriting the application. Also I need all the flexibility the programming language of choice offers to design the domain model. I don’t want to get restricted by some stupid framework that requires default constructors or getters and setters for all fields. Can we handle it? There are many things that sound really awesome, but they require a so different style of coding, that many developers will have a hard time tackling it. And just because I think I can handle it, doesn’t necessarily mean I actually can. So better stay simple and old-fashioned.Reference: What I look for in frameworks from our JCG partner Jens Schauder at the Schauderhaft blog....

Spark: Write to CSV file with header using saveAsFile

In my last blog post I showed how to write to a single CSV file using Spark and Hadoop and the next thing I wanted to do was add a header row to the resulting row. Hadoop’s FileUtil#copyMerge function does take a String parameter but it adds this text to the end of each partition file which isn’t quite what we want. However, if we copy that function into our own FileUtil class we can restructure it to do what we want:       import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.IOUtils; import java.io.IOException;   public class MyFileUtil { public static boolean copyMergeWithHeader(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf, String header) throws IOException { dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false); if(!srcFS.getFileStatus(srcDir).isDir()) { return false; } else { FSDataOutputStream out = dstFS.create(dstFile); if(header != null) { out.write((header + "\n").getBytes("UTF-8")); }   try { FileStatus[] contents = srcFS.listStatus(srcDir);   for(int i = 0; i < contents.length; ++i) { if(!contents[i].isDir()) { FSDataInputStream in = srcFS.open(contents[i].getPath());   try { IOUtils.copyBytes(in, out, conf, false);   } finally { in.close(); } } } } finally { out.close(); }   return deleteSource?srcFS.delete(srcDir, true):true; } }   private static Path checkDest(String srcName, FileSystem dstFS, Path dst, boolean overwrite) throws IOException { if(dstFS.exists(dst)) { FileStatus sdst = dstFS.getFileStatus(dst); if(sdst.isDir()) { if(null == srcName) { throw new IOException("Target " + dst + " is a directory"); }   return checkDest((String)null, dstFS, new Path(dst, srcName), overwrite); }   if(!overwrite) { throw new IOException("Target " + dst + " already exists"); } } return dst; } } We can then update our merge function to call this instead: def merge(srcPath: String, dstPath: String, header:String): Unit = { val hadoopConfig = new Configuration() val hdfs = FileSystem.get(hadoopConfig) MyFileUtil.copyMergeWithHeader(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, header) } We call merge from our code like this: merge(file, destinationFile, "type,count") I wasn’t sure how to import my Java based class into the Spark shell so I compiled the code into a JAR and submitted it as a job instead: $ sbt package [info] Loading global plugins from /Users/markneedham/.sbt/0.13/plugins [info] Loading project definition from /Users/markneedham/projects/spark-play/playground/project [info] Set current project to playground (in build file:/Users/markneedham/projects/spark-play/playground/) [info] Compiling 3 Scala sources to /Users/markneedham/projects/spark-play/playground/target/scala-2.10/classes... [info] Packaging /Users/markneedham/projects/spark-play/playground/target/scala-2.10/playground_2.10-1.0.jar ... [info] Done packaging. [success] Total time: 8 s, completed 30-Nov-2014 08:12:26   $ time ./bin/spark-submit --class "WriteToCsvWithHeader" --master local[4] /path/to/playground/target/scala-2.10/playground_2.10-1.0.jar Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Spark's default log4j profile: org/apache/spark/log4j-defaults.propertie ... 14/11/30 08:16:15 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 14/11/30 08:16:15 INFO SparkContext: Job finished: saveAsTextFile at WriteToCsvWithHeader.scala:49, took 0.589036 s   real 0m13.061s user 0m38.977s sys 0m3.393s And if we look at our destination file: $ cat /tmp/singlePrimaryTypes.csv type,count THEFT,859197 BATTERY,757530 NARCOTICS,489528 CRIMINAL DAMAGE,488209 BURGLARY,257310 OTHER OFFENSE,253964 ASSAULT,247386 MOTOR VEHICLE THEFT,197404 ROBBERY,157706 DECEPTIVE PRACTICE,137538 CRIMINAL TRESPASS,124974 PROSTITUTION,47245 WEAPONS VIOLATION,40361 PUBLIC PEACE VIOLATION,31585 OFFENSE INVOLVING CHILDREN,26524 CRIM SEXUAL ASSAULT,14788 SEX OFFENSE,14283 GAMBLING,10632 LIQUOR LAW VIOLATION,8847 ARSON,6443 INTERFERE WITH PUBLIC OFFICER,5178 HOMICIDE,4846 KIDNAPPING,3585 INTERFERENCE WITH PUBLIC OFFICER,3147 INTIMIDATION,2471 STALKING,1985 OFFENSES INVOLVING CHILDREN,355 OBSCENITY,219 PUBLIC INDECENCY,86 OTHER NARCOTIC VIOLATION,80 RITUALISM,12 NON-CRIMINAL,12 OTHER OFFENSE ,6 NON - CRIMINAL,2 NON-CRIMINAL (SUBJECT SPECIFIED),2 Happy days!The code is available as a gist if you want to see all the details.Reference: Spark: Write to CSV file with header using saveAsFile from our JCG partner Mark Needham at the Mark Needham Blog blog....

Developing with WSO2

Since a few months I am back working with WSO2 products. In the upcoming posts I describe some of the (small) issues I ran into and how to solve them. The first thing I did when setting up my development environment was downloading the Developer Studio (64-bit version) on my Mac. After unzipping the downloaded zip file you see the Eclipse icon like this:    However after double clicking the icon Eclipse doesn’t start… Great way to start with the product ! Although this issue is described on their installation page this isn’t exactly the information I read before starting a Java IDE like Eclipse. But it turns out that you have to adjust the file permissions for the Studio to work on a Mac. Just open up a Terminal, browse to the installation directory and modify the permission like this:After fixing the permission you will see the familiair Eclipse applicationafter double clicking the icon:Another small issue you just have to know how to work with it is with the Management Console that comes with the WSO2 product. As an example here is the Management Console for the WSO2 ESB:Now when you want to edit the source of a certain item in the ESB you can do this by going to the source view of that item like this:Now if you want to increase the size of the source box you shouldn’t try to drag the right corner but just click it once and resize the box:Now you can see more of the source view like this:Once you know it it is so easy !Reference: Developing with WSO2 from our JCG partner Pascal Alma at the The Pragmatic Integrator blog....

The New Agile–The beginning

Frequent readers of this blog know I’m a big proponent of agile, with a small ”a”. I tend to scoff at new big ideas that seem to jump out of a marketing committee. Truth is, sometimes these are good ideas. Sometimes they are first versions of trials and errors. When we look at them at the current stage they may seem ridiculous, but if someone get exposed to them, and continues the push, we may get something wonderful. Much like quality, we get the real judgment after the fact, sometimes years later. Therefore, I’ve decided to put some of my cynicism aside, and try to look at things in the agile world that have happened since the agile manifesto – critically. The collection of the posts called “The New Agile” stood behind the making of my “The New Agile” presentation. I presented different topics in the agile world. Some of the information will be accompanied with my observations, and some will be just an encouragement to you to study more if they interest you. There will be recommendation, but especially around what interests me. It may not be the same for everyone. Before we start though, we need to look back at how we got here. And like many examples,  I begin by going back to the agile manifesto. “We are uncovering better ways of developing software, by doing it and helping others do it”.17 smart people who went skiing in Snowbird, Utah in 2001 applied science to software development in one sentence. The idea is that we don’t know everything, but we continue to try, learn, fail and try again. The idea is that if more people do it, we’ll increase our learning. And the idea that software development is not a single process, but different paths, some we didn’t even discover yet. Great ideas. Those are still true today, and therefore what we’ve already learned may not be the best way to develop software. It may seem, for example, that after almost 20 years of TDD, we couldn’t find anything better, and therefore it is a best practice. But we may find something better tomorrow. Those things didn’t start at 2001. This was when they just go a name. If we go back in history, we can see different people learning and applying lean principles, before they got their name. Take W. Edwards Deming, for example. After WWII, he went to Japan and formalized the basis of agile: The  PDCA cycle: Plan, do, check and adjust. It’s the basis of continuous learning and improvement. Deming put in place the lean foundation that Toyota would base their manufacturing processes on. It may come as a shock to you after years of saying “developing software is not like building bridges” that it really is, if you’re building the bridges correctly. Ideas like a line-worker that can stop the line, is the origin of continuous integration. Don’t believe me? In a working CI, what happens when the build breaks? Fixing it is the biggest priority. And it’s not a managerial decree. It’s a culture where the developers stop what they are doing and getting the build on track again, and their decisions is supported by management. Just like when a line worker stops the production line. Quality is first and the culture supports it. So we had those ideas floating around, but because until the 90s we didn’t have an actual internet, ideas were not exchanged in the rate they do today. Communication was still limited. So when people like Ken Schwaber and Jeff Sutherland started working on scrum, Kent Beck and Ron Jeffries on eXtreme Programming, Alistair Cockborn on Crystal, they were “just” working with teams. The different names came to support the consultants marketing efforts, but at its base – it was “just” working with teams. The way they exchanged ideas was the old way: papers, books and conferences. You should remember that there weren’t many conferences about those things then. There were mostly programmer conferences  (nobody dared thinking about conferences for testers then). There were business management conferences, but they weren’t really interested with what the developers were playing with. And then the internet happened. Much like with literacy and then print, the internet gave knowledge ways to explode and reach exponentially larger audience. Finally ideas “scaled’. They were compared to each other, discussed, confronted, and applied. Some successfully, some poorly, some deviously. A few years after the agile manifesto, the business world was ready to stop and hear about some development methodology with a name that came from rugby. In most cases, we’d say the rest is history. But this was only the beginning.Reference: The New Agile–The beginning from our JCG partner Gil Zilberfeld at the Geek Out of Water blog....

ReactiveMongo with Akka, Scala and websockets

I was looking for a simple websocket server for one of my projects to test some stuff with reactive mongo. When looking around, though, I couldn’t really find a simple basic implementation without including a complete framework. Finally I stumbled upon one of Typesage activtor projects: http://typesafe.com/activator/template/akka-spray-websocket. Even though the name implies that spray is required, it actually uses websocket stuff from here: https://github.com/TooTallNate/Java-WebSocket, which provides a very simple to use basic websocket implementation. So in this article I’ll show you how you can setup a very simple websocket server (without requiring additional frameworks), together with Akka and ReactiveMongo. The following screenshots shows what we’re aiming for:In this screenshot you can see a simple websocket client that talks to our server. Our server has the following functionality:Anything the client sends is echo’d back. Any input added to a specific (capped) collection in mongoDB is automatically pushed towards all the listeners.You can cut and paste all the code from this article, but it is probably easier to just get the code from git. You can find it in github here: https://github.com/josdirksen/smartjava/tree/master/ws-akka Getting started The first thing we need to do is setup our workspace, so lets start by looking at the sbt configuration: organization := "org.smartjava"   version := "0.1"   scalaVersion := "2.11.2"   scalacOptions := Seq("-unchecked", "-deprecation", "-encoding", "utf8")   libraryDependencies ++= { val akkaV = "2.3.6" Seq( "com.typesafe.akka" %% "akka-actor" % akkaV, "org.java-websocket" % "Java-WebSocket" % "1.3.1-SNAPSHOT", "org.reactivemongo" %% "reactivemongo" % "" ) }   resolvers ++= Seq("Code Envy" at "http://codenvycorp.com/repository/" ,"Typesafe" at "http://repo.typesafe.com/typesafe/releases/") Nothing special here, we just specify our dependencies and add some resolvers so that sbt knows where to retrieve the dependencies from. Before we look at the code lets first look at the directory structure and the file of our project: ├── build.sbt └── src    └── main       ├── resources       │   ├── application.conf       │   └── log4j2.xml       └── scala       ├── Boot.scala       ├── DB.scala       ├── WSActor.scala       └── WSServer.scala In the src/main/resources directory we store our configuration files and in src/main/scala we store all our scala files. Let start by looking at the configuration files. For this project we use two: The Application.conf file contains our project’s configuration and looks like this: akka { loglevel = "DEBUG" }   mongo { db = "scala" collection = "rmongo" location = "localhost" }   ws-server { port = 9999 } As you can see we just define the log level, how to use mongo and on which port we want our websocket server to listen. And we also need a log4j2.xml file since the reactivemongo library uses that one for logging: <?xml version="1.0" encoding="UTF-8"?> <Configuration status="INFO"> <Appenders> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> </Console> </Appenders> <Loggers> <Root level="INFO"> <AppenderRef ref="Console"/> </Root> </Loggers> </Configuration> So, with the boring stuff out of the way lets look at the scala files. Starting the websocket server and registering the paths The Boot.scala file looks like this: package org.smartjava   import akka.actor.{Props, ActorSystem}   /** * This class launches the system. */ object Boot extends App { // create the actor system implicit lazy val system = ActorSystem("ws-system") // setup the mongoreactive connection implicit lazy val db = new DB(Configuration.location, Configuration.dbname);   // we'll use a simple actor which echo's everything it finds back to the client. val echo = system.actorOf(EchoActor.props(db, Configuration.collection), "echo")   // define the websocket routing and start a websocket listener private val wsServer = new WSServer(Configuration.port) wsServer.forResource("/echo", Some(echo)) wsServer.start   // make sure the actor system and the websocket server are shutdown when the client is // shutdown sys.addShutdownHook({system.shutdown;wsServer.stop}) }   // load configuration from external file object Configuration { import com.typesafe.config.ConfigFactory   private val config = ConfigFactory.load config.checkValid(ConfigFactory.defaultReference)   val port = config.getInt("ws-server.port") val dbname = config.getString("mongo.db") val collection = config.getString("mongo.collection") val location = config.getString("mongo.location") } In this source file we see two objects. The Configuration object allows us to easily access the configuration elements from the application.conf file and the Boot object will start our server. The comments in the code should pretty much explain what is happening, but let me point out the main things:We create an Akka actor system and a connection to our mongoDB instance. We define an actor which we can register to a specific websocket path. Then we create and start the websocketserver and register a path to the actor we just created. Finally we register a shutdown hook, to clean everything up.And that’s it. Now lets look at the interesting part of the code. Next up is the WSServer.scala file. Setting up a websocket server In the WSServer.scala file we define the websocket server. package org.smartjava   import akka.actor.{ActorSystem, ActorRef} import java.net.InetSocketAddress import org.java_websocket.WebSocket import org.java_websocket.framing.CloseFrame import org.java_websocket.handshake.ClientHandshake import org.java_websocket.server.WebSocketServer import scala.collection.mutable.Map import akka.event.Logging   /** * The WSserver companion objects defines a number of distinct messages sendable by this component */ object WSServer { sealed trait WSMessage case class Message(ws : WebSocket, msg : String) extends WSMessage case class Open(ws : WebSocket, hs : ClientHandshake) extends WSMessage case class Close(ws : WebSocket, code : Int, reason : String, external : Boolean) extends WSMessage case class Error(ws : WebSocket, ex : Exception) extends WSMessage }   /** * Create a websocket server that listens on a specific address. * * @param port */ class WSServer(val port : Int)(implicit system : ActorSystem, db: DB ) extends WebSocketServer(new InetSocketAddress(port)) {   // maps the path to a specific actor. private val reactors = Map[String, ActorRef]() // setup some logging based on the implicit passed in actorsystem private val log = Logging.getLogger(system, this);   // Call this function to bind an actor to a specific path. All incoming // connections to a specific path will be routed to that specific actor. final def forResource(descriptor : String, reactor : Option[ActorRef]) { log.debug("Registring actor:" + reactor + " to " + descriptor); reactor match { case Some(actor) => reactors += ((descriptor, actor)) case None => reactors -= descriptor } }   // onMessage is called when a websocket message is recieved. // in this method we check whether we can find a listening // actor and forward the call to that. final override def onMessage(ws : WebSocket, msg : String) {   if (null != ws) { reactors.get(ws.getResourceDescriptor) match { case Some(actor) => actor ! WSServer.Message(ws, msg) case None => ws.close(CloseFrame.REFUSE) } } }   final override def onOpen(ws : WebSocket, hs : ClientHandshake) { log.debug("OnOpen called {} :: {}", ws, hs); if (null != ws) { reactors.get(ws.getResourceDescriptor) match { case Some(actor) => actor ! WSServer.Open(ws, hs) case None => ws.close(CloseFrame.REFUSE) } } }   final override def onClose(ws : WebSocket, code : Int, reason : String, external : Boolean) { log.debug("Close called {} :: {} :: {} :: {}", ws, code, reason, external); if (null != ws) { reactors.get(ws.getResourceDescriptor) match { case Some(actor) => actor ! WSServer.Close(ws, code, reason, external) case None => ws.close(CloseFrame.REFUSE) } } } final override def onError(ws : WebSocket, ex : Exception) { log.debug("onError called {} :: {}", ws, ex); if (null != ws) { reactors.get(ws.getResourceDescriptor) match { case Some(actor) => actor ! WSServer.Error(ws, ex) case None => ws.close(CloseFrame.REFUSE) } } } } A large source file, but not difficult to understand. Let me explain the core concepts:We first define a number of messages as case classes. These are the messages that we sent to our actors. They reflect the messages our websocket server can receive from a client. The WSServer itself extends from the WebSocketServer provided by the org.java_websocket library. The WSServer defines one additional function called forResource. With this function we define which actor to call when we receive a message on our websocket server. and finally we override the different on* methods which are called when a specific event happens on to our websocket server.Now lets look at the echo functionality The akka echo actor The echo actor has two roles in this scenario. First it provides the functionality to respond to incoming messages by responding with the same message. Besides that it also creates a child actor (named ListenActor) that handles the documents received from mongoDB. object EchoActor {   // Messages send specifically by this actor to another instance of this actor. sealed trait EchoMessage   case class Unregister(ws : WebSocket) extends EchoMessage case class Listen() extends EchoMessage; case class StopListening() extends EchoMessage   def props(db: DB): Props = Props(new EchoActor(db)) }   /** * Actor that handles the websocket request */ class EchoActor(db: DB) extends Actor with ActorLogging { import EchoActor._   val clients = mutable.ListBuffer[WebSocket]() val socketActorMapping = mutable.Map[WebSocket, ActorRef]()   override def receive = {   // receive the open request case Open(ws, hs) => { log.debug("Received open request. Start listening for ", ws) clients += ws   // create the child actor that handles the db listening val targetActor = context.actorOf(ListenActor.props(ws, db));   socketActorMapping(ws) = targetActor; targetActor ! Listen }   // recieve the close request case Close(ws, code, reason, ext) => { log.debug("Received close request. Unregisting actor for url {}", ws.getResourceDescriptor)   // send a message to self to unregister self ! Unregister(ws) socketActorMapping(ws) ! StopListening socketActorMapping remove ws; }   // recieves an error message case Error(ws, ex) => self ! Unregister(ws)   // receives a text message case Message(ws, msg) => { log.debug("url {} received msg '{}'", ws.getResourceDescriptor, msg) ws.send("You send:" + msg); }   // unregister the websocket listener case Unregister(ws) => { if (null != ws) { log.debug("unregister monitor") clients -= ws } } } } The code of this actor pretty much should explain itself. With this actor and the code so far we’ve got a simple websocket server that uses an actor to handle messages. Before we look at the ListenActor, which is started from the “Open” message received by the EchoHandler, lets quickly look at how we connect to mongoDB from our DB object: package org.smartjava;   import play.api.libs.iteratee.{Concurrent, Enumeratee, Iteratee} import reactivemongo.api.collections.default.BSONCollection import reactivemongo.api._ import reactivemongo.bson.BSONDocument import scala.concurrent.ExecutionContext.Implicits.global   /** * Contains DB related functions. */ class DB(location:String, dbname:String) {   // get connection to the database val db: DefaultDB = createConnection(location, dbname) // create a enumerator that we use to broadcast received documents val (bcEnumerator, channel) = Concurrent.broadcast[BSONDocument] // assign the channel to the mongodb cursor enumerator val iteratee = createCursor(getCollection(Configuration.collection)) .enumerate() .apply(Iteratee .foreach({doc: BSONDocument => channel.push(doc)}));   /** * Return a simple collection */ private def getCollection(collection: String): BSONCollection = { db(collection) }   /** * Create the connection */ private def createConnection(location: String, dbname: String) : DefaultDB = { // needed to connect to mongoDB. import scala.concurrent.ExecutionContext   // gets an instance of the driver // (creates an actor system) val driver = new MongoDriver val connection = driver.connection(List(location))   // Gets a reference to the database connection(dbname) }   /** * Create the cursor */ private def createCursor(collection: BSONCollection): Cursor[BSONDocument] = { import reactivemongo.api._ import reactivemongo.bson._ import scala.concurrent.Future   import scala.concurrent.ExecutionContext.Implicits.global   val query = BSONDocument( "currentDate" -> BSONDocument( "$gte" -> BSONDateTime(System.currentTimeMillis()) ));   // we enumerate over a capped collection val cursor = collection.find(query) .options(QueryOpts().tailable.awaitData) .cursor[BSONDocument]   return cursor }   /** * Simple function that registers a callback and a predicate on the * broadcasting enumerator */ def listenToCollection(f: BSONDocument => Unit, p: BSONDocument => Boolean ) = {   val it = Iteratee.foreach(f) val itTransformed = Enumeratee.takeWhile[BSONDocument](p).transform(it); bcEnumerator.apply(itTransformed); } } Most of this code is fairly standard, but I’d like to point a couple of things out. At the beginning of this class we set up an iteratee like this: val db: DefaultDB = createConnection(location, dbname) val (bcEnumerator, channel) = Concurrent.broadcast[BSONDocument] val iteratee = createCursor(getCollection(Configuration.collection)) .enumerate() .apply(Iteratee .foreach({doc: BSONDocument => channel.push(doc)})); What we do here is that we first create a broadcast enumerator using the Concurrent.broadcast function. This enumerator can push elements provided by the channel to multiple consumers (iteratees). Next we create an iteratee on the enumerator provided by our ReactiveMongo cursor, where we use the just created channel to pass the documents to any iteratee that is connected to the bcEnumerator. We connect iteratees to the bcEnumerator in the listenToCollection function: def listenToCollection(f: BSONDocument => Unit, p: BSONDocument => Boolean ) = {   val it = Iteratee.foreach(f) val itTransformed = Enumeratee.takeWhile[BSONDocument](p).transform(it); bcEnumerator.apply(itTransformed); } In this function we pass in a function and a predicate. The function is executed whenever a document is added to mongo and the predicate is used to determine when to stop sending messages to the iteratee. The only missing part is the ListenActor ListenActor which responds to messages from Mongo The following code shows the actor responsible for responding to messages from mongoDB. When it receives a Listen message it registers itself using the listenToCollection function. Whenever a message is passed in from mongo it sends a message to itself, to further propogate it to the websocket. object ListenActor { case class ReceiveUpdate(msg: String); def props(ws: WebSocket, db: DB): Props = Props(new ListenActor(ws, db)) } class ListenActor(ws: WebSocket, db: DB) extends Actor with ActorLogging {   var predicateResult = true;   override def receive = { case Listen => {   log.info("{} , {} , {}", ws, db)   // function to call when we receive a message from the reactive mongo // we pass this to the DB cursor val func = ( doc: BSONDocument) => { self ! ReceiveUpdate(BSONDocument.pretty(doc)); }   // the predicate that determines how long we want to retrieve stuff // we do this while the predicateResult is true. val predicate = (d: BSONDocument) => {predicateResult} :Boolean Some(db.listenToCollection(func, predicate)) }   // when we recieve an update we just send it over the websocket case ReceiveUpdate(msg) => { ws.send(msg); }   case StopListening => { predicateResult = false;   // and kill ourselves self ! PoisonPill } } } Now that we’ve done all that, we can run this example. On startup you’ll see something like this: [DEBUG] [11/22/2014 15:14:33.856] [main] [EventStream(akka://ws-system)] logger log1-Logging$DefaultLogger started [DEBUG] [11/22/2014 15:14:33.857] [main] [EventStream(akka://ws-system)] Default Loggers started [DEBUG] [11/22/2014 15:14:35.104] [main] [WSServer(akka://ws-system)] Registring actor:Some(Actor[akka://ws-system/user/echo#1509664759]) to /echo 15:14:35.211 [reactivemongo-akka.actor.default-dispatcher-5] INFO reactivemongo.core.actors.MongoDBSystem - The node set is now available 15:14:35.214 [reactivemongo-akka.actor.default-dispatcher-5] INFO reactivemongo.core.actors.MongoDBSystem - The primary is now available Next when we connect a websocket we see the following:[DEBUG] [11/22/2014 15:15:18.957] [WebSocketWorker-32] [WSServer(akka://ws-system)] OnOpen called org.java_websocket.WebSocketImpl@3161f479 :: org.java_websocket.handshake.HandshakeImpl1Client@6d9a6e19 [DEBUG] [11/22/2014 15:15:18.965] [ws-system-akka.actor.default-dispatcher-2] [akka://ws-system/user/echo] Received open request. Start listening for WARNING arguments left: 1 [INFO] [11/22/2014 15:15:18.973] [ws-system-akka.actor.default-dispatcher-5] [akka://ws-system/user/echo/$a] org.java_websocket.WebSocketImpl@3161f479 , org.smartjava.DB@73fd64 Now lets insert a message into the mongo collection which we created with the following command: db.createCollection( "rmongo", { capped: true, size: 100000 } ) And lets insert an message: > db.rmongo.insert({"test": 1234567, "currentDate": new Date()}) WriteResult({ "nInserted" : 1 }) Which results in this in our websocket client:If you’re interested in the source files look at the following directory in GitHub: https://github.com/josdirksen/smartjava/tree/master/ws-akkaReference: ReactiveMongo with Akka, Scala and websockets from our JCG partner Jos Dirksen at the Smart Java blog....
Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy | Contact
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.
Do you want to know how to develop your skillset and become a ...
Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

Get ready to Rock!
You can download the complementary eBooks using the links below: