2021年7月18日日曜日

2: C#オブジェクト群パイプ(オブジェクト群を逐次に運ぶ)

<このシリーズの前の記事 | このシリーズの目次 | このシリーズの次の記事>

シリアルなのは、さもなければ、メモリが使い果たされてしまうかもしれないから。'java.io.PipedWriter' + 'java.io.PipedReader'に対する、オブジェクト群用C#相当物。

話題


About: C#

この記事の目次


開始コンテキスト


  • 読者は、C#の基本的知識を持っている。

ターゲットコンテキスト



  • 読者は、あるオブジェクト群パイプを知る。

オリエンテーション


あるC#文字列パイプが、次記事にて紹介されます。

あるJavaオブジェクト群パイプが、別シリーズのある記事にて紹介されました。

あるC++オブジェクト群パイプが、別シリーズのある記事にて紹介されました。

あるPythonオブジェクト群パイプが、別シリーズのある記事にて紹介されました。


本体


1: モチベーションおよびプランは、既に紹介されたJavaオブジェクト群パイプに対するものと同じ


Hypothesizer 7
実のところ、別シリーズのある記事が、既に、Javaオブジェクト群パイプを紹介した

そこに記述されたモチベーションおよびプランを繰り返すことは控える。


2: コードおよびその説明


Hypothesizer 7
以下が、私のオブジェクト群パイプクラスおよびそれに関連したクラス群のコードだ。

'theBiasPlanet/coreUtilities/pipes/ObjectsPipe.cs'

@C# ソースコード
namespace theBiasPlanet {
	namespace coreUtilities {
		namespace pipes {
			using System;
			using System.Collections.Generic;
			using System.Runtime.CompilerServices;
			using System.Threading;
			using theBiasPlanet.coreUtilities.constantsGroups;
			using theBiasPlanet.coreUtilities.inputsHandling;
			using theBiasPlanet.coreUtilities.messagingHandling;
			using theBiasPlanet.coreUtilities.timersHandling;
			
			public class ObjectsPipe <T> {
				protected Object [] i_objects;
				protected Int32 i_bufferSize = 0;
				// No data: i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber
				protected Int32 i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
				protected Int32 i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
				protected Boolean i_isFinishedWriting = false;
				protected Boolean i_isFinishedReading = false;
				protected Boolean i_notificationIsDelayed = false;
				
				public ObjectsPipe (Int32 a_bufferSize, Boolean a_notificationIsDelayed) {
					i_bufferSize = a_bufferSize;
					i_objects = new Object [i_bufferSize];
					i_notificationIsDelayed = a_notificationIsDelayed;
				}
				
				~ObjectsPipe () {
				}
				
				protected Boolean isEmptyWithoutLocking () {
					return i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber;
				}
				
				protected Boolean isFullWithoutLocking () {
					return (i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == i_bufferSize) || (i_dataStartIndex != GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataStartIndex == i_dataUntilIndex);
				}
				
				// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
				protected void writeWithoutLocking (T a_object, Int32 a_timeOutPeriodInMilliseconds = -1) {
					if (i_isFinishedReading) {
						throw new NoMoreNeedsException ("");
					}
					if (i_isFinishedWriting) {
						i_isFinishedWriting = false;
					}
					while (true) {
						if (isFullWithoutLocking ()) {
							try {
								if (a_timeOutPeriodInMilliseconds == -1) {
									Monitor.Wait (this);
								}
								else if (a_timeOutPeriodInMilliseconds == 0) {
								}
								else {
									Monitor.Wait (this, a_timeOutPeriodInMilliseconds);
								}
							}
							catch (Exception l_exception) {
								Publisher.logErrorInformation (l_exception);
							}
						}
						// Checked again because the status may have changed while this thread was waiting.
						if (i_isFinishedReading) {
							throw new NoMoreNeedsException ("");
						}
						if (!isFullWithoutLocking ()) {
							Boolean l_wasEmpty = isEmptyWithoutLocking ();
							if (i_dataUntilIndex == i_bufferSize) {
								i_objects [GeneralConstantsConstantsGroup.c_iterationStartNumber] = a_object;
								i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber + 1;
							}
							else {
								i_objects [i_dataUntilIndex] = a_object;
								i_dataUntilIndex ++;
							}
							if ( (!i_notificationIsDelayed && l_wasEmpty) || (i_notificationIsDelayed && isFullWithoutLocking ())) {
								Monitor.PulseAll (this);
							}
							return;
						}
						else {
							if (a_timeOutPeriodInMilliseconds != -1) {
								throw new TimeOutException ("");
							}
						}
					}
				}
				
				// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
				protected T readWithoutLocking (Int32 a_timeOutPeriodInMilliseconds = -1) {
					T l_readObject;
					if (i_isFinishedReading) {
						i_isFinishedReading = false;
					}
					while (true) {
						if (isEmptyWithoutLocking ()) {
							if (!i_isFinishedWriting) {
								try {
									if (a_timeOutPeriodInMilliseconds == -1) {
										Monitor.Wait (this);
									}
									else if (a_timeOutPeriodInMilliseconds == 0) {
									}
									else {
										Monitor.Wait (this, a_timeOutPeriodInMilliseconds);
									}
								}
								catch (Exception l_exception) {
									Publisher.logErrorInformation (l_exception);
								}
							}
							else {
								throw new NoMoreDataException ("");
							}
						}
						// Checked again because the status may have changed while this thread was waiting.
						if (!isEmptyWithoutLocking ()) {
							Boolean l_wasFull = isFullWithoutLocking ();
							l_readObject = (T) i_objects [i_dataStartIndex];
							i_objects [i_dataStartIndex] = null;
							i_dataStartIndex ++;
							if (i_dataStartIndex == i_dataUntilIndex) {
								i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
								i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
							}
							else {
								if (i_dataStartIndex == i_bufferSize) {
									i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
								}
							}
							if ( (!i_notificationIsDelayed && l_wasFull) || (i_notificationIsDelayed && isEmptyWithoutLocking ())) {
								Monitor.PulseAll (this);
							}
							return l_readObject;
						}
						else {
							if (i_isFinishedWriting) {
								throw new NoMoreDataException ("");
							}
							if (a_timeOutPeriodInMilliseconds != -1) {
								throw new TimeOutException ("");
							}
						}
					}
				}
				
				[MethodImpl (MethodImplOptions.Synchronized)]
				public Boolean isEmpty () {
					return isEmptyWithoutLocking ();
				}
				
				[MethodImpl (MethodImplOptions.Synchronized)]
				public Boolean isFull () {
					return isFullWithoutLocking ();
				}
				
				// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
				[MethodImpl (MethodImplOptions.Synchronized)]
				public void write (T a_object, Int32 a_timeOutPeriodInMilliseconds = -1) {
					writeWithoutLocking (a_object, a_timeOutPeriodInMilliseconds);
				}
				
				// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
				[MethodImpl (MethodImplOptions.Synchronized)]
				public Int32 write (T [] a_objects, Int32 a_offset, Int32 a_length, Int32 a_timeOutPeriodInMilliseconds = -1) {
					Int32 l_writtenLength = 0;
					for (l_writtenLength = 0; l_writtenLength < a_length; l_writtenLength ++) {
						try {
							if (l_writtenLength == 0 || ! (isFullWithoutLocking ())) {
								writeWithoutLocking (a_objects [a_offset + l_writtenLength], a_timeOutPeriodInMilliseconds);
							}
						}
						catch (NoMoreNeedsException l_exception) {
							if (l_writtenLength == 0) {
								throw l_exception;
							}
							else {
								break;
							}
						}
					}
					return l_writtenLength;
				}
				
				// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
				[MethodImpl (MethodImplOptions.Synchronized)]
				public T read (Int32 a_timeOutPeriodInMilliseconds = -1) {
					return readWithoutLocking (a_timeOutPeriodInMilliseconds);
				}
				
				// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
				[MethodImpl (MethodImplOptions.Synchronized)]
				public Int32 read (T [] a_objects, Int32 a_offset, Int32 a_length, Int32 a_timeOutPeriodInMilliseconds = -1) {
					Int32 l_readLength = 0;
					for (; l_readLength < a_length; l_readLength ++) {
						if ( (l_readLength == 0) || !isEmptyWithoutLocking ()) {
							a_objects [a_offset + l_readLength] = readWithoutLocking (a_timeOutPeriodInMilliseconds);
						}
						else {
							break;
						}
					}
					return l_readLength;
				}
				
				[MethodImpl (MethodImplOptions.Synchronized)]
				public List <T> readWholeData () {
					List <T> l_objectsList = new List <T> ();
					while (true) {
						try {
							l_objectsList.Add (readWithoutLocking ());
						}
						catch (NoMoreDataException) {
							break;
						}
					}
					return l_objectsList;
				}
				
				[MethodImpl (MethodImplOptions.Synchronized)]
				public void finishWriting () {
					i_isFinishedWriting = true;
					Monitor.PulseAll (this);
				}
				
				[MethodImpl (MethodImplOptions.Synchronized)]
				public void finishReading () {
					i_isFinishedReading = true;
					Monitor.PulseAll (this);
				}
				
				[MethodImpl (MethodImplOptions.Synchronized)]
				public void reset () {
					i_isFinishedWriting = false;
					i_isFinishedReading = false;
					i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
					i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
				}
			}
		}
	}
}

