2020年8月30日日曜日

2: Javaオブジェクト群パイプ(オブジェクト群を逐次に運ぶ)

<このシリーズの前の記事 | このシリーズの目次 | このシリーズの次の記事>

逐次でなければならないのは、さもなければ、メモリーが使い果たされてしまうかもしれないから。'java.io.PipedWriter' + 'java.io.PipedReader'が文字群に対して行なうことを、オブジェクト群に対して行ないます。

話題


About: Javaプログラミング言語

この記事の目次


開始コンテキスト


  • 読者は、Javaプログラミング言語の基本的知識を持っている。

ターゲットコンテキスト



  • 読者は、あるオブジェクト群パイプを知る。

オリエンテーション



本体


1: メモリーを使い果たすことの害


Hypothesizer 7
プログラムによって引き起こされる可能性のある主要な害の1つは、メモリーを使い果たすことであり、コンピューター全体を麻痺させかねない。

それがより一層に苛立たしいのは、コンピューターが、不毛なことをするのに没頭しているから: スラッシング。. . .私が命じた計算を行なうのにシステムが忙しいのであれば、その努力を評価もしようが。

メモリーが使い果たされたのは、プログラムが多くのデータを溜め込んだからであるが、そのデータは、本当にメモリー内に山積みにされなければならなかったのだろうか、と私は問わなければならない。

1つの例を考えてみよう。

@Java ソースコード
import java.util.ArrayList;
import java.util.List;

~
	~
	
	public static ArrayList <Integer> prepareAndHoardIntegers () {
		ArrayList <Integer> l_integers = new ArrayList <Integer> ();
		for (int l_iterationIndex = 0; l_iterationIndex < 512; l_iterationIndex ++) {
			l_integers.add (Integer.valueOf (l_iterationIndex));
		}
		return l_integers;
	}
	
	public static void processPreparedAndHoardedIntegers (List <Integer> a_integers)  {
		int l_numberOfMultipleOf10s = 0;
		for (Integer l_integer: a_integers) {
			System.out.println (String.format ("### read: %d", l_integer.intValue ()));
			System.out.flush ();
			if (l_integer.intValue () % 10 == 0) {
				l_numberOfMultipleOf10s ++;
				System.out.println (String.format ("### a multiple of 10s is found."));
				System.out.flush ();
			}
		}
		System.out.println (String.format ("### the number of multiple of 10s is %d.", l_numberOfMultipleOf10s));
		System.out.flush ();
	}
	
	public static void test00 () throws Exception {
		ArrayList <Integer> l_integers = prepareAndHoardIntegers ();
		processPreparedAndHoardedIntegers (l_integers);
	}
~

それがメモリーを使い果たす(データ量が十分に大きければ)のは、全てのオブジェクト群がまずメモリー内に溜められるから、処理され始める前に。

それでは、以下のようにするべきだというのか?

@Java ソースコード
~
	~
	
	public static void prepareAndProcessIntegers () {
		Integer l_integer = null;
		int l_numberOfMultipleOf10s = 0;
		for (int l_iterationIndex = 0; l_iterationIndex < 512; l_iterationIndex ++) {
			l_integer = Integer.valueOf (l_iterationIndex);
			System.out.println (String.format ("### read: %d", l_integer.intValue ()));
			System.out.flush ();
			if (l_integer.intValue () % 10 == 0) {
				l_numberOfMultipleOf10s ++;
				System.out.println (String.format ("### a multiple of 10s is found."));
				System.out.flush ();
			}
		}
		System.out.println (String.format ("### the number of multiple of 10s is %d.", l_numberOfMultipleOf10s));
		System.out.flush ();
	}
	
	public static void test01 () {
		prepareAndProcessIntegers ();
	}
~

. . .えーと、問題は、準備を行なうファンクションはオブジェクト群の準備を行なうだけであるように意図されていて、オブジェクト群を処理するようには意図されていないことだ、役割として。. . .実際、準備を行なうファンクションは、オブジェクト群をどう処理するべきかについて、何も知らない。

