edu.isi.pegasus.planner.parser.pdax
Class PDAX2MDAG

java.lang.Object
  extended by edu.isi.pegasus.planner.parser.pdax.PDAX2MDAG
All Implemented Interfaces:
Callback

public class PDAX2MDAG
extends Object
implements Callback

This callback ends up creating the megadag that contains the smaller dags each corresponding to the one level as identified in the pdax file generated by the partitioner.

Version:
$Revision: 4507 $
Author:
Karan Vahi

Nested Class Summary
private  class PDAX2MDAG.GrepCallback
          An inner class, that implements the StreamGobblerCallback to count the occurences of a word in a document.
 
Field Summary
static String CODE_GENERATOR_CLASS
          The SubmitWriter that has to be loaded for now.
static String CONDOR_DAGMAN_LOGICAL_NAME
          The logical name with which to query the transformation catalog for the condor_dagman executable, that ends up running the mini dag as one job.
static String CONDOR_DAGMAN_NAMESPACE
          The namespace to use for condor dagman.
static String CPLANNER_LOGICAL_NAME
          The logical name with which to query the transformation catalog for cPlanner executable.
static String[][] DAGMAN_KNOBS
          The dagman knobs controlled through property.
static int HEAD_INDEX
          The index of the head job.
private  PegasusProperties.CLEANUP_SCOPE mCleanupScope
          The cleanup scope for the workflows.
private  long mCondorVersion
          The long value of condor version.
private  String mDAGManKnobs
          Any extra arguments that need to be passed to dagman, as determined from the properties file.
private  StreamGobblerCallback mDefaultCallback
          An instance of the default stream gobbler callback implementation that is used for creating symbolic links.
private  boolean mDone
          A flag to store whether the parsing is complete or not.
private  org.griphyn.vdl.euryale.FileFactory mFactory
          The handle to the file factory, that is used to create the top level directories for each of the partitions.
private  Map mJobMap
          The internal map that maps the partition id to the job responsible for executing the partition..
private  LogManager mLogger
          The handle to the logging object.
private  String mMDAGPropertiesFile
          The path to the properties file that is written out and shared by all partitions in the mega DAG.
private  ADag mMegaDAG
          The abstract dag object that ends up holding the megadag.
private  NumberFormat mNumFormatter
          The number formatter to format the run submit dir entries.
private  String mPDAXDirectory
          The directory in which the daxes corresponding to the partitions are kept.
private  PlannerOptions mPOptions
          The object containing the options that were given to the concrete planner at runtime.
private  PegasusProperties mProps
          The handle to the properties file.
protected static char mSeparator
          The file Separator to be used on the submit host.
private  String mSubmitDirectory
          The root of the submit directory where all the submit directories for the various partitions reside.
private  TransformationCatalog mTCHandle
          The handle to the transformation catalog.
private  String mUser
          The user name of the user running Pegasus.
static String NAMESPACE
          The namespace to which the job in the MEGA DAG being created refer to.
static int NUM_OF_EXPANDED_JOBS
          The number of jobs into which each job in the partition graph is expanded to.
static String RETRY_LOGICAL_NAME
          The planner utility that needs to be called as a prescript.
static String SUBMIT_DIRECTORY_PREFIX
          The prefix for the submit directory.
static int TAIL_INDEX
          The index of the tail job.
 
Constructor Summary
PDAX2MDAG(String directory, PegasusProperties properties, PlannerOptions options)
          The overloaded constructor.
 
Method Summary
 void cbDocument(Map attributes)
          Callback when the opening tag was parsed.
 void cbDone()
          Callback when the parsing of the document is done.
 void cbParents(String child, List parents)
          Callback for child and parent relationships from section 3.
 void cbPartition(Partition partition)
          Callback for the partition .
protected  Job constructDAGJob(Partition partition, File directory, String dax)
          Constructs a job that plans and submits the partitioned workflow, referred to by a Partition.
static String constructDAGManKnobs(PegasusProperties properties)
          Constructs Any extra arguments that need to be passed to dagman, as determined from the properties file.
