package MogileFS::Worker::Replicate;
# replicates files around
use strict;
use base 'MogileFS::Worker';
use fields (
'fidtodo', # hashref { fid => 1 }
'peerrepl', # hashref { fid => time() } # when peer started replicating
'rebal_pol_obj', # rebalancer policy object
);
use List::Util ();
use MogileFS::Util qw(error every debug);
use MogileFS::Class;
use MogileFS::RebalancePolicy::DrainDevices;
use MogileFS::ReplicationRequest qw(rr_upgrade);
# setup the value used in a 'nexttry' field to indicate that this item will never
# actually be tried again and require some sort of manual intervention.
use constant ENDOFTIME => 2147483647;
# { fid => lastcheck }; instructs us not to replicate this fid... we will clear
# out fids from this list that are expired
my %fidfailure;
# { fid => 1 }; used to keep track of fids we find in the unreachable_fids table
my %unreachable;
sub end_of_time { ENDOFTIME; }
sub new {
my ($class, $psock) = @_;
my $self = fields::new($class);
$self->SUPER::new($psock);
$self->{fidtodo} = {};
$self->{peerrepl} = {};
return $self;
}
sub process_line {
my ($self, $lineref) = @_;
if ($$lineref =~ /^repl_was_done (\d+)/) {
delete $self->{fidtodo}{$1};
return 1;
}
if ($$lineref =~ /^repl_starting (\d+)/) {
my $fidid = $1;
$self->note_peer_replicating($fidid);
return 1;
}
if ($$lineref =~ /^repl_unreachable (\d+)/) {
$unreachable{$1} = 1;
return 1;
}
# telnet to main port and do:
# !to replicate repl_compat {0,1}
# to change it in realtime, without restarting.
if ($$lineref =~ /^repl_compat (\d+)/) {
MogileFS::Config->set_config("old_repl_compat", $1);
return 1;
}
return 0;
}
# replicator wants
sub watchdog_timeout { 30; }
sub work {
my $self = shift;
# give the monitor job 15 seconds to give us an update
my $warn_after = time() + 15;
every(2.0, sub {
$self->parent_ping;
# replication doesn't go well if the monitor job hasn't actively started
# marking things as being available
unless ($self->monitor_has_run) {
error("waiting for monitor job to complete a cycle before beginning replication")
if time() > $warn_after;
return;
}
$self->validate_dbh;
my $dbh = $self->get_dbh or return 0;
# update our unreachable fid list... we consider them good for 15 minutes
# FIXME: uh, what is this even used for nowadays? it made more sense in mogilefs 1.x,
# so maybe we kinda need it for compatibility? but maybe we could ditch it here and
# instead use the file_to_replicate table's row sat ENDOFTIME as meaning the same?
my $urfids = $dbh->selectall_arrayref('SELECT fid, lastupdate FROM unreachable_fids');
die $dbh->errstr if $dbh->err;
foreach my $r (@{$urfids || []}) {
my $nv = $r->[1] + 900;
unless ($fidfailure{$r->[0]} && $fidfailure{$r->[0]} < $nv) {
# given that we might have set it below to a time past the unreachable
# 15 minute timeout, we want to only overwrite %fidfailure's idea of
# the expiration time if we are extending it
$fidfailure{$r->[0]} = $nv;
}
$unreachable{$r->[0]} = 1;
}
my $idle = 1;
# this finds stuff to replicate based on its record in the needs_replication table
$idle = 0 if $self->replicate_using_torepl_table;
# this finds stuff to replicate based on the devcounts. (old style)
if (MogileFS::Config->config("old_repl_compat")) {
$idle = 0 if $self->replicate_using_devcounts;
}
# if replicators are otherwise idle, use them to make the world
# better, rebalancing things (if enabled), and draining devices (if
# any are marked drain)
if ($idle) {
$self->rebalance_devices;
$self->drain_devices;
}
});
}
use constant REPLFETCH_LIMIT => 1000;
# return 1 if we did something (or tried to do something), return 0 if
# there was nothing to be done.
sub replicate_using_torepl_table {
my $self = shift;
# find some fids to replicate, prioritize based on when they should be tried
my $sto = Mgd::get_store();
my @to_repl = $sto->files_to_replicate(REPLFETCH_LIMIT)
or return 0;
# get random list of hashref of things to do:
@to_repl = List::Util::shuffle(@to_repl);
# sort our priority list in terms of 0s (immediate, only 1 copy), 1s (immediate replicate,
# but we already have 2 copies), and big numbers (unixtimestamps) of things that failed.
# but because sort is stable, these are random within their 0/1/big classes.
@to_repl = sort {
($a->{nexttry} < 1000 || $b->{nexttry} < 1000) ? ($a->{nexttry} <=> $b->{nexttry}) : 0
} @to_repl;
foreach my $todo (@to_repl) {
my $fid = $todo->{fid};
next if $self->peer_is_replicating($fid);
my $errcode;
my %opts;
$opts{errref} = \$errcode;
$opts{no_unlock} = 1; # to make it return an $unlock subref
$opts{source_devid} = $todo->{fromdevid} if $todo->{fromdevid};
my ($status, $unlock) = replicate($fid, %opts);
if ($status) {
# $status is either 0 (failure, handled below), 1 (success, we actually
# replicated this file), or 2 (success, but someone else replicated it).
# when $staus eq "lost_race", this delete is unnecessary normally
# (somebody else presumably already deleted it if they
# also replicated it), but in the case of running with old
# replicators from previous versions, -or- simply if the
# other guy's delete failed, this cleans it up....
$sto->delete_fid_from_file_to_replicate($fid);
$unlock->() if $unlock;
next;
}
debug("Replication of fid=$fid failed with errcode=$errcode") if $Mgd::DEBUG >= 2;
# ERROR CASES:
# README: please keep this up to date if you update the replicate() function so we ensure
# that this code always does the right thing
#
# -- HARMLESS --
# failed_getting_lock => harmless. skip. somebody else probably doing.
#
# -- TEMPORARY; DO EXPONENTIAL BACKOFF --
# source_down => only source available is observed down.
# policy_error_doing_failed => policy plugin fucked up. it's looping.
# policy_error_already_there => policy plugin fucked up. it's dumb.
# policy_no_suggestions => no copy was attempted. policy is just not happy.
# copy_error => policy said to do 1+ things, we failed, it ran out of suggestions.
#
# -- FATAL; DON'T TRY AGAIN --
# no_source => it simply exists nowhere. not that something's down, but file_on is empty.
# bail if we failed getting the lock, that means someone else probably
# already did it, so we should just move on
if ($errcode eq 'failed_getting_lock') {
$unlock->() if $unlock;
next;
}
# logic for setting the next try time appropriately
my $update_nexttry = sub {
my ($type, $delay) = @_;
my $sto = Mgd::get_store();
if ($type eq 'end_of_time') {
# special; update to a time that won't happen again,
# as we've encountered a scenario in which case we're
# really hosed
$sto->reschedule_file_to_replicate_absolute($fid, ENDOFTIME);
} elsif ($type eq "offset") {
$sto->reschedule_file_to_replicate_relative($fid, $delay+0);
} else {
$sto->reschedule_file_to_replicate_absolute($fid, $delay+0);
}
};
# now let's handle any error we want to consider a total failure; do not
# retry at any point. push this file off to the end so someone has to come
# along and figure out what went wrong.
if ($errcode eq 'no_source') {
$update_nexttry->( end_of_time => 1 );
$unlock->() if $unlock;
next;
}
# at this point, the rest of the errors require exponential backoff. define what this means
# as far as failcount -> delay to next try.
# 15s, 1m, 5m, 30m, 1h, 2h, 4h, 8h, 24h, 24h, 24h, 24h, ...
my @backoff = qw( 15 60 300 1800 3600 7200 14400 28800 );
$update_nexttry->( offset => int(($backoff[$todo->{failcount}] || 86400) * (rand(0.4) + 0.8)) );
$unlock->() if $unlock;
}
return 1;
}
sub replicate_using_devcounts {
my $self = shift;
# this code path only exists for mogilefsd 1.x compatibility and
# is not needed in a pure-MogileFS 2.x environment. and since you
# can't use non-MySQL in 1.x, it's pointless to port this code to
# the MogileFS::Store system to make it db portable. so we just
# skip here if not using MySQL.
my $sto = Mgd::get_store();
return 0 unless $sto->isa("MogileFS::Store::MySQL");
# call this $mdbh to indiciate it's a MySQL dbh, and to help grepping
# for old handles. :)
my $mdbh = $sto->dbh;
my $did_something = 0;
MogileFS::Class->foreach(sub {
my $mclass = shift;
my ($dmid, $classid, $min, $policy_class) = map { $mclass->$_ } qw(domainid classid mindevcount policy_class);
debug("Checking replication for dmid=$dmid, classid=$classid, min=$min") if $Mgd::DEBUG >= 2;
my $LIMIT = 1000;
# try going from devcount of 1 up to devcount of $min-1
$self->{fidtodo} = {};
my $fixed = 0;
my $attempted = 0;
my $devcount = 1;
while ($fixed < $LIMIT && $devcount < $min) {
my $now = time();
$self->still_alive;
my $fids = $mdbh->selectcol_arrayref("SELECT fid FROM file WHERE dmid=? AND classid=? ".
"AND devcount = ? AND length IS NOT NULL ".
"LIMIT $LIMIT", undef, $dmid, $classid, $devcount);
die $mdbh->errstr if $mdbh->err;
$self->{fidtodo}{$_} = 1 foreach @$fids;
# increase devcount so we try to replicate the files at the next devcount
$devcount++;
# see if we have any files to replicate
my $count = $fids ? scalar @$fids : 0;
debug(" found $count for dmid=$dmid/classid=$classid/min=$min") if $Mgd::DEBUG >= 2;
next unless $count;
# randomize the list so multiple daemons/threads working on
# replicate at the same time don't all fight over the
# same fids to move
my @randfids = List::Util::shuffle(@$fids);
debug("Need to replicate: $dmid/$classid: @$fids") if $Mgd::DEBUG >= 2;
foreach my $fid (@randfids) {
# now replicate this fid
$attempted++;
$did_something = 1;
next unless $self->{fidtodo}{$fid};
next if $self->peer_is_replicating($fid);
if ($fidfailure{$fid}) {
if ($fidfailure{$fid} < $now) {
delete $fidfailure{$fid};
} else {
next;
}
}
$self->read_from_parent;
$self->still_alive;
if (my $status = replicate($fid)) {
# $status is either 0 (failure, handled below), 1 (success, we actually
# replicated this file), or 2 (success, but someone else replicated it).
# so if it's 2, we just want to go to the next fid. this file is done.
next if $status eq "lost_race";
# if it was no longer reachable, mark it reachable
if (delete $unreachable{$fid}) {
$mdbh->do("DELETE FROM unreachable_fids WHERE fid = ?", undef, $fid);
die $mdbh->errstr if $mdbh->err;
}
# housekeeping
$fixed++;
$self->send_to_parent("repl_i_did $fid");
# status update
if ($Mgd::DEBUG >= 1 && $fixed % 20 == 0) {
my $ratio = $fixed/$attempted*100;
error(sprintf("replicated=$fixed, attempted=$attempted, ratio=%.2f%%", $ratio))
if $fixed % 20 == 0;
}
} else {
# failed in replicate, don't retry for a minute
$fidfailure{$fid} = $now + 60;
}
}
}
});
return $did_something;
}
sub rebalance_devices {
my $self = shift;
my $sto = Mgd::get_store();
return 0 unless $sto->server_setting('enable_rebalance');
my $pol = $self->rebalance_policy_obj or return 0;
unless ($self->run_rebalance_policy_a_bit($pol)) {
error("disabling rebalancing due to lack of work");
MogileFS::Config->set_server_setting("enable_rebalance", 0);
}
}
sub drain_devices {
my $self = shift;
my $pol = MogileFS::RebalancePolicy::DrainDevices->instance;
my $rv = $self->run_rebalance_policy_a_bit($pol);
#error("[$$] drained = $rv\n") if $rv;
}
# returns number of files rebalanced.
sub run_rebalance_policy_a_bit {
my ($self, $pol) = @_;
my $stop_at = time() + 5; # Run for up to 5 seconds, then return.
my $n = 0;
my %avoid_devids = map { $_->id => 1 } $pol->dest_devs_to_avoid;
while (my $dfid = $pol->devfid_to_rebalance) {
$self->rebalance_devfid($dfid, avoid_devids => \%avoid_devids);
$n++;
last if time() >= $stop_at;
}
return $n;
}
sub rebalance_policy_obj {
my $self = shift;
my $rclass = Mgd::get_store()->server_setting('rebalance_policy') ||
"MogileFS::RebalancePolicy::PercentFree";
# return old one, if it's still of the same type.
if ($self->{'rebal_pol_obj'} && ref($self->{'rebal_pol_obj'}) eq $rclass) {
return $self->{'rebal_pol_obj'};
}
return error("Bogus rebalance_policy setting") unless $rclass =~ /^[\w:\-]+$/;
return error("Failed to load $rclass: $@") unless eval "use $rclass; 1;";
my $pol = eval { $rclass->new };
return error("Failed to instantiate rebalance policy: $@") unless $pol;
return $self->{'rebal_pol_obj'} = $pol;
}
# Return 1 on success, 0 on failure.
sub rebalance_devfid {
my ($self, $devfid, %opts) = @_;
MogileFS::Util::okay_args(\%opts, qw(avoid_devids));
my $fid = $devfid->fid;
# bail out early if this FID is no longer in the namespace (weird
# case where file is in file_on because not yet deleted, but
# has been replaced/deleted in 'file' table...). not too harmful
# (just nosiy) if thise line didn't exist, but whatever... it
# makes stuff cleaner on my intentionally-corrupted-for-fsck-testing
# dev machine...
return 1 if ! $fid->exists;
my ($ret, $unlock) = replicate($fid,
mask_devids => { $devfid->devid => 1 },
no_unlock => 1,
avoid_devids => $opts{avoid_devids},
);
my $fail = sub {
my $error = shift;
$unlock->();
error("Rebalance for $devfid (" . $devfid->url . ") failed: $error");
return 0;
};
unless ($ret) {
return $fail->("Replication failed");
}
my $should_delete = 0;
my $del_reason;
if ($ret eq "lost_race") {
# for some reason, we did no work. that could be because
# either 1) we lost the race, as the error code implies,
# and some other process rebalanced this first, or 2)
# the file is over-replicated, and everybody just thinks they
# lost the race because the replication policy said there's
# nothing to do, even with this devfid masked away.
# so let's figure it out... if this devfid still exists,
# we're overreplicated, else we just lost the race.
if ($devfid->exists) {
# over-replicated
# see if some copy, besides this one we want
# to delete, is currently alive & of right size..
# just as extra paranoid check before we delete it
foreach my $test_df ($fid->devfids) {
next if $test_df->devid == $devfid->devid;
if ($test_df->size_matches) {
$should_delete = 1;
$del_reason = "over_replicated";
last;
}
}
} else {
# lost race
$should_delete = 0; # no-op
}
} else {
$should_delete = 1;
$del_reason = "did_rebalance;ret=$ret";
}
if ($should_delete) {
eval { $devfid->destroy };
if ($@) {
return $fail->("HTTP delete (due to '$del_reason') failed: $@");
}
}
$unlock->();
return 1;
}
# replicates $fid to make sure it meets its class' replicate policy.
#
# README: if you update this sub to return a new error code, please update the
# appropriate callers to know how to deal with the errors returned.
#
# returns either:
# $rv
# ($rv, $unlock_sub) -- when 'no_unlock' %opt is used. subref to release lock.
# $rv is one of:
# 0 = failure (failure written to ${$opts{errref}})
# 1 = success
# "lost_race" = skipping, we did no work and policy was already met.
# "nofid" => fid no longer exists. skip replication.
sub replicate {
my ($fid, %opts) = @_;
$fid = MogileFS::FID->new($fid) unless ref $fid;
my $fidid = $fid->id;
debug("Replication for $fidid called, opts=".join(',',keys(%opts))) if $Mgd::DEBUG >= 2;
my $errref = delete $opts{'errref'};
my $no_unlock = delete $opts{'no_unlock'};
my $sdevid = delete $opts{'source_devid'};
my $mask_devids = delete $opts{'mask_devids'} || {};
my $avoid_devids = delete $opts{'avoid_devids'} || {};
die "unknown_opts" if %opts;
die unless ref $mask_devids eq "HASH";
# bool: if source was explicitly requested by caller
my $fixed_source = $sdevid ? 1 : 0;
my $sto = Mgd::get_store();
my $unlock = sub {
$sto->note_done_replicating($fidid);
};
my $retunlock = sub {
my $rv = shift;
my ($errmsg, $errcode);
if (@_ == 2) {
($errcode, $errmsg) = @_;
$errmsg = "$errcode: $errmsg"; # include code with message
} else {
($errmsg) = @_;
}
$$errref = $errcode if $errref;
my $ret;
if ($errcode && $errcode eq "failed_getting_lock") {
# don't emit a warning with error() on lock failure. not
# a big deal, don't scare people.
$ret = 0;
} else {
$ret = $rv ? $rv : error($errmsg);
}
if ($no_unlock) {
die "ERROR: must be called in list context w/ no_unlock" unless wantarray;
return ($ret, $unlock);
} else {
die "ERROR: must not be called in list context w/o no_unlock" if wantarray;
$unlock->();
return $ret;
}
};
# hashref of devid -> MogileFS::Device
my $devs = MogileFS::Device->map
or die "No device map";
return $retunlock->(0, "failed_getting_lock", "Unable to obtain lock for fid $fidid")
unless $sto->should_begin_replicating_fidid($fidid);
MogileFS::Worker->send_to_parent("repl_starting $fidid");
# if the fid doesn't even exist, consider our job done! no point
# replicating file contents of a file no longer in the namespace.
return $retunlock->("nofid") unless $fid->exists;
my $cls = $fid->class;
my $policy_class = $cls->policy_class;
eval "use $policy_class; 1;";
if ($@) {
return error("Failed to load policy class: $policy_class: $@");
}
# learn what this devices file is already on
my @on_devs; # all devices fid is on, reachable or not.
my @on_devs_tellpol; # subset of @on_devs, to tell the policy class about
my @on_up_devid; # subset of @on_devs: just devs that are readable
foreach my $devid ($fid->devids) {
my $d = MogileFS::Device->of_devid($devid)
or next;
push @on_devs, $d;
if ($d->dstate->should_have_files && ! $mask_devids->{$devid}) {
push @on_devs_tellpol, $d;
}
if ($d->dstate->can_read_from) {
push @on_up_devid, $devid;
}
}
return $retunlock->(0, "no_source", "Source is no longer available replicating $fidid") if @on_devs == 0;
return $retunlock->(0, "source_down", "No alive devices available replicating $fidid") if @on_up_devid == 0;
# if they requested a specific source, that source must be up.
if ($sdevid && ! grep { $_ == $sdevid} @on_up_devid) {
return $retunlock->(0, "source_down", "Requested replication source device $sdevid not available");
}
my %dest_failed; # devid -> 1 for each devid we were asked to copy to, but failed.
my %source_failed; # devid -> 1 for each devid we had problems reading from.
my $got_copy_request = 0; # true once replication policy asks us to move something somewhere
my $copy_err;
my $rr; # MogileFS::ReplicationRequest
while (1) {
$rr = rr_upgrade($policy_class->replicate_to(
fid => $fidid,
on_devs => \@on_devs_tellpol, # all device objects fid is on, dead or otherwise
all_devs => $devs,
failed => \%dest_failed,
min => $cls->mindevcount,
));
last if $rr->is_happy;
my @ddevs; # dest devs, in order of preferrence
my $ddevid; # dest devid we've chosen to copy to
if (@ddevs = $rr->copy_to_one_of_ideally) {
if (my @not_masked_ids = (grep { ! $mask_devids->{$_} &&
! $avoid_devids->{$_}
}
map { $_->id } @ddevs)) {
$ddevid = $not_masked_ids[0];
} else {
# once we masked devids away, there were no
# ideal suggestions. this is the case of rebalancing,
# which without this check could 'worsen' the state
# of the world. consider the case:
# h1[ d1 d2 ] h2[ d3 ]
# and files are on d1 & d3, an ideal layout.
# if d3 is being rebalanced, and masked away, the
# replication policy could presumably say to put
# the file on d2, even though d3 isn't dead.
# so instead, when masking is in effect, we don't
# use non-ideal placement, just bailing out.
# saying we lost a race is a bit of a lie.. but eh.
return $retunlock->("lost_race");
}
} elsif (@ddevs = $rr->copy_to_one_of_desperate) {
# TODO: reschedule a replication for 'n' minutes in future, or
# when new hosts/devices become available or change state
$ddevid = $ddevs[0]->id;
} else {
last;
}
$got_copy_request = 1;
# replication policy shouldn't tell us to put a file on a device
# we've already told it that we've failed at. so if we get that response,
# the policy plugin is broken and we should terminate now.
if ($dest_failed{$ddevid}) {
return $retunlock->(0, "policy_error_doing_failed",
"replication policy told us to do something we already told it we failed at while replicating fid $fidid");
}
# replication policy shouldn't tell us to put a file on a
# device that it's already on. that's just stupid.
if (grep { $_->id == $ddevid } @on_devs) {
return $retunlock->(0, "policy_error_already_there",
"replication policy told us to put fid $fidid on dev $ddevid, but it's already there!");
}
# find where we're replicating from
unless ($fixed_source) {
# TODO: use an observed good device+host as source to start.
my @choices = grep { ! $source_failed{$_} } @on_up_devid;
return $retunlock->(0, "source_down", "No devices available replicating $fidid") unless @choices;
$sdevid = @choices[int(rand(scalar @choices))];
}
my $worker = MogileFS::ProcManager->is_child or die;
my $rv = http_copy(
sdevid => $sdevid,
ddevid => $ddevid,
fid => $fidid,
expected_len => undef, # FIXME: get this info to pass along
errref => \$copy_err,
callback => sub { $worker->still_alive; },
);
die "Bogus error code: $copy_err" if !$rv && $copy_err !~ /^(?:src|dest)_error$/;
unless ($rv) {
error("Failed copying fid $fidid from devid $sdevid to devid $ddevid (error type: $copy_err)");
if ($copy_err eq "src_error") {
$source_failed{$sdevid} = 1;
if ($fixed_source) {
# there can't be any more retries, as this source
# is busted and is the only one we wanted.
return $retunlock->(0, "copy_error", "error copying fid $fidid from devid $sdevid during replication");
}
} else {
$dest_failed{$ddevid} = 1;
}
next;
}
my $dfid = MogileFS::DevFID->new($ddevid, $fid);
$dfid->add_to_db;
push @on_devs, $devs->{$ddevid};
push @on_devs_tellpol, $devs->{$ddevid};
}
if ($rr->is_happy) {
return $retunlock->(1) if $got_copy_request;
return $retunlock->("lost_race"); # some other process got to it first. policy was happy immediately.
}
return $retunlock->(0, "policy_no_suggestions",
"replication policy ran out of suggestions for us replicating fid $fidid");
}
my $last_peerreplclean = 0;
sub note_peer_replicating {
my ($self, $fidid) = @_;
my $now = time();
$self->{peerrepl}{$fidid} = $now;
# every minute, clean fids in this set older than 2 minutes
if ($now > $last_peerreplclean + 60) {
$last_peerreplclean = $now;
while (my ($k, $t) = each %{$self->{peerrepl}}) {
next if $t > $now - 120;
delete $self->{peerrepl}{$k};
}
}
}
# best effort optimization, doesn't have to be perfect (for instance,
# doesn't currently know what peers on other hosts are doing, only
# peer process). just try to avoid lock contention trying to ask for
# locks on replicating same files. we say that if a file was started
# by a peer in last 60 seconds, it's still being replicated.
sub peer_is_replicating {
my ($self, $fidid) = @_;
my $t = $self->{peerrepl}{$fidid} or return 0;
my $rv = ($t > time() - 60);
return $rv;
}
# copies a file from one Perlbal to another utilizing HTTP
sub http_copy {
my %opts = @_;
my ($sdevid, $ddevid, $fid, $expected_clen, $intercopy_cb, $errref) =
map { delete $opts{$_} } qw(sdevid
ddevid
fid
expected_len
callback
errref
);
die if %opts;
$intercopy_cb ||= sub {};
# handles setting unreachable magic; $error->(reachability, "message")
my $error_unreachable = sub {
my $worker = MogileFS::ProcManager->is_child;
$worker->send_to_parent(":repl_unreachable $fid");
MogileFS::FID->new($fid)->mark_unreachable;
$$errref = "src_error" if $errref;
return error("Fid $fid unreachable while replicating: $_[0]");
};
my $dest_error = sub {
$$errref = "dest_error" if $errref;
error($_[0]);
return 0;
};
my $src_error = sub {
$$errref = "src_error" if $errref;
error($_[0]);
return 0;
};
# get some information we'll need
my $sdev = MogileFS::Device->of_devid($sdevid);
my $ddev = MogileFS::Device->of_devid($ddevid);
return error("Error: unable to get device information: source=$sdevid, destination=$ddevid, fid=$fid")
unless $sdev && $ddev && $sdev->exists && $ddev->exists;
my $s_dfid = MogileFS::DevFID->new($sdev, $fid);
my $d_dfid = MogileFS::DevFID->new($ddev, $fid);
my ($spath, $dpath) = (map { $_->uri_path } ($s_dfid, $d_dfid));
my ($shost, $dhost) = (map { $_->host } ($sdev, $ddev));
my ($shostip, $sport) = ($shost->ip, $shost->http_port);
my ($dhostip, $dport) = ($dhost->ip, $dhost->http_port);
unless (defined $spath && defined $dpath && defined $shostip && defined $dhostip && $sport && $dport) {
# show detailed information to find out what's not configured right
error("Error: unable to replicate file fid=$fid from device id $sdevid to device id $ddevid");
error(" http://$shostip:$sport$spath -> http://$dhostip:$dport$dpath");
return 0;
}
# need by webdav servers, like lighttpd...
$ddev->vivify_directories($d_dfid->url);
# setup our pipe error handler, in case we get closed on
my $pipe_closed = 0;
local $SIG{PIPE} = sub { $pipe_closed = 1; };
# okay, now get the file
my $sock = IO::Socket::INET->new(PeerAddr => $shostip, PeerPort => $sport, Timeout => 2)
or return $src_error->("Unable to create source socket to $shostip:$sport for $spath");
$sock->write("GET $spath HTTP/1.0\r\n\r\n");
return error("Pipe closed retrieving $spath from $shostip:$sport")
if $pipe_closed;
# we just want a content length
my $clen;
# FIXME: this can block. needs to timeout.
while (defined (my $line = <$sock>)) {
$line =~ s/[\s\r\n]+$//;
last unless length $line;
if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
# make sure we get a good response
return $error_unreachable->("Error: Resource http://$shostip:$sport$spath failed: HTTP $1")
unless $1 >= 200 && $1 <= 299;
}
next unless $line =~ /^Content-length:\s*(\d+)\s*$/i;
$clen = $1;
}
return $error_unreachable->("File $spath has a content-length of 0; unable to replicate")
unless $clen;
return $error_unreachable->("File $spath has unexpected content-length of $clen, not $expected_clen")
if defined $expected_clen && $clen != $expected_clen;
# open target for put
my $dsock = IO::Socket::INET->new(PeerAddr => $dhostip, PeerPort => $dport, Timeout => 2)
or return $dest_error->("Unable to create dest socket to $dhostip:$dport for $dpath");
$dsock->write("PUT $dpath HTTP/1.0\r\nContent-length: $clen\r\n\r\n")
or return $dest_error->("Unable to write data to $dpath on $dhostip:$dport");
return $dest_error->("Pipe closed during write to $dpath on $dhostip:$dport")
if $pipe_closed;
# now read data and print while we're reading.
my ($data, $written, $remain) = ('', 0, $clen);
my $bytes_to_read = 1024*1024; # read 1MB at a time until there's less than that remaining
$bytes_to_read = $remain if $remain < $bytes_to_read;
my $finished_read = 0;
while (!$pipe_closed && (my $bytes = $sock->read($data, $bytes_to_read))) {
# now we've read in $bytes bytes
$remain -= $bytes;
$bytes_to_read = $remain if $remain < $bytes_to_read;
my $wbytes = $dsock->send($data);
$written += $wbytes;
return $dest_error->("Error: wrote $wbytes; expected to write $bytes; failed putting to $dpath")
unless $wbytes == $bytes;
$intercopy_cb->();
die if $bytes_to_read < 0;
next if $bytes_to_read;
$finished_read = 1;
last;
}
return $dest_error->("closed pipe writing to destination") if $pipe_closed;
return $src_error->("error reading midway through source: $!") unless $finished_read;
# now read in the response line (should be first line)
my $line = <$dsock>;
if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
return 1 if $1 >= 200 && $1 <= 299;
return $dest_error->("Got HTTP status code $1 PUTing to http://$dhostip:$dport$dpath");
} else {
return $dest_error->("Error: HTTP response line not recognized writing to http://$dhostip:$dport$dpath: $line");
}
}
1;
# Local Variables:
# mode: perl
# c-basic-indent: 4
# indent-tabs-mode: nil
# End:
__END__
=head1 NAME
MogileFS::Worker::Replicate -- replicates files
=head1 OVERVIEW
This process replicates files enqueued in B<file_to_replicate> table.
The replication policy (which devices to replicate to) is pluggable,
but only one policy comes with the server. See
L<MogileFS::ReplicationPolicy::MultipleHosts>
=head1 SEE ALSO
L<MogileFS::Worker>
L<MogileFS::ReplicationPolicy>
L<MogileFS::ReplicationPolicy::MultipleHosts>
syntax highlighted by Code2HTML, v. 0.9.1