􀀂􀀟􀀍􀀅 􀀂􀀞􀀖􀀌􀀈 Process substitution in Python

Jeff Kaufman asks (see full blog post):

How do I write:

cmd -1 <(aws s3 cp "$path1" - | gunzip) -2 <(aws s3 cp "$path2" - | gunzip) | gzip | aws s3 cp - "$pathOut"

In python?

Process substitution is really powerful, and it’s is something I wish was easier in non-shell programming languages.

Update 2023-12-17

I said originally (see below) that “the shell implicitly executes the first two aws s3 cp ... commands in parallel and then waits for both to finish before continuing the pipeline”, but as Jeff points out, this isn’t quite right. The shell actually waits for both to start before continuing the pipeline. I also learned a few more things exploring this.

  • Every chunk that a substituted process emits can be processed immediately by the parent command.
  • Similarly, every chunk that an earlier pipeline command emits can be processed immediately by a later command.
  • A process substitution can contain multiple commands separated by semicolons, like a regular command line

To incorporate all of this I had to significantly rework my solution. It’s longer, but more correct, than my first version. It can also handle recursive layers of process substitutions, such that a process substitution in the outer command can itself contain process substitutions!

Click to show procsub_pipeline_2.py
#!/usr/bin/env python3

import logging
import pdb
import subprocess
import sys
import threading
import os
import traceback
import typing


def idb_excepthook(type, value, tb):
    """Call an interactive debugger in post-mortem mode

    If you do "sys.excepthook = idb_excepthook", then an interactive debugger
    will be spawned at an unhandled exception
    """
    if hasattr(sys, "ps1") or not sys.stderr.isatty():
        sys.__excepthook__(type, value, tb)
    else:
        traceback.print_exception(type, value, tb)
        print
        pdb.pm()


def ch_close_fd_factory(fds: list[int]):
    """Return a completion handler that closes file descriptors"""

    def completion_handler():
        for fd in fds:
            os.close(fd)

    return completion_handler


class Command:
    """A command to run in a pipeline, which may contain process substitutions

    Analogous to a single command in a shell pipeline,
    such as "ps -aux" or "tar zxf" or "cat <(echo hello) <(echo world)".
    """

    def __init__(self, *arguments: list[typing.Union[str, "ProcessSubstitution"]]):
        self.arguments: list[typing.Union[str, "ProcessSubstitution"]] = list(arguments)
        self.process: subprocess.Popen | None = None
        self.procsub_pipes: list[tuple[int, int]] = []
        self.procsub_threads: list[threading.Thread] = []

    def __repr__(self):
        return f"Command({self.arguments})"

    def run(self, popen_kwargs: dict | None = None, pass_fds: list[int] | None = None):
        """Run the command, handling process substitution

        The .process property is set to the Popen object for the command.

        Arguments:
        pass_fds:       A list of file descriptors to pass to subprocess.Popen at each stage
                        This will keep any pipes in this list open for the duration of the pipeline
        """
        popen_kwargs = popen_kwargs or {}
        pass_fds = pass_fds or []
        processed_args: list[str] = []
        for argument in self.arguments:
            if isinstance(argument, ProcessSubstitution):
                pipe = os.pipe()
                self.procsub_pipes.append(pipe)
                pass_fds.append(pipe[0])
                procsub_kwargs = dict(
                    completion_handler=ch_close_fd_factory([pipe[1]]),
                    # pass_fds=pass_fds,
                    result_pipe=pipe[1],
                )
                thread = threading.Thread(target=argument.run, kwargs=procsub_kwargs)
                thread.start()
                self.procsub_threads.append(thread)
                processed_args.append(f"/dev/fd/{pipe[0]}")
            # If the argument isn't a pipeline, just treat it like a regular argument
            else:
                processed_args.append(argument)

        # Create the Popen object for the command, with all process substitutions replaced with file descriptors
        popen_kwargs["pass_fds"] = pass_fds
        self.process = subprocess.Popen(processed_args, **popen_kwargs)

    def close_read_pipes(self):
        """Close the read end of each pipe in this command

        When a command has process substitutions, it opens a pipe for each one.
        The write end of each pipe is closed in a completion handler passed to the ProcessSubstitution.
        The read ends must be closed by the caller of Command.run() once the command has finished.
        """
        for pipe in self.procsub_pipes:
            os.close(pipe[0])