private  TransformationCatalogEntry constructTCEntryFromEnvironment()
          Returns a tranformation catalog entry object constructed from the environment An entry is constructed if either of the following environment variables are defined 1) CONDOR_HOME 2) CONDOR_LOCATION CONDOR_HOME takes precedence over CONDOR_LOCATION
private  TransformationCatalogEntry constructTCEntryFromEnvProfiles(ENV env)
          Returns a tranformation catalog entry object constructed from the environment An entry is constructed if either of the following environment variables are defined 1) CONDOR_HOME 2) CONDOR_LOCATION CONDOR_HOME takes precedence over CONDOR_LOCATION
protected  String createSubmitDirectory(String label, String dir, String user, String vogroup, boolean timestampBased)
          Creates the submit directory for the workflow.
protected  boolean createSymlink(String source, File destDir)
          Returns the number of partitions referred to in the PDAX file.
private  TransformationCatalogEntry defaultTCEntry(String site)
          Returns a default TC entry to be used in case entry is not found in the transformation catalog.
protected  String getAbsolutePath(Partition partition, String directory, String suffix)
          Returns the absolute path to a dagman (usually) related file for a particular partition in the submit directory that is passed as an input parameter.
protected  String getBaseName(Partition partition)
          Returns the base name of the submit directory in which the submit files for a particular partition reside.
protected  String getBasename(Partition partition, String suffix)
          Returns the basename of a dagman (usually) related file for a particular partition.
protected  String getBasenamePrefix(Job job)
          Returns the basename prefix of a dagman (usually) related file for a a job that submits nested dagman.
protected  String getCacheFilePath(Job job)
          Returns the full path to a cache file that corresponds for one partition.
private  String getCondorFileName(String name, int index, String suffix)
          A small utility method that constructs the name of the Condor files that are generated when a dag is submitted.
private  String getCondorFileName(String name, int index, String suffix, String separator)
          A small utility method that constructs the name of the Condor files that are generated when a dag is submitted.
 Object getConstructedObject()
          Returns the MEGADAG that is generated
protected  Job getJob(String id)
          Returns the job that has been constructed for a particular partition.
protected  int getPartitionCount(String pdax)
          Returns the number of partitions referred to in the PDAX file.
protected static int parseInt(String s)
          Parses a string into an integer.
protected static void sanityCheck(File dir)
          Checks the destination location for existence, if it can be created, if it is writable etc.
protected  void setPrescript(Job job, String daxURL, String log)
          Sets the prescript that ends up calling to the default wrapper that introduces retry into Pegasus for a particular job.
protected  void setPrescript(Job job, String daxURL, String log, String namespace, String name, String version)
          Sets the prescript that ends up calling to the default wrapper that introduces retry into Pegasus for a particular job.
protected  String writeOutBraindump(File directory, Partition partition, String dax, String dag)
          Writes out the braindump.txt file for a partition in the partition submit directory.
protected  String writeOutProperties(String directory)
          Writes out the properties to a temporary file in the directory passed.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

CODE_GENERATOR_CLASS

public static final String CODE_GENERATOR_CLASS
The SubmitWriter that has to be loaded for now.

See Also:
Constant Field Values

SUBMIT_DIRECTORY_PREFIX

public static final String SUBMIT_DIRECTORY_PREFIX
The prefix for the submit directory.

See Also:
Constant Field Values

NUM_OF_EXPANDED_JOBS

public static final int NUM_OF_EXPANDED_JOBS
The number of jobs into which each job in the partition graph is expanded to.

See Also:
Constant Field Values

HEAD_INDEX

public static final int HEAD_INDEX
The index of the head job.

See Also:
Constant Field Values

TAIL_INDEX

public static final int TAIL_INDEX
The index of the tail job.

See Also:
Constant Field Values

CPLANNER_LOGICAL_NAME

public static final String CPLANNER_LOGICAL_NAME
The logical name with which to query the transformation catalog for cPlanner executable.

See Also:
Constant Field Values

CONDOR_DAGMAN_NAMESPACE

