如何使用响应式快速处理文本文件 .NET?
处理文本文件是我们作为程序员都做过的事情. However, 操作大型(多gb)文件引发了一个问题:如何才能尽快完成操作, 并且不需要将整个文件加载到内存中?
文本文件通常有一个头(一到几行),后面跟着一些数据行. 处理该文件将生成另一个文本文件, 每个输入行产生一个或多个输出行.
在这篇简短的文章中,我将解释如何使用 Rx.NET library. The entire example project is available at Bitbucket. Here is how to solve this task.
首先,从URL加载文件并返回 IObservable
,每个字符串是一个单独的行. 我将其分成以下几类:
The WebLoader
类负责打开资源(本地或远程文件)并将其作为流返回. 我提取它作为一个单独的类,而不是仅仅使用 Open
method in the RemoteTextLoader
,因为我希望能够编写自包含的单元测试,而不需要访问网络.
The RemoteTextLoader
类将接受该流并将其作为string的可观察对象返回, with each string being a separate line. You can see the main code below; loader
is an instance of the WebLoader
class, and the ReadLoop
method simply returns one line at a time.
public IObservable Read(string url)
{
return Observable.Using(
() => new StreamReader(loader.Open(url)),
sr => ReadLoop(sr).ToObservable(scheduler));
}
The RemoteTextLoader.Read
method ensures that the StreamReader
它的底层流在可观察对象完成操作后关闭 Observable.Using
call. 这对于防止资源泄漏非常重要. The call to the private ReadLoop
method will run on the given scheduler; we don’t want to block the calling thread.
Finally, the WebStream
类不仅负责关闭输入流,还负责关闭web请求, also to prevent leaking resources.
Then, 在将输入文件转换为(响应的)行流之后, 该流必须被分割成几个流,每个流在不同的线程上并行处理.
The ProducerConsumerSplitter
class handles the first part; its Split
方法接受一个输入可观察对象和一个计数,并返回一个可观察对象数组.
public class ProducerConsumerSplitter : Splitter
{
public IObservable[] Split(IObservable observable, int count)
{
//如果集合增长超过(线程数* 10)项,阻塞
var collection = new BlockingCollection(count * 10);
observable.Subscribe(collection.Add, () => collection.CompleteAdding());
return Enumerable
.Range(0, count)
.Select(_ => CreateConsumer(collection))
.ToArray();
}
//
private static IObservable CreateConsumer(BlockingCollection collection)
{
return Observable.Create(o =>
{
while (!collection.IsCompleted)
{
T item;
if (collection.TryTake(out item))
o.OnNext(item);
}
o.OnCompleted();
return Disposable.Empty;
});
}
}
拆分器使用生产者-消费者模式将一个可观察对象拆分为多个. 生产者部分订阅了输入可观察对象,并开始将其项目添加到集合中, 如果项的数量大于任意选择的输出可观察对象数量的十倍的限制,则阻塞. This way, 消费者可观察对象并不缺乏需要处理的项目, 我们也不会用多gb的输入文件填满内存.
消费者只需从集合中读取项,直到 IsCompleted
returns true
,当集合为空时发生 and 没有更多的项目等待添加到它. Note that you should not use the IsAddingCompleted
method here since it gets set to true
一旦生产者完成向集合中添加项目, 即使仍有项目需要处理.
线程上的实际处理在 ParallelProcessor
类,它将加载和拆分委托给前面的类,然后调用 LineProcessor
将每个输入行转换成一个或多个输出行.
public IObservable Process(string url, int headerSize, LineProcessor lineProcessor)
{
var lines = loader.Read(url);
//需要在这里使用Publish,否则流将被枚举两次
return lines.Publish(shared =>
{
var header = shared.Take(headerSize).ToArray();
var rest = shared.Skip(headerSize);
var streams = splitter.Split(rest, ThreadCount);
//使用SubscribeOn而不是ObserveOn,因为处理在订阅时立即开始
return header
.SelectMany(h => streams
.Select(stream => ProcessLines(stream, h, lineProcessor)
.SubscribeOn(scheduler)))
.Merge();
});
}
这个类负责加载文件, 处理每一行(根据请求使用尽可能多的线程),然后将结果合并到单个可观察对象中.
最后,主程序将结果写入 output.txt
文件,还度量处理整个文件所花费的时间. 测试程序所做的处理相当简单, so no doubt the real code would be slower, 但我的机器每秒处理的行数接近100万行, which is encouraging.
Contributors
Marcel Popescu
Freelance C# Developer
Marcel是一名拥有20多年经验的高级开发人员. 他更喜欢后端开发,擅长算法,并为自己设计良好的代码而自豪. 他写了一本关于TDD的介绍性书籍,目前正在指导几个初级程序员.
Show More