class Pipeline:
    """A pipeline of commands, where the STDOUT of each command is piped to the STDIN of the next command

    This is analogous to `echo foo | cat | grep bar | ...` etc.
    Each command's STDOUT is piped to the next command's STDIN.
    """

    def __init__(self, *commands: list[Command]):
        self.commands: list[Command] = list(commands)

    def __repr__(self):
        return f"Pipeline({self.commands})"

    def run(self, pass_fds: list[int] | None = None, result_pipe: int | None = None):
        """Run a pipeline, passing the STDOUT of each stage to the STDIN of the next stage

        Arguments:
        pass_fds:       A list of file descriptors to pass to subprocess.Popen at each stage
                        This will keep any pipes in this list open for the duration of the pipeline
        result_pipe:    The file descriptor to use for the last command in the pipeline
                        If None, it will go to the STDOUT of the current process
        """
        pass_fds = pass_fds or []
        previous_process = None
        for idx, stage in enumerate(self.commands):
            logging.debug(f"Running stage {idx} of pipeline {self.commands}")
            stdin = previous_process.stdout if previous_process else None
            stdout = subprocess.PIPE if idx != len(self.commands) - 1 else result_pipe
            stage.run(popen_kwargs=dict(stdin=stdin, stdout=stdout, pass_fds=pass_fds))
            previous_process = stage.process

        # The processes are all running in the background now, and we can wait for them in the order they were started
        # (If they finish successfully, they'll finish in order;
        # if any fail, processes trying to write to it will receive a SIGPIPE and exit)
        for stage in self.commands:
            stage.process.wait()
            stage.close_read_pipes()

        # TODO: handle errors


class ProcessSubstitution:
    """A process substitution is a list of pipelines, executed in sequence.

    Analogous in shell to `<(grep user /etc/passwd; ls /home | head -n1)`.
    Each separate pipeline is concatenated together and passed to the command as a file descriptor.
    """

    def __init__(self, *pipelines: list[Pipeline]):
        self.pipelines: list[Pipeline] = list(pipelines)

    def __repr__(self):
        return f"ProcessSubstitution({self.pipelines})"

    def run(
        self,
        pass_fds: list[int] | None = None,
        result_pipe: os.PathLike | None = None,
        completion_handler: typing.Callable | None = None,
    ):
        """Run a shell pipeline, passing the STDOUT of each stage to the STDIN of the next stage

        Cannot handle process substitution.

        Arguments:
        pass_fds:               A list of file descriptors to pass to subprocess.Popen at each stage
                                This will keep any pipes in this list open for the duration of the pipeline
        completion_handler:     A function to call once the pipeline has finished
        """
        pass_fds = pass_fds or []
        for pipeline in self.pipelines:
            logging.debug(f"Running pipeline: {pipeline.commands}")
            pipeline.run(pass_fds=pass_fds, result_pipe=result_pipe)
        if completion_handler is not None:
            completion_handler()


