2020年7月5日日曜日

6: Pythonオブジェクト群パイプ(オブジェクト群をシリアルに運ぶ)

<このシリーズの前の記事 | このシリーズの目次 | このシリーズの次の記事>

シリアルなのは、さもなければ、メモリが使い果たされてしまうかもしれないから。'java.io.PipedWriter' + 'java.io.PipedReader'に対する、オブジェクト群用Python相当物。

話題


About: Pythonプログラミング言語

この記事の目次


開始コンテキスト


  • 読者は、Pythonプログラミング言語についての基本的知識を持っている。

ターゲットコンテキスト



  • 読者は、あるオブジェクト群パイプを知る。

本体


1: モチベーションおよびプランは、既に紹介されたJavaオブジェクト群パイプに対するものと同じ


Hypothesizer 7
実は、別のシリーズのある記事が既に、あるJavaオブジェクト群パイプを紹介した

そこで記述されたモチベーションおよびプランを繰り返すことは差し控える。

Javaの'java.io.PipedWriter' + 'java.io.PipedReader'に同等なものはPythonにないが(私が知る限り)、本オブジェクト群パイプを代わりに使える、なぜなら、文字は、一種のオブジェクトだから。


2: コードおよびその説明


Hypothesizer 7
以下が、私のオブジェクト群パイプクラスおよびその関連クラス群のコードだ。

'theBiasPlanet/coreUtilities/pipes/ObjectsPipe.py'

@Python ソースコード
from typing import Generic
from typing import List
from typing import Optional
from typing import Type
from typing import TypeVar
from typing import cast
import sys
from threading import Condition
from theBiasPlanet.coreUtilities.constantsGroups.GeneralConstantsConstantsGroup import GeneralConstantsConstantsGroup
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException import NoMoreDataException
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException import NoMoreNeedsException
from theBiasPlanet.coreUtilities.messagingHandling.Publisher import Publisher
from theBiasPlanet.coreUtilities.timersHandling.TimeOutException import TimeOutException

T = TypeVar ("T")