確かに、回避策はある、特定のインターフェースを持った処理者インスタンスを準備ファンクションが受け取るという回避策が、以下のように。

@Java ソースコード
~
	~
	
	static interface IntegerProcessor {
		void process (Integer a_integer);
		void summarize ();
	}
	
	static public class AIntegerProcessor implements IntegerProcessor {
		private int i_numberOfMultipleOf10s = 0;
		
		@Override
		public void process (Integer a_integer) {
			System.out.println (String.format ("### read: %d", a_integer.intValue ()));
			System.out.flush ();
			if (a_integer.intValue () % 10 == 0) {
				i_numberOfMultipleOf10s ++;
				System.out.println (String.format ("### a multiple of 10s is found."));
				System.out.flush ();
			}
		}
		
		@Override
		public void summarize () {
			System.out.println (String.format ("### the number of multiple of 10s is %d.", i_numberOfMultipleOf10s));
			System.out.flush ();
		}
	}
	
	public static void prepareAndProcessIntegers (IntegerProcessor a_integerProcessor) {
		Integer l_integer = null;
		int l_numberOfMultipleOf10s = 0;
		for (int l_iterationIndex = 0; l_iterationIndex < 512; l_iterationIndex ++) {
			l_integer = Integer.valueOf (l_iterationIndex);
			a_integerProcessor.process (l_integer);
		}
		a_integerProcessor.summarize ();
	}
	
	public static void test02 () {
		prepareAndProcessIntegers (new AIntegerProcessor ());
	}
~

. . .しかしながら、問題は、全オブジェクトが一気に処理できるということでは、必ずしもないことだ: 上記コードでは、各オブジェクトが処理されるタイミングが、'prepareAndProcessIntegers (IntegerProcessor a_integerProcessor)'ファンクションの中に不可避に囚われてしまっている。

もっと一般的に言うと、準備ファンクションは、純粋にオブジェクト群を準備するためのものと意図されているので、そのデザインは無節操に歪められるべきでない、さもなければ、その歪みがコード全体を侵食しかねない。

何も歪めることなく問題を解決するには、オブジェクト群をある場所から別の場所へ逐次に運ぶ、オブジェクト群パイプが必要だ。


2: 'java.io.PipedWriter' + 'java.io.PipedReader'をインスピレーションとする


Hypothesizer 7
実のところ、Javaには、'java.io.PipedWriter' + 'java.io.PipedReader'がある、もしも、運ばれるものが文字群であるならば。

それらはペアで使われ、何らかの文字群が逐次に'java.io.PipedWriter'インスタンスに書かれると、それらは、その'java.io.PipedWriter'インスタンスに接続された'java.io.PipedReader'インスタンスから逐次に読まれる。

以下は、1つの使用例だ。

@Java ソースコード
import java.io.PipedReader;
import java.io.PipedWriter;
import java.io.Reader;
import java.io.Writer;
~
	
	public static void prepareString (Writer a_writer) throws Exception {
		for (int l_iterationIndex = 0; l_iterationIndex < 512; l_iterationIndex ++) {
			a_writer.write (String.format ("%d", l_iterationIndex));
			System.out.println (String.format ("### written: %d", l_iterationIndex));
			System.out.flush ();
		}
	}
	
	public static void processString (Reader a_reader) throws Exception {
		int l_character = -1;
		int l_numberOf0s = 0;
		int l_codeOf0 = '0';
		while ((l_character = a_reader.read ()) != -1) {
			System.out.println (String.format ("### read: %c", (char) l_character));
			if (l_character == l_codeOf0) {
				l_numberOf0s ++;
				System.out.println (String.format ("### a 0 is found."));
				System.out.flush ();
			}
		}
		System.out.println (String.format ("### the number of 0s is %d.", l_numberOf0s));
		System.out.flush ();
	}
	
	public static void test1 () throws Exception {
		PipedWriter l_pipedStringWriter = new PipedWriter ();
		PipedReader l_pipedStringReader = new PipedReader (l_pipedStringWriter, 16);
		Thread l_subThread = new Thread (() -> {
			try {
				prepareString (l_pipedStringWriter);
			}
			catch (Exception l_exception) {
				l_exception.printStackTrace ();
			}
			finally {
				try {
					l_pipedStringWriter.close ();
				}
				catch (Exception l_exception) {
					l_exception.printStackTrace ();
				}
			}
		});
		l_subThread.start ();
		processString (l_pipedStringReader);
		l_pipedStringReader.close ();
		l_subThread.join ();
	}
