Perl 多线程处理数据

现在学习Perl的人真不多了(略微有点感触),最近想了解下Perl的多线程/多进程的使用方法,在网上查下文章想学习下,结果发现都是2-5年前写的文章了。当然以”胶水”语言著称的Perl也不太适合来写一些讲究效率的软件,毕竟速度摆着那;尤其在生信领域,常规的软件还是R/Python比较多,如果是支持多线程的软件则一般也是C++/JAVA写的

整理下Perl多线程/多进程的一些个人简单的理解

  1. 当你需要对多个数据进行同步处理时,那么可以考虑用多进程/多线程
  2. 多进程拥有多个pid,独立占用内存,多线程的pid不独立
  3. 如果你只需要将各个任务独立生成结果的话,那么可以选择多进程;如果需要将各个任务的返回结果进行合并等处理的话,那么选择多线程
  4. 如果想用多进程,可以考虑使用fork方法(这里不展开了,因为我暂时也用不着。。。),多线程则使用Perl自带的threads模块
  5. 虽然网上说Perl的多线程/多进程支持并不太好,但是简单的用用还是可以的

其实这篇文章主要是接着使用Perl/sed/awk分割大文件(测序fastq文件)的后续,在分割fastq文件后,我需要用Perl(主要我其他语言多线程不会~~而且R也不太适合处理这个,除非用R包。。)来多线程处理下这几个分割后的文件,这样相比对整个文件处理会快上很多!!!顺便练练手

需要解决的问题:统计fastq文件下每个位置上(PE150测序的话,就是1-150位)的碱基质量分数Q(Base quality score)

我是用threads模块来进行多线程的,最初我的思路是将整个fastq文件读入后再分割成不同块放在数组中,然后再交给多线程处理,但是perl默认会将变量都copy到每个新线程上,这里会导致内存极大的浪费。所以后来考虑先将文件分割后,再使用多线程分别读入处理,这样内存使用率就很小了

PS.不知道为啥,在perl中调用awk来分割文件会比在shell脚本中要慢。。。所以这里我还是用perl的方法来写分割文件吧

下面是代码部分:

#!/usr/bin/perl -w
use strict;
use threads;
use threads::shared;

my $in  = shift @ARGV;

# 计算文件总行数
my $total_num = `wc -l < $in`;

# 调用线程数(也是分割子文件数目)
my $thread_counts = 4;

# 每个子文件的行数
my $size = (int($total_num / ($thread_counts * 4)) + 1)*4;

# 分割文件-awk写法
# `awk -v sz=$size 'BEGIN{i=1}{ print > FILENAME "." i ".tmp"; if (NR>=i*sz){close(FILENAME "." i ".tmp");i++}}' $in`;

# 分割文件-perl写法
my %handles;
foreach(1..$thread_counts){
    my $outfile = $in.".$_.tmp";
    open $handles{$_}, ">$outfile" or die;
}

open my $fh, $in or die;
my $number = 1;
while (<$fh>){
    chomp;
    if ($. % $size != 0){
        print {$handles{$number}} "$_\n";
    }else{
        print {$handles{$number}} "$_\n";
        $number++;
    }
}

################################################################
my @files = glob ("*.tmp");

# 开始多线程
foreach (0..$thread_counts-1){
    print "Start one thread\n";
    my $thr = threads -> create(\&get_func, $files[$_]);
}

# join各个线程的结果
my %qc;
while(threads -> list()){
    foreach my $t(threads -> list(threads::joinable)){
        my $tmp = $t -> join();
        foreach(1..150){
            $qc{$_} += ${$tmp}{$_} -> {qual};
        }
    }
}
unlink glob "*.tmp";

print $qc{150}/$total_num*4;

# 多线程调用的子函数
sub get_func {
    my ($file) = @_;
    open my $fhsub, $file or die;
    my %hash_sub;
    while (<$fhsub>){
        chomp;
        if ($. % 4 == 0){
            my @base_qual = split //, $_;
            for (my $i=0; $i<=$#base_qual; $i++){
                $hash_sub{$i+1} -> {qual} += int(ord($base_qual[$i])-33);
            }
        }
    }
    close $fhsub;
    return \%hash_sub;
}

将上述代码的多线程部分整理下,Perl多线程的简单模板一般如下:

  1. 创建线程(一般循环创建),调用子函数以及输入相关参数

    my $thr = threads -> create(\&get_func, $parameter);
    

    也可以这样:

    $threads[$i] = threads -> create(\&get_func, $parameter);
    
  2. join线程,也就是收割创建的线程

    while(threads -> list()){
        foreach my $t(threads -> list(threads::joinable)){
            my $tmp = $t -> join();
        }
    }
    

    这里的threads -> list()是为了将所有已经创建的线程列出(PS.如果在创建线程的时候就将线程都放在某个数组中的话,那么没必要这样了)

    threads -> list(threads::joinable)相当于返回已经完成的线程的,可以等待join的那种,也可以使用threads->is_joinable(),这个好处是:可以将一些已经完成的线程先join出来,没必要等待前一个线程完成了才能join后一个线程

    $t -> join()就是收割线程的结果了,如果要处理这个结果(也就是子函数的结果),记得将其赋予某个变量

  3. 如果不需要join结果,那么用threads->detach()剥离已创建的线程(记得要将没用的线程剥离掉)

  4. 默认下数据都是线程私有的,如果你想在每个线程中share某个变量,数组,散列及其引用,那么可以用use threads::shared;但是需要注意是:正确的做法是先将变量share后,再对其进行赋值处理,不然先赋值后share的话,会造成之前赋值的都没了

  5. 有其他需求的话,可以试试fork来多线程

Perl现在的网上资源真心没法跟Python比了,就这个多线程而言,Python一搜就好多资料(主要是中文哈)可以查看,但是Perl的都比较老的资料了。。。我是不是可以开始转一波Python了。。。

本文出自于http://www.bioinfo-scrounger.com转载请注明出处