pipeline.engine¶
Module: pipeline.engine
¶
Inheritance diagram for nipype.pipeline.engine
:
Defines functionality for pipelined execution of interfaces
The Workflow class provides core functionality for batch processing.
Change directory to provide relative paths for doctests >>> import os >>> filepath = os.path.dirname( os.path.realpath( __file__ ) ) >>> datadir = os.path.realpath(os.path.join(filepath, ‘../testing/data’)) >>> os.chdir(datadir)
Classes¶
JoinNode
¶
-
class
nipype.pipeline.engine.
JoinNode
(interface, name, joinsource, joinfield=None, unique=False, **kwargs)¶ Bases:
nipype.pipeline.engine.Node
Wraps interface objects that join inputs into a list.
Examples
>>> import nipype.pipeline.engine as pe >>> from nipype import Node, JoinNode, Workflow >>> from nipype.interfaces.utility import IdentityInterface >>> from nipype.interfaces import (ants, dcm2nii, fsl) >>> wf = Workflow(name='preprocess') >>> inputspec = Node(IdentityInterface(fields=['image']), ... name='inputspec') >>> inputspec.iterables = [('image', ... ['img1.nii', 'img2.nii', 'img3.nii'])] >>> img2flt = Node(fsl.ImageMaths(out_data_type='float'), ... name='img2flt') >>> wf.connect(inputspec, 'image', img2flt, 'in_file') >>> average = JoinNode(ants.AverageImages(), joinsource='inputspec', ... joinfield='images', name='average') >>> wf.connect(img2flt, 'out_file', average, 'images') >>> realign = Node(fsl.FLIRT(), name='realign') >>> wf.connect(img2flt, 'out_file', realign, 'in_file') >>> wf.connect(average, 'output_average_image', realign, 'reference') >>> strip = Node(fsl.BET(), name='strip') >>> wf.connect(realign, 'out_file', strip, 'in_file')
Methods
clone
(name)Clone a workflowbase object get_output
(parameter)Retrieve a particular output of the node hash_exists
([updatehash])help
()Print interface help load
(filename)output_dir
()Return the location of the output directory for the node run
([updatehash])Execute the node in its directory. save
([filename])set_input
(parameter, val)Set interface input value update
(**opts)write_report
([report_type, cwd])-
__init__
(interface, name, joinsource, joinfield=None, unique=False, **kwargs)¶ Parameters: interface : interface object
node specific interface (fsl.Bet(), spm.Coregister())
name : alphanumeric string
node specific name
joinsource : node name
name of the join predecessor iterable node
joinfield : string or list of strings
name(s) of list input fields that will be aggregated. The default is all of the join node input fields.
unique : flag indicating whether to ignore duplicate input values
See Node docstring for additional keyword arguments. :
-
clone
(name)¶ Clone a workflowbase object
Parameters: name : string (mandatory)
A clone of node or workflow must have a new name
-
fullname
¶
-
get_output
(parameter)¶ Retrieve a particular output of the node
-
hash_exists
(updatehash=False)¶
-
help
()¶ Print interface help
-
inputs
¶ The JoinNode inputs include the join field overrides.
-
interface
¶ Return the underlying interface object
-
joinfield
= None¶ the fields to join
-
joinsource
¶ the join predecessor iterable node
-
load
(filename)¶
-
output_dir
()¶ Return the location of the output directory for the node
-
outputs
¶ Return the output fields of the underlying interface
-
result
¶
-
run
(updatehash=False)¶ Execute the node in its directory.
Parameters: updatehash: boolean :
Update the hash stored in the output directory
-
save
(filename=None)¶
-
set_input
(parameter, val)¶ Set interface input value
-
update
(**opts)¶
-
write_report
(report_type=None, cwd=None)¶
-
MapNode
¶
-
class
nipype.pipeline.engine.
MapNode
(interface, iterfield, name, serial=False, nested=False, **kwargs)¶ Bases:
nipype.pipeline.engine.Node
Wraps interface objects that need to be iterated on a list of inputs.
Examples
>>> from nipype import MapNode >>> from nipype.interfaces import fsl >>> realign = MapNode(fsl.MCFLIRT(), 'in_file', 'realign') >>> realign.inputs.in_file = ['functional.nii', ... 'functional2.nii', ... 'functional3.nii'] >>> realign.run()
Methods
clone
(name)Clone a workflowbase object get_output
(parameter)Retrieve a particular output of the node get_subnodes
()hash_exists
([updatehash])help
()Print interface help load
(filename)num_subnodes
()output_dir
()Return the location of the output directory for the node run
([updatehash])Execute the node in its directory. save
([filename])set_input
(parameter, val)Set interface input value or nodewrapper attribute update
(**opts)write_report
([report_type, cwd])-
__init__
(interface, iterfield, name, serial=False, nested=False, **kwargs)¶ Parameters: interface : interface object
node specific interface (fsl.Bet(), spm.Coregister())
iterfield : string or list of strings
name(s) of input fields that will receive a list of whatever kind of input they take. the node will be run separately for each value in these lists. for more than one input, the values are paired (i.e. it does not compute a combinatorial product).
name : alphanumeric string
node specific name
serial : boolean
flag to enforce executing the jobs of the mapnode in a serial manner rather than parallel
nested : boolea
support for nested lists, if set the input list will be flattened before running, and the nested list structure of the outputs will be resored
See Node docstring for additional keyword arguments. :
-
clone
(name)¶ Clone a workflowbase object
Parameters: name : string (mandatory)
A clone of node or workflow must have a new name
-
fullname
¶
-
get_output
(parameter)¶ Retrieve a particular output of the node
-
get_subnodes
()¶
-
hash_exists
(updatehash=False)¶
-
help
()¶ Print interface help
-
inputs
¶
-
interface
¶ Return the underlying interface object
-
load
(filename)¶
-
num_subnodes
()¶
-
output_dir
()¶ Return the location of the output directory for the node
-
outputs
¶
-
result
¶
-
run
(updatehash=False)¶ Execute the node in its directory.
Parameters: updatehash: boolean :
Update the hash stored in the output directory
-
save
(filename=None)¶
-
set_input
(parameter, val)¶ Set interface input value or nodewrapper attribute
Priority goes to interface.
-
update
(**opts)¶
-
write_report
(report_type=None, cwd=None)¶
-
Node
¶
-
class
nipype.pipeline.engine.
Node
(interface, name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, **kwargs)¶ Bases:
nipype.pipeline.engine.WorkflowBase
Wraps interface objects for use in pipeline
A Node creates a sandbox-like directory for executing the underlying interface. It will copy or link inputs into this directory to ensure that input data are not overwritten. A hash of the input state is used to determine if the Node inputs have changed and whether the node needs to be re-executed.
Examples
>>> from nipype import Node >>> from nipype.interfaces import spm >>> realign = Node(spm.Realign(), 'realign') >>> realign.inputs.in_files = 'functional.nii' >>> realign.inputs.register_to_mean = True >>> realign.run()
Methods
clone
(name)Clone a workflowbase object get_output
(parameter)Retrieve a particular output of the node hash_exists
([updatehash])help
()Print interface help load
(filename)output_dir
()Return the location of the output directory for the node run
([updatehash])Execute the node in its directory. save
([filename])set_input
(parameter, val)Set interface input value update
(**opts)write_report
([report_type, cwd])-
__init__
(interface, name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, **kwargs)¶ Parameters: interface : interface object
node specific interface (fsl.Bet(), spm.Coregister())
name : alphanumeric string
node specific name
iterables : generator
Input field and list to iterate using the pipeline engine for example to iterate over different frac values in fsl.Bet() for a single field the input can be a tuple, otherwise a list of tuples
node.iterables = ('frac',[0.5,0.6,0.7]) node.iterables = [('fwhm',[2,4]),('fieldx',[0.5,0.6,0.7])]
If this node has an itersource, then the iterables values is a dictionary which maps an iterable source field value to the target iterables field values, e.g.:
inputspec.iterables = ('images',['img1.nii', 'img2.nii']]) node.itersource = ('inputspec', ['frac']) node.iterables = ('frac', {'img1.nii': [0.5, 0.6], 'img2.nii': [0.6, 0.7]})
If this node’s synchronize flag is set, then an alternate form of the iterables is a [fields, values] list, where fields is the list of iterated fields and values is the list of value tuples for the given fields, e.g.:
node.synchronize = True node.iterables = [('frac', 'threshold'), [(0.5, True), (0.6, False)]]
itersource: tuple :
The (name, fields) iterables source which specifies the name of the predecessor iterable node and the input fields to use from that source node. The output field values comprise the key to the iterables parameter value mapping dictionary.
synchronize: boolean :
Flag indicating whether iterables are synchronized. If the iterables are synchronized, then this iterable node is expanded once per iteration over all of the iterables values. Otherwise, this iterable node is expanded once per each permutation of the iterables values.
overwrite : Boolean
Whether to overwrite contents of output directory if it already exists. If directory exists and hash matches it assumes that process has been executed
needed_outputs : list of output_names
Force the node to keep only specific outputs. By default all outputs are kept. Setting this attribute will delete any output files and directories from the node’s working directory that are not part of the needed_outputs.
run_without_submitting : boolean
Run the node without submitting to a job engine or to a multiprocessing pool
-
clone
(name)¶ Clone a workflowbase object
Parameters: name : string (mandatory)
A clone of node or workflow must have a new name
-
fullname
¶
-
get_output
(parameter)¶ Retrieve a particular output of the node
-
hash_exists
(updatehash=False)¶
-
help
()¶ Print interface help
-
inputs
¶ Return the inputs of the underlying interface
-
interface
¶ Return the underlying interface object
-
load
(filename)¶
-
output_dir
()¶ Return the location of the output directory for the node
-
outputs
¶ Return the output fields of the underlying interface
-
result
¶
-
run
(updatehash=False)¶ Execute the node in its directory.
Parameters: updatehash: boolean :
Update the hash stored in the output directory
-
save
(filename=None)¶
-
set_input
(parameter, val)¶ Set interface input value
-
update
(**opts)¶
-
write_report
(report_type=None, cwd=None)¶
-
Workflow
¶
-
class
nipype.pipeline.engine.
Workflow
(name, base_dir=None)¶ Bases:
nipype.pipeline.engine.WorkflowBase
Controls the setup and execution of a pipeline of processes.
Methods
add_nodes
(nodes)Add nodes to a workflow clone
(name)Clone a workflow connect
(*args, **kwargs)Connect nodes in the pipeline. disconnect
(*args)Disconnect two nodes export
([filename, prefix, format, ...])Export object into a different format get_node
(name)Return an internal node by name list_node_names
()List names of all nodes in a workflow load
(filename)remove_nodes
(nodes)Remove nodes from a workflow run
([plugin, plugin_args, updatehash])Execute the workflow save
([filename])write_graph
([dotfilename, graph2use, ...])Generates a graphviz dot file and a png file write_hierarchical_dotfile
([dotfilename, ...])-
__init__
(name, base_dir=None)¶ Create a workflow object.
Parameters: name : alphanumeric string
unique identifier for the workflow
base_dir : string, optional
path to workflow storage
-
add_nodes
(nodes)¶ Add nodes to a workflow
Parameters: nodes : list
A list of WorkflowBase-based objects
-
clone
(name)¶ Clone a workflow
Note
Will reset attributes used for executing workflow. See _init_runtime_fields.
Parameters: name: alphanumeric name :
unique name for the workflow
-
connect
(*args, **kwargs)¶ Connect nodes in the pipeline.
This routine also checks if inputs and outputs are actually provided by the nodes that are being connected.
Creates edges in the directed graph using the nodes and edges specified in the connection_list. Uses the NetworkX method DiGraph.add_edges_from.
Parameters: args : list or a set of four positional arguments
Four positional arguments of the form:
connect(source, sourceoutput, dest, destinput)
source : nodewrapper node sourceoutput : string (must be in source.outputs) dest : nodewrapper node destinput : string (must be in dest.inputs)
A list of 3-tuples of the following form:
[(source, target, [('sourceoutput/attribute', 'targetinput'), ...]), ...]
Or:
[(source, target, [(('sourceoutput1', func, arg2, ...), 'targetinput'), ...]), ...] sourceoutput1 will always be the first argument to func and func will be evaluated and the results sent ot targetinput currently func needs to define all its needed imports within the function as we use the inspect module to get at the source code and execute it remotely
-
disconnect
(*args)¶ Disconnect two nodes
See the docstring for connect for format.
-
export
(filename=None, prefix='output', format='python', include_config=False)¶ Export object into a different format
Parameters: filename: string :
file to save the code to; overrides prefix
prefix: string :
prefix to use for output file
format: string :
one of “python”
include_config: boolean :
whether to include node and workflow config values
-
fullname
¶
-
get_node
(name)¶ Return an internal node by name
-
inputs
¶
-
list_node_names
()¶ List names of all nodes in a workflow
-
load
(filename)¶
-
outputs
¶
-
remove_nodes
(nodes)¶ Remove nodes from a workflow
Parameters: nodes : list
A list of WorkflowBase-based objects
-
run
(plugin=None, plugin_args=None, updatehash=False)¶ Execute the workflow
Parameters: plugin: plugin name or object :
Plugin to use for execution. You can create your own plugins for execution.
plugin_args : dictionary containing arguments to be sent to plugin
constructor. see individual plugin doc strings for details.
-
save
(filename=None)¶
-
write_graph
(dotfilename='graph.dot', graph2use='hierarchical', format='png', simple_form=True)¶ Generates a graphviz dot file and a png file
Parameters: graph2use: ‘orig’, ‘hierarchical’ (default), ‘flat’, ‘exec’, ‘colored’ :
orig - creates a top level graph without expanding internal workflow nodes; flat - expands workflow nodes recursively; hierarchical - expands workflow nodes recursively with a notion on hierarchy; colored - expands workflow nodes recursively with a notion on hierarchy in color; exec - expands workflows to depict iterables
format: ‘png’, ‘svg’ :
simple_form: boolean (default: True) :
Determines if the node name used in the graph should be of the form ‘nodename (package)’ when True or ‘nodename.Class.package’ when False.
-
write_hierarchical_dotfile
(dotfilename=None, colored=False, simple_form=True)¶
-
WorkflowBase
¶
-
class
nipype.pipeline.engine.
WorkflowBase
(name=None, base_dir=None)¶ Bases:
object
Defines common attributes and functions for workflows and nodes.
Methods
clone
(name)Clone a workflowbase object load
(filename)save
([filename])-
__init__
(name=None, base_dir=None)¶ Initialize base parameters of a workflow or node
Parameters: name : string (mandatory)
Name of this node. Name must be alphanumeric and not contain any special characters (e.g., ‘.’, ‘@’).
base_dir : string
base output directory (will be hashed before creations) default=None, which results in the use of mkdtemp
-
clone
(name)¶ Clone a workflowbase object
Parameters: name : string (mandatory)
A clone of node or workflow must have a new name
-
fullname
¶
-
inputs
¶
-
load
(filename)¶
-
outputs
¶
-
save
(filename=None)¶
-