2021年7月11日日曜日

13: Pythonサブプロセス: 標準入出力を非同期にリレーする

<このシリーズの前の記事 | このシリーズの目次 | このシリーズの次の記事>

あの、'完了するまで待ちやがれ'の'subprocess.run'は、私の選択肢ではありません。プロセス間通信の1方法。リレーを中断する。あるユーティリティクラス。

話題


About: Python

この記事の目次


開始コンテキスト


  • 読者は、Pythonの基本的知識を持っている。

ターゲットコンテキスト



  • 読者は、Pythonにて、プロセスを起動し、そのプロセスの標準出力、標準エラー出力、標準入力をバックグラウンドでリレーする方法を知る。

オリエンテーション


Pythonにて任意の標準入力待ちを中断する方法についての記事があります。

Pythonにおけるあるオブジェクト群パイプを紹介する記事があります。

Pythonにおけるある文字列パイプを紹介する記事があります。

いくつかの他のプログラミング言語(JavaC++C#)におけるオブジェクト群パイプを紹介するいくつかの記事があります。


本体

ト書き
Hypothesizer 7は、独白する。


1: なぜ'subprocess.run'ではだめなのか、一般的に言って


Hypothesizer 7
'subprocess.run'は、起動されたプロセスが完了するまで、否応なしに待つ? . . . ご冗談でしょう。

そのプロセスは、長時間かかるかもしれず、その間に大量の標準出力を出すかもしれないのに、私のプログラムは、そのプロセスが終了するまで手持ち無沙汰に待っていなければいけない、そのバックログを処理し始められるまで? . . . 却下します。

標準入力データはそのメソッドに渡せるものの、それは先験的に決定されていなければならない? . . . 使えません。

ああ、訂正します: 勿論、そのメソッドは、一部の限られたケースでは使えるかもしれないが、汎用目的のメソッドとは見なせない、それが私の意味するところです。

一般的に言うと、私は、起動されたプロセスと相互作用することを考える、私のプログラムはメッセージを送受信して、つまり、プロセス間通信だ。

そのメソッドの前提は、「起動されたプロセスが標準入力に何を要求するかをあなたは先験的に知っているはずだ、それに、起動されたプロセスからの標準出力はプロセス終了後にのみ読まれるというのであなたはオーケーであるはずだ」であるようだが、私は、それを共有していない。


2: プロセスを起動し、その標準出力、標準エラー出力、標準入力を操作すること、の基本


Hypothesizer 7
Pythonでは、プロセスは、'subprocess.Popen'クラスのインスタンスを生成することで起動できる、そして、そのインスタンスは、当該プロセスの標準出力、標準エラー出力、標準入力の各々の'TextIO'ストリームへのポインターを格納している。

勿論、コンストラクタは、起動されたプロセスが終了するのを待たずに即座にリターンするが、それは、そのクラスが汎用目的に使えるために不可欠の性質である。

以下は、1つのプロセスを起動し、その標準出力を読む例だ。

@Python ソースコード
import subprocess
from subprocess import Popen
import sys

		l_process: Popen = Popen (args = ["ls", "-l"], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, cwd = None, env = None, encoding = "UTF-8")
		try:
			l_standardOutputData: str
			while True:
				l_standardOutputData = l_process.stdout.read (1)
				if l_standardOutputData == "":
					break
				sys.stdout.write (l_standardOutputData)
				sys.stdout.flush ()
		except (Exception) as l_exception:
			sys.stderr.write ("### A standard output error: {0:s}.\n".format (str (l_exception)))
			sys.stderr.flush ()
		l_process.communicate ()
		sys.stdout.write ("### the process return: {0:d}\n".format (l_process.returncode))
		sys.stdout.flush ()

標準エラー出力および標準入力のストリームへのポインターは、'l_process.stderr'および'l_process.stdin'だ。

コマンドおよび引数群は、リストとして、コンストラクタの'args'引数に渡される。

ワーキングディレクトリは、文字列として'cwd'引数に渡される。

環境変数群は、'env'引数にマップとして渡されるが、もしも、'None'が渡されれば、起動元プロセスの環境変数群が、起動されたプロセスに継承される。

えーと、起動されたプロセスの生存期間と標準出力等へのアクセス可能期間の関係はどうなのか? . . . もしも、プロセスが完了したら、標準出力はアクセス不能になるのか、コンテンツがまだ読まれていないのに?それとも、標準出力をまだ読んでいなければ、プロセスの完了がブロックされるのか? . . . テストしてみよう。

@Python ソースコード
import subprocess
from subprocess import Popen
import sys
import time

		l_process: Popen = Popen (args = ["ls", "-l"], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, cwd = None, env = None, encoding = "UTF-8")
		sys.stdout.write ("### waiting 30 seconds before starting to read the standard output . . .\n")
		sys.stdout.flush ()
		time.sleep (30)
		try:
			l_standardOutputData: str
			while True:
				l_standardOutputData = l_process.stdout.read (1)
				if l_standardOutputData == "":
					break
				sys.stdout.write (l_standardOutputData)
				sys.stdout.flush ()
		except (Exception) as l_exception:
			sys.stderr.write ("### A standard output error: {0:s}.\n".format (str (l_exception)))
			sys.stderr.flush ()
		sys.stdout.write ("### waiting 30 seconds before calling 'communicate (input, timeout)' . . .\n")
		sys.stdout.flush ()
		time.sleep (30)
		l_processReturn: int = l_process.communicate ()
		sys.stdout.write ("### the process return: {0:d}\n".format (l_process.returncode))
		sys.stdout.flush ()

プログラムがスリープしている間に、起動されたプロセスをLinuxの'ps'コマンドで探した。

えーと、起動されたプロセスは、プログラムが標準出力を読み始めもしないうちに「defunct」になり、'communicate (input, timeout)'メソッドがコールされるまで生き続けた。したがって、標準出力や標準エラー出力を、充分速く読まなかったからという理由で読みはぐれることを心配する必要はない。他方で、標準出力や標準エラー出力を必ずしも読む必要はない: それらを読まなかったからといって、起動されたプロセスの終了が阻止されることはない。

1つの教訓は、'communicate (input, timeout)'メソッドを何らかの形でコールしなければならないということだ、もしも、起動されたプロセスが完了するのを私のプログラムは待ちたくないとしても。 . . . もしも、メインスレッドが待ちたくないのであれば、あるバックグラウンドスレッドを待たせればよい。


3: なぜ、リレーはバックグラウンドで行なわれるべきなのか


Hypothesizer 7
明らかなことだが、もしも、私のプログラムが標準出力を単にフォアグラウンドで読めば、他の2ストリームは、標準出力がクローズされるまで、面倒を見られないということになるだろう。

したがって、3ストリームの内の少なくとも2つは、バックグラウンドで操作されなければならない。

もっと望ましくは、3ストリームの全てがバックグラウンドで操作される、なぜなら、さもなければ、メインスレッドが、沈黙を保つ起動されたプロセスによって予測不可能にブロックされるかもしれないから。

Python 'TextIO'の致命的問題は、中断することもタイムアウトさせることもできず、場合によっては無期限に立ち往生することになることだ。

'Thtread.join (timeout)'および'Popen.communicate (input, timeout)'はタイムアウトさせることができるので、以下はメインスレッドが立ち往生するのを防ぐだろう。

@Python ソースコード
import subprocess
from subprocess import Popen
from subprocess import TimeoutExpired
import sys
from threading import Thread

		l_process: Popen = Popen (args = ["ls", "-l"], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, cwd = None, env = None, encoding = "UTF-8")
		def l_printStandardOutputThreadFunction () -> None:
			try:
				l_standardOutputData: str
				while True:
					l_standardOutputData = l_process.stdout.read (1)
					if l_standardOutputData == "":
						break
					sys.stdout.write (l_standardOutputData)
					sys.stdout.flush ()
			except (Exception) as l_exception:
				sys.stderr.write ("### A standard output error: {0:s}.\n".format (str (l_exception)))
				sys.stderr.flush ()
		l_printStandardOutputThread: Thread = Thread (target = l_printStandardOutputThreadFunction)
		l_printStandardOutputThread.start ()
		while True:
			l_printStandardOutputThread.join (0.1)
			if not l_printStandardOutputThread.is_alive ():
				break
		while True:
			try:
				l_process.communicate (None, 1.0)
				sys.stdout.write ("### the process return: {0:d}\n".format (l_process.returncode))
				sys.stdout.flush ()
				break
			except (TimeoutExpired) as l_exception:
				sys.stdout.write ("### The process has not terminated yet.\n")
				sys.stdout.flush ()

実際には、標準出力は既に自然にクローズされているので、'Popen.communicate (input, timeout)'は、そのケースにおいては、特にタイムアウトさせる必要はないだろう。


4: 各種リレーを行ない、それらを中断する


Hypothesizer 7
ここでは、起動されたプロセスの標準出力および標準エラー出力を、起動したプログラムのそれらにリレーし、起動したプログラムの標準入力を、起動されたプロセスのそれにリレーしよう。

1番目の試みでは、それは以下のようになった。

@Python ソースコード
import subprocess
from subprocess import Popen
from subprocess import TimeoutExpired
import sys
from threading import Thread

		l_process: Popen = Popen (args = ["ls", "-l"], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, cwd = None, env = None, encoding = "UTF-8")
		def l_printStandardOutputThreadFunction () -> None:
			try:
				l_standardOutputData: str
				while True:
					l_standardOutputData = l_process.stdout.read (1)
					if l_standardOutputData == "":
						break
					sys.stdout.write (l_standardOutputData)
					sys.stdout.flush ()
			except (Exception) as l_exception:
				sys.stderr.write ("### A standard output error: {0:s}.\n".format (str (l_exception)))
				sys.stderr.flush ()
		l_printStandardOutputThread: Thread = Thread (target = l_printStandardOutputThreadFunction)
		l_printStandardOutputThread.start ()
		def l_printStandardErrorOutputThreadFunction () -> None:
			try:
				l_standardErrorOutputData: str
				while True:
					l_standardErrorOutputData = l_process.stderr.read (1)
					if l_standardErrorOutputData == "":
						break
					sys.stderr.write (l_standardErrorOutputData)
					sys.stderr.flush ()
			except (Exception) as l_exception:
				sys.stderr.write ("### A standard error output error: {0:s}.\n".format (str (l_exception)))
				sys.stderr.flush ()
		l_printStandardErrorOutputThread: Thread = Thread (target = l_printStandardErrorOutputThreadFunction)
		l_printStandardErrorOutputThread.start ()
		def l_relayStandardInputThreadFunction () -> None:
			try:
				l_standardInputDatum: str = None
				while True:
					l_standardInputDatum = sys.stdin.read (1)
					l_process.stdin.write (l_standardInputDatum)
					l_process.stdin.flush ()
			except (Exception) as l_exception:
				sys.stderr.write ("### A standard input error: {0:s}.\n".format (str (l_exception)))
				sys.stderr.flush ()
		l_relayStandardInputThread: Thread = Thread (target = l_relayStandardInputThreadFunction)
		l_relayStandardInputThread.start ()
		while True:
			l_printStandardOutputThread.join (0.1)
			if not l_printStandardOutputThread.is_alive ():
				break
		while True:
			l_printStandardErrorOutputThread.join (0.1)
			if not l_printStandardErrorOutputThread.is_alive ():
				break
		while True:
			try:
				l_process.communicate (None, 1.0)
				sys.stdout.write ("### the process return: {0:d}\n".format (l_process.returncode))
				sys.stdout.flush ()
				break
			except (TimeoutExpired) as l_exception:
				sys.stdout.write ("### The process has not terminated yet.\n")
				sys.stdout.flush ()
		while True:
			l_relayStandardInputThread.join (1.0)
			if not l_relayStandardInputThread.is_alive ():
				break
			else:
				sys.stdout.write ("### l_relayStandardInputThread has not terminated yet.\n")

それは問題であるが、その理由は、標準入力リレースレッドが終了しないことであり、その理由は、そのスレッドは起動したプロセスの次標準入力データを待ち続けることだ。標準入力待ちもスレッドもどうやっても中断させることができない、私が知る限り(そのスレッドをデーモンスレッドにするというような回避策は一般的には受容できない) . . .

実のところ、解決策はここにある

そこで、私の改善されたコードは以下である。

@Python ソースコード
import subprocess
from subprocess import Popen
from subprocess import TimeoutExpired
import sys
from threading import Thread
from theBiasPlanet.coreUtilities.inputs.HaltableStandardInputReader import HaltableStandardInputReader
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException import NoMoreDataException
from theBiasPlanet.coreUtilities.timersHandling.TimeOutException import TimeOutException

		l_process: Popen = Popen (args = ["ls", "-l"], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, cwd = None, env = None, encoding = "UTF-8")
		def l_printStandardOutputThreadFunction () -> None:
			try:
				l_standardOutputData: str
				while True:
					l_standardOutputData = l_process.stdout.read (1)
					if l_standardOutputData == "":
						break
					sys.stdout.write (l_standardOutputData)
					sys.stdout.flush ()
			except (Exception) as l_exception:
				sys.stderr.write ("### A standard output error: {0:s}.\n".format (str (l_exception)))
				sys.stderr.flush ()
		l_printStandardOutputThread: Thread = Thread (target = l_printStandardOutputThreadFunction)
		l_printStandardOutputThread.start ()
		def l_printStandardErrorOutputThreadFunction () -> None:
			try:
				l_standardErrorOutputData: str
				while True:
					l_standardErrorOutputData = l_process.stderr.read (1)
					if l_standardErrorOutputData == "":
						break
					sys.stderr.write (l_standardErrorOutputData)
					sys.stderr.flush ()
			except (Exception) as l_exception:
				sys.stderr.write ("### A standard error output error: {0:s}.\n".format (str (l_exception)))
				sys.stderr.flush ()
		l_printStandardErrorOutputThread: Thread = Thread (target = l_printStandardErrorOutputThreadFunction)
		l_printStandardErrorOutputThread.start ()
		l_haltableStandardInputReader: "HaltableStandardInputReader" = HaltableStandardInputReader.getInstance ()
		def l_relayStandardInputThreadFunction () -> None:
			l_threadIdentification: str = "{0:d}".format (id (threading.current_thread ()))
			try:
				l_standardInputDatum: str = None
				while True:
					try:
						l_standardInputDatum = l_haltableStandardInputReader.read (l_threadIdentification, 1)
						if l_standardInputDatum == "":
							break
						l_process.stdin.write (l_standardInputDatum)
						l_process.stdin.flush ()
					except (NoMoreDataException) as l_exception:
						break
					except (TimeOutException) as l_exception:
						# impossible
						None
			except (Exception) as l_exception:
				sys.stderr.write ("### A standard input error: {0:s}.\n".format (str (l_exception)))
				sys.stderr.flush ()
		l_relayStandardInputThread: Thread = Thread (target = l_relayStandardInputThreadFunction)
		l_haltableStandardInputReader.addSubscriber ("{0:d}".format (id (l_relayStandardInputThread)))
		sys.stdout.flush ()
		l_relayStandardInputThread.start ()
		sys.stdout.flush ()
		while True:
			l_printStandardOutputThread.join (0.1)
			if not l_printStandardOutputThread.is_alive ():
				break
		while True:
			l_printStandardErrorOutputThread.join (0.1)
			if not l_printStandardErrorOutputThread.is_alive ():
				break
		while True:
			try:
				l_process.communicate (None, 1.0)
				sys.stdout.write ("### the process return: {0:d}\n".format (l_process.returncode))
				sys.stdout.flush ()
				break
			except (TimeoutExpired) as l_exception:
				sys.stdout.write ("### The process has not terminated yet.\n")
				sys.stdout.flush ()
		l_haltableStandardInputReader.removeSubscriber ("{0:d}".format (id (l_relayStandardInputThread)))
		while True:
			l_relayStandardInputThread.join (1.0)
			if not l_relayStandardInputThread.is_alive ():
				break
			else:
				sys.stdout.write ("### l_relayStandardInputThread has not terminated yet.\n")

えーと、標準出力および標準エラー出力の読みはどうやって中断させることができるのか?

通常それらを中断する必要はないと思う、なぜなら、それらのバックグラウンドスレッドは、起動されたプロセスによって標準出力が自主的にクローズされるまで、なぜ、動くか待つかさせておいていけないのだろうか?しかし、中断しなければならないということがあるのであれば、それらのアウトプットは、私のプログラムからクローズすることができる。


5: ユーティリティクラスを作成する


Hypothesizer 7
そのようなコードは以下は定型のものなので、それを複数の箇所に書くのは賢明でない。

そこで、ユーティリティクラスを作ろう。

theBiasPlanet/coreUtilities/processesHandling/ProcessHandler.py

@Python ソースコード
from typing import Dict
from typing import List
from typing import TextIO
from typing import cast
from io import StringIO
import os
import signal
import subprocess
from subprocess import Popen
import sys
import threading
from threading import Condition
from threading import Thread
from theBiasPlanet.coreUtilities.inputs.HaltableStandardInputReader import HaltableStandardInputReader
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException import NoMoreDataException
from theBiasPlanet.coreUtilities.messagingHandling.Publisher import Publisher
from theBiasPlanet.coreUtilities.timersHandling.TimeOutException import TimeOutException

class ProcessHandler ():
	s_haltableStandardInputReader: "HaltableStandardInputReader" = HaltableStandardInputReader.getInstance ()
	
	# the static initializer Start
	s_haltableStandardInputReader.startDispatchDataThread ()
	# the static initializer End
	
	@staticmethod
	def interruptRelayStandardInputThread (a_thread: Thread) -> None:
		ProcessHandler.s_haltableStandardInputReader.removeSubscriber ("{0:d}".format (id (a_thread)))
	
	class StandardInputAndOutputs:
		def __init__ (a_this: "ProcessHandler.StandardInputAndOutputs", a_process: Popen) -> None:
			a_this.i_process: Popen
			a_this.i_standardInputOutputStream: TextIO
			a_this.i_standardOutputInputStream: TextIO
			a_this.i_standardErrorOutputInputStream: TextIO
			a_this.i_relayStandardInputThread: Thread
			a_this.i_printStandardOutputThread: Thread
			a_this.i_printStandardErrorOutputThread: Thread
			a_this.i_thereWereStandardInputContents: bool
			a_this.i_thereWereStandardOutputContents: bool
			a_this.i_thereWereStandardErrorOutputContents: bool
			
			a_this.i_process = a_process
			a_this.i_standardInputOutputStream = cast (TextIO, a_this.i_process.stdin)
			a_this.i_standardOutputInputStream = cast (TextIO, a_this.i_process.stdout)
			a_this.i_standardErrorOutputInputStream = cast (TextIO, a_this.i_process.stderr)
			a_this.i_relayStandardInputThread = None
			a_this.i_printStandardOutputThread = None
			a_this.i_printStandardErrorOutputThread = None
			a_this.i_thereWereStandardInputContents = False
			a_this.i_thereWereStandardOutputContents = False
			a_this.i_thereWereStandardErrorOutputContents = False
		
		def getStandardInputOutputStream (a_this: "ProcessHandler.StandardInputAndOutputs", ) -> TextIO:
			return a_this.i_standardInputOutputStream
		
		def getStandardOutputInputStream (a_this: "ProcessHandler.StandardInputAndOutputs", ) -> TextIO:
			return a_this.i_standardOutputInputStream
		
		def getStandardErrorOutputInputStream (a_this: "ProcessHandler.StandardInputAndOutputs", ) -> TextIO:
			return a_this.i_standardErrorOutputInputStream
		
		def relayStandardInputAsynchronously (a_this: "ProcessHandler.StandardInputAndOutputs") -> None:
			def l_relayStandardInputThreadFunction () -> None:
				l_threadIdentification: str = "{0:d}".format (id (threading.current_thread ()))
				try:
					l_processStandardInputWriter: TextIO = a_this.i_standardInputOutputStream
					l_standardInputDatum: str = None
					while True:
						try:
							l_standardInputDatum = ProcessHandler.s_haltableStandardInputReader.read (l_threadIdentification, 1)
							a_this.i_thereWereStandardInputContents = True
							l_processStandardInputWriter.write (l_standardInputDatum)
							l_processStandardInputWriter.flush ()
						except (NoMoreDataException) as l_exception:
							break
						except (TimeOutException) as l_exception:
							# impossible
							None
				except (Exception) as l_exception:
					Publisher.logErrorInformation ("### A standard input error: {0:s}.".format (str (l_exception)))
			a_this.i_relayStandardInputThread = Thread (target = l_relayStandardInputThreadFunction)
			ProcessHandler.s_haltableStandardInputReader.addSubscriber ("{0:d}".format (id (a_this.i_relayStandardInputThread)))
			a_this.i_relayStandardInputThread.start ()
		
		def printStandardOutputAsynchronously (a_this: "ProcessHandler.StandardInputAndOutputs") -> None:
			def l_printStandardOutputThreadFunction () -> None:
				try:
					l_standardOutputReader: TextIO = a_this.i_standardOutputInputStream
					l_standardOutputData: str
					while True:
						l_standardOutputData = l_standardOutputReader.read (1)
						if l_standardOutputData == "":
							break
						a_this.i_thereWereStandardOutputContents = True
						sys.stdout.write (l_standardOutputData)
						sys.stdout.flush ()
				except (Exception) as l_exception:
					Publisher.logErrorInformation (l_exception)
					sys.stderr.write ("### A standard output error: {0:s}.\n".format (str (l_exception)))
					sys.stderr.flush ()
			a_this.i_printStandardOutputThread  = Thread (target = l_printStandardOutputThreadFunction)
			a_this.i_printStandardOutputThread.start ()
		
		def printStandardErrorOutputAsynchronously (a_this: "ProcessHandler.StandardInputAndOutputs") -> None:
			def l_printStandardErrorOutputThreadFunction () -> None:
				try:
					l_standardErrorOutputReader: TextIO = a_this.i_standardErrorOutputInputStream
					l_standardErrorOutputData: str
					while True:
						l_standardErrorOutputData = l_standardErrorOutputReader.read (1)
						if l_standardErrorOutputData == "":
							break
						a_this.i_thereWereStandardErrorOutputContents = True
						sys.stderr.write (l_standardErrorOutputData)
						sys.stderr.flush ()
				except (Exception) as l_exception:
					Publisher.logErrorInformation (l_exception)
					sys.stderr.write ("### A standard error output error: {0:s}.\n".format (str (l_exception)))
					sys.stderr.flush ()
			a_this.i_printStandardErrorOutputThread  = Thread (target = l_printStandardErrorOutputThreadFunction)
			a_this.i_printStandardErrorOutputThread .start ()
		
		def setStandardInputNextLine (a_this: "ProcessHandler.StandardInputAndOutputs", a_line: str) -> None:
			try:
				a_this.i_standardInputOutputStream.write ("{0:s}\n".format (a_line))
				a_this.i_standardInputOutputStream.flush ()
				a_this.i_thereWereStandardInputContents = True
			except (Exception) as l_exception:
				sys.stderr.write ("The standard input couldn't be written into: {0:s}.\n".format (str (l_exception)))
				sys.stderr.flush ()
				a_this.i_standardInputOutputStream.close ()
		
		def getStandardOutputNextLine (a_this: "ProcessHandler.StandardInputAndOutputs") -> str:
			try:
				l_line: str = None
				l_line = a_this.i_standardOutputInputStream.readline ()
				if l_line != "":
					a_this.i_thereWereStandardOutputContents = True
					return l_line
				else:
					a_this.i_standardOutputInputStream.close ()
					return None
			except (Exception) as l_exception:
				sys.stderr.write ("The standard output couldn't be scanned: {0:s}.\n".format (str (l_exception)))
				sys.stderr.flush ()
				a_this.i_standardOutputInputStream.close ()
				return None
		
		def getStandardErrorOutputNextLine (a_this: "ProcessHandler.StandardInputAndOutputs") -> str:
			try:
				l_line: str = None
				l_line = a_this.i_standardErrorOutputInputStream.readline ()
				if l_line != "":
					a_this.i_thereWereStandardErrorOutputContents = True
					return l_line
				else:
					a_this.i_standardErrorOutputInputStream.close ()
					return None
			except (Exception) as l_exception:
				sys.stderr.write ("The standard error output couldn't be scanned: {0:s}.\n".format (str (l_exception)))
				sys.stderr.flush ()
				a_this.i_standardErrorOutputInputStream.close ()
				return None
		
		def thereWereStandardInputContents (a_this: "ProcessHandler.StandardInputAndOutputs") -> bool:
			return a_this.i_thereWereStandardInputContents
		
		def thereWereStandardOutputContents (a_this: "ProcessHandler.StandardInputAndOutputs") -> bool:
			return a_this.i_thereWereStandardOutputContents
		
		def thereWereStandardErrorOutputContents (a_this: "ProcessHandler.StandardInputAndOutputs") -> bool:
			return a_this.i_thereWereStandardErrorOutputContents
		
		def waitUntillFinish (a_this: "ProcessHandler.StandardInputAndOutputs") -> int:
			if a_this.i_printStandardErrorOutputThread is not None:
				a_this.i_printStandardErrorOutputThread.join ()
				a_this.i_printStandardErrorOutputThread = None
			if a_this.i_printStandardOutputThread is not None:
				a_this.i_printStandardOutputThread.join ()
				a_this.i_printStandardOutputThread = None
			l_processReturn: int = a_this.i_process.wait ()
			if a_this.i_relayStandardInputThread is not None:
				ProcessHandler.interruptRelayStandardInputThread (a_this.i_relayStandardInputThread)
				a_this.i_relayStandardInputThread.join ()
				a_this.i_relayStandardInputThread = None
			return l_processReturn
	
	# The environment variables of the current process are automatically passed into the sub process.
	@staticmethod
	def executeAndReturnStandardInputAndOutputs (a_workingDirectoryPath: str, a_environmentVariableNameToValueMap: "Dict [str, str]", a_commandAndArguments: List [str]) -> "ProcessHandler.StandardInputAndOutputs":
		l_process: Popen = Popen (args = a_commandAndArguments, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, cwd = a_workingDirectoryPath, env = a_environmentVariableNameToValueMap, encoding = "UTF-8")
		return ProcessHandler.StandardInputAndOutputs (l_process)
	
	@staticmethod
	def execute (a_workingDirectory: str, a_environmentVariableNameToValueMap: "Dict [str, str]", a_commandAndArguments: List [str], a_waitsUntilFinish: bool) -> int:
		l_standardInputAndOutputs: "ProcessHandler.StandardInputAndOutputs" = ProcessHandler.executeAndReturnStandardInputAndOutputs (a_workingDirectory, a_environmentVariableNameToValueMap, a_commandAndArguments)
		l_childProcessReturn: int = 0
		if a_waitsUntilFinish:
			l_standardInputAndOutputs.printStandardOutputAsynchronously ()
			l_standardInputAndOutputs.printStandardErrorOutputAsynchronously ()
			l_standardInputAndOutputs.relayStandardInputAsynchronously ()
			l_childProcessReturn = l_standardInputAndOutputs.waitUntillFinish ()
		else:
			def l_waitForInvokedProcessToEndThreadFunction () -> None:
				try:
					l_standardInputAndOutputs.printStandardOutputAsynchronously ()
					l_standardInputAndOutputs.printStandardErrorOutputAsynchronously ()
					l_standardInputAndOutputs.relayStandardInputAsynchronously ()
					l_standardInputAndOutputs.waitUntillFinish ()
				except (Exception) as l_exception:
					None
			l_waitForInvokedProcessToEndThread = Thread (target = l_waitForInvokedProcessToEndThreadFunction)
			l_waitForInvokedProcessToEndThread.start ()
		return l_childProcessReturn


6: そのユーティリティクラスの使用方法


Hypothesizer 7
もしも、あるセクションにおいて行なったのと全く同じようにして、起動されたプロセスが完了するまで待ちたいのであれば、'execute (a_workingDirectory: str, a_environmentVariableNameToValueMap: "Dict [str, str]", a_commandAndArguments: List [str], a_waitsUntilFinish: bool)'メソッドを以下のようにコールすればよい。

@Python ソースコード
from theBiasPlanet.coreUtilities.processesHandling.ProcessHandler import ProcessHandler

		l_executionResult: int = ProcessHandler.execute (None, None, ["ls", "-l"], False)

もしも、そうでないのであれば、'executeAndReturnStandardInputAndOutputs (a_workingDirectoryPath: str, a_environmentVariableNameToValueMap: "Dict [str, str]", a_commandAndArguments: List [str])'メソッドを呼んで、その後で、リターンされた'ProcessHandler.StandardInputAndOutputs'オブジェクトを自由に操作すればよい、例えば、以下のように。

@Python ソースコード
import sys
from theBiasPlanet.coreUtilities.processesHandling.ProcessHandler import ProcessHandler

		l_StandardInputAndOutputs: "ProcessHandler.StandardInputAndOutputs" = ProcessHandler.executeAndReturnStandardInputAndOutputs (None, None, ["ls", "-l"])
		l_StandardInputAndOutputs.printStandardErrorOutputAsynchronously ()
		l_StandardInputAndOutputs.relayStandardInputAsynchronously ()
		sys.stdout.write ("### The 1st line: {0:s}\n".format (l_StandardInputAndOutputs.getStandardOutputNextLine ()))
		l_StandardInputAndOutputs.waitUntillFinish ()


参考資料


<このシリーズの前の記事 | このシリーズの目次 | このシリーズの次の記事>