class ObjectsPipe (Generic [T]):
	def __init__ (a_this: "ObjectsPipe", a_bufferLength: int, a_notificationIsDelayed: bool) -> None:
		a_this.i_threadCondition: Condition
		a_this.i_objects: List [object]
		a_this.i_bufferLength: int = 0
		# No data: i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber
		a_this.i_dataStartIndex: int = GeneralConstantsConstantsGroup.c_iterationStartNumber
		a_this.i_dataUntilIndex: int = GeneralConstantsConstantsGroup.c_iterationStartNumber
		a_this.i_isFinishedWriting: bool = False
		a_this.i_isFinishedReading: bool = False
		a_this.i_notificationIsDelayed: bool = False
		
		a_this.i_threadCondition = Condition ()
		a_this.i_bufferLength = a_bufferLength
		a_this.i_objects = [None] * a_this.i_bufferLength
		a_this.i_notificationIsDelayed = a_notificationIsDelayed
	
	def __del__ (a_this: "ObjectsPipe") -> None:
		None
	
	def isEmptyWithoutLocking (a_this: "ObjectsPipe") -> bool:
		return a_this.i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber and a_this.i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber
	
	def isFullWithoutLocking (a_this: "ObjectsPipe") -> bool:
		return (a_this.i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber and a_this.i_dataUntilIndex == a_this.i_bufferLength) or (a_this.i_dataStartIndex != GeneralConstantsConstantsGroup.c_iterationStartNumber and a_this.i_dataStartIndex == a_this.i_dataUntilIndex)
	
	# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	def writeWithoutLocking (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1) -> None:
		if a_this.i_isFinishedReading:
			raise NoMoreNeedsException ("")
		if a_this.i_isFinishedWriting:
			a_this.i_isFinishedWriting = False
		while (True):
			if a_this.isFullWithoutLocking ():
				try:
					if a_timeOutPeriodInMilliseconds == -1:
						a_this.i_threadCondition.wait ()
					elif a_timeOutPeriodInMilliseconds == 0:
						None
					else:
						a_this.i_threadCondition.wait (a_timeOutPeriodInMilliseconds / 1000)
				except (Exception) as l_exception:
					Publisher.logErrorInformation (l_exception)
			# Checked again because the status may have changed while this thread was waiting.
			if a_this.i_isFinishedReading:
				raise NoMoreNeedsException ("")
			if not a_this.isFullWithoutLocking ():
				l_wasEmpty: bool = a_this.isEmptyWithoutLocking ()
				if a_this.i_dataUntilIndex == a_this.i_bufferLength:
					a_this.i_objects [GeneralConstantsConstantsGroup.c_iterationStartNumber] = a_object
					a_this.i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber + 1
				else:
					a_this.i_objects [a_this.i_dataUntilIndex] = a_object
					a_this.i_dataUntilIndex = a_this.i_dataUntilIndex + 1
				if ((not a_this.i_notificationIsDelayed) and l_wasEmpty) or (a_this.i_notificationIsDelayed and a_this.isFullWithoutLocking ()):
					a_this.i_threadCondition.notifyAll ()
				return
			else:
				if a_timeOutPeriodInMilliseconds != -1:
					raise TimeOutException ("")
	
	# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	def readWithoutLocking (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1) -> Optional [T]:
		l_readObject: Optional [T] = None
		if a_this.i_isFinishedReading:
			a_this.i_isFinishedReading = False
		while True:
			if a_this.isEmptyWithoutLocking ():
				if not a_this.i_isFinishedWriting:
					try:
						if a_timeOutPeriodInMilliseconds == -1:
							a_this.i_threadCondition.wait ()
						elif a_timeOutPeriodInMilliseconds == 0:
							None
						else:
							a_this.i_threadCondition.wait (a_timeOutPeriodInMilliseconds / 1000)
					except (Exception) as l_exception:
						Publisher.logErrorInformation (l_exception)
				else:
					raise NoMoreDataException ("")
			# Checked again because the status may have changed while this thread was waiting.
			if not a_this.isEmptyWithoutLocking ():
				l_wasFull: bool = a_this.isFullWithoutLocking ()
				l_readObject = cast (T, a_this.i_objects [a_this.i_dataStartIndex])
				a_this.i_objects [a_this.i_dataStartIndex] = None
				a_this.i_dataStartIndex = a_this.i_dataStartIndex + 1
				if a_this.i_dataStartIndex == a_this.i_dataUntilIndex:
					a_this.i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
					a_this.i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
				else:
					if a_this.i_dataStartIndex == a_this.i_bufferLength:
						a_this.i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
				if ((not a_this.i_notificationIsDelayed) and l_wasFull) or (a_this.i_notificationIsDelayed and a_this.isEmptyWithoutLocking ()):
					a_this.i_threadCondition.notifyAll ()
				return l_readObject
			else:
				if a_this.i_isFinishedWriting:
					raise NoMoreDataException ("")
				if a_timeOutPeriodInMilliseconds != -1:
					raise TimeOutException ("")
	
	def isEmpty (a_this: "ObjectsPipe") -> bool:
		try:
			a_this.i_threadCondition.acquire ()
			return a_this.isEmptyWithoutLocking ()
		finally:
			a_this.i_threadCondition.release ()
	
	def isFull (a_this: "ObjectsPipe") -> bool:
		try:
			a_this.i_threadCondition.acquire ()
			return a_this.isFullWithoutLocking ()
		finally:
			a_this.i_threadCondition.release ()
	
	# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	def write (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1) -> None:
		try:
			a_this.i_threadCondition.acquire ()
			a_this.writeWithoutLocking (a_object, a_timeOutPeriodInMilliseconds)
		finally:
			a_this.i_threadCondition.release ()
	
	# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	def write_1 (a_this: "ObjectsPipe", a_objects: List [Optional [T]], a_offset: int, a_length: int, a_timeOutPeriodInMilliseconds: int = -1) -> int:
		try:
			a_this.i_threadCondition.acquire ()
			l_writtenLength: int = 0
			for l_writtenLength in range (0, a_length, 1):
				try:
					if (l_writtenLength == 0) or not a_this.isFullWithoutLocking ():
						a_this.writeWithoutLocking (a_objects [a_offset + l_writtenLength], a_timeOutPeriodInMilliseconds)
				except (NoMoreNeedsException) as l_exception:
					if l_writtenLength == 0:
						raise l_exception
					else:
						break
			return l_writtenLength
		finally:
			a_this.i_threadCondition.release ()
	
	# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	def read (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1) -> Optional [T]:
		try:
			a_this.i_threadCondition.acquire ()
			return a_this.readWithoutLocking (a_timeOutPeriodInMilliseconds)
		finally:
			a_this.i_threadCondition.release ()
	
	# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	def read_1 (a_this: "ObjectsPipe", a_objects: List [Optional [T]], a_offset: int, a_length: int, a_timeOutPeriodInMilliseconds: int = -1) -> int:
		try:
			a_this.i_threadCondition.acquire ()
			l_readLength: int = 0
			for l_readLength in range (0, a_length, 1):
				if (l_readLength == 0) or not a_this.isEmptyWithoutLocking ():
					a_objects [a_offset + l_readLength] = a_this.readWithoutLocking (a_timeOutPeriodInMilliseconds)
				else:
					break
			return l_readLength
		finally:
			a_this.i_threadCondition.release ()
	
	
	def readWholeData (a_this: "ObjectsPipe") -> List [Optional [T]]:
		try:
			a_this.i_threadCondition.acquire ()
			l_objectsList: List [Optional [T]] = []
			while True:
				try:
					l_objectsList.append (a_this.readWithoutLocking ())
				except (NoMoreDataException) as l_exception:
					break
			return l_objectsList
		finally:
			a_this.i_threadCondition.release ()
	
	def finishWriting (a_this: "ObjectsPipe") -> None:
		try:
			a_this.i_threadCondition.acquire ()
			a_this.i_isFinishedWriting = True
			a_this.i_threadCondition.notifyAll ()
		finally:
			a_this.i_threadCondition.release ()
	
	def finishReading (a_this: "ObjectsPipe") -> None:
		try:
			a_this.i_threadCondition.acquire ()
			a_this.i_isFinishedReading = True
			a_this.i_threadCondition.notifyAll ()
		finally:
			a_this.i_threadCondition.release ()
	
	def reset (a_this: "ObjectsPipe") -> None:
		try:
			a_this.i_threadCondition.acquire ()
			a_this.i_isFinishedWriting = False
			a_this.i_isFinishedReading = False
			a_this.i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
			a_this.i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
		finally:
			a_this.i_threadCondition.release ()