public static final String CONDOR_DAGMAN_NAMESPACE
The namespace to use for condor dagman.

See Also:
Constant Field Values

CONDOR_DAGMAN_LOGICAL_NAME

public static final String CONDOR_DAGMAN_LOGICAL_NAME
The logical name with which to query the transformation catalog for the condor_dagman executable, that ends up running the mini dag as one job.

See Also:
Constant Field Values

NAMESPACE

public static final String NAMESPACE
The namespace to which the job in the MEGA DAG being created refer to.

See Also:
Constant Field Values

RETRY_LOGICAL_NAME

public static final String RETRY_LOGICAL_NAME
The planner utility that needs to be called as a prescript.

See Also:
Constant Field Values

DAGMAN_KNOBS

public static final String[][] DAGMAN_KNOBS
The dagman knobs controlled through property. They map the property name to the corresponding dagman option.


mSeparator

protected static char mSeparator
The file Separator to be used on the submit host.


mPDAXDirectory

private String mPDAXDirectory
The directory in which the daxes corresponding to the partitions are kept. This should be the same directory where the pdax containing the partition graph resides.


mSubmitDirectory

private String mSubmitDirectory
The root of the submit directory where all the submit directories for the various partitions reside.


mMegaDAG

private ADag mMegaDAG
The abstract dag object that ends up holding the megadag.


mJobMap

private Map mJobMap
The internal map that maps the partition id to the job responsible for executing the partition..


mProps

private PegasusProperties mProps
The handle to the properties file.


mTCHandle

private TransformationCatalog mTCHandle
The handle to the transformation catalog.


mLogger

private LogManager mLogger
The handle to the logging object.


mPOptions

private PlannerOptions mPOptions
The object containing the options that were given to the concrete planner at runtime.


mMDAGPropertiesFile

private String mMDAGPropertiesFile
The path to the properties file that is written out and shared by all partitions in the mega DAG.


mFactory

private org.griphyn.vdl.euryale.FileFactory mFactory
The handle to the file factory, that is used to create the top level directories for each of the partitions.


mDefaultCallback

private StreamGobblerCallback mDefaultCallback
An instance of the default stream gobbler callback implementation that is used for creating symbolic links.


mNumFormatter

private NumberFormat mNumFormatter
The number formatter to format the run submit dir entries.


mUser

private String mUser
The user name of the user running Pegasus.


mDone

private boolean mDone
A flag to store whether the parsing is complete or not.


mDAGManKnobs

private String mDAGManKnobs
Any extra arguments that need to be passed to dagman, as determined from the properties file.


mCondorVersion

private long mCondorVersion
The long value of condor version.


mCleanupScope

private PegasusProperties.CLEANUP_SCOPE mCleanupScope
The cleanup scope for the workflows.

Constructor Detail

PDAX2MDAG

public PDAX2MDAG(String directory,
                 PegasusProperties properties,
                 PlannerOptions options)
The overloaded constructor.

Parameters:
directory - the directory where the pdax and all the daxes corresponding to the partitions reside.
properties - the PegasusProperties to be used.
options - the options passed to the planner.
Method Detail

sanityCheck

protected static void sanityCheck(File dir)
                           throws IOException
Checks the destination location for existence, if it can be created, if it is writable etc.

Parameters:
dir - is the new base directory to optionally create.
Throws:
IOException - in case of error while writing out files.

cbDocument

public void cbDocument(Map attributes)
Callback when the opening tag was parsed. This contains all attributes and their raw values within a map. This callback can also be used to initialize callback-specific resources.

Specified by:
cbDocument in interface Callback
Parameters:
attributes - is a map of attribute key to attribute value

cbPartition

public void cbPartition(Partition partition)
Callback for the partition . These partitions are completely assembled, but each is passed separately.

Specified by:
cbPartition in interface Callback
Parameters:
partition - is the PDAX-style partition.

cbParents

public void cbParents(String child,
                      List parents)
Callback for child and parent relationships from section 3. This ties in the relations between the partitions to the relations between the jobs that are responsible for partitions. In addition, appropriate cache file arguments are generated.

