edu.isi.pegasus.planner.refiner
Class TransferEngine

java.lang.Object
  extended by edu.isi.pegasus.planner.refiner.Engine
      extended by edu.isi.pegasus.planner.refiner.TransferEngine

public class TransferEngine
extends Engine

The transfer engine, which on the basis of the pools on which the jobs are to run, adds nodes to transfer the data products.

Version:
$Revision: 4950 $
Author:
Karan Vahi, Gaurang Mehta

Field Summary
static int DELETED_JOBS_LEVEL
          The MAX level is assigned as the level for deleted jobs.
static String FILE_URL_SCHEME
          The scheme name for file url.
private  ADag mDag
          The DAG object to which the transfer nodes are to be added.
protected  boolean mDeepStorageStructure
          A boolean indicating whether to have a deep directory structure for the storage directory or not.
private  List mDeletedJobs
          Holds all the jobs deleted by the reduction algorithm.
private  List mDeletedLeafJobs
          Holds the jobs from the original dags which are deleted by the reduction algorithm.
private  org.griphyn.vdl.euryale.FileFactory mFactory
          The handle to the file factory, that is used to create the top level directories for each of the partitions.
private  ReplicaCatalogBridge mRCBridge
          The bridge to the Replica Catalog.
private  ReplicaSelector mReplicaSelector
          The handle to the replica selector that is to used to select the various replicas.
private  SLS mSLS
          The handle to the SLS implementor
private  Map<String,NameValue> mSRMServiceURLToMountPointMap
          A map that associates the site name with the SRM server url and mount point.
private  String mStageOutBaseDirectory
          The base path for the stageout directory on the output site where all the files are staged out.
protected  String mStorageDir
          This contains the storage directory relative to the se mount point of the pool.
private  ReplicaCatalog mTransientRC
          A SimpleFile Replica Catalog, that tracks all the files that are being materialized as part of workflow executaion.
private  Refiner mTXRefiner
          The handle to the transfer refiner that adds the transfer nodes into the workflow.
protected  boolean mUseSymLinks
          This member variable if set causes the source url for the pull nodes from the RLS to have file:// url if the pool attributed associated with the pfn is same as a particular jobs execution pool.
protected  String mWorkDir
          The working directory relative to the mount point of the execution pool.
private  boolean mWorkerNodeExecution
          A boolean indicating whether we are doing worker node execution or not.
static String SRM_MOUNT_POINT_PROPERTIES_SUFFIX
          The suffix to retrive the mount point for SRM server.
static String SRM_PROPERTIES_PREFIX
          The property prefix for retrieving SRM properties.
static String SRM_SERVICE_URL_PROPERTIES_SUFFIX
          The suffix to retrive the service url for SRM server.
static String SYMLINK_URL_SCHEME
          The scheme name for file url.
 
Fields inherited from class edu.isi.pegasus.planner.refiner.Engine
mBag, mLogger, mLogMsg, mOutputPool, mPoolFile, mPOptions, mProps, mRLIUrl, mSiteStore, mTCFile, mTCHandle, mTCMode, REGISTRATION_UNIVERSE, TRANSFER_UNIVERSE
 
Constructor Summary
TransferEngine(ADag reducedDag, PegasusBag bag, List<Job> deletedJobs, List<Job> deletedLeafJobs)
          Overloaded constructor.
 
Method Summary
 void addTransferNodes(ReplicaCatalogBridge rcb, ReplicaCatalog transientCatalog)
          Adds the transfer nodes to the workflow.
private  FileTransfer constructFileTX(PegasusFile pf, String stagingSiteHandle, String destSiteHandle, String job, String path, boolean localTransfer)
          Constructs the FileTransfer object on the basis of the transiency information.
private  Map<String,NameValue> constructSiteToSRMServerMap(PegasusProperties props)
          Constructs a Properties objects by parsing the relevant SRM pegasus properties.
private  Vector getDeletedFileTX(String pool, Job job)
          This gets the file transfer objects corresponding to the location of files found in the replica mechanism, and transfers it to the output pool asked by the user.
private  void getFilesFromRC(DAGJob job, Collection searchFiles)
          Special Handling for a DAGJob for retrieving files from the Replica Catalog.
private  void getFilesFromRC(DAXJob job, Collection searchFiles)
          Special Handling for a DAXJob for retrieving files from the Replica Catalog.
private  void getFilesFromRC(Job job, Collection searchFiles)
          It looks up the RCEngine Hashtable to lookup the locations for the files and add nodes to transfer them.
private  Vector getFileTX(String destPool, Job job, boolean localTransfer)
          This gets the Vector of FileTransfer objects for the files which have to be transferred to an one destination pool.
private  Collection<FileTransfer>[] getInterpoolFileTX(Job job, Vector nodes)
          This gets the Vector of FileTransfer objects for all the files which have to be transferred to the destination pool in case of Interpool transfers.
