批处理之家's Archiver

523066680 发表于 2019-3-7 19:41

[Perl]闭包与回调函数 - 多线程下载并显示各自进度

[i=s] 本帖最后由 523066680 于 2019-3-7 20:32 编辑 [/i]

我们知道有些函数允许通过传递 “函数引用(指针)” 的形式,注册 “回调函数”。
某些事件循环(如timer, idle)、递归模型通过这种方式将数据传出,并转移部分控制权(由你决定怎么处理数据),回调函数执行完后交还控制权。

举个例子,Lwp::UserAgent 下载网络文件,如果要显示下载的详细进度,就可以回调函数实现:[code]use LWP::UserAgent;
my $url = "http://mirrors.163.com/cpan/authors/id/S/SR/SREZIC/Tk-804.034.tar.gz";
# 全局变量/buffer
our $buffer = "";
my $ua = LWP::UserAgent->new( timeout => 5 );
my $res = $ua->get($url, ':content_cb' => \&detail );

sub detail {
    our $buffer;
    my ( $data, $res ) = @_;
    my $total = $res->content_length();
    $buffer .= $data;
    my $recv = length($buffer);
    printf "Progress %.2f%% \n", $recv/$total*100.0;
}
[/code]其中 $buffer 是全局变量,用来积累每一步回调时取得的数据。

回调函数有个限制:传参是固定的、也不能返回值。换而言之,函数体是你写的,规矩却不是你定的。
临时的办法是用全局变量($buffer)流通参数以外的数据。

现在增加一下需求,[b]多线程下载不同文件,并且持续显示每个线程对应的下载进度。[/b]
单个文件的 $buffer 可以使用全局变量,但若多个文件同时下载怎么区分?受不同线程调用的回调函数,如何知道自己属于哪一个线程?
通过闭包函数可以实现。
一个极简的示例:[code]my $ret = closure(1, 2);
print $ret->();
sub closure {
    my ($foo, $bar) = @_;
    return sub { $foo+$bar }
}
[/code]closure 函数接受参数$foo和$bar,并返回一个匿名函数引用,同时 $foo $bar 的值确实传递到了子函数内部。
print $ret->() 输出结果为 3
闭包也可以实现类似C语言中 static 做的事情 —— 匿名函数作用域以外、闭包作用域以内的变量值得到保留,用于积累数据。

具体实现:[code]=info
    523066680/vicyang
    2018-01
=cut

use Modern::Perl;
use File::Slurp;
use File::Basename;
use threads;
use threads::shared;
use LWP::UserAgent;
use Time::HiRes qw/sleep/;
use Term::ReadKey;
STDOUT->autoflush(1);

our @ths;
our @files :shared;
our @progress :shared;
@progress = (0, 0);

@files = (
    "http://mirrors.163.com/cpan/authors/id/S/SR/SREZIC/Tk-804.034.tar.gz",
    "http://mirrors.163.com/cpan/authors/id/J/JC/JCRISTY/PerlMagick-6.89-1.tar.gz"
    );

#创建线程
grep { push @ths, threads->create( \&thread, $_ ) } ( 0 .. 1 );

#等待运行结束
while ( threads->list(threads::running) ) {
    printf "[1] %5.2f    [2] %5.2f\n", @progress if ( $progress[0]+$progress[1] > 0.0 );
    sleep 0.2;
}
printf "[1] %5.2f    [2] %5.2f\n", @progress;

#线程分离/结束
grep { $_->detach() } threads->list(threads::all);
print "Press Any Key to Continue ... ";
ReadKey -1;

sub thread
{
    our @mission;
    my $idx = shift;
    my $url = $files[$idx];
    my $ua = LWP::UserAgent->new( timeout => 5, keep_alive=>1 );

    printf "[%d] %s\n", $idx+1, basename($url) ;
    my $res = $ua->get($url, ':content_cb' => closure( $idx, basename($url) ) );
}

sub closure
{
    our (@progress);
    my ($id, $file) = @_ ;
    my ($total, $part, $recv);
    my $buffer = "";
    $recv = 0;

    return sub
    {
        my ($data, $res ) = @_;
        $total = $res->content_length();
        $part = length($data);
        $buffer .= $data;
        $recv += $part;
        
        $progress[$id] = $recv/$total*100.0;
        if ( $recv == $total ) {
            write_file( $file, {binmode=>":raw", err_mode => 'carp' }, $buffer ) or die;
        }
    }
}
[/code]$id 是线程编号, $file 是对应文件名,$buffer是积累缓冲区。
运行时每隔0.2秒显示一次线程1、2的下载进度。因为要同时显示进度,而不是交替输出,所以将各自的进度保存到全局变量 @progress,通过线程ID辨别。[code][1] Tk-804.034.tar.gz
[2] PerlMagick-6.89-1.tar.gz
[1]  1.62    [2]  2.48
[1] 11.01    [2] 16.07
[1] 20.66    [2] 27.61
[1] 28.89    [2] 39.65
[1] 38.85    [2] 51.89
[1] 48.56    [2] 64.08
[1] 55.43    [2] 72.42
[1] 64.09    [2] 84.12
[1] 71.90    [2] 94.62
[1] 79.64    [2] 100.00
[1] 89.31    [2] 100.00
[1] 100.00    [2] 100.00
Press Any Key to Continue ... [Finished in 2.9s][/code]

页: [1]

Powered by Discuz! Archiver 7.2  © 2001-2009 Comsenz Inc.