Mesos framework example in Python

The example I am presenting here is built on this blog post by James J Porter. James’ example runs a shell command to echo Hello World. Now, let’s say we want to do some computation and the framework needs to tell the slaves to get the binaries/data from somewhere. How do we do that?

The answer lies in the mesos.proto file:

/**
 * Describes a command, executed via: '/bin/sh -c value'. Any URIs specified
 * are fetched before executing the command.  If the executable field for an
 * uri is set, executable file permission is set on the downloaded file.
 * Otherwise, if the downloaded file has a recognized archive extension
 * (currently [compressed] tar and zip) it is extracted into the executor's
 * working directory. This extraction can be disabled by setting `extract` to
 * false. In addition, any environment variables are set before executing
 * the command (so they can be used to "parameterize" your command).
 */
message CommandInfo {
  message URI {
    required string value = 1;
    optional bool executable = 2;
    optional bool extract = 3 [default = true];
  }

Mesos take cares of downloading and extracting the files; all we need is state where to get it from. Therefore compose the task message like this (in Python):


task = new_task(offer)
uri = task.command.uris.add()
uri.value = "path-to-file"
task.command.value = "command to-run-something"

What exactly do we put as the uri? Looking at the Mesos code here, turns out the URI can point to a local file, HDFS, http, ftp, etc.

Base on these new found knowledge, I put bells and whistles on the hello world framework (actually I took the hello world part away). The example framework here is going to tell the slaves to talk to a web service to get a Python script, use the script to sum up a few numbers, then send the results to the web service. The web service is going to print out the results. When it is done, the framework will stop.

I am going to assume that you already have Mesos setup. Now, the demo.

I have one container running the Mesos master and two containers running the slaves. The master container also runs the web service and the framework.

I am going to open two terminals on the Mesos master. The first terminal runs the Node JS web service:

root@mesos:/example# npm start
> test-service@1.0.0 start /example
> node test-service.js

Example app listening at http://0.0.0.0:3000

And on the second terminal I ran the framework:

root@mesos:/example# python test_framework.py 172.17.0.13
I0326 06:23:25.496966   506 sched.cpp:139] Version: 0.20.1
I0326 06:23:25.501157   509 sched.cpp:235] New master detected at master@172.17.0.13:5050
I0326 06:23:25.502126   509 sched.cpp:243] No credentials provided. Attempting to register without authentication
I0326 06:23:25.505386   511 sched.cpp:409] Framework registered with 20150326-025122-218108332-5050-234-0015
INFO:root:Registered with framework id: value: "20150326-025122-218108332-5050-234-0015"

172.17.0.13 is the address of the Mesos master.

Very soon, the framework receives resource offers from the slaves:

INFO:root:Recieved resource offers: [u'20150326-025122-218108332-5050-234-55', u'20150326-025122-218108332-5050-234-56']
INFO:root:Launching task 9fce2a69-fe75-4191-8964-2a41e8b9dec3 using offer 20150326-025122-218108332-5050-234-55.
...
All tasks done, stopping framework
I0326 06:23:35.602032   515 sched.cpp:747] Stopping framework '20150326-025122-218108332-5050-234-0015'
root@mesos:/example# 

Switching back to the first terminal where the web service is running, the results are already displayed:

root@mesos:/example# npm start

> test-service@1.0.0 start /example
> node test-service.js

Example app listening at http://0.0.0.0:3000
sum of 4 5 6 = 15
sum of 1 2 3 = 6
sum of 7 8 9 = 24

All the code needed to run this demo can be found below. You are going to need to have Python, Nodejs, and npm. Run npm install to get the dependencies for the web service.

test_framework.py


import logging
import uuid
import time

import sys

from mesos.interface import Scheduler
from mesos.native import MesosSchedulerDriver
from mesos.interface import mesos_pb2

logging.basicConfig(level=logging.INFO)