'theBiasPlanet/coreUtilities/constantsGroups/GeneralConstantsConstantsGroup.py'

@Python ソースコード
~

class GeneralConstantsConstantsGroup:
	~
	c_iterationStartNumber: int = 0
	~


'theBiasPlanet/coreUtilities/inputsHandling/NoMoreDataException.py'

@Python ソースコード

class NoMoreDataException (Exception):
	def __init__ (a_this: "NoMoreDataException", a_message: str) -> None:
		
		super ().__init__ (a_message)


'theBiasPlanet/coreUtilities/inputsHandling/NoMoreNeedsException.py'

@Python ソースコード

class NoMoreNeedsException (Exception):
	def __init__ (a_this: "NoMoreNeedsException", a_message: str) -> None:
		
		super ().__init__ (a_message)


'theBiasPlanet/coreUtilities/messagingHandling/Publisher.py'は、全て省略する、なぜなら、使われているメソッドはただログを書くだけだから。

'theBiasPlanet/coreUtilities/timersHandling/TimeOutException.py'

@Python ソースコード

class TimeOutException (Exception):
	def __init__ (a_this: "TimeOutException", a_message: str) -> None:
		
		super ().__init__ (a_message)


コードの説明は、Java版に対するものとほとんど同じだが、重複を含めて、説明をしよう。

注意として、コードには、mypyアノテーションがついている(私のどのPythonコードにもついているとおり)。

コンストラクターは、バッファサイズおよびリーダーを起こすためのアルゴリズム選択を受け取る。

'i_objects'がバッファだ。

'i_dataStartIndex'および'i_dataUntilIndex'は、値保持エリアの開始インデックスおよび終了インデックス(含まず)だ。バッファが空の時は、それらは、それぞれ、'0'および'0'になる。

コンストラクターを除く全てのパブリック想定のメソッドは、'a_this.i_threadCondition'オブジェクト上でシンクロナイズされている(「パブリック想定」というのは、クラスの外から使用されるように意図されている(私によって)ことを意味している、Pythonは非パブリックなメソッドなど許さないが)。実のところ、'~WithoutLocking'メソッド群は、プロテクテッド想定だ。