~

以下は、1つのアウトプットだ。

@出力
### written: 0
### written: 1
### written: 2
### written: 3
### written: 4
### written: 5
### written: 6
### written: 7
### written: 8
### written: 9
### written: 10
### written: 11
### written: 12
### read: 0
### 0 is found.
### read: 1
### read: 2
### read: 3
### read: 4
### read: 5
### read: 6
### read: 7
### read: 8
### read: 9
### read: 1
### read: 0
### 0 is found.
### read: 1
### read: 1
### read: 1
### read: 2
~
### written: 506
### written: 507
### written: 508
### written: 509
### written: 510
### read: 5
### read: 0
### 0 is found.
### read: 6
### read: 5
### read: 0
### 0 is found.
### read: 7
### read: 5
### read: 0
### 0 is found.
### read: 8
### read: 5
### read: 0
### 0 is found.
### read: 9
### read: 5
### read: 1
### read: 0
### 0 is found.
### read: 5
### written: 511
### read: 1
### read: 1
### the number of 0s is 102.

それの何がいいのか?

"16"はパイプのバッファサイズであり、そのパイプはそれより多くのデータを決して溜め込まないことを意味する。書き込み側は、もしもパイプが満杯でなければ書き、さもなければ待つ。読み込み側は、もしもパイプが空でなければ読み(勿論、読まれたデータは、パイプから廃棄される)、さもなければ、待つ。

要点は、読み込み側は、自分の好きなタイミングでパイプを読めることだ: 読み込み側は、書き込み側があるデータを書いたからというだけの理由で、そのデータを即座に読むように強制されたりしない。読み込み側が読むことをしばらく怠れば、バッファが満杯になるかも知れないが、読み込み側は、書き込み側を待たせておけばよいだけだ。他方、もしも、読み込み側が、パイプを読もうとしてパイプが空であることを発見したら、読み込み側は待つことになり、そうすれば、書き込み側に書く機会が与えられる。


3: 計画


Hypothesizer 7
それでは、オブジェクト群パイプは、ただのバッファで、それに書き込み側が書き、それから読み込み側が読むだけなのか?

えーと、基本的にはそうだ、しかし、そのバッファは、同時アクセスを適切に切り回せるに十分なだけ賢く、あるアクセスを許可し、あるアクセスを待たせ、ある待機中アクセスを起こし、等しなければならない。

加えて、書き込み側には、書き込みを完了したことを通知する手段がなければならず、読み込み側には、読み込みを完了したことを通知する手段がなければならない。

加えて、書き込み側や読み込み側は、書きや読みをタイムアウトさせたいかもしれない。

バッファは、サイクリックに使用されなければならない、データを動かし回すことを避けるためには: 例えば、バッファfサイズが16であり、書き込み側が10個のオブジェクトを書き、読み込み側が5個のオブジェクトを読み、書き込み側がさらに10個のオブジェクトを書いたら、値の入ったエリアは、''から、'0->9'、'5->9'、'5->15;0->3'(それを私は単に'5->3'と表現する)へと変更される。