def main():
    # Our test pipeline is intended to verify a few things:
    # 1. Commands in a pipeline process data as it's emitted from the previous command --
    #    they don't wait for the previous command to finish before starting.
    # 2. Process substitutions are run in parallel --
    #    two process substitutions that each take 3 seconds should take 3 seconds, not 6.

    print("Testing pipeline 1")
    print("Compare with a shell command like:")
    print(
        "cat <(printf foo; sleep 1; echo YYYbarYYY | sed -e 's/YYY//g') <(printf hello; sleep 3; echo XXXworldXXX | sed -e 's/XXX//g') | sed -e 's/$/EOL/'"
    )
    # You can time it like this, but note that time will not print the output until the pipeline is finished:
    # time zsh -c "cat <(printf foo; sleep 1; echo YYYbarYYY | sed -e 's/YYY//g') <(printf hello; sleep 3; echo XXXworldXXX | sed -e 's/XXX//g') | sed -e 's/$/EOL/'"
    Pipeline(
        Command(
            "cat",
            ProcessSubstitution(
                Pipeline(Command("printf", "foo")),
                Pipeline(Command("sleep", "1")),
                Pipeline(
                    Command("echo", "YYYbarYYY"), Command("sed", "-e", "s/YYY//g")
                ),
            ),
            ProcessSubstitution(
                Pipeline(Command("printf", "hello")),
                Pipeline(Command("sleep", "3")),
                Pipeline(
                    Command("echo", "XXXworldXXX"), Command("sed", "-e", "s/XXX//g")
                ),
            ),
        ),
        Command("sed", "-e", "s/$/EOL/"),
    ).run()

    print("Testing pipeline 2")
    print("Compare with a shell command like:")
    print("paste <(echo a; sleep 3; echo b) <(echo a; sleep 3; echo b)")
    Pipeline(
        Command(
            "paste",
            ProcessSubstitution(
                Pipeline(Command("echo", "a")),
                Pipeline(Command("sleep", "3")),
                Pipeline(Command("echo", "b")),
            ),
            ProcessSubstitution(
                Pipeline(Command("echo", "a")),
                Pipeline(Command("sleep", "3")),
                Pipeline(Command("echo", "b")),
            ),
        )
    ).run()

    print("Testing pipeline 3")
    print("Compare with a shell command like:")
    print("cat <(cat <(echo one) <(echo two)) <(cat <(echo three) <(echo four))")
    Pipeline(
        Command(
            "cat",
            ProcessSubstitution(
                Pipeline(
                    Command(
                        "cat",
                        ProcessSubstitution(Pipeline(Command("echo", "one"))),
                        ProcessSubstitution(Pipeline(Command("echo", "two"))),
                    )
                )
            ),
            ProcessSubstitution(
                Pipeline(
                    Command(
                        "cat",
                        ProcessSubstitution(Pipeline(Command("echo", "three"))),
                        ProcessSubstitution(Pipeline(Command("echo", "four"))),
                    )
                )
            ),
        )
    ).run()


if __name__ == "__main__":
    # logging.basicConfig(level=logging.DEBUG)
    sys.excepthook = idb_excepthook
    main()

When I run it, I get this result:

> ./procsub_pipeline_2.py
Testing pipeline 1
Compare with a shell command like:
cat <(printf foo; sleep 1; echo YYYbarYYY | sed -e 's/YYY//g') <(printf hello; sleep 3; echo XXXworldXXX | sed -e 's/XXX//g') | sed -e 's/$/EOL/'
foobarEOL
helloworldEOL
Testing pipeline 2
Compare with a shell command like:
paste <(echo a; sleep 3; echo b) <(echo a; sleep 3; echo b)
a	a
b	b
Testing pipeline 3
Compare with a shell command like:
cat <(cat <(echo one) <(echo two)) <(cat <(echo three) <(echo four))
one
two
three
four

Original first post

Something tricky about this problem is that the shell implicitly executes the first two aws s3 cp ... commands in parallel and then waits for both to finish before continuing the pipeline. Replicating this in Python takes some work.

My solution

Click to show procsub_pipeline.py
#!/usr/bin/env python3

import logging
import pdb
import subprocess
import sys
import threading
import os
import traceback
import typing


def idb_excepthook(type, value, tb):
    """Call an interactive debugger in post-mortem mode

    If you do "sys.excepthook = idb_excepthook", then an interactive debugger
    will be spawned at an unhandled exception
    """
    if hasattr(sys, "ps1") or not sys.stderr.isatty():
        sys.__excepthook__(type, value, tb)
    else:
        traceback.print_exception(type, value, tb)
        print
        pdb.pm()


class Command:
    """A command to run in a pipeline, which may contain process substitutions

    Process substitutions are represented as a Pipeline object.
    """

    def __init__(self, *arguments: list[typing.Union[str, "Pipeline"]]):
        self.arguments: list[typing.Union[str, "Pipeline"]] = list(arguments)


class Pipeline:
    """A pipeline of commands, where the STDOUT of each command is piped to the STDIN of the next command"""

    def __init__(self, *commands: list[Command]):
        self.commands: list[Command] = list(commands)