private  Set getOutputFiles(Vector nodes, Vector parentSubs)
          It gets the output files for all the nodes which are specified in the Vector nodes passed.
protected  String getPathOnStageoutSite(String lfn)
          Returns the full path on remote output site, where the lfn will reside.
 String getStagingSite(Job job)
          Returns the staging site to be used for a job.
private  Job getSubInfo(String jobName)
          Returns the Job object for the job specified.
protected  void initializeStageOutSiteDirectoryFactory(ADag workflow)
          Initialize the Stageout Site Directory factory.
private  String poolNotFoundMsg(String poolName, String universe)
          This generates a error message for pool not found in the pool config file.
private  void processParents(Job job, Vector vParents)
          It processes a nodes parents and determines if nodes are to be added or not.
protected  String replaceProtocolFromURL(String pfn)
          Replaces the gsiftp URL scheme from the url, and replaces it with the symlink url scheme and returns in a new object.
protected  ReplicaCatalogEntry replaceSourceProtocolFromURL(ReplicaCatalogEntry rce)
          Replaces the SRM URL scheme from the url, and replaces it with the file url scheme and returns in a new object if replacement happens.
 boolean runTransferOnLocalSite(String site, String destinationURL, int type)
          Returns whether to run a transfer job on local site or not.
protected  void trackInTransientRC(Job job)
          Tracks the files created by a job in the Transient Replica Catalog.
private  void trackInTransientRC(String lfn, String pfn, String site)
          Inserts an entry into the Transient RC.
private  void trackInTransientRC(String lfn, String pfn, String site, boolean modifyURL)
          Inserts an entry into the Transient RC.
 
Methods inherited from class edu.isi.pegasus.planner.refiner.Engine
addVector, appendArrayList, loadProperties, printVector, stringInList, stringInPegVector, stringInVector, vectorToString
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

DELETED_JOBS_LEVEL

public static final int DELETED_JOBS_LEVEL
The MAX level is assigned as the level for deleted jobs. We can put it to Integer.MAX_VALUE, but it is rare that number of levels in a workflows exceed 1000.

See Also:
Constant Field Values

FILE_URL_SCHEME

public static final String FILE_URL_SCHEME
The scheme name for file url.

See Also:
Constant Field Values

SYMLINK_URL_SCHEME

public static final String SYMLINK_URL_SCHEME
The scheme name for file url.

See Also:
Constant Field Values

SRM_PROPERTIES_PREFIX

public static final String SRM_PROPERTIES_PREFIX
The property prefix for retrieving SRM properties.

See Also:
Constant Field Values

SRM_SERVICE_URL_PROPERTIES_SUFFIX

public static final String SRM_SERVICE_URL_PROPERTIES_SUFFIX
The suffix to retrive the service url for SRM server.

See Also:
Constant Field Values

SRM_MOUNT_POINT_PROPERTIES_SUFFIX

public static final String SRM_MOUNT_POINT_PROPERTIES_SUFFIX
The suffix to retrive the mount point for SRM server.

See Also:
Constant Field Values

mSRMServiceURLToMountPointMap

private Map<String,NameValue> mSRMServiceURLToMountPointMap
A map that associates the site name with the SRM server url and mount point.


mDag

private ADag mDag
The DAG object to which the transfer nodes are to be added. This is the reduced Dag, which is got from the Reduction Engine.


mRCBridge

private ReplicaCatalogBridge mRCBridge
The bridge to the Replica Catalog.


mReplicaSelector

private ReplicaSelector mReplicaSelector
The handle to the replica selector that is to used to select the various replicas.


mTXRefiner

private Refiner mTXRefiner
The handle to the transfer refiner that adds the transfer nodes into the workflow.


mDeletedJobs

private List mDeletedJobs
Holds all the jobs deleted by the reduction algorithm.


mDeletedLeafJobs

private List mDeletedLeafJobs
Holds the jobs from the original dags which are deleted by the reduction algorithm.


mTransientRC

private ReplicaCatalog mTransientRC
A SimpleFile Replica Catalog, that tracks all the files that are being materialized as part of workflow executaion.


mFactory

private org.griphyn.vdl.euryale.FileFactory mFactory
The handle to the file factory, that is used to create the top level directories for each of the partitions.


mStageOutBaseDirectory

private String mStageOutBaseDirectory
The base path for the stageout directory on the output site where all the files are staged out.


mWorkDir

protected String mWorkDir
The working directory relative to the mount point of the execution pool. It is populated from the pegasus.dir.exec property from the properties file. If not specified then it work_dir is supposed to be the exec mount point of the execution pool.


mStorageDir

protected String mStorageDir
This contains the storage directory relative to the se mount point of the pool. It is populated from the pegasus.dir.storage property from the properties file. If not specified then the storage directory is the se mount point from the pool.config file.