def new_task(offer):
    task = mesos_pb2.TaskInfo()
    id = uuid.uuid4()
    task.task_id.value = str(id)
    task.slave_id.value = offer.slave_id.value
    task.name = "task {}".format(str(id))

    cpus = task.resources.add()
    cpus.name = "cpus"
    cpus.type = mesos_pb2.Value.SCALAR
    cpus.scalar.value = 1

    mem = task.resources.add()
    mem.name = "mem"
    mem.type = mesos_pb2.Value.SCALAR
    mem.scalar.value = 1

    return task


class HelloWorldScheduler(Scheduler):
    def __init__(self):
        #Each task will sum up one set of numbers and return the result to the framework
        self.numbers = ["1 2 3", "4 5 6", "7 8 9"];
        self.counter = 0
        self.finished_tasks = 0
        self.master_ip = sys.argv[1]
        self.webservice_ip = self.master_ip

    def registered(self, driver, framework_id, master_info):
        logging.info("Registered with framework id: {}".format(framework_id))

    def resourceOffers(self, driver, offers):
        logging.info("Recieved resource offers: {}".format([o.id.value for o in offers]))
        for offer in offers:
            if self.counter < len(self.numbers):
                task = new_task(offer)
                #The slave is going to download a python script from our nodejs web service
                uri = task.command.uris.add()
                uri.value = "http://{ip}:3000/slave_task.py".format(ip=self.master_ip)
                #And then it will use the script to sum up the numbers
                task.command.value = "python slave_task.py {ip} {params}".format(ip=self.webservice_ip, params=self.numbers[self.counter])
                time.sleep(2)
                logging.info("Launching task {task} "
                             "using offer {offer}.".format(task=task.task_id.value,
                                                           offer=offer.id.value))
                tasks = [task]
                driver.launchTasks(offer.id, tasks)
                self.counter += 1

    def statusUpdate(self, driver, update):
        print "Task %s is in state %d" % (update.task_id.value, update.state)
        if update.state == mesos_pb2.TASK_FINISHED:
            self.finished_tasks += 1
            if self.finished_tasks == len(self.numbers):
                print "All tasks done, stopping framework"
                driver.stop()      

if __name__ == '__main__':
    # make us a framework
    framework = mesos_pb2.FrameworkInfo()
    framework.user = ""  # Have Mesos fill in the current user.
    framework.name = "hello-world"
    driver = MesosSchedulerDriver(
        HelloWorldScheduler(),
        framework,
        sys.argv[1] + ":5050"  # assumes running on the master
    )
    driver.run()

slave_task.py


import sys
import urllib
import urllib2

query_args = { 'numbers':" ".join(sys.argv[2:]), 'sum':sum([int(num) for num in sys.argv[2:]]) }
encoded_args = urllib.urlencode(query_args)
print urllib2.urlopen(urllib2.Request(
	'http://{ip}:3000/results?{params}'.format(ip=sys.argv[1], params=encoded_args), 
	headers={ 'User-Agent': 'Mozilla/5.0' })).read()

test-service.js


var express = require('express')
var app = express()
var path = require('path')

app.get('/results', function (req, res) {
	console.log("sum of " + req.query.numbers + " = " + req.query.sum);
	res.send("sum of " + req.query.numbers + " = " + req.query.sum);
})

app.get('/*', function (req, res) {
	res.sendfile(path.resolve(req.url))
})

var server = app.listen(3000, function () {

  var host = server.address().address
  var port = server.address().port

  console.log('Example app listening at http://%s:%s', host, port)

})

package.json


{
        "name":"test-service",
        "version":"1.0.0",
        "description":"a webservice to serve script needed by mesos slaves and to receive task results for a test mesos framework",
        "author":"Long <dk1027@gmail.com>",
        "scripts":{
                "start":"node test-service.js"
        },
        "dependencies":{
                "express":"4.2.x"
        }
}
Advertisement

One thought on “Mesos framework example in Python

  1. Joe says:

    All of this is very true especially the article your linked. When searching for mesos you get the 10,000ft view every where. Learning actually how to do something and the docs seem to have vanished. Thanks for writing this up as it is one puzzle piece I can now use.

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s