Getting Started

** It is highly recommended that you carefully read and understand SAGA-Python and Pilot-Abstractions, Pilot-MapReduce all of this before you go off and start developing your own applications. **

In this chapter we explain the main components of Pilot-MapReduce and the foundations of their function and their interplay. For your convenience, you can find a fully working example at the end of this page.

Note

This chapter assumes that you have successfully installed Pilot-MapReduce on (see chapter Installation).

This tutorial shows how to create and run a Pilot-MapReduce job that counts words. To start with, you need nothing but a input directory with a single text file. Let’s call the file input.txt. If you don’t happen to have a suitable file on hand, you can download one from here. and save it in $HOME/data directory.

Loading the Module

In order to use Pilot-MapReduce in your Python application, you need to import the pmr module.

import pmr

You can check / print the version of your Pilot-MapReduce installation via the version property.

print pmr.version

Creating a MapReduce Job

A pmr.MapReduce is used to create MapReduce job in Pilot-MapReduce. It takes two parameters as inputs:

  • List of dict objects with SAGA-Pilot compute, data descriptions and input data SAGA URL location.
  • Coordination system URL used by SAGA-Pilot abstractions for managing communication between pilot-managers and pilots.
pmrDesc = []
pmrDesc.append({
                'pilot_compute': { "service_url": "fork://localhost",
                                  "number_of_processes": 8,
                                  "working_directory": os.getenv("HOME")+"/pilot-compute",
                                  "affinity_datacenter_label": "eu-de-south",
                                  "affinity_machine_label": "mymachine-1"
                                  },
                'pilot_data'   :  {
                                  "service_url": "ssh://localhost/" + os.getenv("HOME")+"/pilot-data",
                                  "size": 100,
                                  "affinity_datacenter_label": "eu-de-south",
                                  "affinity_machine_label": "mymachine-1"
                                  },
                'input_url'    : 'sftp://localhost/'+ os.getenv("HOME") + "/data")
                })

job = pmr.MapReduce(pmrDesc, COORDINATION_URL)

Create Mapper executable

Create a Mapper executable in a new file wc_mapper.py in your $HOME directory using your favorite editor.

The map executable has mainly 3 sections

  • Initialization : Each map task receives chunk/split files as command line arguments. Initialize map task with command line parameters.
mapJob = Mapper(sys.argv)
  • Map function : Actual map task implementation. The pmr.Mapper provides a member chunkFile to access the chunk file passed as input to the map task. It also provide a member function emit which emits key, value pair to the reduce phase.
with open(mapJob.chunkFile) as fh:
    line = fh.read()
    for word in line.split():
        mapJob.emit(word, "%s,%s" % (word, 1))
  • Finalization : Cleanup of the task.
mapJob.finalize()

After putting it all together, your first Pilot-MapReduce Mapper executable will look somewhat like the script below.

import sys
from pmr.mapper import Mapper
    
if __name__ == "__main__":
    # Initialize Map Job
    mapJob = Mapper(sys.argv)
          
    
    # Map function    
    with open(mapJob.chunkFile) as fh:
        line = fh.read()
        for word in line.split():
            mapJob.emit(word, "%s,%s" % (word, 1))
                            
    # Finalize map job  
    mapJob.finalize()

Create Reducer executable

Create a Reducer executable in a new file wc_reducer.py in your $HOME directory using your favorite editor.

The reduce executable also has mainly 3 sections

  • Initialization : Each Reduce task receives list of sorted partition files as command line arguments. Initialize reduce task with command line parameters.
reduceJob = Reducer(sys.argv)
  • Reduce function : Actual Reduce task implementation. The pmr.Reducer provides a member partitionFiles to access list of partition files passed as input to the reduce task. It also provide a member function emit which emits key, value pair to the output result file.
count = {}
# split the map emitted to get words count from each partition file
for pName in reduceJob.partitionFiles:
    with open(pName) as infile:
        for line in infile:
            tokens = line.split(",")

            # Actual word might contain "," and count is last token.
            value = int(tokens[-1])
            word = ",".join(tokens[:-1])

            if count.has_key(word):
                count[word] = count[word] + value
            else:
                count[word] = value
for word, count in count.iteritems():
    reduceJob.emit(word, count)
  • Finalization : Cleanup of the task.
