The example I am presenting here is built on this blog post by James J Porter. James’ example runs a shell command to echo Hello World. Now, let’s say we want to do some computation and the framework needs to tell the slaves to get the binaries/data from somewhere. How do we do that?
The answer lies in the mesos.proto file:
/** * Describes a command, executed via: '/bin/sh -c value'. Any URIs specified * are fetched before executing the command. If the executable field for an * uri is set, executable file permission is set on the downloaded file. * Otherwise, if the downloaded file has a recognized archive extension * (currently [compressed] tar and zip) it is extracted into the executor's * working directory. This extraction can be disabled by setting `extract` to * false. In addition, any environment variables are set before executing * the command (so they can be used to "parameterize" your command). */ message CommandInfo { message URI { required string value = 1; optional bool executable = 2; optional bool extract = 3 [default = true]; }
Mesos take cares of downloading and extracting the files; all we need is state where to get it from. Therefore compose the task message like this (in Python):
task = new_task(offer)
uri = task.command.uris.add()
uri.value = "path-to-file"
task.command.value = "command to-run-something"
What exactly do we put as the uri? Looking at the Mesos code here, turns out the URI can point to a local file, HDFS, http, ftp, etc.
Base on these new found knowledge, I put bells and whistles on the hello world framework (actually I took the hello world part away). The example framework here is going to tell the slaves to talk to a web service to get a Python script, use the script to sum up a few numbers, then send the results to the web service. The web service is going to print out the results. When it is done, the framework will stop.
I am going to assume that you already have Mesos setup. Now, the demo.
I have one container running the Mesos master and two containers running the slaves. The master container also runs the web service and the framework.
I am going to open two terminals on the Mesos master. The first terminal runs the Node JS web service:
root@mesos:/example# npm start > test-service@1.0.0 start /example > node test-service.js Example app listening at http://0.0.0.0:3000
And on the second terminal I ran the framework:
root@mesos:/example# python test_framework.py 172.17.0.13 I0326 06:23:25.496966 506 sched.cpp:139] Version: 0.20.1 I0326 06:23:25.501157 509 sched.cpp:235] New master detected at master@172.17.0.13:5050 I0326 06:23:25.502126 509 sched.cpp:243] No credentials provided. Attempting to register without authentication I0326 06:23:25.505386 511 sched.cpp:409] Framework registered with 20150326-025122-218108332-5050-234-0015 INFO:root:Registered with framework id: value: "20150326-025122-218108332-5050-234-0015"
172.17.0.13 is the address of the Mesos master.
Very soon, the framework receives resource offers from the slaves:
INFO:root:Recieved resource offers: [u'20150326-025122-218108332-5050-234-55', u'20150326-025122-218108332-5050-234-56'] INFO:root:Launching task 9fce2a69-fe75-4191-8964-2a41e8b9dec3 using offer 20150326-025122-218108332-5050-234-55. ... All tasks done, stopping framework I0326 06:23:35.602032 515 sched.cpp:747] Stopping framework '20150326-025122-218108332-5050-234-0015' root@mesos:/example#
Switching back to the first terminal where the web service is running, the results are already displayed:
root@mesos:/example# npm start > test-service@1.0.0 start /example > node test-service.js Example app listening at http://0.0.0.0:3000 sum of 4 5 6 = 15 sum of 1 2 3 = 6 sum of 7 8 9 = 24
All the code needed to run this demo can be found below. You are going to need to have Python, Nodejs, and npm. Run npm install to get the dependencies for the web service.
test_framework.py
import logging
import uuid
import time
import sys
from mesos.interface import Scheduler
from mesos.native import MesosSchedulerDriver
from mesos.interface import mesos_pb2
logging.basicConfig(level=logging.INFO)
def new_task(offer):
task = mesos_pb2.TaskInfo()
id = uuid.uuid4()
task.task_id.value = str(id)
task.slave_id.value = offer.slave_id.value
task.name = "task {}".format(str(id))
cpus = task.resources.add()
cpus.name = "cpus"
cpus.type = mesos_pb2.Value.SCALAR
cpus.scalar.value = 1
mem = task.resources.add()
mem.name = "mem"
mem.type = mesos_pb2.Value.SCALAR
mem.scalar.value = 1
return task
class HelloWorldScheduler(Scheduler):
def __init__(self):
#Each task will sum up one set of numbers and return the result to the framework
self.numbers = ["1 2 3", "4 5 6", "7 8 9"];
self.counter = 0
self.finished_tasks = 0
self.master_ip = sys.argv[1]
self.webservice_ip = self.master_ip
def registered(self, driver, framework_id, master_info):
logging.info("Registered with framework id: {}".format(framework_id))
def resourceOffers(self, driver, offers):
logging.info("Recieved resource offers: {}".format([o.id.value for o in offers]))
for offer in offers:
if self.counter < len(self.numbers):
task = new_task(offer)
#The slave is going to download a python script from our nodejs web service
uri = task.command.uris.add()
uri.value = "http://{ip}:3000/slave_task.py".format(ip=self.master_ip)
#And then it will use the script to sum up the numbers
task.command.value = "python slave_task.py {ip} {params}".format(ip=self.webservice_ip, params=self.numbers[self.counter])
time.sleep(2)
logging.info("Launching task {task} "
"using offer {offer}.".format(task=task.task_id.value,
offer=offer.id.value))
tasks = [task]
driver.launchTasks(offer.id, tasks)
self.counter += 1
def statusUpdate(self, driver, update):
print "Task %s is in state %d" % (update.task_id.value, update.state)
if update.state == mesos_pb2.TASK_FINISHED:
self.finished_tasks += 1
if self.finished_tasks == len(self.numbers):
print "All tasks done, stopping framework"
driver.stop()
if __name__ == '__main__':
# make us a framework
framework = mesos_pb2.FrameworkInfo()
framework.user = "" # Have Mesos fill in the current user.
framework.name = "hello-world"
driver = MesosSchedulerDriver(
HelloWorldScheduler(),
framework,
sys.argv[1] + ":5050" # assumes running on the master
)
driver.run()
slave_task.py
import sys
import urllib
import urllib2
query_args = { 'numbers':" ".join(sys.argv[2:]), 'sum':sum([int(num) for num in sys.argv[2:]]) }
encoded_args = urllib.urlencode(query_args)
print urllib2.urlopen(urllib2.Request(
'http://{ip}:3000/results?{params}'.format(ip=sys.argv[1], params=encoded_args),
headers={ 'User-Agent': 'Mozilla/5.0' })).read()
test-service.js
var express = require('express')
var app = express()
var path = require('path')
app.get('/results', function (req, res) {
console.log("sum of " + req.query.numbers + " = " + req.query.sum);
res.send("sum of " + req.query.numbers + " = " + req.query.sum);
})
app.get('/*', function (req, res) {
res.sendfile(path.resolve(req.url))
})
var server = app.listen(3000, function () {
var host = server.address().address
var port = server.address().port
console.log('Example app listening at http://%s:%s', host, port)
})
package.json
{
"name":"test-service",
"version":"1.0.0",
"description":"a webservice to serve script needed by mesos slaves and to receive task results for a test mesos framework",
"author":"Long <dk1027@gmail.com>",
"scripts":{
"start":"node test-service.js"
},
"dependencies":{
"express":"4.2.x"
}
}
All of this is very true especially the article your linked. When searching for mesos you get the 10,000ft view every where. Learning actually how to do something and the docs seem to have vanished. Thanks for writing this up as it is one puzzle piece I can now use.
LikeLike