シリアルなのは、さもなければ、メモリが使い果たされてしまうかもしれないから。'java.io.PipedWriter' + 'java.io.PipedReader'に対する、オブジェクト群用Python相当物。
話題
About: Pythonプログラミング言語
この記事の目次
- 開始コンテキスト
- ターゲットコンテキスト
- 本体
- 1: モチベーションおよびプランは、既に紹介されたJavaオブジェクト群パイプに対するものと同じ
- 2: コードおよびその説明
- 3: 1つの使用例と1つの実行結果
- 4: 結びとその先
開始コンテキスト
- 読者は、Pythonプログラミング言語についての基本的知識を持っている。
ターゲットコンテキスト
- 読者は、あるオブジェクト群パイプを知る。
本体
1: モチベーションおよびプランは、既に紹介されたJavaオブジェクト群パイプに対するものと同じ
Hypothesizer 7
実は、別のシリーズのある記事が既に、あるJavaオブジェクト群パイプを紹介した。
そこで記述されたモチベーションおよびプランを繰り返すことは差し控える。
Javaの'java.io.PipedWriter' + 'java.io.PipedReader'に同等なものはPythonにないが(私が知る限り)、本オブジェクト群パイプを代わりに使える、なぜなら、文字は、一種のオブジェクトだから。
2: コードおよびその説明
Hypothesizer 7
以下が、私のオブジェクト群パイプクラスおよびその関連クラス群のコードだ。
'theBiasPlanet/coreUtilities/pipes/ObjectsPipe.py'
@Python ソースコード
from typing import Generic
from typing import List
from typing import Optional
from typing import Type
from typing import TypeVar
from typing import cast
import sys
from threading import Condition
from theBiasPlanet.coreUtilities.constantsGroups.GeneralConstantsConstantsGroup import GeneralConstantsConstantsGroup
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException import NoMoreDataException
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException import NoMoreNeedsException
from theBiasPlanet.coreUtilities.messagingHandling.Publisher import Publisher
from theBiasPlanet.coreUtilities.timersHandling.TimeOutException import TimeOutException
T = TypeVar ("T")
class ObjectsPipe (Generic [T]):
def __init__ (a_this: "ObjectsPipe", a_bufferLength: int, a_notificationIsDelayed: bool) -> None:
a_this.i_threadCondition: Condition
a_this.i_objects: List [object]
a_this.i_bufferLength: int = 0
# No data: i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber
a_this.i_dataStartIndex: int = GeneralConstantsConstantsGroup.c_iterationStartNumber
a_this.i_dataUntilIndex: int = GeneralConstantsConstantsGroup.c_iterationStartNumber
a_this.i_isFinishedWriting: bool = False
a_this.i_isFinishedReading: bool = False
a_this.i_notificationIsDelayed: bool = False
a_this.i_threadCondition = Condition ()
a_this.i_bufferLength = a_bufferLength
a_this.i_objects = [None] * a_this.i_bufferLength
a_this.i_notificationIsDelayed = a_notificationIsDelayed
def __del__ (a_this: "ObjectsPipe") -> None:
None
def isEmptyWithoutLocking (a_this: "ObjectsPipe") -> bool:
return a_this.i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber and a_this.i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber
def isFullWithoutLocking (a_this: "ObjectsPipe") -> bool:
return (a_this.i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber and a_this.i_dataUntilIndex == a_this.i_bufferLength) or (a_this.i_dataStartIndex != GeneralConstantsConstantsGroup.c_iterationStartNumber and a_this.i_dataStartIndex == a_this.i_dataUntilIndex)
# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
def writeWithoutLocking (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1) -> None:
if a_this.i_isFinishedReading:
raise NoMoreNeedsException ("")
if a_this.i_isFinishedWriting:
a_this.i_isFinishedWriting = False
while (True):
if a_this.isFullWithoutLocking ():
try:
if a_timeOutPeriodInMilliseconds == -1:
a_this.i_threadCondition.wait ()
elif a_timeOutPeriodInMilliseconds == 0:
None
else:
a_this.i_threadCondition.wait (a_timeOutPeriodInMilliseconds / 1000)
except (Exception) as l_exception:
Publisher.logErrorInformation (l_exception)
# Checked again because the status may have changed while this thread was waiting.
if a_this.i_isFinishedReading:
raise NoMoreNeedsException ("")
if not a_this.isFullWithoutLocking ():
l_wasEmpty: bool = a_this.isEmptyWithoutLocking ()
if a_this.i_dataUntilIndex == a_this.i_bufferLength:
a_this.i_objects [GeneralConstantsConstantsGroup.c_iterationStartNumber] = a_object
a_this.i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber + 1
else:
a_this.i_objects [a_this.i_dataUntilIndex] = a_object
a_this.i_dataUntilIndex = a_this.i_dataUntilIndex + 1
if ((not a_this.i_notificationIsDelayed) and l_wasEmpty) or (a_this.i_notificationIsDelayed and a_this.isFullWithoutLocking ()):
a_this.i_threadCondition.notifyAll ()
return
else:
if a_timeOutPeriodInMilliseconds != -1:
raise TimeOutException ("")
# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
def readWithoutLocking (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1) -> Optional [T]:
l_readObject: Optional [T] = None
if a_this.i_isFinishedReading:
a_this.i_isFinishedReading = False
while True:
if a_this.isEmptyWithoutLocking ():
if not a_this.i_isFinishedWriting:
try:
if a_timeOutPeriodInMilliseconds == -1:
a_this.i_threadCondition.wait ()
elif a_timeOutPeriodInMilliseconds == 0:
None
else:
a_this.i_threadCondition.wait (a_timeOutPeriodInMilliseconds / 1000)
except (Exception) as l_exception:
Publisher.logErrorInformation (l_exception)
else:
raise NoMoreDataException ("")
# Checked again because the status may have changed while this thread was waiting.
if not a_this.isEmptyWithoutLocking ():
l_wasFull: bool = a_this.isFullWithoutLocking ()
l_readObject = cast (T, a_this.i_objects [a_this.i_dataStartIndex])
a_this.i_objects [a_this.i_dataStartIndex] = None
a_this.i_dataStartIndex = a_this.i_dataStartIndex + 1
if a_this.i_dataStartIndex == a_this.i_dataUntilIndex:
a_this.i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
a_this.i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
else:
if a_this.i_dataStartIndex == a_this.i_bufferLength:
a_this.i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
if ((not a_this.i_notificationIsDelayed) and l_wasFull) or (a_this.i_notificationIsDelayed and a_this.isEmptyWithoutLocking ()):
a_this.i_threadCondition.notifyAll ()
return l_readObject
else:
if a_this.i_isFinishedWriting:
raise NoMoreDataException ("")
if a_timeOutPeriodInMilliseconds != -1:
raise TimeOutException ("")
def isEmpty (a_this: "ObjectsPipe") -> bool:
try:
a_this.i_threadCondition.acquire ()
return a_this.isEmptyWithoutLocking ()
finally:
a_this.i_threadCondition.release ()
def isFull (a_this: "ObjectsPipe") -> bool:
try:
a_this.i_threadCondition.acquire ()
return a_this.isFullWithoutLocking ()
finally:
a_this.i_threadCondition.release ()
# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
def write (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1) -> None:
try:
a_this.i_threadCondition.acquire ()
a_this.writeWithoutLocking (a_object, a_timeOutPeriodInMilliseconds)
finally:
a_this.i_threadCondition.release ()
# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
def write_1 (a_this: "ObjectsPipe", a_objects: List [Optional [T]], a_offset: int, a_length: int, a_timeOutPeriodInMilliseconds: int = -1) -> int:
try:
a_this.i_threadCondition.acquire ()
l_writtenLength: int = 0
for l_writtenLength in range (0, a_length, 1):
try:
if (l_writtenLength == 0) or not a_this.isFullWithoutLocking ():
a_this.writeWithoutLocking (a_objects [a_offset + l_writtenLength], a_timeOutPeriodInMilliseconds)
except (NoMoreNeedsException) as l_exception:
if l_writtenLength == 0:
raise l_exception
else:
break
return l_writtenLength
finally:
a_this.i_threadCondition.release ()
# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
def read (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1) -> Optional [T]:
try:
a_this.i_threadCondition.acquire ()
return a_this.readWithoutLocking (a_timeOutPeriodInMilliseconds)
finally:
a_this.i_threadCondition.release ()
# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
def read_1 (a_this: "ObjectsPipe", a_objects: List [Optional [T]], a_offset: int, a_length: int, a_timeOutPeriodInMilliseconds: int = -1) -> int:
try:
a_this.i_threadCondition.acquire ()
l_readLength: int = 0
for l_readLength in range (0, a_length, 1):
if (l_readLength == 0) or not a_this.isEmptyWithoutLocking ():
a_objects [a_offset + l_readLength] = a_this.readWithoutLocking (a_timeOutPeriodInMilliseconds)
else:
break
return l_readLength
finally:
a_this.i_threadCondition.release ()
def readWholeData (a_this: "ObjectsPipe") -> List [Optional [T]]:
try:
a_this.i_threadCondition.acquire ()
l_objectsList: List [Optional [T]] = []
while True:
try:
l_objectsList.append (a_this.readWithoutLocking ())
except (NoMoreDataException) as l_exception:
break
return l_objectsList
finally:
a_this.i_threadCondition.release ()
def finishWriting (a_this: "ObjectsPipe") -> None:
try:
a_this.i_threadCondition.acquire ()
a_this.i_isFinishedWriting = True
a_this.i_threadCondition.notifyAll ()
finally:
a_this.i_threadCondition.release ()
def finishReading (a_this: "ObjectsPipe") -> None:
try:
a_this.i_threadCondition.acquire ()
a_this.i_isFinishedReading = True
a_this.i_threadCondition.notifyAll ()
finally:
a_this.i_threadCondition.release ()
def reset (a_this: "ObjectsPipe") -> None:
try:
a_this.i_threadCondition.acquire ()
a_this.i_isFinishedWriting = False
a_this.i_isFinishedReading = False
a_this.i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
a_this.i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
finally:
a_this.i_threadCondition.release ()
'theBiasPlanet/coreUtilities/constantsGroups/GeneralConstantsConstantsGroup.py'
@Python ソースコード
~
class GeneralConstantsConstantsGroup:
~
c_iterationStartNumber: int = 0
~
'theBiasPlanet/coreUtilities/inputsHandling/NoMoreDataException.py'
@Python ソースコード
class NoMoreDataException (Exception):
def __init__ (a_this: "NoMoreDataException", a_message: str) -> None:
super ().__init__ (a_message)
'theBiasPlanet/coreUtilities/inputsHandling/NoMoreNeedsException.py'
@Python ソースコード
class NoMoreNeedsException (Exception):
def __init__ (a_this: "NoMoreNeedsException", a_message: str) -> None:
super ().__init__ (a_message)
'theBiasPlanet/coreUtilities/messagingHandling/Publisher.py'は、全て省略する、なぜなら、使われているメソッドはただログを書くだけだから。
'theBiasPlanet/coreUtilities/timersHandling/TimeOutException.py'
@Python ソースコード
class TimeOutException (Exception):
def __init__ (a_this: "TimeOutException", a_message: str) -> None:
super ().__init__ (a_message)
コードの説明は、Java版に対するものとほとんど同じだが、重複を含めて、説明をしよう。
注意として、コードには、mypyアノテーションがついている(私のどのPythonコードにもついているとおり)。
コンストラクターは、バッファサイズおよびリーダーを起こすためのアルゴリズム選択を受け取る。
'i_objects'がバッファだ。
'i_dataStartIndex'および'i_dataUntilIndex'は、値保持エリアの開始インデックスおよび終了インデックス(含まず)だ。バッファが空の時は、それらは、それぞれ、'0'および'0'になる。
コンストラクターを除く全てのパブリック想定のメソッドは、'a_this.i_threadCondition'オブジェクト上でシンクロナイズされている(「パブリック想定」というのは、クラスの外から使用されるように意図されている(私によって)ことを意味している、Pythonは非パブリックなメソッドなど許さないが)。実のところ、'~WithoutLocking'メソッド群は、プロテクテッド想定だ。
'readWithoutLocking (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1)'メソッドは、ループを持っているが、それは、複数のリーダーがいるかもしれないということに基づいている: 空のバッファに対して待っていたリーダーは、起こされて、書かれたオブジェクト群が既に他のリーダーたちによってさらわれ済みであることを発見し、もう一度待つことを余儀なくされるかもしれない。
同様に、'writeWithoutLocking (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1)'メソッドは、ループを持っているが、それは、複数のライターがいるかもしれないということに基づいている。
'writeWithoutLocking (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1)'メソッドの、パイプが読み取り終了と宣言された時の振る舞いが、'readWithoutLocking (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1)'メソッドの、パイプが書き込み終了と宣言された時の振る舞いと異なっている理由は、ライターは、それ以上書き込む必要が全然ない一方、リーダーは、既にバッファに格納済みのオブジェクト群を読み取りたいであろうことだ。
警告しておくが、私は、コードを強力にテストしたわけではない(まだ)、いくつかのシンプルなケース(その内の1つは、次セクションで示される)で使用はしたが。
3: 1つの使用例と1つの実行結果
Hypothesizer 7
以下は、当該オブジェクト群パイプを使用したサンプルプログラムだ。
@Python ソースコード
from typing import Optional
import sys
from threading import Thread
import traceback
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException import NoMoreDataException
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException import NoMoreNeedsException
from theBiasPlanet.coreUtilities.pipes.ObjectsPipe import ObjectsPipe
~
~
@staticmethod
def prepareIntegers (a_writer: "ObjectsPipe [int]") -> None:
l_iterationIndex: int
for l_iterationIndex in range (0, 512, 1):
try:
a_writer.write (l_iterationIndex)
except (NoMoreNeedsException) as l_exception:
break
sys.stdout.write ("### written: {0:d}\n".format (l_iterationIndex))
sys.stdout.flush ()
@staticmethod
def processIntegers (a_reader: "ObjectsPipe [int]") -> None:
l_integer: Optional [int] = None
l_numberOfMultipleOf10s: int = 0
while True:
try:
l_integer = a_reader.read ()
except (NoMoreDataException) as l_exception:
break
sys.stdout.write ("### read: {0:d}\n".format (l_integer))
sys.stdout.flush ();
if l_integer % 10 == 0:
l_numberOfMultipleOf10s = l_numberOfMultipleOf10s + 1
sys.stdout.write ("### a multiple of 10s is found.\n".format ())
sys.stdout.flush ();
sys.stdout.write ("### the number of multiple of 10s is {0:d}.\n".format (l_numberOfMultipleOf10s))
sys.stdout.flush ()
@staticmethod
def test2 () -> None:
l_integersPipe: "ObjectsPipe [int]" = ObjectsPipe [int] (16, True)
PerformanceMeasurer.setStartTime ()
def l_subThreadFunction () -> None:
try:
Test1Test.prepareIntegers (l_integersPipe)
except (Exception) as l_exception:
sys.stdout.write ("{0:s}: {1:s}\n".format (str (l_exception), traceback.format_exc ()))
finally:
try:
l_integersPipe.finishWriting ()
except (Exception) as l_exception:
sys.stdout.write ("{0:s}: {1:s}\n".format (str (l_exception), traceback.format_exc ()))
l_subThread: Thread = Thread (target = l_subThreadFunction)
l_subThread.start ()
Test1Test.processIntegers (l_integersPipe)
l_subThread.join ()
sys.stdout.write ("### The elapsed time is {0:,d} ns.\n".format (PerformanceMeasurer.getElapseTimeInNanoSeconds ()))
~
以下が、私のシングルコア、シングルCPU Linuxコンピュータ(それはアウトプットに影響していないかもしれない、なぜなら、Pythonには、グローバルインタープリターロックなる、ぞっとさせるものがあるから)における1つのアウトプットだ。
@出力
### written: 0
### read: 0
### a multiple of 10s is found.
### read: 1
### written: 1
### written: 2
### written: 3
### written: 4
### written: 5
### written: 6
### written: 7
### written: 8
### written: 9
### written: 10
### written: 11
### written: 12
### written: 13
### written: 14
### written: 15
### written: 16
### written: 17
### read: 2
### read: 3
### read: 4
### read: 5
### read: 6
### read: 7
### read: 8
### read: 9
### read: 10
### a multiple of 10s is found.
### read: 11
### read: 12
### read: 13
### read: 14
### read: 15
~
### written: 496
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### written: 503
### read: 487
### read: 488
### read: 489
### read: 490
### a multiple of 10s is found.
### read: 491
### read: 492
### read: 493
### read: 494
### read: 495
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### read: 502
### read: 503
### written: 504
### written: 505
### written: 506
### written: 507
### written: 508
### written: 509
### written: 510
### written: 511
### read: 504
### read: 505
### read: 506
### read: 507
### read: 508
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.
...えーと、起こったことは、ライターが始めに行動を起こし、'0'および'1'を書いた、リーダーは、'0'および'1'を読み、待ちにされた、なぜなら、パイプが空になったから、ライターは、フルに書き込み、リーダーを起こした、リーダーは読み始めた、と続く、私の推測では。...注意として、'1'は、リーダーが読み始める前に書かれたはずだ、「### written: 1」は、遅れて現れたが: 'write'メソッドが完了した後すぐに、コントロールは読み取りスレッドにスイッチし、メッセージ出力は、コントロールが書き込みスレッドにに戻るまで遅延された。
通知タイミングモードを'False'にセットした場合、以下がアウトプットだ。
@出力
### written: 0
### read: 0
### a multiple of 10s is found.
### read: 1
### written: 1
### written: 2
### read: 2
### read: 3
### written: 3
### written: 4
### read: 4
### read: 5
### written: 5
### written: 6
### read: 6
### read: 7
### written: 7
### written: 8
### read: 8
### read: 9
### written: 9
### written: 10
### read: 10
### a multiple of 10s is found.
### read: 11
### written: 11
### written: 12
### read: 12
### read: 13
### written: 13
### written: 14
### read: 14
### read: 15
### written: 15
~
### written: 496
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### written: 503
### written: 504
### written: 505
### written: 506
### written: 507
### written: 508
### read: 493
### read: 494
### read: 495
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### read: 502
### read: 503
### read: 504
### read: 505
### read: 506
### read: 507
### read: 508
### written: 509
### written: 510
### written: 511
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.
...ライターが始めに行動を起こして、'0'および'1'を書いた、リーダーは、'0'および'1'を読み、待ちにされた、なぜなら、パイプが空になったから、ライターは、'2'を書き、リーダーを起こし、'3'を書いた、リーダーは、読み始めた、と続く、私の推測では。...この場合も、「### written: 1」は、遅れて現われた。
いずれにせよ、バッファの少しだけのスロットが使用される傾向にあるようだ。
上記2モードにおける経過時間(時間が大きくかかるメッセージ出力を取り除いて)を計測したところ(モード毎に5回)、時間は、それぞれ、{'14,296,000'、'15,188,000'、'14,926,000'、'15,518,000'、'14,495,000': 平均 -> '14,884,600'} および {'15,321,000'、'13,819,000'、'15,230,000'、'14,191,000'、'14,009,000': 平均 -> '14,514,000'}(ナノ秒)だった。違いは有意でないように推測する。...ところで、気づいたのだが、Python版は、Java版よりもずっと高速だ、その理由は、私がCベースであるCPythonを使っているから? 多分。
4: 結びとその先
Hypothesizer 7
これで、私はオブジェクト群パイプを持つことになった。
オブジェクト群パイプの恩恵は、どれだけ多くのオブジェクトを処理しようとも、メモリースペースが枯渇することがないということだ。実際、上に挙げられたサンプルプログラムは、バッファオーバーフローを起こさない、繰り返し回数をいかに大きく設定しても。
本オブジェクト群パイプは、そのままでも文字列パイプとして使うことができるが、いくつかの追加のメソッドが、文字列パイプインスタンスを操作するのに便利だろうから、本オブジェクト群パイプクラス継承した文字列パイプクラスを特に作成しよう、次記事にて。