汎用的解決策、任意のスレッドに、任意のタイミングで、待ちを、開始、中断、再開させられる。入力は、非標準でも構いません。
話題
About: Pythonプログラミング言語
この記事の目次
開始コンテキスト
- 読者は、Pythonの基本的知識を持っている。
ターゲットコンテキスト
- 読者は、Pythonにおいて任意の標準入力待ちを中断するための汎用的解決策を得る。
オリエンテーション
Pythonにおいて任意の標準入力待ちを中断するための汎用的解決策がここにあります。
前記事で紹介された文字列パイプが使われています。
ここで作成される中断可能リーダーは、いくつかの他のシリーズにていくつかの他のプログラミング言語(Java用、C++用、C#用)へ移植されます。
本体
1: 私の要件
Hypothesizer 7
私の要件は以下のとおり: 標準入力を待つスレッドを作成する; しばらくした後、そのスレッドを停止させる; しばらくした後、標準入力を待つ別のスレッドを作成する; と続く。
それはとても容易だろうと私は思っていた; スレッドを停止させる方法か、標準入力待ちを中断させる方法か、ブロッキングすることなく入力を覗き見る方法か、待ちをタイムアウトさせる方法があるだろう、と私は思っていた
2: うまくいかない、よく見かけるアドバイスたち
Hypothesizer 7
スレッドを停止させるためのよく見かける1つのアドバイスは、フラグやそれに類したメカニズムを使うことだ. . .
あのう、それは、私の場合には、うまくいきません: スレッドは、Python内でループしているのではなく、単一のメソッド呼び出し内で引っかかっているのです!
別のアドバイスは、'ctypes.pythonapi.PyThreadState_SetAsyncExc'メソッドを使うことだ。
ナイス、しかし、残念ながら、それもうまくいかない、なぜなら、そのメソッドは、Python行間でのみ中断をかけられるのであって、Cコードからなる単一のPython行内には割り込めないから。
それでは、標準入力待ちを中断することはできるだろうか?
えーと、Pythonの'io.TextIOWrapper'クラス('sys.stdin'はそのインスタンスである)は、そのようなメソッドを持っていない。
それでは、ブロッキングすることなく入力を覗き見ることはできるだろうか?
そのクラスは、それをサポートしていないようだ. . .
それでは、待ちをタイムアウトさせられるだろうか?
そうではなさそうだ. . .
えーと、じゃあ、どうしろと?
別のアドバイスは、入力ストリームをクローズすることだ。
あのう、標準入力ストリームを閉じることはできないのですよ、なぜなら、後続のスレッドたちがそれを使用しなければならないから(クローズされた標準入力ストリームは再オープンできないでしょう?)。
スレッドを起こすためにプログラム的にダミー入力データを送れないものかと思ったが、標準Pythonのみでは無理のようだ。
別のアドバイスは、当該プロセスに'signal.SIGINT'シグナルを送ることだ。
ナイス、しかし、残念ながら、それは、当該スレッドがメインスレッドである場合にのみうまくいく. . .
別のアドバイスは、'select.select (rlist, wlist, xlist[, timeout])'ファンクションを使って、標準入力の状態をチェックすることだ。
えーと、それは、とても有望に思えたが、結局、うまくいかないことがわかった、なぜなら、文字を1つ読むと、そのメソッドは乱されるから: つまり、4文字の文字列"ABC¥n"が入力された時、そのメソッドは、入力が用意できていると問題なく報告するが、1つの文字"A"だけを読んだ後は、そのメソッドは、入力が用意できていないと報告し、私のプログラムは、もう1文字読むべきか否か惑うままになる。. . . あのう、文字列長が分からないので、私のプログラムは、読める文字が何個そこにあるのか分からないのですよ。
別のアドバイスは、当該スレッドをデーモンスレッドにすることだ。
明らかに、それはうまくいかない、なぜなら、それは、プログラムが終了する時にのみ当該スレッドを停止させるのであって、それは、私が望んでいることではない。
結局、アドバイスはどれもうまくいかない!この容易に見えるタスクに、なぜ私ははまっていなければならのだろうか、まったくのところ?
3: 計画
Hypothesizer 7
何かが不可能であることを理解することは重要である、なぜなら、そうした後に初めて、問題をその条件の下に考え始められるから。
私は今や、標準入力を待っているスレッドを停止させることは不可能であることを理解した。. . . いや、もっと正確に述べる必要がある: 標準入力を'直接に'待っているスレッドを停止させることは不可能である
その正確な叙述が、新たなアイデアへの道を開く: それでは、私のスレッドは標準入力を'間接に'待つべきである。つまり、標準入力を待ち続け、入力データを私のスレッドたちにリレーするデーモンスレッドがあるべきである。そのリレーするメカニズムが中断可能であればいいわけだろう。
実のところ、1つの中断可能リレーメカニズムを既に私は持っている: 前記事で紹介された文字列パイプだ。
ロジックをこの直近の関心事のプログラムに実装するよりも、中断可能入力リーダークラスのサブクラスとして中断可能標準入力リーダークラスを作成したほうがよいだろう。
4: コード
Hypothesizer 7
以下が、私の中断可能入力リーダークラスのコードだ(mypyアノテーション annotationsを含んでいる(私の全てのpythonコードがそうであるように))。
'theBiasPlanet/coreUtilities/inputs/HaltableReader.py'
@Python ソースコード
from typing import List
from typing import Optional
from typing import TextIO
from collections import OrderedDict
from io import StringIO
from threading import Thread
from theBiasPlanet.coreUtilities.constantsGroups.GeneralConstantsConstantsGroup import GeneralConstantsConstantsGroup
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException import NoMoreDataException
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException import NoMoreNeedsException
from theBiasPlanet.coreUtilities.locks.SharableLock import SharableLock
from theBiasPlanet.coreUtilities.messagingHandling.Publisher import Publisher
from theBiasPlanet.coreUtilities.pipes.StringPipe import StringPipe
from theBiasPlanet.coreUtilities.timersHandling.TimeOutException import TimeOutException
class HaltableReader:
def __init__ (a_this: "HaltableReader", a_underlyingStream: TextIO, a_bufferLength: int) -> None:
a_this.i_underlyingStream: TextIO
a_this.i_bufferLength: int
a_this.i_dispatchDataThread: Optional [Thread] = None
a_this.i_subscriberIdentificationToStringPipeMap: "OrderedDict [str, StringPipe]" = OrderedDict ()
a_this.i_sharableLock: "SharableLock" = SharableLock ()
a_this.i_underlyingStream = a_underlyingStream
a_this.i_bufferLength = a_bufferLength
# Any subscriber has to read consistently, or the other subscribers may be stuck because the dispatching thread will be stuck waiting to write to the string pipe for the neglecting subscriber.
def addSubscriber (a_this: "HaltableReader", a_subscriberIdentification: str) -> None:
try:
a_this.i_sharableLock.lockExclusively ()
a_this.i_subscriberIdentificationToStringPipeMap.update ({a_subscriberIdentification: StringPipe (a_this.i_bufferLength, False)})
finally:
a_this.i_sharableLock.unlockExclusively ()
def removeSubscriber (a_this: "HaltableReader", a_subscriberIdentification: str) -> None:
try:
a_this.i_sharableLock.lockSharedly ()
try:
l_stringPipe: "StringPipe" = a_this.i_subscriberIdentificationToStringPipeMap [a_subscriberIdentification]
l_stringPipe.finishWriting ()
except (KeyError) as l_exception:
None
finally:
a_this.i_sharableLock.unlockSharedly ()
try:
a_this.i_sharableLock.lockExclusively ()
a_this.i_subscriberIdentificationToStringPipeMap.pop (a_subscriberIdentification)
finally:
a_this.i_sharableLock.unlockExclusively ()
def startDispatchDataThread (a_this: "HaltableReader") -> None:
if a_this.i_dispatchDataThread is None:
def l_dispatchDataThreadFunction () -> None:
try:
l_data: str
while True:
l_data = a_this.i_underlyingStream.read (1)
if l_data == "":
break
l_subscriberIdentification: str = None
try:
a_this.i_sharableLock.lockSharedly ()
for l_subscriberIdentification in a_this.i_subscriberIdentificationToStringPipeMap:
a_this.i_subscriberIdentificationToStringPipeMap [l_subscriberIdentification].writeWholeString (StringIO (l_data))
finally:
a_this.i_sharableLock.unlockSharedly ()
except (EOFError) as l_exception:
None
except (Exception) as l_exception:
Publisher.logErrorInformation (l_exception)
finally:
try:
a_this.i_sharableLock.lockSharedly ()
for l_subscriberIdentification in a_this.i_subscriberIdentificationToStringPipeMap:
a_this.i_subscriberIdentificationToStringPipeMap [l_subscriberIdentification].finishWriting ()
finally:
a_this.i_sharableLock.unlockSharedly ()
a_this.i_dispatchDataThread = Thread (target = l_dispatchDataThreadFunction)
a_this.i_dispatchDataThread.daemon = True
a_this.i_dispatchDataThread.start ()
def close (a_this: "HaltableReader") -> None:
a_this.i_underlyingStream.close ()
def read (a_this: "HaltableReader", a_subscriberIdentification: str, a_maximumLength: int, a_timeOutPeriodInMilliseconds: int = -1) -> str:
l_stringPipe: "Optional [StringPipe]" = None
try:
a_this.i_sharableLock.lockSharedly ()
try:
l_stringPipe = a_this.i_subscriberIdentificationToStringPipeMap [a_subscriberIdentification]
except (KeyError) as l_exception:
None
finally:
a_this.i_sharableLock.unlockSharedly ()
if l_stringPipe is not None:
return l_stringPipe.readString (a_maximumLength, a_timeOutPeriodInMilliseconds)
else:
raise NoMoreNeedsException ("")
def readLine (a_this: "HaltableReader", a_subscriberIdentification: str, a_maximumLength: int, a_timeOutPeriodInMilliseconds: int = -1) -> str:
l_stringPipe: "Optional [StringPipe]" = None
try:
a_this.i_sharableLock.lockSharedly ()
try:
l_stringPipe = a_this.i_subscriberIdentificationToStringPipeMap.get (a_subscriberIdentification)
except (KeyError) as l_exception:
None
finally:
a_this.i_sharableLock.unlockSharedly ()
if l_stringPipe is not None:
return l_stringPipe.readStringLine (a_maximumLength, a_timeOutPeriodInMilliseconds)
else:
raise NoMoreNeedsException ("")
def isReady (a_this: "HaltableReader", a_subscriberIdentification: str) -> bool:
l_stringPipe: "Optional [StringPipe]" = None
try:
a_this.i_sharableLock.lockSharedly ()
try:
l_stringPipe = a_this.i_subscriberIdentificationToStringPipeMap [a_subscriberIdentification]
except (KeyError) as l_exception:
None
finally:
a_this.i_sharableLock.unlockSharedly ()
if l_stringPipe is not None:
return not (l_stringPipe.isEmpty ())
else:
return False
'theBiasPlanet/coreUtilities/constantsGroups/GeneralConstantsConstantsGroup.py'
@Python ソースコード
~
class GeneralConstantsConstantsGroup:
c_emptyString: str = ""
~
'theBiasPlanet/coreUtilities/locks/SharableLock.py'
@Python ソースコード
from typing import Optional
import threading
from threading import Condition
from threading import Thread
class SharableLock:
def __init__ (a_this: "SharableLock") -> None:
a_this.i_sharingLockingNumberOfTimes: int = 0
a_this.i_exclusiveLockingThread: Optional [Thread] = None
a_this.i_exclusiveLockingNumberOfTimes: int = 0
a_this.i_threadCondition: Condition = Condition ()
def lockSharedly (a_this: "SharableLock") -> None:
a_this.i_threadCondition.acquire ()
while a_this.i_exclusiveLockingNumberOfTimes > 0:
a_this.i_threadCondition.wait ()
a_this.i_sharingLockingNumberOfTimes = a_this.i_sharingLockingNumberOfTimes + 1
a_this.i_threadCondition.release ()
def lockExclusively (a_this: "SharableLock") -> None:
a_this.i_threadCondition.acquire ()
while a_this.i_sharingLockingNumberOfTimes > 0 or (a_this.i_exclusiveLockingThread is not None and threading.current_thread () != a_this.i_exclusiveLockingThread):
a_this.i_threadCondition.wait ()
if a_this.i_exclusiveLockingThread is None:
a_this.i_exclusiveLockingThread = threading.current_thread ()
a_this.i_exclusiveLockingNumberOfTimes = a_this.i_exclusiveLockingNumberOfTimes + 1
a_this.i_threadCondition.release ()
def unlockSharedly (a_this: "SharableLock") -> None:
a_this.i_threadCondition.acquire ()
if a_this.i_sharingLockingNumberOfTimes > 0:
a_this.i_sharingLockingNumberOfTimes = a_this.i_sharingLockingNumberOfTimes - 1
if a_this.i_sharingLockingNumberOfTimes == 0:
a_this.i_threadCondition.notify_all ()
a_this.i_threadCondition.release ()
def unlockExclusively (a_this: "SharableLock") -> None:
a_this.i_threadCondition.acquire ()
if a_this.i_exclusiveLockingNumberOfTimes > 0:
a_this.i_exclusiveLockingNumberOfTimes = a_this.i_exclusiveLockingNumberOfTimes - 1
if a_this.i_exclusiveLockingNumberOfTimes == 0:
a_this.i_exclusiveLockingThread = None
a_this.i_threadCondition.notify_all ()
a_this.i_threadCondition.release ()
'theBiasPlanet/coreUtilities/messagingHandling/Publisher.py'は、全て省略する、なぜなら、その使用されているメソッドは、ただログを書くだけだから。
以下が、私の中断可能標準入力リーダークラスだ。
'theBiasPlanet/coreUtilities/inputs/HaltableStandardInputReader.py'
@Python ソースコード
import sys
from theBiasPlanet.coreUtilities.constantsGroups.DefaultValuesConstantsGroup import DefaultValuesConstantsGroup
from theBiasPlanet.coreUtilities.inputs.HaltableReader import HaltableReader
class HaltableStandardInputReader (HaltableReader):
s_singletonInstance: "HaltableStandardInputReader" = None
def __init__ (a_this: "HaltableStandardInputReader") -> None:
super ().__init__ (sys.stdin, DefaultValuesConstantsGroup.c_smallBufferSize)
@staticmethod
def getInstance () -> "HaltableStandardInputReader":
if HaltableStandardInputReader.s_singletonInstance is None:
HaltableStandardInputReader.s_singletonInstance = HaltableStandardInputReader ()
return HaltableStandardInputReader.s_singletonInstance
ただ1つの標準入力リーダークラスインスタンスがある、と想定されており、そのインスタンスをシングルトンにするために、インスタンスは'HaltableStandardInputReader.getInstance ()'メソッドから取得される、と想定されている(それは強制できないが、なぜなら、Pythonがプライベートコンストラクタを(というか、プライベートメソッドを)許さないため、残念なことに)。
リレーを行なうスレッドを開始するために、'HaltableReader.startDispatchDataThread'メソッドが呼ばれなければならない(何らの害もなく、複数回呼ぶことができる)。
任意のスレッドを、'HaltableReader.addSubscriber (a_this: "HaltableReader", a_subscriberIdentification: str)'メソッドを呼ぶことによって、その標準入力リーダークラスインスタンスの講読者として登録でき、'HaltableReader.removeSubscriber (a_this: "HaltableReader", a_subscriberIdentification: str)'メソッド呼び出しによって購読解除することによって、そのスレッドによる入力待ちが中断される。
スレッドは、待ちにタイムアウトを使いたければ、そうできる。
複数のスレッドがその標準入力リーダークラスインスタンスを購読できる、それは、私の直近の要件には含まれていないが。
本解決策がうまくいかないケースはないであろう、と私は推測する。
5: 1つの使用例と1つの実行結果
Hypothesizer 7
以下は、1つの使用例だ。
@Python ソースコード
from typing import List
from typing import Optional
from datetime import datetime
import sys
import threading
from threading import Thread
import time
from theBiasPlanet.coreUtilities.inputs.HaltableStandardInputReader import HaltableStandardInputReader
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException import NoMoreDataException
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException import NoMoreNeedsException
from theBiasPlanet.coreUtilities.timersHandling.TimeOutException import TimeOutException
class Test1Test:
@staticmethod
def main (a_arguments: List [str]) -> None:
Test1Test.test4 ()
@staticmethod
def test4 () -> None:
l_haltableStandardInputReader: "HaltableStandardInputReader" = HaltableStandardInputReader.getInstance ()
l_haltableStandardInputReader.startDispatchDataThread ()
def l_workerThreadFunction () -> None:
l_data: Optional [str] = None
l_subscriberIdentification: str = str (id (threading.current_thread ()))
while True:
try:
l_data = l_haltableStandardInputReader.readLine (l_subscriberIdentification, 10, 10 * 1000)
sys.stdout.write ("### {0:s}: {1:s}: the standard input data: {2:s}\n".format (str (datetime.now ()), l_subscriberIdentification, l_data))
sys.stdout.flush ()
except (NoMoreDataException) as l_exception:
break
except (NoMoreNeedsException) as l_exception:
break
except (TimeOutException) as l_exception:
None
sys.stdout.write ("### {0:s}: {1:s}: ended\n".format (str (datetime.now ()), l_subscriberIdentification))
sys.stdout.flush ()
l_workerThreadA: Thread = Thread (target = l_workerThreadFunction)
l_subscriberIdentificationForWorkerThreadA: str = str (id (l_workerThreadA))
l_haltableStandardInputReader.addSubscriber (l_subscriberIdentificationForWorkerThreadA)
l_workerThreadB: Thread = Thread (target = l_workerThreadFunction)
l_subscriberIdentificationForWorkerThreadB: str = str (id (l_workerThreadB))
l_haltableStandardInputReader.addSubscriber (l_subscriberIdentificationForWorkerThreadB)
l_workerThreadA.start ()
l_workerThreadB.start ()
sys.stdout.write ("### {0:s}: sleeping . . .\n".format (str (datetime.now ())))
sys.stdout.flush ()
time.sleep (30)
l_haltableStandardInputReader.removeSubscriber (l_subscriberIdentificationForWorkerThreadA)
l_workerThreadA.join ()
sys.stdout.write ("### {0:s}: sleeping . . .\n".format (str (datetime.now ())))
sys.stdout.flush ()
time.sleep (30)
l_haltableStandardInputReader.removeSubscriber (l_subscriberIdentificationForWorkerThreadB)
l_workerThreadB.join ()
if __name__ == "__main__":
Test1Test.main (sys.argv)
以下は、1つのアウトプットだ。
@出力
### 2020-08-03 14:34:09.851530: sleeping . . .
ABC
### 2020-08-03 14:34:14.924620: 139963775811104: the standard input data: ABC
### 2020-08-03 14:34:14.924868: 139963775840512: the standard input data: ABC
### 2020-08-03 14:34:39.853083: 139963775811104: ended
### 2020-08-03 14:34:39.853977: sleeping . . .
DEF
### 2020-08-03 14:34:44.147788: 139963775840512: the standard input data: DEF
### 2020-08-03 14:35:09.854674: 139963775840512: ended