書き込み側を読み込み側よりも優先するアルゴリズムとして、洗練されたオプションもあるかもしれないが、私は、かなり単純な2つのオプションを実装した: 1) 読み込み側は、バッファが満杯になった時に起こされる、 および 2) 読み込み側は、バッファが空でなくなった時に起こされる、その一方、書き込み側は、常に、バッファが満杯でなくなった時に起こされる。. . .留意しないといけないが、前者のオプションは、もしも書き込み側が勤勉に書かなければ、読み込み側を不必要に長時間待たせるかもしれない、そのかわり、書き込み側がそうすれば、より効率的かもしれない。

通常は、単一の書き込み側と単一の読み込み側があると想定されているが、複数の書き込み側や複数の書き込み側がある可能性も特に除外されているわけではない、そういうケースについてテストをしてはいないが、まだ。


4: コードとその説明


Hypothesizer 7
以下が、私のオブジェクト群パイプクラスおよびその関連クラス群のコードだ。

'theBiasPlanet/coreUtilities/pipes/ObjectsPipe.java'

@Java ソースコード
package theBiasPlanet.coreUtilities.pipes;

import java.util.ArrayList;
import java.util.List;
import theBiasPlanet.coreUtilities.constantsGroups.GeneralConstantsConstantsGroup;
import theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException;
import theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException;
import theBiasPlanet.coreUtilities.messagingHandling.Publisher;
import theBiasPlanet.coreUtilities.timersHandling.TimeOutException;

public class ObjectsPipe <T> {
	protected Object [] i_objects;
	protected int i_bufferLength = 0;
	// No data: i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber
	protected int i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
	protected int i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
	protected boolean i_isFinishedWriting = false;
	protected boolean i_isFinishedReading = false;
	protected boolean i_notificationIsDelayed = false;
	
	public ObjectsPipe (int a_bufferLength, boolean a_notificationIsDelayed) {
		i_bufferLength = a_bufferLength;
		i_objects = new Object [i_bufferLength];
		i_notificationIsDelayed = a_notificationIsDelayed;
	}
	
	@Override
	protected void finalize () {
	}
	
	protected boolean isEmptyWithoutLocking () {
		return i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber;
	}
	
	protected boolean isFullWithoutLocking () {
		return (i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == i_bufferLength) || (i_dataStartIndex != GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataStartIndex == i_dataUntilIndex);
	}
	
	// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	protected void writeWithoutLocking (T a_object, long a_timeOutPeriodInMilliseconds) throws NoMoreNeedsException, TimeOutException {
		if (i_isFinishedReading) {
			throw new NoMoreNeedsException ("");
		}
		if (i_isFinishedWriting) {
			i_isFinishedWriting = false;
		}
		while (true) {
			if (isFullWithoutLocking ()) {
				try {
					if (a_timeOutPeriodInMilliseconds == -1) {
						this.wait ();
					}
					else if (a_timeOutPeriodInMilliseconds == 0) {
					}
					else {
						this.wait (a_timeOutPeriodInMilliseconds);
					}
				}
				catch (InterruptedException l_exception) {
					Publisher.logErrorInformation (l_exception);
				}
			}
			// Checked again because the status may have changed while this thread was waiting.
			if (i_isFinishedReading) {
				throw new NoMoreNeedsException ("");
			}
			if (!isFullWithoutLocking ()) {
				boolean l_wasEmpty = isEmptyWithoutLocking ();
				if (i_dataUntilIndex == i_bufferLength) {
					i_objects [GeneralConstantsConstantsGroup.c_iterationStartNumber] = a_object;
					i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber + 1;
				}
				else {
					i_objects [i_dataUntilIndex] = a_object;
					i_dataUntilIndex ++;
				}
				if ( (!i_notificationIsDelayed && l_wasEmpty) || (i_notificationIsDelayed && isFullWithoutLocking ())) {
					this.notifyAll ();
				}
				return;
			}
			else {
				if (a_timeOutPeriodInMilliseconds != -1) {
					throw new TimeOutException ("");
				}
			}
		}
	}
	
	@SuppressWarnings("unchecked")
	// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	protected T readWithoutLocking (long a_timeOutPeriodInMilliseconds) throws NoMoreDataException, TimeOutException {
		T l_readObject = null;
		if (i_isFinishedReading) {
			i_isFinishedReading = false;
		}
		while (true) {
			if (isEmptyWithoutLocking ()) {
				if (!i_isFinishedWriting) {
					try {
						if (a_timeOutPeriodInMilliseconds == -1) {
							this.wait ();
						}
						else if (a_timeOutPeriodInMilliseconds == 0) {
						}
						else {
							this.wait (a_timeOutPeriodInMilliseconds);
						}
					}
					catch (InterruptedException l_exception) {
						Publisher.logErrorInformation (l_exception);
					}
				}
				else {
					throw new NoMoreDataException ("");
				}
			}
			// Checked again because the status may have changed while this thread was waiting.
			if (!isEmptyWithoutLocking ()) {
				boolean l_wasFull = isFullWithoutLocking ();
				l_readObject = (T) i_objects [i_dataStartIndex];
				i_objects [i_dataStartIndex] = null;
				i_dataStartIndex ++;
				if (i_dataStartIndex == i_dataUntilIndex) {
					i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
					i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
				}
				else {
					if (i_dataStartIndex == i_bufferLength) {
						i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
					}
				}
				if ( (!i_notificationIsDelayed && l_wasFull) || (i_notificationIsDelayed && isEmptyWithoutLocking ())) {
					this.notifyAll ();
				}
				return l_readObject;
			}
			else {
				if (i_isFinishedWriting) {
					throw new NoMoreDataException ("");
				}
				if (a_timeOutPeriodInMilliseconds != -1) {
					throw new TimeOutException ("");
				}
			}
		}
	}
	
	public synchronized boolean isEmpty () {
		return isEmptyWithoutLocking ();
	}
	
	public synchronized boolean isFull () {
		return isFullWithoutLocking ();
	}
	
	// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	public synchronized void write (T a_object, long a_timeOutPeriodInMilliseconds) throws NoMoreNeedsException, TimeOutException {
		writeWithoutLocking (a_object, a_timeOutPeriodInMilliseconds);
	}
	
	public synchronized void write (T a_object) throws NoMoreNeedsException {
		try {
			write (a_object, -1);
		}
		catch (TimeOutException l_exception) {
			// impossible
		}
	}
	
	// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	public synchronized int write (T [] a_objects, int a_offset, int a_length, long a_timeOutPeriodInMilliseconds) throws NoMoreNeedsException, TimeOutException {
		int l_writtenLength = 0;
		for (l_writtenLength = 0; l_writtenLength < a_length; l_writtenLength ++) {
			try {
				if (l_writtenLength == 0 || ! (isFullWithoutLocking ())) {
					writeWithoutLocking (a_objects [a_offset + l_writtenLength], a_timeOutPeriodInMilliseconds);
				}
			}
			catch (NoMoreNeedsException l_exception) {
				if (l_writtenLength == 0) {
					throw l_exception;
				}
				else {
					break;
				}
			}
		}
		return l_writtenLength;
	}
	
	public synchronized int write (T [] a_objects, int a_offset, int a_length) throws NoMoreNeedsException {
		try {
			return write (a_objects, a_offset, a_length, -1);
		}
		catch (TimeOutException l_exception) {
			// impossible
			return 0;
		}
	}
	
	// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	public synchronized T read (long a_timeOutPeriodInMilliseconds) throws NoMoreDataException, TimeOutException {
		return readWithoutLocking (a_timeOutPeriodInMilliseconds);
	}
	
	public synchronized T read () throws NoMoreDataException {
		try {
			return read (-1);
		}
		catch (TimeOutException l_exception) {
			// impossible
			return null;
		}
	}
	
	// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	public synchronized int read (T [] a_objects, int a_offset, int a_length, long a_timeOutPeriodInMilliseconds) throws NoMoreDataException, TimeOutException {
		int l_readLength = 0;
		for (; l_readLength < a_length; l_readLength ++) {
			if ( (l_readLength == 0) || !isEmptyWithoutLocking ()) {
				a_objects [a_offset + l_readLength] = readWithoutLocking (a_timeOutPeriodInMilliseconds);
			}
			else {
				break;
			}
		}
		return l_readLength;
	}
	
	public synchronized int read (T [] a_objects, int a_offset, int a_length) throws NoMoreDataException {
		try {
			return read (a_objects,  a_offset,  a_length, -1);
		}
		catch (TimeOutException l_exception) {
			// impossible
			return 0;
		}
	}
	
	public synchronized List <T> readWholeData () {
		List <T> l_objectsList = new ArrayList <T> ();
		while (true) {
			try {
				l_objectsList.add (readWithoutLocking (-1));
			}
			catch (TimeOutException l_exception) {
				// impossible
			}
			catch (NoMoreDataException l_exception) {
				break;
			}
		}
		return l_objectsList;
	}
	
	public synchronized void finishWriting () {
		i_isFinishedWriting = true;
		this.notifyAll ();
	}
	
	public synchronized void finishReading () {
		i_isFinishedReading = true;
		this.notifyAll ();
	}
	
	public synchronized void reset () {
		i_isFinishedWriting = false;
		i_isFinishedReading = false;
		i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
		i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
	}
}