reduceJob.finalize()

After putting it all together, your first Pilot-MapReduce Reducer executable will look somewhat like the script below.

from pmr.reducer import Reducer
import sys


if __name__ == "__main__":
    # Initialize Reduce job
    reduceJob = Reducer(sys.argv)         
    
    # Reduce function    
    count = {}        
    # split the map emitted to get words count from each partition file                       
    for pName in reduceJob.partitionFiles:
        with open(pName) as infile:
            for line in infile:
                tokens = line.split(",")
                
                # Actual word might contain "," and count is last token. 
                value = int(tokens[-1])
                word = ",".join(tokens[:-1])
                
                if count.has_key(word):
                    count[word] = count[word] + value
                else:
                    count[word] = value

    for word, count in count.iteritems():                
        reduceJob.emit(word, count)                

    # Finalize reduce job   
    reduceJob.finalize() 

Define Chunk, Map, Reduce jobs

Define Chunk, Map and Reduce tasks as dict representation of SAGA Job description attributes

chunkDesc  = { "executable": "split -l 50" }

mapDesc    = { "executable": "python wc_mapper.py",
               "number_of_processes": 1,
               "spmd_variation":"single",
               "files" : ['ssh://localhost/' + os.getenv("HOME") + "/wc_mapper.py")]
             }

reduceDesc = { "executable": "python wc_reducer.py",
               "number_of_processes": 1,
               "spmd_variation":"single",
               "files" : ['ssh://localhost/' + os.getenv("HOME") + "/wc_reducer.py")]
             }

Note

Attribute files is not a SAGA Job description attribute. This attribute takes a list of files to be transferred to the Chunk/Map/Reduce task execution working directory even in a distributed Pilot-MapReduce case.

Register the Chunk, Mapper, Reduce Job description to MapRedue Job

job.setChunk(chunkDesc)
job.setMapper(mapDesc)
job.setReducer(reduceDesc)

Set the number of reducers and the output path

job.setNbrReduces(8)
job.setOutputPath(os.getenv("HOME")+"/output")

Submit the Job

job.runJob()

The Complete Example

After putting it all together, your first Pilot-MapReduce application will look somewhat like the script below.

import os
import pmr

COORDINATION_URL = "redis://localhost:6379"

def wordCountJob():
    
    # List of Pilot-MapReduce descriptions    
    pmrDesc = []    
    pmrDesc.append({
                    'pilot_compute': { "service_url": "fork://localhost",
                                       "number_of_processes": 8,
                                       "working_directory": os.getenv("HOME") + "/pilot-compute",
                                       "affinity_datacenter_label": "eu-de-south",
                                       "affinity_machine_label": "mymachine-1"                                
                                     },
                    'pilot_data'   : { "service_url": "ssh://localhost/" + os.getenv("HOME") + "/pilot-data",
                                       "size": 100,
                                       "affinity_datacenter_label": "eu-de-south",
                                       "affinity_machine_label": "mymachine-1"                              
                                     },
                    'input_url'    : 'sftp://localhost/' + os.getenv("HOME") + "/data"
                  })
    
    job = pmr.MapReduce(pmrDesc, COORDINATION_URL)
    
    
    # SAGA Job dictionary description of Chunk, Map, Reduce tasks.         
    chunkDesc = { "executable": "split -l 50" }
    
    mapDesc   = { "executable": "python wc_mapper.py",
                  "number_of_processes": 1,
                  "spmd_variation":"single",
                  "files" : ['ssh://localhost/' + os.getenv("HOME") + "/wc_mapper.py"]
                }
    
    reduceDesc = { "executable": "python wc_reducer.py", 		  
                    "number_of_processes": 1,
                    "spmd_variation":"single",
                    "files" : ['ssh://localhost/' + os.getenv("HOME") + "/wc_reducer.py"]
                 }
    

    # Register Chunk, Map, Reduce tasks     
    job.setChunk(chunkDesc)
    job.setMapper(mapDesc)
    job.setReducer(reduceDesc)

    # Set number of reduces and output path.    
    job.setNbrReduces(8)
    job.setOutputPath(os.getenv("HOME") + "/output")
    
    # Submit Job.    
    job.runJob()
    
if __name__ == "__main__":
    wordCountJob()