Specified by:
cbParents in interface Callback
Parameters:
child - is the IDREF of the child element.
parents - is a list of IDREFs of the included parents.

cbDone

public void cbDone()
Callback when the parsing of the document is done. This ends up triggering the writing of the condor submit files corresponding to the mega dag.

Specified by:
cbDone in interface Callback

getConstructedObject

public Object getConstructedObject()
Returns the MEGADAG that is generated

Specified by:
getConstructedObject in interface Callback
Returns:
ADag object containing the mega daga

constructDAGJob

protected Job constructDAGJob(Partition partition,
                              File directory,
                              String dax)
Constructs a job that plans and submits the partitioned workflow, referred to by a Partition. The main job itself is a condor dagman job that submits the concrete workflow. The concrete workflow is generated by running the planner in the prescript for the job.

Parameters:
partition - the partition corresponding to which the job has to be constructed.
directory - the submit directory where the submit files for the partition should reside.
dax - the absolute path to the partitioned dax file that corresponds to this partition.
Returns:
the constructed DAG job.

defaultTCEntry

private TransformationCatalogEntry defaultTCEntry(String site)
Returns a default TC entry to be used in case entry is not found in the transformation catalog.

Parameters:
site - the site for which the default entry is required.
Returns:
the default entry.

constructTCEntryFromEnvironment

private TransformationCatalogEntry constructTCEntryFromEnvironment()
Returns a tranformation catalog entry object constructed from the environment An entry is constructed if either of the following environment variables are defined 1) CONDOR_HOME 2) CONDOR_LOCATION CONDOR_HOME takes precedence over CONDOR_LOCATION

Returns:
the constructed entry else null.

constructTCEntryFromEnvProfiles

private TransformationCatalogEntry constructTCEntryFromEnvProfiles(ENV env)
Returns a tranformation catalog entry object constructed from the environment An entry is constructed if either of the following environment variables are defined 1) CONDOR_HOME 2) CONDOR_LOCATION CONDOR_HOME takes precedence over CONDOR_LOCATION

Parameters:
env - the environment profiles.
Returns:
the entry constructed else null if environment variables not defined.

writeOutBraindump

protected String writeOutBraindump(File directory,
                                   Partition partition,
                                   String dax,
                                   String dag)
                            throws IOException
Writes out the braindump.txt file for a partition in the partition submit directory. The braindump.txt file is used for passing to the tailstatd daemon that monitors the state of execution of the workflow.

Parameters:
directory - the directory in which the braindump file needs to be written to.
partition - the partition for which the braindump is to be written out.
dax - the dax file
dag - the dag file
Returns:
the absolute path to the braindump file.txt written in the directory.
Throws:
IOException - in case of error while writing out file.

writeOutProperties

protected String writeOutProperties(String directory)
                             throws IOException
Writes out the properties to a temporary file in the directory passed.

Parameters:
directory - the directory in which the properties file needs to be written to.
Returns:
the absolute path to the properties file written in the directory.
Throws:
IOException - in case of error while writing out file.

setPrescript

protected void setPrescript(Job job,
                            String daxURL,
                            String log)
Sets the prescript that ends up calling to the default wrapper that introduces retry into Pegasus for a particular job.

Parameters:
job - the job whose prescript needs to be set.
daxURL - the path to the dax file on the filesystem.
log - the file where the output of the prescript needs to be redirected to.
See Also:
RETRY_LOGICAL_NAME

setPrescript

protected void setPrescript(Job job,
                            String daxURL,
                            String log,
                            String namespace,
                            String name,
                            String version)
Sets the prescript that ends up calling to the default wrapper that introduces retry into Pegasus for a particular job.

Parameters:
job - the job whose prescript needs to be set.
daxURL - the path to the dax file on the filesystem.
log - the file where the output of the prescript needs to be redirected to.
namespace - the namespace of the replanner utility.
name - the logical name of the replanner.
version - the version of the replanner to be picked up.

getBaseName