'theBiasPlanet/coreUtilities/constantsGroups/GeneralConstantsConstantsGroup.java'

@Java ソースコード
package theBiasPlanet.coreUtilities.constantsGroups;

~

public interface GeneralConstantsConstantsGroup {
	~
	int c_iterationStartNumber = 0;
	~
}

'theBiasPlanet/coreUtilities/inputsHandling/NoMoreDataException.java'

@Java ソースコード
package theBiasPlanet.coreUtilities.inputsHandling;

public class NoMoreDataException extends Exception {
	public NoMoreDataException (String a_message) {
		super (a_message);
	}
}

'theBiasPlanet/coreUtilities/inputsHandling/NoMoreNeedsException.java'

@Java ソースコード
package theBiasPlanet.coreUtilities.inputsHandling;

public class NoMoreNeedsException extends Exception {
	public NoMoreNeedsException (String a_message) {
		super (a_message);
	}
}

'theBiasPlanet/coreUtilities/messagingHandling/Publisher.java'は、全体を省略する、使用されているメソッドはただログを書くだけだから。

'theBiasPlanet/coreUtilities/timersHandling/TimeOutException.java'

@Java ソースコード
package theBiasPlanet.coreUtilities.timersHandling;

public class TimeOutException extends Exception {
	public TimeOutException (String a_message) {
		super (a_message);
	}
}

注意しておくが、Javaのジェネリックスタイプパラメータはプリミティブデータタイプであることを許されないという制約のため、当オブジェクト群パイプは、'Objects'パイプである、だが、勿論、任意のプリミティブデータタイプのラッパークラスを使用することができる。

コンストラクタは、バッファサイズおよび、書き込み側(複数あるかもしれない)を起こすアルゴリズムの選択を受け取る。

'i_objects'がバッファだ。

'i_dataStartIndex'および'i_dataUntilIndex'は、値保持エリアの開始インデックスおよび終了インデックス(このインデックス位置はエリアに含まれない)であり、バッファが空の時は、それらは、それぞれ、'0'および'0'である。

コンストラクタを除く全てのパブリックメソッドは、'this'オブジェクトに対してシンクロナイズされる。

'readWithoutLocking (long a_timeOutPeriodInMilliseconds)'メソッドはループを持っているが、それは、複数の読み込み側があるかもしれないことに基づいている: 空のバッファに対して待機していたある読み込み側は、起こされて、書かれたオブジェクト群が既に別の読み込み側によってさらわれていることを発見し、もう一度待つことを余儀なくされるかもしれない(スレッドシンクロナイゼイションに関するそうした注意点は、次記事にて論じられる)。

