Enterprise Java

Apache Mesos : Writing your own distributed frameworks

In the previous post, we saw what mesos is, how is it useful and getting started with it. In this post, we shall see how to write your own framework on mesos. (In mesos, a framework is any application running on it.) This post explains about a framework called “mesos-pinspider” which fetches the user profile information and user board information of a pinterest page of a user.

Mesos Framework

In general, a Mesos framework has three basic components.

  • Driver which submits the tasks to the framework
  • Scheduler which registers with the master to be offered resources, takes the tasks and runs them on executor
  • Executor process that is launched on slave nodes to run the framework’s tasks

Pinspider Framework Example

You may check the code here on github. Let’s break it down to PinDriver, PinScheduler and Pin UserProfileExecutor.


The driver component of the framework is PinDriver.

  • Create Executor Info

    Describe the information about the executor using the Builder pattern and mesos use Google Protocol Buffers for the data interchange. Here, we need to set the executorID, command which is basically a shell command, executed via: ‘/bin/sh -c value’. Any URIs specified are fetched before executing the command. The name is set by setName(). The source is set by
    setSource(), an identifier style string used by frameworks to track the source of an executor. This is useful when it’s possible for different executor ids to be related semantically.

    Protos.ExecutorInfo userProfileExecutorInfo = Protos.ExecutorInfo.newBuilder().setExecutorId(Protos.ExecutorID.newBuilder().setValue("PinUserProfileExecutor")).setCommand(commandInfoUserProfile).setName("PinUserProfileExecutor Java").setSource("java").build();
  • Create Framework Info

    Describe the framework information. The user field is used to determine the Unix user that an executor/task should be launched as. If the user field is set to an empty string Mesos will auto-magically set it to the current user. The amount of time that the master will wait for the scheduler to fail-over before removing the framework is specified by
    setFailoverTimeout(). The name of the framework is set by setName()

    Protos.FrameworkInfo.Builder frameworkBuilder = Protos.FrameworkInfo.newBuilder().setFailoverTimeout(120000).setUser("").setName("Pinspider Framework");
  • Instantiate Scheduler

    You need to instantiate the Scheduler with the number of tasks that needs to be submitted for the executor to run.

    Scheduler scheduler = args.length == 1 ?
    	new PinScheduler(userProfileExecutorInfo,userBoardExecutorInfo) :
    	new PinScheduler(userProfileExecutorInfo, userBoardExecutorInfo, Integer.parseInt(args[1]), args[2]);

    Note: Please note that two ExecutorInfo are used ie. one for fetching user profile information and the other one for user board information for demonstration. This explanation involves only one executorinfo – userProfileExecutorInfo

  • Starting the mesos scheduler driver.

    MesosSchedulerDriver is an implementation of SchedulerDriver which is an abstract interface to connect scheduler to mesos. This is done by managing the life-cycle of the scheduler ( start, stop and wait for tasks to finish) and also to interact with Mesos (launch tasks, kill tasks etc).

    MesosSchedulerDriver schedulerDriver =
    	new MesosSchedulerDriver(scheduler,frameworkBuilder.build(), args[0]);
    int status = schedulerDriver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1;schedulerDriver.stop();

Executor Implementation

The Executor component of the framework is PinUserProfileExecutor.

Executor is a callback interface which is implemented by frameworks’ executors. In our implementation, let us concentrate on launchTask()