def run_basic_pipeline(
    pipeline: Pipeline,
    result_pipe: os.PathLike | None = None,
    start_event: threading.Event | None = None,
    pass_fds: list[int] | None = None,
):
    """Run a shell pipeline, passing the STDOUT of each stage to the STDIN of the next stage

    Cannot handle process substitution.

    Arguments:
    pipeline:       A pipeline to run
                    The pipeline cannot contain any process substitutions
    result_pipe:    The file descriptor for write end of the os.pipe() to use for the last command in the pipeline
                    If None, it will go to the STDOUT of the current process
    start_event:    An event to set once the pipeline has started (optional, for caller's use)
    pass_fds:       A list of file descriptors to pass to subprocess.Popen at each stage
                    This will keep any pipes in this list open for the duration of the pipeline
    """
    pass_fds = pass_fds or []
    # The input to the first command in the pipeline is empty
    stdin_content = b""
    for idx, stage in enumerate(pipeline.commands):
        logging.debug(f"Running stage: {stage.arguments}")
        # The stdin= argument should be a new pipe for all but the first command in the pipeline
        stdin_arg = subprocess.PIPE if idx != 0 else None
        # The stdout= argument should be a new pipe for all but the last command in the pipeline,
        # which should use the result_pipe
        stdout_arg = (
            subprocess.PIPE if idx != len(pipeline.commands) - 1 else result_pipe
        )
        process = subprocess.Popen(
            stage.arguments, stdin=stdin_arg, stdout=stdout_arg, pass_fds=pass_fds
        )
        stdout, stderr = process.communicate(input=stdin_content)
        stdin_content = stdout
    if start_event is not None:
        start_event.set()


def run_procsub_pipeline(pipeline: Pipeline):
    """Run a pipeline, and use process substitution to run any sub-pipelines"""

    # We will process the input pipeline and convert any Pipeline objects into process substitutions
    processed_pipeline = Pipeline()
    # Start a thread for each process substitution
    threads: list[threading.Thread] = []
    # We keep a start event for each process substitution so we can close the write end of each pipe once all threads have started
    start_events: list[threading.Event] = []
    # os.pipe() returns a tuple of two int file descriptors: read and write
    pipes: list[tuple[int, int]] = []
    for stage in pipeline.commands:
        processed_stage = Command()
        for argument in stage.arguments:
            # If the argument is a pipeline, treat it as a process substitution --
            # run the pipeline and pass the write end of the pipe to the command as an argument.
            # E.g. if the command is `cat <(echo hello) <(echo world)`,
            # then we'll run the two 'echo' commands and then something like `cat /dev/fd/3 /dev/fd/4`.
            if isinstance(argument, Pipeline):
                pipe = os.pipe()
                pipes.append(pipe)
                start_event = threading.Event()
                start_events.append(start_event)
                thread = threading.Thread(
                    target=run_basic_pipeline,
                    args=(argument, pipe[1], start_event, [pipe[0]]),
                )
                thread.start()
                threads.append(thread)
                processed_stage.arguments.append(f"/dev/fd/{pipe[0]}")
            # If the argument isn't a pipeline, just treat it like a regular argument
            else:
                processed_stage.arguments.append(argument)
        processed_pipeline.commands.append(processed_stage)

    # Wait for all process substitutions to start
    for start_event in start_events:
        start_event.wait()
    # Once they all have been started, we can close the write end of the pipes
    for pipe in pipes:
        os.close(pipe[1])

    # Run the processed pipeline
    # Keep the read ends of the pipes open until the pipeline is finished
    run_basic_pipeline(processed_pipeline, pass_fds=[pipe[0] for pipe in pipes])

    # Wait for all process substitutions to finish
    # We wait for substitutions for all stages to finish before running the pipeline because that's easier;
    # we could optimize this to run each stage of the pipeline as soon as it's ready.
    for thread in threads:
        thread.join()

    # Close the read ends of the pipes
    for pipe in pipes:
        os.close(pipe[0])


def main():
    run_procsub_pipeline(
        Pipeline(
            Command(
                "cat",
                Pipeline(Command("/bin/sh", "-c", "sleep 1; echo hello")),
                Pipeline(
                    Command("/bin/sh", "-c", "sleep 3; echo XXXworldXXX"),
                    Command("sed", "-e", "s/XXX//g"),
                ),
            ),
            Command("hexdump", "-C"),
        )
    )

    # Compare with a command like:
    # time zsh -c "cat <(sh -c 'sleep 1; echo hello') <(sh -c 'sleep 3; echo XXXworldXXX' | sed -e 's/XXX//g') | hexdump -C"


if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG)
    sys.excepthook = idb_excepthook
    main()

I don’t have his cmd or S3 contents, so I use the following shell command as my reference for a pipeline that contains process substitution pipelines. You can time(1) this command to check that the sleep(1) commands actually execute in parallel.

cat <(sh -c 'sleep 1; echo hello') <(sh -c 'sleep 3; echo XXXworldXXX' | sed -e 's/XXX//g') | hexdump -C