mDeepStorageStructure

protected boolean mDeepStorageStructure
A boolean indicating whether to have a deep directory structure for the storage directory or not.


mUseSymLinks

protected boolean mUseSymLinks
This member variable if set causes the source url for the pull nodes from the RLS to have file:// url if the pool attributed associated with the pfn is same as a particular jobs execution pool.


mWorkerNodeExecution

private boolean mWorkerNodeExecution
A boolean indicating whether we are doing worker node execution or not.


mSLS

private SLS mSLS
The handle to the SLS implementor

Constructor Detail

TransferEngine

public TransferEngine(ADag reducedDag,
                      PegasusBag bag,
                      List<Job> deletedJobs,
                      List<Job> deletedLeafJobs)
Overloaded constructor.

Parameters:
reducedDag - the reduced workflow.
bag - bag of initialization objects
deletedJobs - list of all jobs deleted by reduction algorithm.
deletedLeafJobs - list of deleted leaf jobs by reduction algorithm.
Method Detail

runTransferOnLocalSite

public boolean runTransferOnLocalSite(String site,
                                      String destinationURL,
                                      int type)
Returns whether to run a transfer job on local site or not.

Parameters:
site - the site handle associated with the destination URL.
destURL - the destination URL
type - the type of transfer job for which the URL is being constructed.
Returns:
true indicating if the associated transfer job should run on local site or not.

getSubInfo

private Job getSubInfo(String jobName)
Returns the Job object for the job specified.

Parameters:
jobName - the name of the job
Returns:
the Job object for a job.

addTransferNodes

public void addTransferNodes(ReplicaCatalogBridge rcb,
                             ReplicaCatalog transientCatalog)
Adds the transfer nodes to the workflow.

Parameters:
rcb - the bridge to the ReplicaCatalog.
transientCatalog - an instance of the replica catalog that will store the locations of the files on the remote sites.

getStagingSite

public String getStagingSite(Job job)
Returns the staging site to be used for a job. If a staging site is not determined from the options it is set to be the execution site for the job

Parameters:
job - the job for which to determine the staging site
Returns:
the staging site

getDeletedFileTX

private Vector getDeletedFileTX(String pool,
                                Job job)
This gets the file transfer objects corresponding to the location of files found in the replica mechanism, and transfers it to the output pool asked by the user. If the output pool path and the one returned by the replica mechanism match then that object is not transferred.

Parameters:
pool - this the output pool which the user specifies at runtime.
job - The Job object corresponding to the leaf job which was deleted by the Reduction algorithm
Returns:
Vector of FileTransfer objects

processParents

private void processParents(Job job,
                            Vector vParents)
It processes a nodes parents and determines if nodes are to be added or not. All the input files for the job are searched in the output files of the parent nodes and the Replica Mechanism.

Parameters:
job - the Job object containing all the details of the job.
vParents - Vector of String objects corresponding to the Parents of the node.

getFileTX

private Vector getFileTX(String destPool,
                         Job job,
                         boolean localTransfer)
This gets the Vector of FileTransfer objects for the files which have to be transferred to an one destination pool. It checks for the transient flags for files. If the transfer transient flag is set, it means the file does not have to be transferred to the destination pool.

Parameters:
destSiteHandle - The pool to which the files are to be transferred to.
job - The Jobobject of the job whose output files are needed at the destination pool.
localTransfer - boolean indicating that associated transfer job will run on local site.
Returns:
Vector of FileTransfer objects

constructFileTX

private FileTransfer constructFileTX(PegasusFile pf,
                                     String stagingSiteHandle,
                                     String destSiteHandle,
                                     String job,
                                     String path,
                                     boolean localTransfer)
Constructs the FileTransfer object on the basis of the transiency information. If the transient flag for transfer is set, the destURL for the FileTransfer object would be the execution directory, as this is the entry that has to be registered in the ReplicaMechanism

Parameters:
pf - the PegasusFile for which the transfer has to be done.
stagingSiteHandle - the staging site at which file is placed after execution.
destSiteHandle - the output pool where the job should be transferred
job - the name of the associated job.
path - the path that a user specifies in the profile for key remote_initialdir that results in the workdir being changed for a job on a execution pool.
localTransfer - boolean indicating that associated transfer job will run on local site.
Returns:
the corresponding FileTransfer object

poolNotFoundMsg

private String poolNotFoundMsg(String poolName,
                               String universe)
This generates a error message for pool not found in the pool config file.

Parameters:
poolName - the name of pool that is not found.
universe - the condor universe
Returns:
the message.

getInterpoolFileTX

private Collection<FileTransfer>[] getInterpoolFileTX(Job job,
                                                      Vector nodes)
This gets the Vector of FileTransfer objects for all the files which have to be transferred to the destination pool in case of Interpool transfers. Each FileTransfer object has the source and the destination URLs. the source URI is determined from the pool on which the jobs are executed.

