edu.isi.pegasus.planner.cluster
Class Horizontal

java.lang.Object
  extended by edu.isi.pegasus.planner.cluster.Horizontal
All Implemented Interfaces:
Clusterer, Refiner

public class Horizontal
extends Object
implements Clusterer, Refiner

The horizontal clusterer, that clusters jobs on the same level.

Version:
$Revision: 4687 $
Author:
Karan Vahi

Nested Class Summary
private  class Horizontal.JobComparator
          A job comparator, that allows me to compare jobs according to the transformation names.
 
Field Summary
static int DEFAULT_COLLAPSE_FACTOR
          The default collapse factor for collapsing jobs with same logical name scheduled onto the same execution pool.
static String DESCRIPTION
          A short description about the partitioner.
private  Map mCollapseMap
          Map to hold the collapse values for the various execution pools.
protected  JobAggregatorInstanceFactory mJobAggregatorFactory
          The handle to the job aggregator factory.
private static Comparator mJobComparator
          A singleton access to the job comparator.
private  Map mJobMap
          Map to hold the jobs sorted by the label of jobs in dax.
protected  LogManager mLogger
          The handle to the logger object.
private  PPS mPPS
          The handle to the provenance store implementation.
protected  PegasusProperties mProps
          The handle to the properties object holding all the properties.
private  Map mReplacementTable
          Replacement table, that identifies the corresponding fat job for a job.
private  ADag mScheduledDAG
          ADag object containing the jobs that have been scheduled by the site selector.
private  Map mSubInfoMap
          A Map to store all the job(Job) objects indexed by their logical ID found in the dax.
private  XMLProducer mXMLStore
          The XML Producer object that records the actions.
 
Fields inherited from interface edu.isi.pegasus.planner.cluster.Clusterer
VERSION
 
Fields inherited from interface edu.isi.pegasus.planner.refiner.Refiner
VERSION
 
Constructor Summary
Horizontal()
          The default constructor.
 
Method Summary
protected  void appendAttribute(StringBuffer xmlFeed, String key, String value)
          Appends an xml attribute to the xml feed.
private  void assimilateJobs()
          Puts the jobs in the abstract workflow into the job that is index by the logical name of the jobs.
private  List<List<Job>> bestFitBinPack(List<Job> jobs, double maxTime)
          Perform best fit bin packing.
private  void collapseJobs(String name, List jobs, String partitionID)
          Collapses the jobs having the same logical name according to the sites where they are scheduled.
 String constructID(String partitionID, int id)
          Given an integer id, returns a string id that is used for the clustered job.
private  Map constructMap(String propValue)
          Constructs a map with the numbers/values for the collapsing factors to collapse the nodes of same type.
 String description()
          Returns a textual description of the transfer implementation.
 void determineClusters(Partition partition)
          Determine the clusters for a partition.
private  Comparator<Job> getBinPackingComparator()
          The comparator is used to sort a collection of jobs in decreasing order of their run times as specified by the Pegasus.JOB_RUN_TIME property.
 ADag getClusteredDAG()
          Returns the clustered workflow.
 int[] getCollapseFactor(String pool, Job job, int size)
          Returns the collapse factor, that is used to chunk up the jobs of a particular type on a pool.
 ADag getWorkflow()
          Returns a reference to the workflow that is being refined by the refiner.
 XMLProducer getXMLProducer()
          Returns a reference to the XMLProducer, that generates the XML fragment capturing the actions of the refiner.
 void initialize(ADag dag, PegasusBag bag)
          Initializes the Clusterer impelementation
private  Comparator jobComparator()
          Singleton access to the job comparator.
protected  void logRefinerAction(AggregatedJob clusteredJob, JobAggregator aggregator)
          Records the refiner action into the Provenace Store as a XML fragment.
 void parents(String partitionID, List parents)
          Am empty implementation of the callout, as state is maintained internally to determine the relations between the jobs.
private  void printList(List l)
          A utility method to print short description of jobs in a list.
private  void replaceJobs()
          The relations/edges are changed in local graph structure.