When I time(1) that, I get a result like:

> time zsh -c "cat <(sh -c 'sleep 1; echo hello') <(sh -c 'sleep 3; echo XXXworldXXX' | sed -e 's/XXX//g') | hexdump -C"
00000000  68 65 6c 6c 6f 0a 77 6f  72 6c 64 0a              |hello.world.|
0000000c
zsh -c   0.00s user 0.01s system 0% cpu 3.031 total

With my code, that short shell one-liner turns into this:

run_procsub_pipeline(
    Pipeline(
        Command(
            "cat",
            Pipeline(Command("/bin/sh", "-c", "sleep 1; echo hello")),
            Pipeline(
                Command("/bin/sh", "-c", "sleep 3; echo XXXworldXXX"),
                Command("sed", "-e", "s/XXX//g"),
            ),
        ),
        Command("hexdump", "-C"),
    )
)

Update: it turns out that hexdump is not a very good test of pipelines, because it will not print anything until its input cat command has completed. This is why in the update above I eschew hexdump for sed.

When I time(1) my script containing that, I get output like this:

> time ./procsub_pipeline.py
DEBUG:root:Running stage: ['/bin/sh', '-c', 'sleep 1; echo hello']
DEBUG:root:Running stage: ['/bin/sh', '-c', 'sleep 3; echo XXXworldXXX']
DEBUG:root:Running stage: ['sed', '-e', 's/XXX//g']
DEBUG:root:Running stage: ['cat', '/dev/fd/3', '/dev/fd/5']
DEBUG:root:Running stage: ['hexdump', '-C']
00000000  68 65 6c 6c 6f 0a 77 6f  72 6c 64 0a              |hello.world.|
0000000c
./procsub_pipeline.py  0.03s user 0.02s system 1% cpu 3.079 total

Which, aside from the DEBUG messages, matches what the shell version does. Cool!

Previous work

I’ve written some code trying to emulate a pipeline in Python before, without attempting to add process substitution. It might be useful as a pattern to copy and paste into code that needs a pipeline pattern. I think the code is kind of messy and I wouldn’t write it this way today.

Click to show simple_pipeline.py
import logging
import subprocess


logger = logging.getLogger(__name__)


def pipe(arg_kwarg_list):
    """Construct a shell pipeline
    Invokes the first command in the arglist, retrieves its STDOUT, passes that to the STDIN of the
    next command in the arglist, and so on.
    Logs each command, including its STDIN, STDOUT, and STDERR.
    arg_kwarg_list:     A list of (command, kwargs) tuples
                        command:    A list to pass to subprocess.Popen
                        kwargs:     Any keyword arguments to subprocess.Popen
    result:             The STDOUT of the final command
    Example:
        # Call:
        pipe([
            (['ls', '-1'], {'cwd': '/'}),
            (['head', '-n', '2'], {}),      # Can pass an empty dict...
            (['grep', 'p'],)                # ... or make a one-item tuple with a trailing comma
        ])
        # Result (on my Mac):
        Applications
    """
    first = True
    stdin = b""
    for argtuple in arg_kwarg_list:
        if len(argtuple) < 1:
            raise Exception("Found empty tuple")
        if len(argtuple) > 2:
            raise Exception(f"Found tuple with {len(argtuple)} elements")
        command = argtuple[0]
        kwargs = argtuple[1] if len(argtuple) == 2 else {}
        kwargs["stdout"] = subprocess.PIPE
        kwargs["stderr"] = subprocess.PIPE

        if not first:
            kwargs["stdin"] = subprocess.PIPE
        first = False

        process = subprocess.Popen(command, **kwargs)
        stdout, stderr = process.communicate(input=stdin)

        # Don't log stdin/stdout because it may contain binary
        logger.debug(
            f"Popen call {command} with keyword arguments {kwargs} "
            f"exited with code {process.returncode} "
            # f"with a stdin of '{stdin}' "
            # f"and with a stdout of '{stdout}' "
            f"and with a stderr of '{stderr.decode()}'"
        )
        if process.returncode != 0:
            raise subprocess.CalledProcessError(
                process.returncode, command, output=stdout, stderr=stderr
            )

        stdin = stdout

    return stdout

Responses

Comments are hosted on this site and powered by Remark42 (thanks!).

Webmentions are hosted on remote sites and syndicated via Webmention.io (thanks!).