@Override public void launchTask(final ExecutorDriver executorDriver
final Protos.TaskInfo taskInfo) { 

  • Set the task status by setting the ID and the state with a builder pattern.
    Protos.TaskStatus taskStatus =
  • Send the status update to the framework scheduler retrying as necessary until an acknowledgement has been received or the executor is terminated, in which case, a TASK_LOST status update will be sent.
  • Get the data from the tasks and run your logic.
    try {
    	message = ("userprofile :" + getUserProfileInfo(url)).getBytes();
    } catch (IOException e) {
    	LOGGER.error("Error parsing the Pinterest URL :" + e.getMessage());
  • Send the framework the message.
  • Mark the state of the task as finished and send the status update to the framework scheduler.
    taskStatus = Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId())
  • main() method to create an instance of MesosExecutorDriver and run
    mesosExecutorDriver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1

Scheduler Implementation

The Scheduler component of the framework is Pin Scheduler.

Scheduler is a callback interface to be implemented by frameworks’ schedulers. In our implemenation, let us concentrate on resourceOffers(), statusUpdate() and frameworkMessage()

  • Constructor : construct with the executor information and the number of launch tasks.
    public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor , Protos.ExecutorInfo pinUserBoardExecutor ) {
    	this(pinUserProfileExecutor,pinUserBoardExecutor, 5, "http://www.pinterest.com/techcrunch");
    public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor,Protos.ExecutorInfo pinUserBoardExecutor,  int totalTasks, String url) { 
    	this.pinUserProfileExecutor = pinUserProfileExecutor;
    	this.pinUserBoardExecutor = pinUserBoardExecutor;
    	this.totalTasks = totalTasks; this.crawlQueue =
    		Collections.synchronizedList(new ArrayList<String>());
  • Resource Offers
    • A resource offer can be resources like CPU, memory etc. From the offers list, get the scalar value of the resources. We need to give our requirements of resources for the tasks while setting the task info.
      for (Protos.Offer offer : list) {
      	List<Protos.TaskInfo> taskInfoList = new ArrayList<Protos.TaskInfo>();
      	double offerCpus = 0; double offerMem = 0;
      	for (Protos.Resource resource : offer.getResourcesList()) {
      		if (resource.getName().equals("cpus")) {
      			offerCpus += resource.getScalar().getValue();
      		else if (resource.getName().equals("mem")) {
      			offerMem += resource.getScalar().getValue();
      	LOGGER.info("Received Offer : " + offer.getId().getValue() +
      		" with cpus = " + offerCpus + " and mem =" + offerMem);
    • Create task ID.
      Protos.TaskID taskID = Protos.TaskID.newBuilder().setValue(Integer.toString(launchedTasks++)).build();
    • Create task info by setting task ID, adding resources, setting data and setting executor.
      Protos.TaskInfo pinUserProfileTaskInfo = Protos.TaskInfo.newBuilder().setName("task " + taskID.getValue())
    • Launch the tasks through the SchedulerDriver.
      schedulerDriver.launchTasks(offer.getId(), taskInfoList);
  • Status update

    This is invoked when the status of a task has changed ie., a slave is lost and so the task is lost, a task finishes and an executor sends a status update saying so.

    @Override public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) {
    • Stop the SchedulerDriver if tasks are finished
      if (taskStatus.getState() == Protos.TaskState.TASK_FINISHED) {
      	LOGGER.info("Finished tasks : " + finishedTasks);
      	if (finishedTasks == totalTasks) {
    • Abort the SchedulerDriver if the tasks are killed, lost or failed
      if (taskStatus.getState() == Protos.TaskState.TASK_FAILED
      || taskStatus.getState() == Protos.TaskState.TASK_KILLED
      || taskStatus.getState() == Protos.TaskState.TASK_LOST) {
      	LOGGER.error("Aborting because the task " + taskStatus.getTaskId().getValue() +
      		" is in unexpected state : " + taskStatus.getState().getValueDescriptor().getName() +
      		"with reason : " + taskStatus.getReason().getValueDescriptor().getName()
      	+ " from source : " + taskStatus.getSource().getValueDescriptor().getName() + " with message : " + taskStatus.getMessage());
  • Framework Message

    This is invoked when an executor sends a message.

    • Handle your message
      @Override public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID,
      Protos.SlaveID slaveID, byte[] bytes) {
      	String data = new String(bytes);
      	LOGGER.info("User Profile Information : " + data);

Complete code is available here with the instructions to run and sample output.

Swathi V

Loves Art and Technology! Would like to blog and share.. Involved in Apache Hadoop and its ecosystem. Eager to be a part of Big Data Revolution.
Notify of

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Inline Feedbacks
View all comments
Back to top button