同様に、'writeWithoutLocking (T a_object, long a_timeOutPeriodInMilliseconds)'メソッドはループを持っている、複数の書き込み側があるかもしれないことに基づいて。

'writeWithoutLocking (T a_object, long a_timeOutPeriodInMilliseconds)'メソッドの、パイプが読み込み終了と宣言された際の振る舞いは、'readWithoutLocking (long a_timeOutPeriodInMilliseconds)'メソッドの、パイプが書き込み終了と宣言された際の振る舞いとはいくらか異なっているが、その理由は、書き込み側は、もう全然書く必要がないが、読み込み側は、バッファに既に格納されているオブジェクト群を読み尽くしたいだろうことだ。

本コードは強固にテストしたわけではないこと(今のところ)を述べておく、いくつかのシンプルなケースに自ら既に使用してはいるものの(その1つが、次セクションにて示される)。


5: 1つの使用例と1つの実行結果


Hypothesizer 7
以下は、本オブジェクト群パイプを使用するサンプルプログラムだ。

@Java ソースコード
import theBiasPlanet.coreUtilities.pipes.ObjectsPipe;
import theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException;
import theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException;

~
	~
	
	public static void prepareIntegers (ObjectsPipe <Integer> a_writer) throws Exception {
		for (int l_iterationIndex = 0; l_iterationIndex < 512; l_iterationIndex ++) {
			try {
				a_writer.write (Integer.valueOf (l_iterationIndex));
			}
			catch (NoMoreNeedsException l_exception) {
				break;
			}
			System.out.println (String.format ("### written: %d", l_iterationIndex));
			System.out.flush ();
		}
	}
	
	public static void processIntegers (ObjectsPipe <Integer> a_reader) throws Exception {
		Integer l_integer = null;
		int l_numberOfMultipleOf10s = 0;
		while (true) {
			try {
				l_integer = a_reader.read ();
			}
			catch (NoMoreDataException l_exception) {
				break;
			}
			System.out.println (String.format ("### read: %d", l_integer.intValue ()));
			System.out.flush ();
			if (l_integer.intValue () % 10 == 0) {
				l_numberOfMultipleOf10s ++;
				System.out.println (String.format ("### a multiple of 10s is found."));
				System.out.flush ();
			}
		}
		System.out.println (String.format ("### the number of multiple of 10s is %d.", l_numberOfMultipleOf10s));
		System.out.flush ();
	}
	
	public static void test2 () throws Exception {
		ObjectsPipe <Integer> l_integersPipe = new ObjectsPipe <Integer> (16, true);
		Thread l_subThread = new Thread (() -> {
			try {
				prepareIntegers (l_integersPipe);
			}
			catch (Exception l_exception) {
				l_exception.printStackTrace ();
			}
			finally {
				try {
					l_integersPipe.finishWriting ();
				}
				catch (Exception l_exception) {
					l_exception.printStackTrace ();
				}
			}
		});
		l_subThread.start ();
		processIntegers (l_integersPipe);
		l_subThread.join ();
	}
~

以下は、1つのアウトプットだ、私の、シングルコアCPU Linuxコンピュータ(それがアウトプットに影響しているはず)における。