private  void updateReplacementTable(List jobs, Job mergedJob)
          Updates the replacement table.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

DEFAULT_COLLAPSE_FACTOR

public static final int DEFAULT_COLLAPSE_FACTOR
The default collapse factor for collapsing jobs with same logical name scheduled onto the same execution pool.

See Also:
Constant Field Values

DESCRIPTION

public static final String DESCRIPTION
A short description about the partitioner.

See Also:
Constant Field Values

mJobComparator

private static Comparator mJobComparator
A singleton access to the job comparator.


mLogger

protected LogManager mLogger
The handle to the logger object.


mProps

protected PegasusProperties mProps
The handle to the properties object holding all the properties.


mJobAggregatorFactory

protected JobAggregatorInstanceFactory mJobAggregatorFactory
The handle to the job aggregator factory.


mScheduledDAG

private ADag mScheduledDAG
ADag object containing the jobs that have been scheduled by the site selector.


mJobMap

private Map mJobMap
Map to hold the jobs sorted by the label of jobs in dax. The key is the logical job name and value is the list of jobs with that logical name. This no longer used, and would be removed later.


mSubInfoMap

private Map mSubInfoMap
A Map to store all the job(Job) objects indexed by their logical ID found in the dax. This should actually be in the ADag structure.


mCollapseMap

private Map mCollapseMap
Map to hold the collapse values for the various execution pools. The values are gotten from the properties file or can be gotten from the resource information catalog a.k.a MDS.


mReplacementTable

private Map mReplacementTable
Replacement table, that identifies the corresponding fat job for a job.


mXMLStore

private XMLProducer mXMLStore
The XML Producer object that records the actions.


mPPS

private PPS mPPS
The handle to the provenance store implementation.

Constructor Detail

Horizontal

public Horizontal()
The default constructor.

Method Detail

jobComparator

private Comparator jobComparator()
Singleton access to the job comparator.

Returns:
the job comparator.

getWorkflow

public ADag getWorkflow()
Returns a reference to the workflow that is being refined by the refiner.

Specified by:
getWorkflow in interface Refiner
Returns:
ADAG object.

getXMLProducer

public XMLProducer getXMLProducer()
Returns a reference to the XMLProducer, that generates the XML fragment capturing the actions of the refiner. This is used for provenace purposes.

Specified by:
getXMLProducer in interface Refiner
Returns:
XMLProducer

initialize

public void initialize(ADag dag,
                       PegasusBag bag)
                throws ClustererException
Initializes the Clusterer impelementation

Specified by:
initialize in interface Clusterer
Parameters:
dag - the workflow that is being clustered.
bag - the bag of objects that is useful for initialization.
Throws:
ClustererException - in case of error.

determineClusters

public void determineClusters(Partition partition)
                       throws ClustererException
Determine the clusters for a partition. The partition is assumed to contain independant jobs, and multiple clusters maybe created for the partition. Internally the jobs are grouped according to transformation name and then according to the execution site. Each group (having same transformation name and scheduled on same site), is then clustered. The number of clustered jobs created for each group is dependant on the following Pegasus profiles that can be associated with the jobs.
       1) bundle   (dictates the number of clustered jobs that are created)
       2) collapse (the number of jobs that make a single clustered job)
 
In case of both parameters being associated with the jobs in a group, the bundle parameter overrides collapse parameter.

Specified by:
determineClusters in interface Clusterer
Parameters:
partition - the partition for which the clusters need to be determined.
Throws:
ClustererException - in case of error.
See Also:
Pegasus.BUNDLE_KEY, Pegasus.COLLAPSE_KEY

parents

public void parents(String partitionID,
                    List parents)
             throws ClustererException
Am empty implementation of the callout, as state is maintained internally to determine the relations between the jobs.

Specified by:
parents in interface Clusterer
Parameters:
partitionID - the id of a partition.
parents - the list of String objects that contain the id's of the parents of the partition.
Throws:
ClustererException - in case of error.

collapseJobs

private void collapseJobs(String name,
                          List jobs,
                          String partitionID)