'readWithoutLocking (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1)'メソッドは、ループを持っているが、それは、複数のリーダーがいるかもしれないということに基づいている: 空のバッファに対して待っていたリーダーは、起こされて、書かれたオブジェクト群が既に他のリーダーたちによってさらわれ済みであることを発見し、もう一度待つことを余儀なくされるかもしれない。

同様に、'writeWithoutLocking (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1)'メソッドは、ループを持っているが、それは、複数のライターがいるかもしれないということに基づいている。

'writeWithoutLocking (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1)'メソッドの、パイプが読み取り終了と宣言された時の振る舞いが、'readWithoutLocking (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1)'メソッドの、パイプが書き込み終了と宣言された時の振る舞いと異なっている理由は、ライターは、それ以上書き込む必要が全然ない一方、リーダーは、既にバッファに格納済みのオブジェクト群を読み取りたいであろうことだ。

警告しておくが、私は、コードを強力にテストしたわけではない(まだ)、いくつかのシンプルなケース(その内の1つは、次セクションで示される)で使用はしたが。


3: 1つの使用例と1つの実行結果


Hypothesizer 7
以下は、当該オブジェクト群パイプを使用したサンプルプログラムだ。

@Python ソースコード
from typing import Optional
import sys
from threading import Thread
import traceback
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException import NoMoreDataException
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException import NoMoreNeedsException
from theBiasPlanet.coreUtilities.pipes.ObjectsPipe import ObjectsPipe

~
	~
	
	@staticmethod
	def prepareIntegers (a_writer: "ObjectsPipe [int]") -> None:
		l_iterationIndex: int
		for l_iterationIndex in range (0, 512, 1):
			try:
				a_writer.write (l_iterationIndex)
			except (NoMoreNeedsException) as l_exception:
				break
			sys.stdout.write ("### written: {0:d}\n".format (l_iterationIndex))
			sys.stdout.flush ()
	
	@staticmethod
	def processIntegers (a_reader: "ObjectsPipe [int]") -> None:
		l_integer: Optional [int] = None
		l_numberOfMultipleOf10s: int = 0
		while True:
			try:
				l_integer = a_reader.read ()
			except (NoMoreDataException) as l_exception:
				break
			sys.stdout.write ("### read: {0:d}\n".format (l_integer))
			sys.stdout.flush ();
			if l_integer % 10 == 0:
				l_numberOfMultipleOf10s = l_numberOfMultipleOf10s + 1
				sys.stdout.write ("### a multiple of 10s is found.\n".format ())
				sys.stdout.flush ();
		sys.stdout.write ("### the number of multiple of 10s is {0:d}.\n".format (l_numberOfMultipleOf10s))
		sys.stdout.flush ()
	
	@staticmethod
	def  test2 () -> None:
		l_integersPipe: "ObjectsPipe [int]" = ObjectsPipe [int] (16, True)
		PerformanceMeasurer.setStartTime ()
		def l_subThreadFunction () -> None:
			try:
				Test1Test.prepareIntegers (l_integersPipe)
			except (Exception) as l_exception:
				sys.stdout.write ("{0:s}: {1:s}\n".format (str (l_exception), traceback.format_exc ()))
			finally:
				try:
					l_integersPipe.finishWriting ()
				except (Exception) as l_exception:
					sys.stdout.write ("{0:s}: {1:s}\n".format (str (l_exception), traceback.format_exc ()))
		l_subThread: Thread = Thread (target = l_subThreadFunction)
		l_subThread.start ()
		Test1Test.processIntegers (l_integersPipe)
		l_subThread.join ()
		sys.stdout.write ("### The elapsed time is {0:,d} ns.\n".format (PerformanceMeasurer.getElapseTimeInNanoSeconds ()))

~

以下が、私のシングルコア、シングルCPU Linuxコンピュータ(それはアウトプットに影響していないかもしれない、なぜなら、Pythonには、グローバルインタープリターロックなる、ぞっとさせるものがあるから)における1つのアウトプットだ。

