Home » Java » Core Java » Using a reactive stream as a data source for Drools

About Geoffrey De Smet

Geoffrey De Smet
Geoffrey De Smet (Red Hat) is the lead and founder of OptaPlanner. Before joining Red Hat in 2010, he was formerly employed as a Java consultant, an A.I. researcher and an enterprise application project lead. He has contributed to many open source projects (such as drools, jbpm, pressgang, spring-richclient, several maven plugins, weld, arquillian, ...). Since he started OptaPlanner in 2006, he’s been passionately addicted to planning optimization.

Using a reactive stream as a data source for Drools

A few months ago we started redesigning the Drools lowest level executable model and making it accessible to end user with a Java 8 API. To demonstrate the flexibility of this approach I tried to integrate it with a reactive stream and in particular to use this stream as a data source for Drools.

To show how this works I created a simple temperature server that provides a RxJava Observable emitting every second the temperature for a given town and terminates after 5 seconds. There is also a second factory method that allows to merge more of these Observables in order to have a single Observable that emits the temperature for more than one town at the same time.

public class TempServer {
    public static Observable<TempInfo> getFeed(String town) {
        return Observable.create(subscriber ->
                                         Observable.interval(1, TimeUnit.SECONDS)
                                                   .subscribe(i -> {
                                                       if (i > 5) subscriber.onCompleted();
                                                       try {
                                                           subscriber.onNext(TempInfo.fetch(town));
                                                       } catch (Exception e) {
                                                           subscriber.onError(e);
                                                       }
                                                   }));
    }

    public static Observable<TempInfo> getFeeds(String... towns) {
        return Observable.merge(Arrays.stream(towns)
                                      .map(TempServer::getFeed)
                                      .collect(toList()));
    }
}

where the TempInfo.fetch method just returns a random temperature between -20 and 50 degrees

public TempInfo(String town, int temp) {
    this.town = town;
    this.temp = temp;
}

public static TempInfo fetch(String town) {
    return new TempInfo(town, random.nextInt(70) - 20);
}

Using an improved version of the Java 8 DSL presented in the former article I defined the following 2 rules:

Variable<TempInfo> temp = any( TempInfo.class );
Variable<Person> person = any( Person.class );

Rule r1 = rule("low temp")
        .view(
                subscribe(temp, "tempFeed"),
                expr(temp, t -> t.getTemp() < 0),
                input(person, "persons"),
                expr(person, temp, (p, t) -> p.getTown().equals(t.getTown()))
             )
        .then(on(person, temp)
                      .execute((p, t) -> System.out.println(p.getName() + " is freezing in " + p.getTown() + " - temp is " + t.getTemp())));

Rule r2 = rule("high temp")
        .view(
                subscribe(temp, "tempFeed"),
                expr(temp, t -> t.getTemp() > 30),
                input(person, "persons"),
                expr(person, temp, (p, t) -> p.getTown().equals(t.getTown()))
             )
        .then(on(person, temp)
                      .execute((p, t) -> System.out.println(p.getName() + " is sweating in " + p.getTown() + " - temp is " + t.getTemp())));

Here I’m using 2 different kinds of data sources: a passive one that can be considered a mere store of facts:

DataStore persons = storeOf(new Person("Mark", 37, "London"),
                            new Person("Edson", 35, "Toronto"),
                            new Person("Mario", 40, "Milano"));

that can be bound to a specific Drools KieSession with

bindDataSource(ksession, "persons", persons);

and a reactive one taken from the TempServer implemented above

Observable<TempInfo> tempFeed = TempServer.getFeeds( "Milano", "London", "Toronto" );

that can also be bound to the same KieSession in a similar way

bindRxObservable( ksession, "tempFeed", tempFeed );

Having done this you can fire those 2 rules and obtain an output like the following:

Mark is freezing in London - temp is -9
Edson is sweating in Toronto - temp is 42
Mario is sweating in Milano - temp is 42
Mario is sweating in Milano - temp is 49
Mark is freezing in London - temp is -17
Edson is sweating in Toronto - temp is 40
Edson is sweating in Toronto - temp is 47
Mario is freezing in Milano - temp is -14
Mark is freezing in London - temp is -8
Mark is freezing in London - temp is -17
  • The complete test case to run this example is available here.
(0 rating, 0 votes)
You need to be a registered member to rate this.
1 Comment Views Tweet it!
Do you want to know how to develop your skillset to become a Java Rockstar?
Subscribe to our newsletter to start Rocking right now!
To get you started we give you our best selling eBooks for FREE!
1. JPA Mini Book
2. JVM Troubleshooting Guide
3. JUnit Tutorial for Unit Testing
4. Java Annotations Tutorial
5. Java Interview Questions
6. Spring Interview Questions
7. Android UI Design
and many more ....
I agree to the Terms and Privacy Policy

1
Leave a Reply

avatar
1 Comment threads
0 Thread replies
0 Followers
 
Most reacted comment
Hottest comment thread
1 Comment authors
David Karnok Recent comment authors

This site uses Akismet to reduce spam. Learn how your comment data is processed.

  Subscribe  
newest oldest most voted
Notify of
David Karnok
Guest
David Karnok

The getFeed() can be implemented with standard operators and there is no need to fiddle with create() and forwarding.

return Observable.interval(1, TimeUnit.SECONDS)
.take(5)
.map(i -> TempInfo.fetch(town));