'theBiasPlanet/coreUtilities/inputsHandling/NoMoreDataException.cs'

@C# ソースコード
namespace theBiasPlanet {
	namespace coreUtilities {
		namespace inputsHandling {
			using System;
			
			public class NoMoreDataException : Exception {
				public NoMoreDataException (String a_message) : base (a_message) {
				}
			}
		}
	}
}

'theBiasPlanet/coreUtilities/inputsHandling/NoMoreNeedsException.cs'

@C# ソースコード
namespace theBiasPlanet {
	namespace coreUtilities {
		namespace inputsHandling {
			using System;
			
			public class NoMoreNeedsException : Exception {
				public NoMoreNeedsException (String a_message) : base (a_message) {
				}
			}
		}
	}
}

'theBiasPlanet/coreUtilities/timersHandling/TimeOutException.cs'

@C# ソースコード
namespace theBiasPlanet {
	namespace coreUtilities {
		namespace timersHandling {
			using System;
			
			public class TimeOutException : Exception {
				public TimeOutException (String a_message) : base (a_message) {
				}
			}
		}
	}
}

'theBiasPlanet/coreUtilities/messagingHandling/Publisher.cs'は、全体として省略された、なぜなら、使用されているメソッドはただログを書くだけだから。

そのコードの説明はJava版に対するのとほとんど同じだ、しかし、重複を含めて、説明を行なおう。

そのコンストラクタは、バッファサイズ、およびリーダー達を起こすためのアルゴリズムの選択を受け取る。

'i_objects'がバッファだ。

'i_dataStartIndex'および'i_dataUntilIndex'は、値が入ったエリアの開始インデックスおよび終了インデックス(含まず)だ。バッファが空の時は、それらは、それぞれ、'0'および'0'になる。

コンストラクタを除く全てのパブリックメソッドテンプレートは、'this'オブジェクト上でシンクロナイズされている。

'readWithoutLocking (Int32 a_timeOutPeriodInMilliseconds = -1)'メソッドがループを持っているのは、複数のリーダーがいるかもしれないということに基づいている: 空のバッファに対して待ちをしていたあるリーダーは、起こされて、書かれたオブジェクト群が既に他のリーダーたちにさらわれてしまっていることを発見し、再び待たないといけなくなるかもしれない。

同様に、'writeWithoutLocking (T a_object, Int32 a_timeOutPeriodInMilliseconds = -1)'メソッドがループを持っているのは、複数のライターがいるかもしれないことに基づいている。

'writeWithoutLocking (T a_object, Int32 a_timeOutPeriodInMilliseconds = -1)'メソッドの、パイプが読取りを終了したと宣言された際の振る舞いが、'readWithoutLocking (Int32 a_timeOutPeriodInMilliseconds = -1)'メソッドの、パイプが書き込みを終了したと宣言された際の振る舞いと若干異なっている理由は、ライターは、それ以上書く必要が全然ないが、リーダーは、バッファ内に既に格納されているオブジェクト群を読み上げたいだろうということだ。

警告しておくが、私は、コードを強力にテストしたわけではない(まだ)、いくつかのシンプルなケース(その内の1つは、次セクションで示される)で使用はしたが。


3: 1つの使用例と1つの実行結果


Hypothesizer 7
以下は、このオブジェクト群パイプを使用したサンプルプログラムだ。

@C# ソースコード
namespace theBiasPlanet {
	namespace coreUtilitiesTests {
		namespace pipesTest1 {
			using System;
			using System.Threading;
			using theBiasPlanet.coreUtilities.inputsHandling;
			using theBiasPlanet.coreUtilities.pipes;
			
			public class Test1Test {
				public static void main (String [] a_argumentsArray) {
					test2 ();
				}
				
				public static void prepareIntegers (ObjectsPipe <Int32> a_writer) {
					for (int l_iterationIndex = 0; l_iterationIndex < 512; l_iterationIndex ++) {
						try {
							a_writer.write (l_iterationIndex);
						}
						catch (NoMoreNeedsException) {
							break;
						}
						Console.Out.WriteLine (String.Format ("### written: {0:d}", l_iterationIndex));
						Console.Out.Flush ();
					}
				}
				
				private static void processIntegers (ObjectsPipe <Int32> a_reader) {
					Int32 l_integer = -1;
					int l_numberOfMultipleOf10s = 0;
					while (true) {
						try {
							l_integer = a_reader.read ();
						}
						catch (NoMoreDataException) {
							break;
						}
						Console.Out.WriteLine (String.Format ("### read: {0:d}", l_integer));
						Console.Out.Flush ();
						if (l_integer % 10 == 0) {
							l_numberOfMultipleOf10s ++;
							Console.Out.WriteLine (String.Format ("### a multiple of 10s is found."));
							Console.Out.Flush ();
						}
					}
					Console.Out.WriteLine (String.Format ("### the number of multiple of 10s is {0:d}.", l_numberOfMultipleOf10s));
					Console.Out.Flush ();
				}
				