@出力
### written: 0
### written: 1
### written: 2
### written: 3
### written: 4
### written: 5
### written: 6
### written: 7
### written: 8
### written: 9
### written: 10
### written: 11
### written: 12
### written: 13
### written: 14
### read: 0
### a multiple of 10s is found.
### read: 1
### read: 2
### read: 3
### read: 4
### read: 5
### read: 6
### written: 15
### written: 16
### written: 17
### read: 7
### read: 8
### written: 18
### written: 19
### written: 20
### written: 21
### read: 9
### read: 10
### a multiple of 10s is found.
### written: 22
### written: 23
### read: 11
### read: 12
### read: 13
### read: 14
### written: 24
### written: 25
### written: 26
### read: 15
~
### written: 496
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### read: 486
### read: 487
### read: 488
### read: 489
### read: 490
### a multiple of 10s is found.
### read: 491
### read: 492
### read: 493
### read: 494
### read: 495
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### written: 503
### written: 504
### written: 505
### written: 506
### written: 507
### written: 508
### written: 509
### written: 510
### written: 511
### read: 502
### read: 503
### read: 504
### read: 505
### read: 506
### read: 507
### read: 508
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.

. . .えーと、何が起こったかというと、読み込み側が最初に行動を起こしたが待たされた、なぜなら、パイプが空だったから; 書き込み側が満杯まで書いて読み込み側を起こした; 読み込み側が読み始めた; と続く、私の推測では。. . .注意しておくと、'15'は、読み込み側が起こされる前に書かれたはずだ、"### written: 15"は、遅れて表示されているが: 'write'メソッドが完了した直後に、コントロールが読み込みスレッドへ移り、メッセージ表示は、コントロールが書き込みスレッドへ戻るまで遅らせられた。

通知タイミングモードが'false'にセットされた場合、以下がアウトプットだ。

@出力
### read: 0
### a multiple of 10s is found.
### written: 0
### written: 1
### written: 2
### written: 3
### read: 1
### read: 2
### read: 3
### read: 4
### written: 4
### written: 5
### written: 6
### written: 7
### read: 5
### read: 6
### read: 7
### read: 8
### written: 8
### written: 9
### written: 10
### written: 11
### read: 9
### written: 12
### written: 13
### read: 10
### a multiple of 10s is found.
### read: 11
### read: 12
### written: 14
### written: 15
### written: 16
### written: 17
### read: 13
### read: 14
### read: 15
~
### written: 496
### read: 481
### read: 482
### read: 483
### read: 484
### read: 485
### read: 486
### read: 487
### read: 488
### read: 489
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### written: 503
### written: 504
### written: 505
### written: 506
### read: 490
### a multiple of 10s is found.
### read: 491
### read: 492
### read: 493
### read: 494
### read: 495
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### read: 502
### written: 507
### written: 508
### written: 509
### written: 510
### written: 511
### read: 503
### read: 504
### read: 505
### read: 506
### read: 507
### read: 508
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.

. . .読み込み側が最初に行動を起こしたが待たせられた、なぜなら、パイプが空だったから; 書き込み側が'0'を書いて読み込み側を起こした; 読み込み側が読み始めた; と続く、私の推測では。. . .この場合も、"### written: 0"は、遅れて表示された。

とにかく、バッファ中の少数のスロットのみが使用される傾向にあるようだ。

2つのモードにおけるかかった時間を計測した(時間を多く消費するメッセージ出力を取り除いて)(モード毎に5回)ところ、時間は、ナノ秒で、それぞれ、{'123,607,654'、'113,359,567'、'129,854,341'、'87,619,575'、'99,663,616': 平均 -> '110,820,951'} および {'108,841,378'、'154,165,815'、'140,925,679'、'111,987,546'、'113,260,213': 平均 -> '125,836,126'}だった。その差違は有意なのか?. . .統計的に評価はしていない。


6: 結びとその先


Hypothesizer 7
これで、私は、オブジェクト群パイプを持つことになった。

オブジェクト群パイプの恩恵は、いくら多くのオブジェクト群が処理されてもメモリスペースが使い果たされることがないことだ。事実、上記に例として挙げたプログラムは、繰り返し回数をいくら大きくしても、バッファオーバーフローを起こさない。

私は、本Java版を、C++版、C#版、Python版へ移植する、以降のいくつかの記事にて。


参考資料


<このシリーズの前の記事 | このシリーズの目次 | このシリーズの次の記事>