Getting Started

** It is highly recommended that you carefully read and understand SAGA-Python and Pilot-Abstractions, Pilot-MapReduce all of this before you go off and start developing your own applications. **

In this chapter we explain the main components of Pilot-MapReduce and the foundations of their function and their interplay. For your convenience, you can find a fully working example at the end of this page.


This chapter assumes that you have successfully installed Pilot-MapReduce on (see chapter Installation).

This tutorial shows how to create and run a Pilot-MapReduce job that counts words. To start with, you need nothing but a input directory with a single text file. Let’s call the file input.txt. If you don’t happen to have a suitable file on hand, you can download one from here. and save it in $HOME/data directory.

Loading the Module

In order to use Pilot-MapReduce in your Python application, you need to import the pmr module.

import pmr

You can check / print the version of your Pilot-MapReduce installation via the version property.

print pmr.version

Creating a MapReduce Job

A pmr.MapReduce is used to create MapReduce job in Pilot-MapReduce. It takes two parameters as inputs:

  • List of dict objects with SAGA-Pilot compute, data descriptions and input data SAGA URL location.
  • Coordination system URL used by SAGA-Pilot abstractions for managing communication between pilot-managers and pilots.
pmrDesc = []
                'pilot_compute': { "service_url": "fork://localhost",
                                  "number_of_processes": 8,
                                  "working_directory": os.getenv("HOME")+"/pilot-compute",
                                  "affinity_datacenter_label": "eu-de-south",
                                  "affinity_machine_label": "mymachine-1"
                'pilot_data'   :  {
                                  "service_url": "ssh://localhost/" + os.getenv("HOME")+"/pilot-data",
                                  "size": 100,
                                  "affinity_datacenter_label": "eu-de-south",
                                  "affinity_machine_label": "mymachine-1"
                'input_url'    : 'sftp://localhost/'+ os.getenv("HOME") + "/data")

job = pmr.MapReduce(pmrDesc, COORDINATION_URL)

Create Mapper executable

Create a Mapper executable in a new file in your $HOME directory using your favorite editor.

The map executable has mainly 3 sections

  • Initialization : Each map task receives chunk/split files as command line arguments. Initialize map task with command line parameters.
mapJob = Mapper(sys.argv)
  • Map function : Actual map task implementation. The pmr.Mapper provides a member chunkFile to access the chunk file passed as input to the map task. It also provide a member function emit which emits key, value pair to the reduce phase.
with open(mapJob.chunkFile) as fh:
    line =
    for word in line.split():
        mapJob.emit(word, "%s,%s" % (word, 1))
  • Finalization : Cleanup of the task.

After putting it all together, your first Pilot-MapReduce Mapper executable will look somewhat like the script below.

import sys
from pmr.mapper import Mapper
if __name__ == "__main__":
    # Initialize Map Job
    mapJob = Mapper(sys.argv)
    # Map function    
    with open(mapJob.chunkFile) as fh:
        line =
        for word in line.split():
            mapJob.emit(word, "%s,%s" % (word, 1))
    # Finalize map job  

Create Reducer executable

Create a Reducer executable in a new file in your $HOME directory using your favorite editor.

The reduce executable also has mainly 3 sections

  • Initialization : Each Reduce task receives list of sorted partition files as command line arguments. Initialize reduce task with command line parameters.
reduceJob = Reducer(sys.argv)
  • Reduce function : Actual Reduce task implementation. The pmr.Reducer provides a member partitionFiles to access list of partition files passed as input to the reduce task. It also provide a member function emit which emits key, value pair to the output result file.
count = {}
# split the map emitted to get words count from each partition file
for pName in reduceJob.partitionFiles:
    with open(pName) as infile:
        for line in infile:
            tokens = line.split(",")

            # Actual word might contain "," and count is last token.
            value = int(tokens[-1])
            word = ",".join(tokens[:-1])

            if count.has_key(word):
                count[word] = count[word] + value
                count[word] = value
for word, count in count.iteritems():
    reduceJob.emit(word, count)
  • Finalization : Cleanup of the task.

After putting it all together, your first Pilot-MapReduce Reducer executable will look somewhat like the script below.

from pmr.reducer import Reducer
import sys

if __name__ == "__main__":
    # Initialize Reduce job
    reduceJob = Reducer(sys.argv)         
    # Reduce function    
    count = {}        
    # split the map emitted to get words count from each partition file                       
    for pName in reduceJob.partitionFiles:
        with open(pName) as infile:
            for line in infile:
                tokens = line.split(",")
                # Actual word might contain "," and count is last token. 
                value = int(tokens[-1])
                word = ",".join(tokens[:-1])
                if count.has_key(word):
                    count[word] = count[word] + value
                    count[word] = value

    for word, count in count.iteritems():                
        reduceJob.emit(word, count)                

    # Finalize reduce job   

Define Chunk, Map, Reduce jobs

Define Chunk, Map and Reduce tasks as dict representation of SAGA Job description attributes

chunkDesc  = { "executable": "split -l 50" }

mapDesc    = { "executable": "python",
               "number_of_processes": 1,
               "files" : ['ssh://localhost/' + os.getenv("HOME") + "/")]

reduceDesc = { "executable": "python",
               "number_of_processes": 1,
               "files" : ['ssh://localhost/' + os.getenv("HOME") + "/")]


Attribute files is not a SAGA Job description attribute. This attribute takes a list of files to be transferred to the Chunk/Map/Reduce task execution working directory even in a distributed Pilot-MapReduce case.

Register the Chunk, Mapper, Reduce Job description to MapRedue Job


Set the number of reducers and the output path


Submit the Job


The Complete Example

After putting it all together, your first Pilot-MapReduce application will look somewhat like the script below.

import os
import pmr

COORDINATION_URL = "redis://localhost:6379"

def wordCountJob():
    # List of Pilot-MapReduce descriptions    
    pmrDesc = []    
                    'pilot_compute': { "service_url": "fork://localhost",
                                       "number_of_processes": 8,
                                       "working_directory": os.getenv("HOME") + "/pilot-compute",
                                       "affinity_datacenter_label": "eu-de-south",
                                       "affinity_machine_label": "mymachine-1"                                
                    'pilot_data'   : { "service_url": "ssh://localhost/" + os.getenv("HOME") + "/pilot-data",
                                       "size": 100,
                                       "affinity_datacenter_label": "eu-de-south",
                                       "affinity_machine_label": "mymachine-1"                              
                    'input_url'    : 'sftp://localhost/' + os.getenv("HOME") + "/data"
    job = pmr.MapReduce(pmrDesc, COORDINATION_URL)
    # SAGA Job dictionary description of Chunk, Map, Reduce tasks.         
    chunkDesc = { "executable": "split -l 50" }
    mapDesc   = { "executable": "python",
                  "number_of_processes": 1,
                  "files" : ['ssh://localhost/' + os.getenv("HOME") + "/"]
    reduceDesc = { "executable": "python", 		  
                    "number_of_processes": 1,
                    "files" : ['ssh://localhost/' + os.getenv("HOME") + "/"]

    # Register Chunk, Map, Reduce tasks     

    # Set number of reduces and output path.    
    job.setOutputPath(os.getenv("HOME") + "/output")
    # Submit Job.    
if __name__ == "__main__":