protected String getBaseName(Partition partition)
Returns the base name of the submit directory in which the submit files for a particular partition reside.

Parameters:
partition - the partition for which the base directory is to be constructed.
Returns:
the base name of the partition.

getAbsolutePath

protected String getAbsolutePath(Partition partition,
                                 String directory,
                                 String suffix)
Returns the absolute path to a dagman (usually) related file for a particular partition in the submit directory that is passed as an input parameter. This does not create the file, just returns an absolute path to it. Useful for constructing argument string for condor_dagman.

Parameters:
partition - the partition for which the dagman is responsible for execution.
directory - the directory where the file should reside.
suffix - the suffix for the file basename.
Returns:
the absolute path to a file in the submit directory.

getBasename

protected String getBasename(Partition partition,
                             String suffix)
Returns the basename of a dagman (usually) related file for a particular partition.

Parameters:
partition - the partition for which the dagman is responsible for execution.
suffix - the suffix for the file basename.
Returns:
the basename.

getBasenamePrefix

protected String getBasenamePrefix(Job job)
Returns the basename prefix of a dagman (usually) related file for a a job that submits nested dagman.

Parameters:
job - the job that submits a nested dagman.
Returns:
the basename.

getCacheFilePath

protected String getCacheFilePath(Job job)
Returns the full path to a cache file that corresponds for one partition. The cache file resides in the submit directory for the partition for which the job is responsible for.

Parameters:
job - the job running on the submit host that submits the partition.
Returns:
the full path to the file.

createSymlink

protected boolean createSymlink(String source,
                                File destDir)
Returns the number of partitions referred to in the PDAX file.

Parameters:
source - the source file that has to be symlinked.
destDir - the destination directory where the symlink has to be placed.
Returns:
the number of partitions in the pdax file.

getPartitionCount

protected int getPartitionCount(String pdax)
Returns the number of partitions referred to in the PDAX file.

Parameters:
pdax - the path to the pdax file.
Returns:
the number of partitions in the pdax file.

getJob

protected Job getJob(String id)
Returns the job that has been constructed for a particular partition.

Parameters:
id - the partition id.
Returns:
the corresponding job, else null if not found.

createSubmitDirectory

protected String createSubmitDirectory(String label,
                                       String dir,
                                       String user,
                                       String vogroup,
                                       boolean timestampBased)
                                throws IOException
Creates the submit directory for the workflow. This is not thread safe.

Parameters:
label - the label of the workflow being worked upon.
dir - the base directory specified by the user.
user - the username of the user.
vogroup - the vogroup to which the user belongs to.
timestampBased - boolean indicating whether to have a timestamp based dir or not
Returns:
the directory name created relative to the base directory passed as input.
Throws:
IOException - in case of unable to create submit directory.

constructDAGManKnobs

public static String constructDAGManKnobs(PegasusProperties properties)
Constructs Any extra arguments that need to be passed to dagman, as determined from the properties file.

Parameters:
properties - the PegasusProperties
Returns:
any arguments to be added, else empty string

parseInt

protected static int parseInt(String s)
Parses a string into an integer. Non valid values returned as -1

Parameters:
s - the String to be parsed as integer
Returns:
the int value if valid, else -1

getCondorFileName

private String getCondorFileName(String name,
                                 int index,
                                 String suffix)
A small utility method that constructs the name of the Condor files that are generated when a dag is submitted. The default separator _ is used.

Parameters:
name - the name attribute in the partition element of the pdax.
index - the partition number of the partition.
suffix - the suffix that needs to be added to the filename.
Returns:
the name of the condor file.

getCondorFileName

private String getCondorFileName(String name,
                                 int index,
                                 String suffix,
                                 String separator)
A small utility method that constructs the name of the Condor files that are generated when a dag is submitted.

Parameters:
name - the name attribute in the partition element of the pdax.
index - the partition number of the partition.
suffix - the suffix that needs to be added to the filename
separator - the separator that is to be used while constructing the filename.
Returns:
the name of the condor file


Copyright © 2011 The University of Southern California. All Rights Reserved.