Salutations,
If you don’t have time to read/investigate all of this right now, would you mind leaving any helpful resources you’ve found for properly using cwl.Factory?
When invoking cwltool from Python via the CWL Factory, directory outputs do not contain their resolved output files. Instead they contain symlinks which point back to themselves. When running the same workflow via cwltool command line, the files in these directory outputs are properly resolved and are not symlinks.
After some debugging it appears the file resolution step is skipped within cwltool.process.stage_files()
on line 288:
if not entry.staged:
continue
This “staged” determination is made by the PathMapper constructed on line 399. PathMapper properly populates the resolved
field of its MapperEnt
s but on line 114 in cwltool.pathmapper.visit()
, the directories are flagged as staged = False
and this determination propagates down the directory tree during the recursive call to cwltool.pathmapper.visitlisting()
I have scoured the setup procedure in cwltool.main.main() but cannot figure out why output directories are being correctly flagged as staged when invoking cwltool via command line, but not when using cwltool.Factory.
Here is my Python invocation of cwltool.Factory:
def run_native(config_object, cwl_path, run_directory, debug=False, parallel=False):
def furnish_if_file_record(file_dict):
if isinstance(file_dict, dict) and file_dict.get('class', None) == 'File':
file_dict['basename'] = os.path.basename(file_dict['path'])
file_dict['location'] = file_dict['path']
file_dict['contents'] = None
for _, config_param in config_object.config.items():
if isinstance(config_param, list):
for config_dict in config_param:
furnish_if_file_record(config_dict)
else:
furnish_if_file_record(config_param)
runtime_context = cwltool.factory.RuntimeContext({
'secret_store': cwltool.secrets.SecretStore(),
'default_stdout': subprocess.PIPE,
'default_stderr': subprocess.PIPE,
'outdir': run_directory,
'on_error': "continue",
'debug': debug
})
cwl = cwltool.factory.Factory(
runtime_context=runtime_context,
executor=cwltool.executors.MultithreadedJobExecutor() # Run jobs in parallel
if parallel else cwltool.executors.SingleJobExecutor() # Run one library at a time
)
pipeline = cwl.make(f"{cwl_path}/workflows/main.cwl")
pipeline(**config_object.config)
Thanks for taking the time!