Core Java

Using a reactive stream as a data source for Drools

A few months ago we started redesigning the Drools lowest level executable model and making it accessible to end user with a Java 8 API. To demonstrate the flexibility of this approach I tried to integrate it with a reactive stream and in particular to use this stream as a data source for Drools.

To show how this works I created a simple temperature server that provides a RxJava Observable emitting every second the temperature for a given town and terminates after 5 seconds. There is also a second factory method that allows to merge more of these Observables in order to have a single Observable that emits the temperature for more than one town at the same time.

public class TempServer {
    public static Observable<TempInfo> getFeed(String town) {
        return Observable.create(subscriber ->
                                         Observable.interval(1, TimeUnit.SECONDS)
                                                   .subscribe(i -> {
                                                       if (i > 5) subscriber.onCompleted();
                                                       try {
                                                           subscriber.onNext(TempInfo.fetch(town));
                                                       } catch (Exception e) {
                                                           subscriber.onError(e);
                                                       }
                                                   }));
    }

    public static Observable<TempInfo> getFeeds(String... towns) {
        return Observable.merge(Arrays.stream(towns)
                                      .map(TempServer::getFeed)
                                      .collect(toList()));
    }
}

where the TempInfo.fetch method just returns a random temperature between -20 and 50 degrees

public TempInfo(String town, int temp) {
    this.town = town;
    this.temp = temp;
}

public static TempInfo fetch(String town) {
    return new TempInfo(town, random.nextInt(70) - 20);
}

Using an improved version of the Java 8 DSL presented in the former article I defined the following 2 rules:

Variable<TempInfo> temp = any( TempInfo.class );
Variable<Person> person = any( Person.class );

Rule r1 = rule("low temp")
        .view(
                subscribe(temp, "tempFeed"),
                expr(temp, t -> t.getTemp() < 0),
                input(person, "persons"),
                expr(person, temp, (p, t) -> p.getTown().equals(t.getTown()))
             )
        .then(on(person, temp)
                      .execute((p, t) -> System.out.println(p.getName() + " is freezing in " + p.getTown() + " - temp is " + t.getTemp())));

Rule r2 = rule("high temp")
        .view(
                subscribe(temp, "tempFeed"),
                expr(temp, t -> t.getTemp() > 30),
                input(person, "persons"),
                expr(person, temp, (p, t) -> p.getTown().equals(t.getTown()))
             )
        .then(on(person, temp)
                      .execute((p, t) -> System.out.println(p.getName() + " is sweating in " + p.getTown() + " - temp is " + t.getTemp())));

Here I’m using 2 different kinds of data sources: a passive one that can be considered a mere store of facts:

DataStore persons = storeOf(new Person("Mark", 37, "London"),
                            new Person("Edson", 35, "Toronto"),
                            new Person("Mario", 40, "Milano"));

that can be bound to a specific Drools KieSession with

bindDataSource(ksession, "persons", persons);

and a reactive one taken from the TempServer implemented above

Observable<TempInfo> tempFeed = TempServer.getFeeds( "Milano", "London", "Toronto" );

that can also be bound to the same KieSession in a similar way

bindRxObservable( ksession, "tempFeed", tempFeed );

Having done this you can fire those 2 rules and obtain an output like the following:

Mark is freezing in London - temp is -9
Edson is sweating in Toronto - temp is 42
Mario is sweating in Milano - temp is 42
Mario is sweating in Milano - temp is 49
Mark is freezing in London - temp is -17
Edson is sweating in Toronto - temp is 40
Edson is sweating in Toronto - temp is 47
Mario is freezing in Milano - temp is -14
Mark is freezing in London - temp is -8
Mark is freezing in London - temp is -17
  • The complete test case to run this example is available here.

Geoffrey De Smet

Geoffrey De Smet (Red Hat) is the lead and founder of OptaPlanner. Before joining Red Hat in 2010, he was formerly employed as a Java consultant, an A.I. researcher and an enterprise application project lead. He has contributed to many open source projects (such as drools, jbpm, pressgang, spring-richclient, several maven plugins, weld, arquillian, ...). Since he started OptaPlanner in 2006, he’s been passionately addicted to planning optimization.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

1 Comment
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
David Karnok
David Karnok
8 years ago

The getFeed() can be implemented with standard operators and there is no need to fiddle with create() and forwarding.

return Observable.interval(1, TimeUnit.SECONDS)
.take(5)
.map(i -> TempInfo.fetch(town));

Back to top button