				private static void test2 () {
					ObjectsPipe <Int32> l_integersPipe = new ObjectsPipe <Int32> (16, true);
					//ObjectsPipe <Int32> l_integersPipe = new ObjectsPipe <Int32> (16, false);
					Thread l_subThread = new Thread (() => {
						try {
							prepareIntegers (l_integersPipe);
						}
						catch (Exception l_exception) {
							Console.Out.WriteLine (l_exception.ToString ());
						}
						finally {
							try {
								l_integersPipe.finishWriting ();
							}
							catch (Exception l_exception) {
								Console.Out.WriteLine (l_exception.ToString ());
							}
						}
					});
					l_subThread.Start ();
					processIntegers (l_integersPipe);
					l_subThread.Join ();
				}
			}
		}
	}
}

以下は、私のシングルコア、シングルCPU Linuxコンピュータ(Monoが使われている)(それがアウトプットに影響しているはずだ)における1つのアウトプットだ。

@出力
### written: 0
### written: 1
### written: 2
### written: 3
### written: 4
### written: 5
### written: 6
### written: 7
### written: 8
### written: 9
### written: 10
### written: 11
### written: 12
### written: 13
### written: 14
### read: 0
### a multiple of 10s is found.
### read: 1
### read: 2
### read: 3
### read: 4
### read: 5
### read: 6
### read: 7
### read: 8
### read: 9
### read: 10
### a multiple of 10s is found.
### read: 11
### read: 12
### read: 13
### read: 14
### read: 15
### written: 15
~
### written: 496
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### written: 503
### written: 504
### written: 505
### written: 506
### written: 507
### written: 508
### written: 509
### written: 510
### written: 511
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### read: 502
### read: 503
### read: 504
### read: 505
### read: 506
### read: 507
### read: 508
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.

. . . えーと、起こったことは、リーダーが最初に行動し、パイプが空だったから待ちにされた、ライターが満杯まで書き、リーダーを起こした、リーダーが読み始めた、と続く、私の推測では。 . . . 注意として、'15'は、リーダーが起こされる前に書かれたに違いない、「### written: 15」は遅れて表示されているが: 'write'メソッドが完了した直後に、コントロールは読取りスレッドへスイッチし、メッセージ表示は、コントロールが書き込みスレッドに戻されるまで遅らされた。

通知タイミングモードが'false'にセットされた時は、以下がアウトプットだ。

@出力
### read: 0
### a multiple of 10s is found.
### written: 0
### read: 1
### written: 1
### read: 2
### written: 2
### read: 3
### written: 3
### read: 4
### written: 4
### read: 5
### written: 5
### read: 6
### written: 6
### read: 7
### written: 7
### read: 8
### written: 8
### read: 9
### written: 9
### read: 10
### a multiple of 10s is found.
### written: 10
### written: 11
### written: 12
### written: 13
### written: 14
### written: 15
### written: 16
### written: 17
### written: 18
### written: 19
### written: 20
### written: 21
### written: 22
### written: 23
### written: 24
### written: 25
### written: 26
### read: 11
### read: 12
### read: 13
### read: 14
### read: 15
~
### written: 496
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### written: 503
### written: 504
### written: 505
### written: 506
### written: 507
### written: 508
### written: 509
### written: 510
### written: 511
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### read: 502
### read: 503
### read: 504
### read: 505
### read: 506
### read: 507
### read: 508
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.

. . . リーダーが最初に行動し、パイプが空だったので待ちにされた、ライターが'0'を書き、リーダーを起こした、リーダーが読み始めた、と続く、私の推測では。 . . . この場合も、「### written: 0」は遅れて表示された。

それらの実行では、明確な違いは、最初の少しの間だけ現れた。

2つのモードにてかかった時間を計測した(時間のかかるメッセージ出力を取り除いて)(モード毎に5回)ところ、時間は、ナノ秒で(明らかに、そんな精度はない、実際には)、それぞれ、{'176,234,000'、'93,412,000'、'107,652,000'、'147,381,000'、'130,540,000'}および{'99,691,000'、'260,851,000'、'216,234,000'、'147,965,000'、'208,565,000'}だった。計測時間の不安定性のため、私は特に何の結論も引き出さない。


参考資料


<このシリーズの前の記事 | このシリーズの目次 | このシリーズの次の記事>