2020年8月16日日曜日

9: Pythonにおいて、任意の標準入力待ちを中断する

<このシリーズの前の記事 | このシリーズの目次 | このシリーズの次の記事>

汎用的解決策、任意のスレッドに、任意のタイミングで、待ちを、開始、中断、再開させられる。入力は、非標準でも構いません。

話題


About: Pythonプログラミング言語

この記事の目次


開始コンテキスト


  • 読者は、Pythonの基本的知識を持っている。

ターゲットコンテキスト



  • 読者は、Pythonにおいて任意の標準入力待ちを中断するための汎用的解決策を得る。

オリエンテーション


Pythonにおいて任意の標準入力待ちを中断するための汎用的解決策がここにあります。

前記事で紹介された文字列パイプが使われています。

ここで作成される中断可能リーダーは、いくつかの他のシリーズにていくつかの他のプログラミング言語(Java用、C++用、C#用)へ移植されます。


本体


1: 私の要件


Hypothesizer 7
私の要件は以下のとおり: 標準入力を待つスレッドを作成する; しばらくした後、そのスレッドを停止させる; しばらくした後、標準入力を待つ別のスレッドを作成する; と続く。

それはとても容易だろうと私は思っていた; スレッドを停止させる方法か、標準入力待ちを中断させる方法か、ブロッキングすることなく入力を覗き見る方法か、待ちをタイムアウトさせる方法があるだろう、と私は思っていた


2: うまくいかない、よく見かけるアドバイスたち


Hypothesizer 7
スレッドを停止させるためのよく見かける1つのアドバイスは、フラグやそれに類したメカニズムを使うことだ. . .

あのう、それは、私の場合には、うまくいきません: スレッドは、Python内でループしているのではなく、単一のメソッド呼び出し内で引っかかっているのです!

別のアドバイスは、'ctypes.pythonapi.PyThreadState_SetAsyncExc'メソッドを使うことだ。

ナイス、しかし、残念ながら、それもうまくいかない、なぜなら、そのメソッドは、Python行間でのみ中断をかけられるのであって、Cコードからなる単一のPython行内には割り込めないから。

それでは、標準入力待ちを中断することはできるだろうか?

えーと、Pythonの'io.TextIOWrapper'クラス('sys.stdin'はそのインスタンスである)は、そのようなメソッドを持っていない。

それでは、ブロッキングすることなく入力を覗き見ることはできるだろうか?

そのクラスは、それをサポートしていないようだ. . .

それでは、待ちをタイムアウトさせられるだろうか?

そうではなさそうだ. . .

えーと、じゃあ、どうしろと?

別のアドバイスは、入力ストリームをクローズすることだ。

あのう、標準入力ストリームを閉じることはできないのですよ、なぜなら、後続のスレッドたちがそれを使用しなければならないから(クローズされた標準入力ストリームは再オープンできないでしょう?)。

スレッドを起こすためにプログラム的にダミー入力データを送れないものかと思ったが、標準Pythonのみでは無理のようだ。

別のアドバイスは、当該プロセスに'signal.SIGINT'シグナルを送ることだ。

ナイス、しかし、残念ながら、それは、当該スレッドがメインスレッドである場合にのみうまくいく. . .

別のアドバイスは、'select.select (rlist, wlist, xlist[, timeout])'ファンクションを使って、標準入力の状態をチェックすることだ。

えーと、それは、とても有望に思えたが、結局、うまくいかないことがわかった、なぜなら、文字を1つ読むと、そのメソッドは乱されるから: つまり、4文字の文字列"ABC¥n"が入力された時、そのメソッドは、入力が用意できていると問題なく報告するが、1つの文字"A"だけを読んだ後は、そのメソッドは、入力が用意できていないと報告し、私のプログラムは、もう1文字読むべきか否か惑うままになる。. . . あのう、文字列長が分からないので、私のプログラムは、読める文字が何個そこにあるのか分からないのですよ。

別のアドバイスは、当該スレッドをデーモンスレッドにすることだ。

明らかに、それはうまくいかない、なぜなら、それは、プログラムが終了する時にのみ当該スレッドを停止させるのであって、それは、私が望んでいることではない。

結局、アドバイスはどれもうまくいかない!この容易に見えるタスクに、なぜ私ははまっていなければならのだろうか、まったくのところ?


3: 計画


Hypothesizer 7
何かが不可能であることを理解することは重要である、なぜなら、そうした後に初めて、問題をその条件の下に考え始められるから。

私は今や、標準入力を待っているスレッドを停止させることは不可能であることを理解した。. . . いや、もっと正確に述べる必要がある: 標準入力を'直接に'待っているスレッドを停止させることは不可能である

その正確な叙述が、新たなアイデアへの道を開く: それでは、私のスレッドは標準入力を'間接に'待つべきである。つまり、標準入力を待ち続け、入力データを私のスレッドたちにリレーするデーモンスレッドがあるべきである。そのリレーするメカニズムが中断可能であればいいわけだろう。

実のところ、1つの中断可能リレーメカニズムを既に私は持っている: 前記事で紹介された文字列パイプだ。

ロジックをこの直近の関心事のプログラムに実装するよりも、中断可能入力リーダークラスのサブクラスとして中断可能標準入力リーダークラスを作成したほうがよいだろう。


4: コード


Hypothesizer 7
以下が、私の中断可能入力リーダークラスのコードだ(mypyアノテーション annotationsを含んでいる(私の全てのpythonコードがそうであるように))。

'theBiasPlanet/coreUtilities/inputs/HaltableReader.py'

@Python ソースコード
from typing import List
from typing import Optional
from typing import TextIO
from collections import OrderedDict
from io import StringIO
from threading import Thread
from theBiasPlanet.coreUtilities.constantsGroups.GeneralConstantsConstantsGroup import GeneralConstantsConstantsGroup
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException import NoMoreDataException
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException import NoMoreNeedsException
from theBiasPlanet.coreUtilities.locks.SharableLock import SharableLock
from theBiasPlanet.coreUtilities.messagingHandling.Publisher import Publisher
from theBiasPlanet.coreUtilities.pipes.StringPipe import StringPipe
from theBiasPlanet.coreUtilities.timersHandling.TimeOutException import TimeOutException

class HaltableReader:
	def __init__ (a_this: "HaltableReader", a_underlyingStream: TextIO, a_bufferLength: int) -> None:
		a_this.i_underlyingStream: TextIO
		a_this.i_bufferLength: int
		a_this.i_dispatchDataThread: Optional [Thread] = None
		a_this.i_subscriberIdentificationToStringPipeMap: "OrderedDict [str, StringPipe]" = OrderedDict ()
		a_this.i_sharableLock: "SharableLock" = SharableLock ()
		
		a_this.i_underlyingStream = a_underlyingStream
		a_this.i_bufferLength = a_bufferLength
	
	# Any subscriber has to read consistently, or the other subscribers may be stuck because the dispatching thread will be stuck waiting to write to the string pipe for the neglecting subscriber.
	def addSubscriber (a_this: "HaltableReader", a_subscriberIdentification: str) -> None:
		try:
			a_this.i_sharableLock.lockExclusively ()
			a_this.i_subscriberIdentificationToStringPipeMap.update ({a_subscriberIdentification: StringPipe (a_this.i_bufferLength, False)})
		finally:
			a_this.i_sharableLock.unlockExclusively ()
	
	def removeSubscriber (a_this: "HaltableReader", a_subscriberIdentification: str) -> None:
		try:
			a_this.i_sharableLock.lockSharedly ()
			try:
				l_stringPipe: "StringPipe" = a_this.i_subscriberIdentificationToStringPipeMap [a_subscriberIdentification]
				l_stringPipe.finishWriting ()
			except (KeyError) as l_exception:
				None
		finally:
			a_this.i_sharableLock.unlockSharedly ()
		try:
			a_this.i_sharableLock.lockExclusively ()
			a_this.i_subscriberIdentificationToStringPipeMap.pop (a_subscriberIdentification)
		finally:
			a_this.i_sharableLock.unlockExclusively ()
	
	def startDispatchDataThread (a_this: "HaltableReader") -> None:
		if a_this.i_dispatchDataThread is None:
			def l_dispatchDataThreadFunction () -> None:
				try:
					l_data: str
					while True:
						l_data = a_this.i_underlyingStream.read (1)
						if l_data == "":
							break
						l_subscriberIdentification: str = None
						try:
							a_this.i_sharableLock.lockSharedly ()
							for l_subscriberIdentification in a_this.i_subscriberIdentificationToStringPipeMap:
								a_this.i_subscriberIdentificationToStringPipeMap [l_subscriberIdentification].writeWholeString (StringIO (l_data))
						finally:
							a_this.i_sharableLock.unlockSharedly ()
				except (EOFError) as l_exception:
					None
				except (Exception) as l_exception:
					Publisher.logErrorInformation (l_exception)
				finally:
					try:
						a_this.i_sharableLock.lockSharedly ()
						for l_subscriberIdentification in a_this.i_subscriberIdentificationToStringPipeMap:
							a_this.i_subscriberIdentificationToStringPipeMap [l_subscriberIdentification].finishWriting ()
					finally:
						a_this.i_sharableLock.unlockSharedly ()
			a_this.i_dispatchDataThread = Thread (target = l_dispatchDataThreadFunction)
			a_this.i_dispatchDataThread.daemon = True
			a_this.i_dispatchDataThread.start ()
	
	def close (a_this: "HaltableReader") -> None:
		a_this.i_underlyingStream.close ()
	
	def read (a_this: "HaltableReader", a_subscriberIdentification: str, a_maximumLength: int, a_timeOutPeriodInMilliseconds: int = -1) -> str:
		l_stringPipe: "Optional [StringPipe]" = None
		try:
			a_this.i_sharableLock.lockSharedly ()
			try:
				l_stringPipe = a_this.i_subscriberIdentificationToStringPipeMap [a_subscriberIdentification]
			except (KeyError) as l_exception:
				None
		finally:
			a_this.i_sharableLock.unlockSharedly ()
		if l_stringPipe is not None:
			return l_stringPipe.readString (a_maximumLength, a_timeOutPeriodInMilliseconds)
		else:
			raise NoMoreNeedsException ("")
	
	def readLine (a_this: "HaltableReader", a_subscriberIdentification: str, a_maximumLength: int, a_timeOutPeriodInMilliseconds: int = -1) -> str:
		l_stringPipe: "Optional [StringPipe]" = None
		try:
			a_this.i_sharableLock.lockSharedly ()
			try:
				l_stringPipe = a_this.i_subscriberIdentificationToStringPipeMap.get (a_subscriberIdentification)
			except (KeyError) as l_exception:
				None
		finally:
			a_this.i_sharableLock.unlockSharedly ()
		if l_stringPipe is not None:
			return l_stringPipe.readStringLine (a_maximumLength, a_timeOutPeriodInMilliseconds)
		else:
			raise NoMoreNeedsException ("")
	
	def isReady (a_this: "HaltableReader", a_subscriberIdentification: str) -> bool:
		l_stringPipe: "Optional [StringPipe]" = None
		try:
			a_this.i_sharableLock.lockSharedly ()
			try:
				l_stringPipe = a_this.i_subscriberIdentificationToStringPipeMap [a_subscriberIdentification]
			except (KeyError) as l_exception:
				None
		finally:
			a_this.i_sharableLock.unlockSharedly ()
		if l_stringPipe is not None:
			return not (l_stringPipe.isEmpty ())
		else:
			return False

'theBiasPlanet/coreUtilities/constantsGroups/GeneralConstantsConstantsGroup.py'

@Python ソースコード
~
class GeneralConstantsConstantsGroup:
	c_emptyString: str = ""
	~

'theBiasPlanet/coreUtilities/locks/SharableLock.py'

@Python ソースコード
from typing import Optional
import threading
from threading import Condition
from threading import Thread

class SharableLock:
	def __init__ (a_this: "SharableLock") -> None:
		a_this.i_sharingLockingNumberOfTimes: int = 0
		a_this.i_exclusiveLockingThread: Optional [Thread] = None
		a_this.i_exclusiveLockingNumberOfTimes: int = 0
		a_this.i_threadCondition: Condition = Condition ()
	
	def lockSharedly (a_this: "SharableLock") -> None:
		a_this.i_threadCondition.acquire ()
		while a_this.i_exclusiveLockingNumberOfTimes > 0:
			a_this.i_threadCondition.wait ()
		a_this.i_sharingLockingNumberOfTimes = a_this.i_sharingLockingNumberOfTimes + 1
		a_this.i_threadCondition.release ()
	
	def lockExclusively (a_this: "SharableLock") -> None:
		a_this.i_threadCondition.acquire ()
		while a_this.i_sharingLockingNumberOfTimes > 0 or (a_this.i_exclusiveLockingThread is not None and threading.current_thread () != a_this.i_exclusiveLockingThread):
			a_this.i_threadCondition.wait ()
		if a_this.i_exclusiveLockingThread is None:
			a_this.i_exclusiveLockingThread = threading.current_thread ()
		a_this.i_exclusiveLockingNumberOfTimes = a_this.i_exclusiveLockingNumberOfTimes + 1
		a_this.i_threadCondition.release ()
	
	def unlockSharedly (a_this: "SharableLock") -> None:
		a_this.i_threadCondition.acquire ()
		if a_this.i_sharingLockingNumberOfTimes > 0:
			a_this.i_sharingLockingNumberOfTimes = a_this.i_sharingLockingNumberOfTimes - 1
			if a_this.i_sharingLockingNumberOfTimes == 0:
				a_this.i_threadCondition.notify_all ()
		a_this.i_threadCondition.release ()
	
	def unlockExclusively (a_this: "SharableLock") -> None:
		a_this.i_threadCondition.acquire ()
		if a_this.i_exclusiveLockingNumberOfTimes > 0:
			a_this.i_exclusiveLockingNumberOfTimes = a_this.i_exclusiveLockingNumberOfTimes - 1
			if a_this.i_exclusiveLockingNumberOfTimes == 0:
				a_this.i_exclusiveLockingThread = None
				a_this.i_threadCondition.notify_all ()
		a_this.i_threadCondition.release ()

'theBiasPlanet/coreUtilities/messagingHandling/Publisher.py'は、全て省略する、なぜなら、その使用されているメソッドは、ただログを書くだけだから。

以下が、私の中断可能標準入力リーダークラスだ。

'theBiasPlanet/coreUtilities/inputs/HaltableStandardInputReader.py'

@Python ソースコード
import sys
from theBiasPlanet.coreUtilities.constantsGroups.DefaultValuesConstantsGroup import DefaultValuesConstantsGroup
from theBiasPlanet.coreUtilities.inputs.HaltableReader import HaltableReader

class HaltableStandardInputReader (HaltableReader):
	s_singletonInstance: "HaltableStandardInputReader" = None
	
	def __init__ (a_this: "HaltableStandardInputReader") -> None:
		
		super ().__init__ (sys.stdin, DefaultValuesConstantsGroup.c_smallBufferSize)
	
	@staticmethod
	def getInstance () -> "HaltableStandardInputReader":
		if HaltableStandardInputReader.s_singletonInstance is None:
			HaltableStandardInputReader.s_singletonInstance = HaltableStandardInputReader ()
		return HaltableStandardInputReader.s_singletonInstance

ただ1つの標準入力リーダークラスインスタンスがある、と想定されており、そのインスタンスをシングルトンにするために、インスタンスは'HaltableStandardInputReader.getInstance ()'メソッドから取得される、と想定されている(それは強制できないが、なぜなら、Pythonがプライベートコンストラクタを(というか、プライベートメソッドを)許さないため、残念なことに)。

リレーを行なうスレッドを開始するために、'HaltableReader.startDispatchDataThread'メソッドが呼ばれなければならない(何らの害もなく、複数回呼ぶことができる)。

任意のスレッドを、'HaltableReader.addSubscriber (a_this: "HaltableReader", a_subscriberIdentification: str)'メソッドを呼ぶことによって、その標準入力リーダークラスインスタンスの講読者として登録でき、'HaltableReader.removeSubscriber (a_this: "HaltableReader", a_subscriberIdentification: str)'メソッド呼び出しによって購読解除することによって、そのスレッドによる入力待ちが中断される。

スレッドは、待ちにタイムアウトを使いたければ、そうできる。

複数のスレッドがその標準入力リーダークラスインスタンスを購読できる、それは、私の直近の要件には含まれていないが。

本解決策がうまくいかないケースはないであろう、と私は推測する。


5: 1つの使用例と1つの実行結果


Hypothesizer 7
以下は、1つの使用例だ。

@Python ソースコード
from typing import List
from typing import Optional
from datetime import datetime
import sys
import threading
from threading import Thread
import time
from theBiasPlanet.coreUtilities.inputs.HaltableStandardInputReader import HaltableStandardInputReader
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException import NoMoreDataException
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException import NoMoreNeedsException
from theBiasPlanet.coreUtilities.timersHandling.TimeOutException import TimeOutException

class Test1Test:
	@staticmethod
	def main (a_arguments: List [str]) -> None:
		Test1Test.test4 ()
	
	@staticmethod
	def test4 () -> None:
		l_haltableStandardInputReader: "HaltableStandardInputReader" = HaltableStandardInputReader.getInstance ()
		l_haltableStandardInputReader.startDispatchDataThread ()
		def l_workerThreadFunction () -> None:
			l_data: Optional [str] = None
			l_subscriberIdentification: str = str (id (threading.current_thread ()))
			while True:
				try:
					l_data = l_haltableStandardInputReader.readLine (l_subscriberIdentification, 10, 10 * 1000)
					sys.stdout.write ("### {0:s}: {1:s}: the standard input data: {2:s}\n".format (str (datetime.now ()), l_subscriberIdentification, l_data))
					sys.stdout.flush ()
				except (NoMoreDataException) as l_exception:
					break
				except (NoMoreNeedsException) as l_exception:
					break
				except (TimeOutException) as l_exception:
					None
			sys.stdout.write ("### {0:s}: {1:s}: ended\n".format (str (datetime.now ()), l_subscriberIdentification))
			sys.stdout.flush ()
		l_workerThreadA: Thread = Thread (target = l_workerThreadFunction)
		l_subscriberIdentificationForWorkerThreadA: str = str (id (l_workerThreadA))
		l_haltableStandardInputReader.addSubscriber (l_subscriberIdentificationForWorkerThreadA)
		l_workerThreadB: Thread = Thread (target = l_workerThreadFunction)
		l_subscriberIdentificationForWorkerThreadB: str = str (id (l_workerThreadB))
		l_haltableStandardInputReader.addSubscriber (l_subscriberIdentificationForWorkerThreadB)
		l_workerThreadA.start ()
		l_workerThreadB.start ()
		sys.stdout.write ("### {0:s}: sleeping . . .\n".format (str (datetime.now ())))
		sys.stdout.flush ()
		time.sleep (30)
		l_haltableStandardInputReader.removeSubscriber (l_subscriberIdentificationForWorkerThreadA)
		l_workerThreadA.join ()
		sys.stdout.write ("### {0:s}: sleeping . . .\n".format (str (datetime.now ())))
		sys.stdout.flush ()
		time.sleep (30)
		l_haltableStandardInputReader.removeSubscriber (l_subscriberIdentificationForWorkerThreadB)
		l_workerThreadB.join ()

if __name__ == "__main__":
	Test1Test.main (sys.argv)

以下は、1つのアウトプットだ。

@出力
### 2020-08-03 14:34:09.851530: sleeping . . .
ABC
### 2020-08-03 14:34:14.924620: 139963775811104: the standard input data: ABC

### 2020-08-03 14:34:14.924868: 139963775840512: the standard input data: ABC

### 2020-08-03 14:34:39.853083: 139963775811104: ended
### 2020-08-03 14:34:39.853977: sleeping . . .
DEF
### 2020-08-03 14:34:44.147788: 139963775840512: the standard input data: DEF

### 2020-08-03 14:35:09.854674: 139963775840512: ended


参考資料


<このシリーズの前の記事 | このシリーズの目次 | このシリーズの次の記事>