シリアルなのは、さもなければ、メモリが使い果たされてしまうかもしれないから。'java.io.PipedWriter' + 'java.io.PipedReader'に対する、オブジェクト群用Python相当物。
話題
About: Pythonプログラミング言語
この記事の目次
- 開始コンテキスト
- ターゲットコンテキスト
- 本体
- 1: モチベーションおよびプランは、既に紹介されたJavaオブジェクト群パイプに対するものと同じ
- 2: コードおよびその説明
- 3: 1つの使用例と1つの実行結果
- 4: 結びとその先
開始コンテキスト
- 読者は、Pythonプログラミング言語についての基本的知識を持っている。
ターゲットコンテキスト
- 読者は、あるオブジェクト群パイプを知る。
本体
1: モチベーションおよびプランは、既に紹介されたJavaオブジェクト群パイプに対するものと同じ
Hypothesizer 7
実は、別のシリーズのある記事が既に、あるJavaオブジェクト群パイプを紹介した。
そこで記述されたモチベーションおよびプランを繰り返すことは差し控える。
Javaの'java.io.PipedWriter' + 'java.io.PipedReader'に同等なものはPythonにないが(私が知る限り)、本オブジェクト群パイプを代わりに使える、なぜなら、文字は、一種のオブジェクトだから。
2: コードおよびその説明
Hypothesizer 7
以下が、私のオブジェクト群パイプクラスおよびその関連クラス群のコードだ。
'theBiasPlanet/coreUtilities/pipes/ObjectsPipe.py'
@Python ソースコード
from typing import Genericfrom typing import Listfrom typing import Optionalfrom typing import Typefrom typing import TypeVarfrom typing import castimport sysfrom threading import Conditionfrom theBiasPlanet.coreUtilities.constantsGroups.GeneralConstantsConstantsGroup import GeneralConstantsConstantsGroupfrom theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException import NoMoreDataExceptionfrom theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException import NoMoreNeedsExceptionfrom theBiasPlanet.coreUtilities.messagingHandling.Publisher import Publisherfrom theBiasPlanet.coreUtilities.timersHandling.TimeOutException import TimeOutExceptionT = TypeVar ("T")class ObjectsPipe (Generic [T]):def __init__ (a_this: "ObjectsPipe", a_bufferLength: int, a_notificationIsDelayed: bool) -> None:a_this.i_threadCondition: Conditiona_this.i_objects: List [object]a_this.i_bufferLength: int = 0# No data: i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumbera_this.i_dataStartIndex: int = GeneralConstantsConstantsGroup.c_iterationStartNumbera_this.i_dataUntilIndex: int = GeneralConstantsConstantsGroup.c_iterationStartNumbera_this.i_isFinishedWriting: bool = Falsea_this.i_isFinishedReading: bool = Falsea_this.i_notificationIsDelayed: bool = Falsea_this.i_threadCondition = Condition ()a_this.i_bufferLength = a_bufferLengtha_this.i_objects = [None] * a_this.i_bufferLengtha_this.i_notificationIsDelayed = a_notificationIsDelayeddef __del__ (a_this: "ObjectsPipe") -> None:Nonedef isEmptyWithoutLocking (a_this: "ObjectsPipe") -> bool:return a_this.i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber and a_this.i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumberdef isFullWithoutLocking (a_this: "ObjectsPipe") -> bool:return (a_this.i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber and a_this.i_dataUntilIndex == a_this.i_bufferLength) or (a_this.i_dataStartIndex != GeneralConstantsConstantsGroup.c_iterationStartNumber and a_this.i_dataStartIndex == a_this.i_dataUntilIndex)# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not waitdef writeWithoutLocking (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1) -> None:if a_this.i_isFinishedReading:raise NoMoreNeedsException ("")if a_this.i_isFinishedWriting:a_this.i_isFinishedWriting = Falsewhile (True):if a_this.isFullWithoutLocking ():try:if a_timeOutPeriodInMilliseconds == -1:a_this.i_threadCondition.wait ()elif a_timeOutPeriodInMilliseconds == 0:Noneelse:a_this.i_threadCondition.wait (a_timeOutPeriodInMilliseconds / 1000)except (Exception) as l_exception:Publisher.logErrorInformation (l_exception)# Checked again because the status may have changed while this thread was waiting.if a_this.i_isFinishedReading:raise NoMoreNeedsException ("")if not a_this.isFullWithoutLocking ():l_wasEmpty: bool = a_this.isEmptyWithoutLocking ()if a_this.i_dataUntilIndex == a_this.i_bufferLength:a_this.i_objects [GeneralConstantsConstantsGroup.c_iterationStartNumber] = a_objecta_this.i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber + 1else:a_this.i_objects [a_this.i_dataUntilIndex] = a_objecta_this.i_dataUntilIndex = a_this.i_dataUntilIndex + 1if ((not a_this.i_notificationIsDelayed) and l_wasEmpty) or (a_this.i_notificationIsDelayed and a_this.isFullWithoutLocking ()):a_this.i_threadCondition.notifyAll ()returnelse:if a_timeOutPeriodInMilliseconds != -1:raise TimeOutException ("")# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not waitdef readWithoutLocking (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1) -> Optional [T]:l_readObject: Optional [T] = Noneif a_this.i_isFinishedReading:a_this.i_isFinishedReading = Falsewhile True:if a_this.isEmptyWithoutLocking ():if not a_this.i_isFinishedWriting:try:if a_timeOutPeriodInMilliseconds == -1:a_this.i_threadCondition.wait ()elif a_timeOutPeriodInMilliseconds == 0:Noneelse:a_this.i_threadCondition.wait (a_timeOutPeriodInMilliseconds / 1000)except (Exception) as l_exception:Publisher.logErrorInformation (l_exception)else:raise NoMoreDataException ("")# Checked again because the status may have changed while this thread was waiting.if not a_this.isEmptyWithoutLocking ():l_wasFull: bool = a_this.isFullWithoutLocking ()l_readObject = cast (T, a_this.i_objects [a_this.i_dataStartIndex])a_this.i_objects [a_this.i_dataStartIndex] = Nonea_this.i_dataStartIndex = a_this.i_dataStartIndex + 1if a_this.i_dataStartIndex == a_this.i_dataUntilIndex:a_this.i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumbera_this.i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumberelse:if a_this.i_dataStartIndex == a_this.i_bufferLength:a_this.i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumberif ((not a_this.i_notificationIsDelayed) and l_wasFull) or (a_this.i_notificationIsDelayed and a_this.isEmptyWithoutLocking ()):a_this.i_threadCondition.notifyAll ()return l_readObjectelse:if a_this.i_isFinishedWriting:raise NoMoreDataException ("")if a_timeOutPeriodInMilliseconds != -1:raise TimeOutException ("")def isEmpty (a_this: "ObjectsPipe") -> bool:try:a_this.i_threadCondition.acquire ()return a_this.isEmptyWithoutLocking ()finally:a_this.i_threadCondition.release ()def isFull (a_this: "ObjectsPipe") -> bool:try:a_this.i_threadCondition.acquire ()return a_this.isFullWithoutLocking ()finally:a_this.i_threadCondition.release ()# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not waitdef write (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1) -> None:try:a_this.i_threadCondition.acquire ()a_this.writeWithoutLocking (a_object, a_timeOutPeriodInMilliseconds)finally:a_this.i_threadCondition.release ()# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not waitdef write_1 (a_this: "ObjectsPipe", a_objects: List [Optional [T]], a_offset: int, a_length: int, a_timeOutPeriodInMilliseconds: int = -1) -> int:try:a_this.i_threadCondition.acquire ()l_writtenLength: int = 0for l_writtenLength in range (0, a_length, 1):try:if (l_writtenLength == 0) or not a_this.isFullWithoutLocking ():a_this.writeWithoutLocking (a_objects [a_offset + l_writtenLength], a_timeOutPeriodInMilliseconds)except (NoMoreNeedsException) as l_exception:if l_writtenLength == 0:raise l_exceptionelse:breakreturn l_writtenLengthfinally:a_this.i_threadCondition.release ()# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not waitdef read (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1) -> Optional [T]:try:a_this.i_threadCondition.acquire ()return a_this.readWithoutLocking (a_timeOutPeriodInMilliseconds)finally:a_this.i_threadCondition.release ()# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not waitdef read_1 (a_this: "ObjectsPipe", a_objects: List [Optional [T]], a_offset: int, a_length: int, a_timeOutPeriodInMilliseconds: int = -1) -> int:try:a_this.i_threadCondition.acquire ()l_readLength: int = 0for l_readLength in range (0, a_length, 1):if (l_readLength == 0) or not a_this.isEmptyWithoutLocking ():a_objects [a_offset + l_readLength] = a_this.readWithoutLocking (a_timeOutPeriodInMilliseconds)else:breakreturn l_readLengthfinally:a_this.i_threadCondition.release ()def readWholeData (a_this: "ObjectsPipe") -> List [Optional [T]]:try:a_this.i_threadCondition.acquire ()l_objectsList: List [Optional [T]] = []while True:try:l_objectsList.append (a_this.readWithoutLocking ())except (NoMoreDataException) as l_exception:breakreturn l_objectsListfinally:a_this.i_threadCondition.release ()def finishWriting (a_this: "ObjectsPipe") -> None:try:a_this.i_threadCondition.acquire ()a_this.i_isFinishedWriting = Truea_this.i_threadCondition.notifyAll ()finally:a_this.i_threadCondition.release ()def finishReading (a_this: "ObjectsPipe") -> None:try:a_this.i_threadCondition.acquire ()a_this.i_isFinishedReading = Truea_this.i_threadCondition.notifyAll ()finally:a_this.i_threadCondition.release ()def reset (a_this: "ObjectsPipe") -> None:try:a_this.i_threadCondition.acquire ()a_this.i_isFinishedWriting = Falsea_this.i_isFinishedReading = Falsea_this.i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumbera_this.i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumberfinally:a_this.i_threadCondition.release ()
'theBiasPlanet/coreUtilities/constantsGroups/GeneralConstantsConstantsGroup.py'
@Python ソースコード
~class GeneralConstantsConstantsGroup:~c_iterationStartNumber: int = 0~
'theBiasPlanet/coreUtilities/inputsHandling/NoMoreDataException.py'
@Python ソースコード
class NoMoreDataException (Exception):def __init__ (a_this: "NoMoreDataException", a_message: str) -> None:super ().__init__ (a_message)
'theBiasPlanet/coreUtilities/inputsHandling/NoMoreNeedsException.py'
@Python ソースコード
class NoMoreNeedsException (Exception):def __init__ (a_this: "NoMoreNeedsException", a_message: str) -> None:super ().__init__ (a_message)
'theBiasPlanet/coreUtilities/messagingHandling/Publisher.py'は、全て省略する、なぜなら、使われているメソッドはただログを書くだけだから。
'theBiasPlanet/coreUtilities/timersHandling/TimeOutException.py'
@Python ソースコード
class TimeOutException (Exception):def __init__ (a_this: "TimeOutException", a_message: str) -> None:super ().__init__ (a_message)
コードの説明は、Java版に対するものとほとんど同じだが、重複を含めて、説明をしよう。
注意として、コードには、mypyアノテーションがついている(私のどのPythonコードにもついているとおり)。
コンストラクターは、バッファサイズおよびリーダーを起こすためのアルゴリズム選択を受け取る。
'i_objects'がバッファだ。
'i_dataStartIndex'および'i_dataUntilIndex'は、値保持エリアの開始インデックスおよび終了インデックス(含まず)だ。バッファが空の時は、それらは、それぞれ、'0'および'0'になる。
コンストラクターを除く全てのパブリック想定のメソッドは、'a_this.i_threadCondition'オブジェクト上でシンクロナイズされている(「パブリック想定」というのは、クラスの外から使用されるように意図されている(私によって)ことを意味している、Pythonは非パブリックなメソッドなど許さないが)。実のところ、'~WithoutLocking'メソッド群は、プロテクテッド想定だ。
'readWithoutLocking (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1)'メソッドは、ループを持っているが、それは、複数のリーダーがいるかもしれないということに基づいている: 空のバッファに対して待っていたリーダーは、起こされて、書かれたオブジェクト群が既に他のリーダーたちによってさらわれ済みであることを発見し、もう一度待つことを余儀なくされるかもしれない。
同様に、'writeWithoutLocking (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1)'メソッドは、ループを持っているが、それは、複数のライターがいるかもしれないということに基づいている。
'writeWithoutLocking (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1)'メソッドの、パイプが読み取り終了と宣言された時の振る舞いが、'readWithoutLocking (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1)'メソッドの、パイプが書き込み終了と宣言された時の振る舞いと異なっている理由は、ライターは、それ以上書き込む必要が全然ない一方、リーダーは、既にバッファに格納済みのオブジェクト群を読み取りたいであろうことだ。
警告しておくが、私は、コードを強力にテストしたわけではない(まだ)、いくつかのシンプルなケース(その内の1つは、次セクションで示される)で使用はしたが。
3: 1つの使用例と1つの実行結果
Hypothesizer 7
以下は、当該オブジェクト群パイプを使用したサンプルプログラムだ。
@Python ソースコード
from typing import Optionalimport sysfrom threading import Threadimport tracebackfrom theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException import NoMoreDataExceptionfrom theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException import NoMoreNeedsExceptionfrom theBiasPlanet.coreUtilities.pipes.ObjectsPipe import ObjectsPipe~~@staticmethoddef prepareIntegers (a_writer: "ObjectsPipe [int]") -> None:l_iterationIndex: intfor l_iterationIndex in range (0, 512, 1):try:a_writer.write (l_iterationIndex)except (NoMoreNeedsException) as l_exception:breaksys.stdout.write ("### written: {0:d}\n".format (l_iterationIndex))sys.stdout.flush ()@staticmethoddef processIntegers (a_reader: "ObjectsPipe [int]") -> None:l_integer: Optional [int] = Nonel_numberOfMultipleOf10s: int = 0while True:try:l_integer = a_reader.read ()except (NoMoreDataException) as l_exception:breaksys.stdout.write ("### read: {0:d}\n".format (l_integer))sys.stdout.flush ();if l_integer % 10 == 0:l_numberOfMultipleOf10s = l_numberOfMultipleOf10s + 1sys.stdout.write ("### a multiple of 10s is found.\n".format ())sys.stdout.flush ();sys.stdout.write ("### the number of multiple of 10s is {0:d}.\n".format (l_numberOfMultipleOf10s))sys.stdout.flush ()@staticmethoddef test2 () -> None:l_integersPipe: "ObjectsPipe [int]" = ObjectsPipe [int] (16, True)PerformanceMeasurer.setStartTime ()def l_subThreadFunction () -> None:try:Test1Test.prepareIntegers (l_integersPipe)except (Exception) as l_exception:sys.stdout.write ("{0:s}: {1:s}\n".format (str (l_exception), traceback.format_exc ()))finally:try:l_integersPipe.finishWriting ()except (Exception) as l_exception:sys.stdout.write ("{0:s}: {1:s}\n".format (str (l_exception), traceback.format_exc ()))l_subThread: Thread = Thread (target = l_subThreadFunction)l_subThread.start ()Test1Test.processIntegers (l_integersPipe)l_subThread.join ()sys.stdout.write ("### The elapsed time is {0:,d} ns.\n".format (PerformanceMeasurer.getElapseTimeInNanoSeconds ()))~
以下が、私のシングルコア、シングルCPU Linuxコンピュータ(それはアウトプットに影響していないかもしれない、なぜなら、Pythonには、グローバルインタープリターロックなる、ぞっとさせるものがあるから)における1つのアウトプットだ。
@出力
### written: 0### read: 0### a multiple of 10s is found.### read: 1### written: 1### written: 2### written: 3### written: 4### written: 5### written: 6### written: 7### written: 8### written: 9### written: 10### written: 11### written: 12### written: 13### written: 14### written: 15### written: 16### written: 17### read: 2### read: 3### read: 4### read: 5### read: 6### read: 7### read: 8### read: 9### read: 10### a multiple of 10s is found.### read: 11### read: 12### read: 13### read: 14### read: 15~### written: 496### written: 497### written: 498### written: 499### written: 500### written: 501### written: 502### written: 503### read: 487### read: 488### read: 489### read: 490### a multiple of 10s is found.### read: 491### read: 492### read: 493### read: 494### read: 495### read: 496### read: 497### read: 498### read: 499### read: 500### a multiple of 10s is found.### read: 501### read: 502### read: 503### written: 504### written: 505### written: 506### written: 507### written: 508### written: 509### written: 510### written: 511### read: 504### read: 505### read: 506### read: 507### read: 508### read: 509### read: 510### a multiple of 10s is found.### read: 511### the number of multiple of 10s is 52.
...えーと、起こったことは、ライターが始めに行動を起こし、'0'および'1'を書いた、リーダーは、'0'および'1'を読み、待ちにされた、なぜなら、パイプが空になったから、ライターは、フルに書き込み、リーダーを起こした、リーダーは読み始めた、と続く、私の推測では。...注意として、'1'は、リーダーが読み始める前に書かれたはずだ、「### written: 1」は、遅れて現れたが: 'write'メソッドが完了した後すぐに、コントロールは読み取りスレッドにスイッチし、メッセージ出力は、コントロールが書き込みスレッドにに戻るまで遅延された。
通知タイミングモードを'False'にセットした場合、以下がアウトプットだ。
@出力
### written: 0### read: 0### a multiple of 10s is found.### read: 1### written: 1### written: 2### read: 2### read: 3### written: 3### written: 4### read: 4### read: 5### written: 5### written: 6### read: 6### read: 7### written: 7### written: 8### read: 8### read: 9### written: 9### written: 10### read: 10### a multiple of 10s is found.### read: 11### written: 11### written: 12### read: 12### read: 13### written: 13### written: 14### read: 14### read: 15### written: 15~### written: 496### written: 497### written: 498### written: 499### written: 500### written: 501### written: 502### written: 503### written: 504### written: 505### written: 506### written: 507### written: 508### read: 493### read: 494### read: 495### read: 496### read: 497### read: 498### read: 499### read: 500### a multiple of 10s is found.### read: 501### read: 502### read: 503### read: 504### read: 505### read: 506### read: 507### read: 508### written: 509### written: 510### written: 511### read: 509### read: 510### a multiple of 10s is found.### read: 511### the number of multiple of 10s is 52.
...ライターが始めに行動を起こして、'0'および'1'を書いた、リーダーは、'0'および'1'を読み、待ちにされた、なぜなら、パイプが空になったから、ライターは、'2'を書き、リーダーを起こし、'3'を書いた、リーダーは、読み始めた、と続く、私の推測では。...この場合も、「### written: 1」は、遅れて現われた。
いずれにせよ、バッファの少しだけのスロットが使用される傾向にあるようだ。
上記2モードにおける経過時間(時間が大きくかかるメッセージ出力を取り除いて)を計測したところ(モード毎に5回)、時間は、それぞれ、{'14,296,000'、'15,188,000'、'14,926,000'、'15,518,000'、'14,495,000': 平均 -> '14,884,600'} および {'15,321,000'、'13,819,000'、'15,230,000'、'14,191,000'、'14,009,000': 平均 -> '14,514,000'}(ナノ秒)だった。違いは有意でないように推測する。...ところで、気づいたのだが、Python版は、Java版よりもずっと高速だ、その理由は、私がCベースであるCPythonを使っているから? 多分。
4: 結びとその先
Hypothesizer 7
これで、私はオブジェクト群パイプを持つことになった。
オブジェクト群パイプの恩恵は、どれだけ多くのオブジェクトを処理しようとも、メモリースペースが枯渇することがないということだ。実際、上に挙げられたサンプルプログラムは、バッファオーバーフローを起こさない、繰り返し回数をいかに大きく設定しても。
本オブジェクト群パイプは、そのままでも文字列パイプとして使うことができるが、いくつかの追加のメソッドが、文字列パイプインスタンスを操作するのに便利だろうから、本オブジェクト群パイプクラス継承した文字列パイプクラスを特に作成しよう、次記事にて。