Collapses the jobs having the same logical name according to the sites where they are scheduled.

Parameters:
name - the logical name of the jobs in the list passed to this function.
jobs - the list Job objects corresponding to the jobs that have the same logical name.
partitionID - the ID of the partition to which the jobs belong.

bestFitBinPack

private List<List<Job>> bestFitBinPack(List<Job> jobs,
                                       double maxTime)
Perform best fit bin packing.

Parameters:
jobs - List of jobs sorted in decreasing order of the job runtime.
maxTime - The maximum time for which the clustered job should run.
Returns:
List of List of Jobs where each List is the set of jobs which should be clustered together so as to run in under maxTime.

getBinPackingComparator

private Comparator<Job> getBinPackingComparator()
The comparator is used to sort a collection of jobs in decreasing order of their run times as specified by the Pegasus.JOB_RUN_TIME property.

Returns:

getClusteredDAG

public ADag getClusteredDAG()
                     throws ClustererException
Returns the clustered workflow.

Specified by:
getClusteredDAG in interface Clusterer
Returns:
the ADag object corresponding to the clustered workflow.
Throws:
ClustererException - in case of error.

description

public String description()
Returns a textual description of the transfer implementation.

Specified by:
description in interface Clusterer
Returns:
a short textual description

logRefinerAction

protected void logRefinerAction(AggregatedJob clusteredJob,
                                JobAggregator aggregator)
Records the refiner action into the Provenace Store as a XML fragment.

Parameters:
clusteredJob - the clustered job
aggregator - the aggregator that was used to create this clustered job

appendAttribute

protected void appendAttribute(StringBuffer xmlFeed,
                               String key,
                               String value)
Appends an xml attribute to the xml feed.

Parameters:
xmlFeed - the xmlFeed to which xml is being written
key - the attribute key
value - the attribute value

getCollapseFactor

public int[] getCollapseFactor(String pool,
                               Job job,
                               int size)
Returns the collapse factor, that is used to chunk up the jobs of a particular type on a pool. The collapse factor is determined by getting the collapse key in the Pegasus namespace/profile associated with the job in the transformation catalog. Right now tc overrides the property from the one in the properties file that specifies per pool. There are two orthogonal notions of bundling and collapsing. In case the bundle key is specified, it ends up overriding the collapse key, and the bundle value is used to generate the collapse values.

Parameters:
pool - the pool where the chunking up is occuring
job - the Job object containing the job that is to be chunked up together.
size - the number of jobs that refer to the same logical transformation and are scheduled on the same execution pool.
Returns:
int array of size 4 where int[0] is the the collapse factor int[1] is the number of jobs for whom collapsing is int[0] + 1. int [2] is maximum time for which the clusterd job should run. int [3] is time for which the single job would run.

constructID

public String constructID(String partitionID,
                          int id)
Given an integer id, returns a string id that is used for the clustered job.

Parameters:
partitionID - the id of the partition.
id - the integer id from which the string id has to be constructed. The id should be unique for all the clustered jobs that are formed for a particular partition.
Returns:
the id of the clustered job

updateReplacementTable

private void updateReplacementTable(List jobs,
                                    Job mergedJob)
Updates the replacement table.

Parameters:
jobs - the List of jobs that is being replaced.
mergedJob - the mergedJob that is replacing the jobs in the list.

assimilateJobs

private void assimilateJobs()
Puts the jobs in the abstract workflow into the job that is index by the logical name of the jobs.


constructMap

private Map constructMap(String propValue)
Constructs a map with the numbers/values for the collapsing factors to collapse the nodes of same type. The user ends up specifying these through the properties file. The value of the property is of the form poolname1=value,poolname2=value....

Parameters:
propValue - the value of the property got from the properties file.
Returns:
the constructed map.

replaceJobs

private void replaceJobs()
The relations/edges are changed in local graph structure.


printList

private void printList(List l)
A utility method to print short description of jobs in a list.

Parameters:
l - the list of Job objects


Copyright © 2011 The University of Southern California. All Rights Reserved.