クローラフレームワークXangoをsolarisで動かす

POEを利用していて高速だというXango(1.08)を試してみました。環境はいつもの solaris x86 でこんなかんじ。

% uname -a
SunOS dokodemo 5.10 Generic_118844-26 i86pc i386 i86pc Solaris

まずXangoをインストールする前にPOEをインストールします。
依存パッケージのインストール途中 POE::Component::Client::HTTP の POE-0.3601/tests/30_loops/00_base/wheel_run.pm あたりでテストがこけてなんともならないので

cpan> force install POE::Component::Client::HTTP

で入れました。
Xango自体は確か問題なくインストールできた気がします。

Xangoについては YappoLogs: Xangoというクロウラーフレームワークを使ってみる グニャラくんのグニャグニャ備忘録: Xango::Brokerのperldocの和訳風超訳 という解説がありますが、バージョンが新しくなっていろいろ仕様が変わってこのままでは動かなくなっていました。
正式なドキュメント Xango::Manual::Intro – Learn How To Write Crawlers With Xango – search.cpan.org も out-of-date なのか、いまどきなperlの書き方をまったく分かってないのもあって動くようにできませんでした。

そういうわけでXangoのテストコード Xango-1.08/t/lib/XangoTest/SimplePull/Handler.pm, Xango-1.08/t/lib/XangoTest/BaseHandler.pm を参考にしつつ、見よう見まねでなんとか動くところまでこぎ着けました。ちなみに今のXangoにはクロールの動作の種類にpushとpullとありますが、書いたのはpullのほうです。

はまったのは Xango::Manual::Intro で

package MyHandler;
  sub retrieve_jobs
  {
    # of course, you probably wouldn't be connecting to a database every
    # time retrieve_jobs is called in reality (too much overhead)
    my $dbh = DBI->connect(...);

    my @jobs;
    my $sth = $dbh->prepare("SELECT ....");
    while ($sth->fetchrow_arrayref) {
      my $job = Xango::Job->new(uri => $uri, ...);
      push @jobs, $job;
    }
    return @jobs;
  }

こんなふうに書いてね、と書かれている retrieve_jobs ですが、このハンドラはキューの読み出しハンドラのようなもので、30秒ごとに呼び出されてキューからジョブをひとつ取り出すためのもののようです。テストコードの Xango-1.08/t/02-pull.t ではこんなふうに書かれていました。

sub retrieve_jobs
{
    my($kernel, $obj) = @_[KERNEL, OBJECT];

    # Retrieve one by one
    my $jobs = $obj->jobs;
    return splice(@$jobs, 0, 1);
}

これだけみてもよくわからないけど、重要なのは最後にspliceがあるということ。つまり毎回減らしていかないとだめなのです。落としてきたHTMLをパースしてそれをキューに入れたい、という時には $obj->jobs の中に入れる感じだと思われます。

あとはあんまりわかんないんだけどとりあえず切ってつないだら動いた!
というかんじでコードは以下の通り。ファイルはこちらから。 xango_runner.pl MyHandler.pm

xango_runner.pl
#!/usr/bin/perl

sub Xango::DEBUG { 1 }
use strict;
use Xango;
use Xango::Broker::Pull;
use MyHandler;

my @jobs = (
    Xango::Job->new( uri => URI->new('http://www.cpan.org') )
);

my $handler = MyHandler->spawn( jobs => \@jobs );
my $broker  = Xango::Broker::Pull->spawn(
                    Alias => 'handler',
                    HandlerAlias => 'handler',
                );

POE::Kernel->run();
MyHandler.pm
package MyHandler;
use strict;
use POE;

sub new
{
    my $class = shift;

    my $self  = bless {
    }, $class;
    $self->initialize(@_);
    return $self;
}

sub initialize
{
    my $self = shift;
    my %args = @_;

    my $k_munge;
    foreach my $k (keys %args) {
        $k_munge = $k;
        $k_munge =~ s/([a-z])([A-Z])/$1_$2/g;
        $self->{uc ($k_munge)} = $args{$k};
    }
}

sub retrieve_jobs
{
    my($kernel, $obj) = @_[KERNEL, OBJECT];
    return splice(@{$obj->{JOBS}}, 0, 1);
}

sub spawn
{
    my $class = shift;
    my $self  = $class->new(@_);

    POE::Session->create(
         $self->states,
    );

    return $self;
}

sub states
{
    my $self = shift;
    return (
            object_states => [
                $self => [ qw(_start _stop handle_response apply_policy retrieve_jobs) ]
            ]
    );

sub handle_response
{
    my($response) = @_[ARG0];
    my $filename = time;
    open F, ">$filename" or die "$!:$filename";
    print F $response->{notes}->{http_response}->as_string;
    close F;
}

sub _start { $_[KERNEL]->alias_set( 'handler' ); }
sub _stop  { $_[KERNEL]->alias_remove( 'handler' ); }
sub apply_policy { 1 }

1;

About this entry