@出力
### written: 0
### read: 0
### a multiple of 10s is found.
### read: 1
### written: 1
### written: 2
### written: 3
### written: 4
### written: 5
### written: 6
### written: 7
### written: 8
### written: 9
### written: 10
### written: 11
### written: 12
### written: 13
### written: 14
### written: 15
### written: 16
### written: 17
### read: 2
### read: 3
### read: 4
### read: 5
### read: 6
### read: 7
### read: 8
### read: 9
### read: 10
### a multiple of 10s is found.
### read: 11
### read: 12
### read: 13
### read: 14
### read: 15
~
### written: 496
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### written: 503
### read: 487
### read: 488
### read: 489
### read: 490
### a multiple of 10s is found.
### read: 491
### read: 492
### read: 493
### read: 494
### read: 495
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### read: 502
### read: 503
### written: 504
### written: 505
### written: 506
### written: 507
### written: 508
### written: 509
### written: 510
### written: 511
### read: 504
### read: 505
### read: 506
### read: 507
### read: 508
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.

...えーと、起こったことは、ライターが始めに行動を起こし、'0'および'1'を書いた、リーダーは、'0'および'1'を読み、待ちにされた、なぜなら、パイプが空になったから、ライターは、フルに書き込み、リーダーを起こした、リーダーは読み始めた、と続く、私の推測では。...注意として、'1'は、リーダーが読み始める前に書かれたはずだ、「### written: 1」は、遅れて現れたが: 'write'メソッドが完了した後すぐに、コントロールは読み取りスレッドにスイッチし、メッセージ出力は、コントロールが書き込みスレッドにに戻るまで遅延された。

通知タイミングモードを'False'にセットした場合、以下がアウトプットだ。

@出力
### written: 0
### read: 0
### a multiple of 10s is found.
### read: 1
### written: 1
### written: 2
### read: 2
### read: 3
### written: 3
### written: 4
### read: 4
### read: 5
### written: 5
### written: 6
### read: 6
### read: 7
### written: 7
### written: 8
### read: 8
### read: 9
### written: 9
### written: 10
### read: 10
### a multiple of 10s is found.
### read: 11
### written: 11
### written: 12
### read: 12
### read: 13
### written: 13
### written: 14
### read: 14
### read: 15
### written: 15
~
### written: 496
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### written: 503
### written: 504
### written: 505
### written: 506
### written: 507
### written: 508
### read: 493
### read: 494
### read: 495
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### read: 502
### read: 503
### read: 504
### read: 505
### read: 506
### read: 507
### read: 508
### written: 509
### written: 510
### written: 511
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.

...ライターが始めに行動を起こして、'0'および'1'を書いた、リーダーは、'0'および'1'を読み、待ちにされた、なぜなら、パイプが空になったから、ライターは、'2'を書き、リーダーを起こし、'3'を書いた、リーダーは、読み始めた、と続く、私の推測では。...この場合も、「### written: 1」は、遅れて現われた。

いずれにせよ、バッファの少しだけのスロットが使用される傾向にあるようだ。

上記2モードにおける経過時間(時間が大きくかかるメッセージ出力を取り除いて)を計測したところ(モード毎に5回)、時間は、それぞれ、{'14,296,000'、'15,188,000'、'14,926,000'、'15,518,000'、'14,495,000': 平均 -> '14,884,600'} および {'15,321,000'、'13,819,000'、'15,230,000'、'14,191,000'、'14,009,000': 平均 -> '14,514,000'}(ナノ秒)だった。違いは有意でないように推測する。...ところで、気づいたのだが、Python版は、Java版よりもずっと高速だ、その理由は、私がCベースであるCPythonを使っているから? 多分。


4: 結びとその先


Hypothesizer 7
これで、私はオブジェクト群パイプを持つことになった。

オブジェクト群パイプの恩恵は、どれだけ多くのオブジェクトを処理しようとも、メモリースペースが枯渇することがないということだ。実際、上に挙げられたサンプルプログラムは、バッファオーバーフローを起こさない、繰り返し回数をいかに大きく設定しても。

本オブジェクト群パイプは、そのままでも文字列パイプとして使うことができるが、いくつかの追加のメソッドが、文字列パイプインスタンスを操作するのに便利だろうから、本オブジェクト群パイプクラス継承した文字列パイプクラスを特に作成しよう、次記事にて。


参考資料


<このシリーズの前の記事 | このシリーズの目次 | このシリーズの次の記事>