Parameters:
job - the job with reference to which interpool file transfers need to be determined.
nodes - Vector of Job objects for the nodes, whose outputfiles are to be transferred to the dest pool.
Returns:
Vector of FileTransfer objects

getFilesFromRC

private void getFilesFromRC(DAGJob job,
                            Collection searchFiles)
Special Handling for a DAGJob for retrieving files from the Replica Catalog.

Parameters:
job - the DAGJob
searchFiles - file that need to be looked in the Replica Catalog.

getFilesFromRC

private void getFilesFromRC(DAXJob job,
                            Collection searchFiles)
Special Handling for a DAXJob for retrieving files from the Replica Catalog.

Parameters:
job - the DAXJob
searchFiles - file that need to be looked in the Replica Catalog.

getFilesFromRC

private void getFilesFromRC(Job job,
                            Collection searchFiles)
It looks up the RCEngine Hashtable to lookup the locations for the files and add nodes to transfer them. If a file is not found to be in the Replica Catalog the Transfer Engine flags an error and exits

Parameters:
job - the Jobobject for whose ipfile have to search the Replica Mechanism for.
searchFiles - Vector containing the PegasusFile objects corresponding to the files that need to have their mapping looked up from the Replica Mechanism.

replaceSourceProtocolFromURL

protected ReplicaCatalogEntry replaceSourceProtocolFromURL(ReplicaCatalogEntry rce)
Replaces the SRM URL scheme from the url, and replaces it with the file url scheme and returns in a new object if replacement happens. The original object passed as a parameter still remains the same.

Parameters:
rce - the ReplicaCatalogEntry object whose url need to be replaced.
Returns:
the object with the url replaced.

replaceProtocolFromURL

protected String replaceProtocolFromURL(String pfn)
Replaces the gsiftp URL scheme from the url, and replaces it with the symlink url scheme and returns in a new object. The original object passed as a parameter still remains the same.

Parameters:
pfn - the pfn that needs to be replaced
Returns:
the replaced PFN

constructSiteToSRMServerMap

private Map<String,NameValue> constructSiteToSRMServerMap(PegasusProperties props)
Constructs a Properties objects by parsing the relevant SRM pegasus properties. For example, if users have the following specified in properties file
 pegasus.transfer.srm.ligo-cit.service.url          srm://osg-se.ligo.caltech.edu:10443/srm/v2/server?SFN=/mnt/hadoop
 pegasus.transfer.srm.ligo-cit.service.mountpoint   /mnt/hadoop
 
then, a Map is create the associates ligo-cit with NameValue object containing the service url and mount point ( ).

Parameters:
props - the PegasusProperties object
Returns:
Map that maps a site name to a NameValue object that has the URL prefix and the mount point

getOutputFiles

private Set getOutputFiles(Vector nodes,
                           Vector parentSubs)
It gets the output files for all the nodes which are specified in the Vector nodes passed.

Parameters:
nodes - Vector of nodes job names whose output files are required.
parentSubs - Vector of Job objects. One passes an empty vector as a parameter. And this populated with Job objects, of the nodes when output files are being determined.
Returns:
Set of PegasusFile objects

getPathOnStageoutSite

protected String getPathOnStageoutSite(String lfn)
Returns the full path on remote output site, where the lfn will reside. Each call to this function could trigger a change in the directory returned depending upon the file factory being used.

Parameters:
lfn - the logical filename of the file.
Returns:
the storage mount point.

initializeStageOutSiteDirectoryFactory

protected void initializeStageOutSiteDirectoryFactory(ADag workflow)
Initialize the Stageout Site Directory factory. The factory is used to returns the relative directory that a particular file needs to be staged to on the output site.

Parameters:
workflow - the workflow to which the transfer nodes need to be added.

trackInTransientRC

protected void trackInTransientRC(Job job)
Tracks the files created by a job in the Transient Replica Catalog.

Parameters:
job - the job whose input files need to be tracked.

trackInTransientRC

private void trackInTransientRC(String lfn,
                                String pfn,
                                String site)
Inserts an entry into the Transient RC. It modifies the PFN if the workflow is running on the cloud and S3 is being used for storage..

Parameters:
lfn - the logical name of the file.
pfn - the pfn
site - the site handle

trackInTransientRC

private void trackInTransientRC(String lfn,
                                String pfn,
                                String site,
                                boolean modifyURL)
Inserts an entry into the Transient RC. It modifies the PFN if the workflow is running on the cloud and S3 is being used for storage, dependant on the modifyURL parameter passed.

Parameters:
lfn - the logical name of the file.
pfn - the pfn
site - the site handle
modifyURL - whether to modify URL in case of S3 or not.


Copyright © 2011 The University of Southern California. All Rights Reserved.