c# – 我做错了什么,或者不可能并行提取zip文件?
发布时间:2020-12-15 03:47:05  所属栏目:百科  来源:网络整理 
            导读:我创建了这个测试一个并行提取: public static async Task ExtractToDirectoryAsync(this FileInfo file,DirectoryInfo folder) { ActionBlockZipArchiveEntry block = new ActionBlockZipArchiveEntry((entry) = { var path = Path.Combine(folder.FullNam
                
                
                
            | 
                         我创建了这个测试一个并行提取: 
  
  
  
public static async Task ExtractToDirectoryAsync(this FileInfo file,DirectoryInfo folder)
    {
        ActionBlock<ZipArchiveEntry> block = new ActionBlock<ZipArchiveEntry>((entry) =>
        {
            var path = Path.Combine(folder.FullName,entry.FullName);
            Directory.CreateDirectory(Path.GetDirectoryName(path));
            entry.ExtractToFile(path);
        },new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
        using (var archive = ZipFile.OpenRead(file.FullName))
        {
            foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
            {
                block.Post(entry);
            }
            block.Complete();
            await block.Completion;
        }
    } 
 并进行以下测试单元测试: [TestMethod]
    public async Task ExtractTestAsync()
    {
        if (Resources.LocalExtractFolder.Exists)
            Resources.LocalExtractFolder.Delete(true);
        //  Resources.LocalExtractFolder.Create();
        await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);
    } 
 使用MaxDegreeOfParallelism = 1,事情工作,但2它不. Test Name: ExtractTestAsync Test FullName: Composite.Azure.Tests.ZipFileTests.ExtractTestAsync Test Source: c:DevelopmentC1localCompositeC1Composite.Azure.TestsZipFileTests.cs : line 21 Test Outcome: Failed Test Duration: 0:00:02.4138753 Result Message: Test method Composite.Azure.Tests.ZipFileTests.ExtractTestAsync threw exception: System.IO.InvalidDataException: Unknown block type. Stream might be corrupted. Result StackTrace: at System.IO.Compression.Inflater.Decode() at System.IO.Compression.Inflater.Inflate(Byte[] bytes,Int32 offset,Int32 length) at System.IO.Compression.DeflateStream.Read(Byte[] array,Int32 count) at System.IO.Stream.InternalCopyTo(Stream destination,Int32 bufferSize) at System.IO.Stream.CopyTo(Stream destination) at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source,String destinationFileName,Boolean overwrite) at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source,String destinationFileName) at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<>c__DisplayClass6.<ExtractToDirectoryAsync>b__3(ZipArchiveEntry entry) in c:DevelopmentC1localCompositeC1Composite.Azure.StorageCompressionZipArchiveExtensions.cs:line 37 at System.Threading.Tasks.Dataflow.ActionBlock`1.ProcessMessage(Action`1 action,KeyValuePair`2 messageWithId) at System.Threading.Tasks.Dataflow.ActionBlock`1.<>c__DisplayClass5.<.ctor>b__0(KeyValuePair`2 messageWithId) at System.Threading.Tasks.Dataflow.Internal.TargetCore`1.ProcessMessagesLoopCore() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.GetResult() at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<ExtractToDirectoryAsync>d__8.MoveNext() in c:DevelopmentC1localCompositeC1Composite.Azure.StorageCompressionZipArchiveExtensions.cs:line 48 --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.GetResult() at Composite.Azure.Tests.ZipFileTests.<ExtractTestAsync>d__2.MoveNext() in c:DevelopmentC1localCompositeC1Composite.Azure.TestsZipFileTests.cs:line 25 --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.GetResult() 更新2 这是我自己去做并行,它不工作:)记住在continueWith中处理异常. public static void ExtractToDirectorySemaphore(this FileInfo file,DirectoryInfo folder)
        {
            int MaxDegreeOfParallelism = 2;
            using (var archive = ZipFile.OpenRead(file.FullName))
            {
                var semaphore = new Semaphore(MaxDegreeOfParallelism,MaxDegreeOfParallelism);
                foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
                {
                    semaphore.WaitOne();
                    var task = Task.Run(() =>
                    {
                        var path = Path.Combine(folder.FullName,entry.FullName);
                        Directory.CreateDirectory(Path.GetDirectoryName(path));
                        entry.ExtractToFile(path);
                    });
                    task.ContinueWith(handle =>
                    {
                        try
                        {
                            //do any cleanup/post processing
                        }
                        finally
                        {
                            // Release the semaphore so the next thing can be processed
                            semaphore.Release();
                        }
                    });
                }
                while(MaxDegreeOfParallelism-->0)
                    semaphore.WaitOne(); //Wait here until the last task completes.
            }
        } 
 这里是异步版本: public static Task ExtractToDirectorySemaphoreAsync(this FileInfo file,DirectoryInfo folder)
        {
            return Task.Factory.StartNew(() =>
            {
                int MaxDegreeOfParallelism = 50;
                using (var archive = ZipFile.OpenRead(file.FullName))
                {
                    var semaphore = new Semaphore(MaxDegreeOfParallelism,MaxDegreeOfParallelism);
                    foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
                    {
                        semaphore.WaitOne();
                        var task = Task.Run(() =>
                        {
                            var path = Path.Combine(folder.FullName,entry.FullName);
                            Directory.CreateDirectory(Path.GetDirectoryName(path));
                            entry.ExtractToFile(path);
                        });
                        task.ContinueWith(handle =>
                        {
                            try
                            {
                                //do any cleanup/post processing
                            }
                            finally
                            {
                                // Release the semaphore so the next thing can be processed
                                semaphore.Release();
                            }
                        },TaskContinuationOptions.AttachedToParent); // the outher task will wait for all.
                    }
                }
            });
        } 
 更新3 在handle.Exception中抛出以下异常. {"Block length does not match with its complement."}  
[0] = {"A local file header is corrupt."} 
 必须找出ZipFile是否是线程安全的. 解决方法
 拆解器:它只是一个概念证明. 
  
  
        在代码中的示例中使用ParallelZipFile.OpenRead替换ZipFile.OpenRead全部4个单位测试通过. public class ParallelZipFile
    {
        public static ParallelZipArchive OpenRead(string path)
        {
            return new ParallelZipArchive(ZipFile.OpenRead(path),path);
        }
    }
    public class ParallelZipArchive : IDisposable
    {
        internal ZipArchive _archive;
        internal string _path;
        internal ConcurrentQueue<ZipArchive> FreeReaders = new ConcurrentQueue<ZipArchive>();
        public ParallelZipArchive(ZipArchive zip,string path)
        {
            _path = path;
            _archive = zip;
            FreeReaders.Enqueue(zip);
        }
        public ReadOnlyCollection<ParallelZipArchiveEntry> Entries
        {
            get
            {
                var list = new List<ParallelZipArchiveEntry>(_archive.Entries.Count);
                int i = 0;
                foreach (var entry in _archive.Entries)
                    list.Add(new ParallelZipArchiveEntry(i++,entry,this));
                return  new ReadOnlyCollection<ParallelZipArchiveEntry>(list);
            }
        }
        public void Dispose()
        {
            foreach (var archive in FreeReaders)
                archive.Dispose();
        }
    }
    public class ParallelZipArchiveEntry
    {
        private ParallelZipArchive _parent;
        private int _entry;
        public string Name { get; set; }
        public string FullName { get; set; }
        public ParallelZipArchiveEntry(int entryNr,ZipArchiveEntry entry,ParallelZipArchive parent)
        {
            _entry = entryNr;
            _parent = parent;
            Name = entry.Name;
            FullName = entry.FullName;
        }
        public void ExtractToFile(string path)
        {
            ZipArchive value;
            Trace.TraceInformation(string.Format("Number of readers: {0}",_parent.FreeReaders.Count));
            if (!_parent.FreeReaders.TryDequeue(out value))
                value = ZipFile.OpenRead(_parent._path);
            value.Entries.Skip(_entry).First().ExtractToFile(path);
            _parent.FreeReaders.Enqueue(value);
        }
    } 
 单元测试 [TestClass]
    public class ZipFileTests
    {
        [ClassInitialize()]
        public static void PreInitialize(TestContext context)
        {
            if (Resources.LocalExtractFolderTruth.Exists)
                Resources.LocalExtractFolderTruth.Delete(true);
            ZipFile.ExtractToDirectory(Resources.WebsiteZip.FullName,Resources.LocalExtractFolderTruth.FullName);
        }
        [TestInitialize()]
        public void InitializeTests()
        {
            if (Resources.LocalExtractFolder.Exists)
                Resources.LocalExtractFolder.Delete(true);
        }
        [TestMethod]
        public void ExtractTest()
        {
            Resources.WebsiteZip.ExtractToDirectory(Resources.LocalExtractFolder);
            Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
                Resources.LocalExtractFolderTruth,Resources.LocalExtractFolder));
        }
        [TestMethod]
        public async Task ExtractAsyncTest()
        {
            await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);
            Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
               Resources.LocalExtractFolderTruth,Resources.LocalExtractFolder));
        }
        [TestMethod]
        public void ExtractSemaphoreTest()
        {
            Resources.WebsiteZip.ExtractToDirectorySemaphore(Resources.LocalExtractFolder);
            Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
               Resources.LocalExtractFolderTruth,Resources.LocalExtractFolder));
        }
        [TestMethod]
        public async Task ExtractSemaphoreAsyncTest()
        {
            await Resources.WebsiteZip.ExtractToDirectorySemaphoreAsync(Resources.LocalExtractFolder);
            Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
               Resources.LocalExtractFolderTruth,Resources.LocalExtractFolder));
        }
    }
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!  | 
                  
