using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; using Renci.SshNet.Abstractions; using Renci.SshNet.Common; using Renci.SshNet.Sftp; using System; using System.Diagnostics; using System.Threading; using BufferedRead = Renci.SshNet.Sftp.SftpFileReader.BufferedRead; namespace Renci.SshNet.Tests.Classes.Sftp { /// /// Runs a reader with max. 2 pending reads. /// The read-ahead of chunk1 starts followed by the read-ahead of chunk2. /// The read-ahead of chunk1 completes successfully and the resulting chunk is read. /// The read of this first chunk allows a third ahead-head to start. /// The second read-ahead uses signals to forcefully block a failure completion until the read /// ahead of the third chunk has completed and the semaphore is waiting for a slot to start /// the read-ahead of chunk4. /// The second read does not consume check3 as it is out of order, but instead waits for /// the outcome of the read-ahead of chunk2. /// /// The completion with exception of chunk2 causes the second read to throw that same exception, and /// signals the semaphore that was waiting to start the read-ahead of chunk4. However, due to the fact /// that chunk2 completed with an exception, the read-ahead loop is stopped. /// [TestClass] public class SftpFileReaderTest_ReadAheadEndInvokeException_DiscardsFurtherReadAheads : SftpFileReaderTestBase { private const int ChunkLength = 32 * 1024; private MockSequence _seq; private byte[] _handle; private int _fileSize; private WaitHandle[] _waitHandleArray; private int _operationTimeout; private SftpCloseAsyncResult _closeAsyncResult; private byte[] _chunk1; private byte[] _chunk3; private ManualResetEvent _readAheadChunk2Completed; private ManualResetEvent _readAheadChunk3Completed; private ManualResetEvent _waitingForSemaphoreAfterCompletingChunk3; private SftpFileReader _reader; private SshException _exception; private SshException _actualException; protected override void SetupData() { var random = new Random(); _handle = CreateByteArray(random, 5); _chunk1 = CreateByteArray(random, ChunkLength); _chunk3 = CreateByteArray(random, ChunkLength); _fileSize = 4 * ChunkLength; _waitHandleArray = new WaitHandle[2]; _operationTimeout = random.Next(10000, 20000); _closeAsyncResult = new SftpCloseAsyncResult(null, null); _readAheadChunk2Completed = new ManualResetEvent(false); _readAheadChunk3Completed = new ManualResetEvent(false); _waitingForSemaphoreAfterCompletingChunk3 = new ManualResetEvent(false); _exception = new SshException(); } protected override void SetupMocks() { _seq = new MockSequence(); SftpSessionMock.InSequence(_seq) .Setup(p => p.CreateWaitHandleArray(It.IsNotNull(), It.IsNotNull())) .Returns((disposingWaitHandle, semaphoreAvailableWaitHandle) => { _waitHandleArray[0] = disposingWaitHandle; _waitHandleArray[1] = semaphoreAvailableWaitHandle; return _waitHandleArray; }); SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout); SftpSessionMock.InSequence(_seq) .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout)) .Returns(() => WaitAny(_waitHandleArray, _operationTimeout)); SftpSessionMock.InSequence(_seq) .Setup(p => p.BeginRead(_handle, 0, ChunkLength, It.IsNotNull(), It.IsAny())) .Callback((handle, offset, length, callback, state) => { var asyncResult = new SftpReadAsyncResult(callback, state); asyncResult.SetAsCompleted(_chunk1, false); }) .Returns((SftpReadAsyncResult)null); SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout); SftpSessionMock.InSequence(_seq) .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout)) .Returns(() => WaitAny(_waitHandleArray, _operationTimeout)); SftpSessionMock.InSequence(_seq) .Setup(p => p.BeginRead(_handle, ChunkLength, ChunkLength, It.IsNotNull(), It.IsAny())) .Callback((handle, offset, length, callback, state) => { ThreadAbstraction.ExecuteThread(() => { // wait until the read-ahead for chunk3 has completed; this should allow // the read-ahead of chunk4 to start _readAheadChunk3Completed.WaitOne(TimeSpan.FromSeconds(3)); // wait until the semaphore wait to start with chunk4 has started _waitingForSemaphoreAfterCompletingChunk3.WaitOne(TimeSpan.FromSeconds(7)); // complete async read of chunk2 with exception var asyncResult = new SftpReadAsyncResult(callback, state); asyncResult.SetAsCompleted(_exception, false); // signal that read-ahead of chunk 2 has completed _readAheadChunk2Completed.Set(); }); }) .Returns((SftpReadAsyncResult)null); SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout); SftpSessionMock.InSequence(_seq) .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout)) .Returns(() => WaitAny(_waitHandleArray, _operationTimeout)); SftpSessionMock.InSequence(_seq) .Setup(p => p.BeginRead(_handle, 2 * ChunkLength, ChunkLength, It.IsNotNull(), It.IsAny())) .Callback((handle, offset, length, callback, state) => { var asyncResult = new SftpReadAsyncResult(callback, state); asyncResult.SetAsCompleted(_chunk3, false); // signal that we've completed the read-ahead for chunk3 _readAheadChunk3Completed.Set(); }) .Returns((SftpReadAsyncResult)null); SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout); SftpSessionMock.InSequence(_seq) .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout)) .Callback(() => _waitingForSemaphoreAfterCompletingChunk3.Set()) .Returns(() => WaitAny(_waitHandleArray, _operationTimeout)); } protected override void Arrange() { base.Arrange(); _reader = new SftpFileReader(_handle, SftpSessionMock.Object, ChunkLength, 2, _fileSize); } protected override void Act() { _reader.Read(); try { _reader.Read(); Assert.Fail(); } catch (SshException ex) { _actualException = ex; } } [TestMethod] public void ReadOfSecondChunkShouldThrowExceptionThatOccurredInReadAhead() { Assert.IsNotNull(_actualException); Assert.AreSame(_exception, _actualException); } [TestMethod] public void ReahAheadOfChunk3ShouldHaveStarted() { SftpSessionMock.Verify(p => p.BeginRead(_handle, 2 * ChunkLength, ChunkLength, It.IsNotNull(), It.IsAny()), Times.Once); } [TestMethod] public void ReadAfterReadAheadExceptionShouldRethrowExceptionThatOccurredInReadAhead() { try { _reader.Read(); Assert.Fail(); } catch (SshException ex) { Assert.AreSame(_exception, ex); } } [TestMethod] public void WaitAnyOFSftpSessionShouldHaveBeenInvokedFourTimes() { SftpSessionMock.Verify(p => p.WaitAny(_waitHandleArray, _operationTimeout), Times.Exactly(4)); } [TestMethod] public void DisposeShouldCloseHandleAndCompleteImmediately() { SftpSessionMock.InSequence(_seq).Setup(p => p.IsOpen).Returns(true); SftpSessionMock.InSequence(_seq).Setup(p => p.BeginClose(_handle, null, null)).Returns(_closeAsyncResult); SftpSessionMock.InSequence(_seq).Setup(p => p.EndClose(_closeAsyncResult)); var stopwatch = Stopwatch.StartNew(); _reader.Dispose(); stopwatch.Stop(); Assert.IsTrue(stopwatch.ElapsedMilliseconds < 200, "Dispose took too long to complete: " + stopwatch.ElapsedMilliseconds); SftpSessionMock.Verify(p => p.IsOpen, Times.Once); SftpSessionMock.Verify(p => p.BeginClose(_handle, null, null), Times.Once); SftpSessionMock.Verify(p => p.EndClose(_closeAsyncResult), Times.Once); } } }