1003 lines
35 KiB
Perl
1003 lines
35 KiB
Perl
#!/usr/bin/perl
|
|
|
|
=head1 NAME
|
|
|
|
Simba::CA
|
|
|
|
=head1 DESCRIPTION
|
|
|
|
Collecting Agent of the Simba backup system.
|
|
|
|
This class represents one instance of a running collecting agent.
|
|
The only user-callable methods are the constructor new and the instance
|
|
method run, which collects all the files from various disk agents.
|
|
|
|
The Simba::CA object is a hashref with the following keys:
|
|
|
|
=over
|
|
|
|
=item basedir
|
|
|
|
=item unknown_uid
|
|
|
|
=item unknown_gid
|
|
|
|
=item fh_log
|
|
|
|
=item log_level
|
|
|
|
=item dbh
|
|
|
|
=item targets
|
|
|
|
A list of entries (hashes) from table filesets.
|
|
|
|
=item ssh_id_file
|
|
|
|
=item target
|
|
|
|
=item last_backup
|
|
|
|
=item last_backup_id
|
|
|
|
=item timestamp
|
|
|
|
=item this_backup
|
|
|
|
=item session_id
|
|
|
|
=item file_pid
|
|
|
|
=item file_cfd
|
|
|
|
=item file_dfd
|
|
|
|
=item start_time
|
|
|
|
Timestamp when the backup of the current target started. Used to test when to abort due to timeout.
|
|
|
|
=back
|
|
|
|
=cut
|
|
|
|
|
|
package Simba::CA;
|
|
use strict;
|
|
use warnings;
|
|
|
|
use Encode;
|
|
use IPC::Open2;
|
|
use POSIX qw(strftime);
|
|
use Simba::Util qw(quote unquote typestr);
|
|
use Readonly;
|
|
use Digest::SHA;
|
|
use List::Util qw(min);
|
|
use IO::Handle;
|
|
use File::stat;
|
|
use Scalar::Util qw(tainted);
|
|
use DBI;
|
|
use Time::HiRes qw(gettimeofday);
|
|
|
|
Readonly my $BUFSIZE => 128 * 1024;
|
|
|
|
sub new {
|
|
my ($class, $opt) = @_;
|
|
|
|
my $self = {};
|
|
bless $self, $class;
|
|
|
|
$self->{basedir} = '/backup';
|
|
$self->{unknown_uid} = 65533;
|
|
$self->{unknown_gid} = 65533;
|
|
$self->{fh_log} = exists($opt->{fh_log}) ? $opt->{fh_log} : \*STDERR;
|
|
$self->{log_level} = 99;
|
|
$self->{record_file_id} = 0;
|
|
$self->{record_time} = 0;
|
|
$self->{parallel} = $opt->{parallel};
|
|
|
|
if ($opt->{dbi_file}) {
|
|
my $fn = $opt->{dbi_file};
|
|
open(FN, "<$fn") or die "cannot open $fn: $!";
|
|
my $line = <FN>;
|
|
close(FN);
|
|
my @cred = split(/[\s\n]+/, $line);
|
|
$self->{dbi} = \@cred;
|
|
}
|
|
|
|
$self->{dbh} = DBI->connect(@{ $self->{dbi} },
|
|
{ AutoCommit => 0,
|
|
PrintError => 1,
|
|
RaiseError => 1
|
|
}
|
|
);
|
|
$self->{instances_part_size} = 10_000_000;
|
|
$self->adjust_partitions;
|
|
|
|
# Order by ascending sort order with nulls last:
|
|
# MySQL considers NULL to be smaller than any number, so negate the numbers
|
|
# (making the largest the smallest) and then sort descending.
|
|
# Idea from https://www.designcise.com/web/tutorial/how-to-order-null-values-first-or-last-in-mysql
|
|
$self->{targets} = $self->{dbh}->selectall_arrayref("select * from filesets order by -sortorder desc", { Slice => {} });
|
|
if ($opt->{filesets}) {
|
|
$self->{targets} =
|
|
[
|
|
grep {
|
|
my $id = $_->{id};
|
|
grep { $id == $_ } @{ $opt->{filesets} }
|
|
} @{ $self->{targets} }
|
|
];
|
|
} else {
|
|
$self->{targets} =
|
|
[
|
|
grep {
|
|
$_->{active};
|
|
} @{ $self->{targets} }
|
|
];
|
|
}
|
|
if ($ENV{HOME} =~ m{([/\w]*)}) {
|
|
if (-f "$1/.ssh/id_rsa") {
|
|
if (my $st = stat("$1/.ssh/id_rsa")) {
|
|
if ($st->uid == $>) {
|
|
$self->{ssh_id_file} = "$1/.ssh/id_rsa";
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return $self;
|
|
}
|
|
|
|
|
|
sub run {
|
|
my ($self) = @_;
|
|
|
|
# run sequentially for prototype. In production we probably
|
|
# want some concurrency
|
|
if ($self->{parallel}) {
|
|
$self->{dbh}->disconnect();
|
|
my %running = ();
|
|
for my $target (@{$self->{targets}}) {
|
|
$self->log(3, "found target host " . $target->{host} . " dir " . $target->{dir});
|
|
while (scalar keys %running >= $self->{parallel}) {
|
|
$self->log(3, "reached parallel limit - waiting");
|
|
my $pid = wait();
|
|
delete $running{$pid};
|
|
$self->log(3, "child with pid $pid terminated, " . (scalar keys %running) . " remaining");
|
|
}
|
|
my $pid = fork();
|
|
if (!defined($pid)) {
|
|
die "fork failed: $!";
|
|
}
|
|
if ($pid == 0) {
|
|
$self->{dbh} = DBI->connect(@{ $self->{dbi} },
|
|
{ AutoCommit => 0,
|
|
PrintError => 1,
|
|
RaiseError => 1
|
|
}
|
|
);
|
|
$self->backup2disk($target);
|
|
$self->{dbh}->disconnect();
|
|
exit(0);
|
|
} else {
|
|
$running{$pid} = 1;
|
|
$self->log(3, "child with pid $pid started, " . (scalar keys %running) . " running");
|
|
}
|
|
sleep(10);
|
|
}
|
|
while (scalar keys %running) {
|
|
my $pid = wait();
|
|
delete $running{$pid};
|
|
$self->log(3, "child with pid $pid terminated, " . (scalar keys %running) . " remaining");
|
|
}
|
|
$self->{dbh} = DBI->connect(@{ $self->{dbi} },
|
|
{ AutoCommit => 0,
|
|
PrintError => 1,
|
|
RaiseError => 1
|
|
}
|
|
);
|
|
|
|
} else {
|
|
for my $target (@{$self->{targets}}) {
|
|
$self->backup2disk($target);
|
|
}
|
|
}
|
|
$self->export();
|
|
}
|
|
|
|
sub backup2disk {
|
|
my ($self, $target) = @_;
|
|
|
|
unless ($self->reserve_fileset($target)) {
|
|
return;
|
|
}
|
|
|
|
$self->log(3, "starting backup for target host " . $target->{host} . " dir " . $target->{dir});
|
|
$self->{target} = $target;
|
|
$self->{start_time} = time();
|
|
|
|
# get previous generation
|
|
$self->get_last_session();
|
|
|
|
my $timestamp = $self->{timestamp} || strftime('%Y-%m-%dT%H.%M.%S', localtime);
|
|
$self->{this_backup} = $self->{basedir} . "/$timestamp/" . $target->{host} . '/' . $target->{dir};
|
|
$self->new_session();
|
|
|
|
my ($list_pid, $list_cfd, $list_dfd); # connection to get list of files
|
|
$list_pid = open2($list_dfd, $list_cfd,
|
|
"/usr/bin/ssh",
|
|
"-l", "simba_da",
|
|
$self->{ssh_id_file} ? ("-i", $self->{ssh_id_file}) : (),
|
|
$target->{host}, "da");
|
|
$list_cfd->printflush("list $target->{dir}\n"); # XXX - encode!
|
|
close($list_cfd);
|
|
my $count = 0;
|
|
my $last_report = $self->{start_time};
|
|
while (<$list_dfd>) {
|
|
my $now = time();
|
|
if ($target->{timeout}) {
|
|
my $elapsed = $now - $self->{start_time};
|
|
$self->log(10, "checking timeout " . $elapsed . " > " . $target->{timeout});
|
|
if ($elapsed > $target->{timeout}) {
|
|
$self->log(3, "Timeout exceeded. $elapsed > $target->{timeout}");
|
|
last;
|
|
}
|
|
}
|
|
$count++;
|
|
chomp;
|
|
$self->log(10, "file: $_");
|
|
# split into fields
|
|
chomp;
|
|
my $f = $self->parse($_);
|
|
if ($now - $last_report >= 10) {
|
|
$self->log(9, "file $count: $f->{name}");
|
|
$last_report = $now;
|
|
}
|
|
|
|
my $success = 1;
|
|
if ($f->{t} eq 'f') {
|
|
$success = $self->store_file($f);
|
|
} elsif ($f->{t} eq 'd') {
|
|
my $d = "$self->{this_backup}/$f->{name}";
|
|
$d =~ s,//+,/,g;
|
|
mkdir_p($d) or die "cannot mkdir $d: $!"; # XXX
|
|
$self->setmeta($f);
|
|
} elsif ($f->{t} eq 'l') {
|
|
my $l = "$self->{this_backup}/$f->{name}";
|
|
unless (symlink($f->{lt}, $l)) {
|
|
die "cannot symlink $l -> $f->{lt}: $!"; # XXX
|
|
}
|
|
# $self->setmeta($f); ignore for symlinks. would need to use
|
|
# lchown, lchmod, etc.
|
|
} else {
|
|
# create local copy (or insert into DB only?)
|
|
$self->log(5, "ignored $_\n");
|
|
}
|
|
# insert into DB.
|
|
$self->db_record_version($target, $f) if ($success);
|
|
}
|
|
$self->flush_insert_instances();
|
|
$self->close_session();
|
|
$self->release_fileset($target);
|
|
$self->log(3, "finished backup for target host " . $target->{host} . " dir " . $target->{dir} . ": $count files");
|
|
$self->{counts}{objects} += $count;
|
|
|
|
$self->log(3, "statistics:");
|
|
for (sort keys %{ $self->{counts} }) {
|
|
$self->log(3, " $_: $self->{counts}{$_}");
|
|
}
|
|
for (sort keys %{ $self->{times} }) {
|
|
$self->log(3, " $_: $self->{times}{$_} s");
|
|
}
|
|
}
|
|
|
|
sub reserve_fileset {
|
|
my ($self, $target) = @_;
|
|
|
|
my $start = time();
|
|
my $pid;
|
|
while (time() - $start < 3600) {
|
|
my $rows = $self->{dbh}->do(q{update filesets set pid=? where id = ? and pid is null}, {}, $$, $target->{id});
|
|
return 1 if $rows == 1;
|
|
$pid = $self->{dbh}->selectrow_array(q{select pid from filesets where id = ?}, {}, $target->{id});
|
|
$self->log(3, "fileset $target->{id} appears to be in use by pid $pid");
|
|
if (!kill(0, $pid) && $!{ESRCH}) {
|
|
$self->log(3, "pid $pid doesn't exist, trying to release fileset $target->{id}");
|
|
$self->{dbh}->do(q{update filesets set pid=null where id = ? and pid=?}, {}, $target->{id}, $pid);
|
|
}
|
|
sleep 60;
|
|
}
|
|
$self->log(2, "fileset $target->{id} appears to be still in use by pid $pid after grace period - giving up");
|
|
return 0;
|
|
}
|
|
|
|
|
|
sub release_fileset {
|
|
my ($self, $target) = @_;
|
|
|
|
$self->log(3, "releasing fileset $target->{id}");
|
|
$self->{dbh}->do(q{update filesets set pid=null where id=? and pid=?}, {}, $target->{id}, $$);
|
|
$self->{dbh}->commit();
|
|
}
|
|
|
|
|
|
sub parse {
|
|
my ($self, $s) = @_;
|
|
|
|
my @s = split(/ +/, $s);
|
|
my $f = {};
|
|
$f->{name} = shift @s;
|
|
$f->{name} = $1 if ($f->{name} =~ /(.*)/); # detaint XXX
|
|
for (@s) {
|
|
my ($k, $v) = split(/=/, $_, 2);
|
|
$f->{$k} = $v;
|
|
# special processing for permissions etc, here?
|
|
}
|
|
$f->{o} = unquote($f->{o});
|
|
$f->{g} = unquote($f->{g});
|
|
$f->{acl} = unquote($f->{acl});
|
|
$f->{m} = $1 if $f->{m} =~ /^(\d+)$/;
|
|
$f->{lt} = unquote($1) if defined $f->{lt} && $f->{lt} =~ /(.*)/;
|
|
return $f;
|
|
|
|
}
|
|
|
|
sub present {
|
|
my ($self, $f) = @_;
|
|
return unless $self->{last_backup};
|
|
my $st = lstat("$self->{last_backup}/$f->{name}");
|
|
return unless $st;
|
|
if ($st->mtime == $f->{m} &&
|
|
$st->size == $f->{s} &&
|
|
$st->uid == $self->name2uid($f->{o}) &&
|
|
$st->gid == $self->name2gid($f->{g}) &&
|
|
($st->mode & 07777) == $self->acl2mode($f)
|
|
) {
|
|
return 1;
|
|
} else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
sub mkdir_p {
|
|
my ($dir, $perm) = @_;
|
|
$perm = 0777 unless(defined($perm));
|
|
|
|
if (-d $dir) {
|
|
return 1;
|
|
} elsif (mkdir($dir, $perm)) {
|
|
return 1;
|
|
} elsif ($!{ENOENT}) {
|
|
my $parentdir = $dir;
|
|
$parentdir =~ s|(.*)/.+|$1|;
|
|
mkdir_p($parentdir, $perm);
|
|
if (-d $dir) {
|
|
return 1;
|
|
} else {
|
|
return mkdir($dir, $perm);
|
|
}
|
|
} else {
|
|
return undef;
|
|
}
|
|
}
|
|
|
|
sub basedir {
|
|
my ($self, $dir) = @_;
|
|
$self->{basedir} = $dir if defined($dir);
|
|
return $self->{basedir};
|
|
}
|
|
|
|
sub targets {
|
|
my ($self, $targets) = @_;
|
|
$self->{targets} = $targets if defined($targets);
|
|
return $self->{targets};
|
|
}
|
|
|
|
sub add_target {
|
|
my ($self, $target) = @_;
|
|
push @{ $self->{targets} }, $target;
|
|
return $self->{targets};
|
|
}
|
|
|
|
my %permstrbits = (
|
|
'---' => 0,
|
|
'--x' => 1,
|
|
'-w-' => 2,
|
|
'-wx' => 3,
|
|
'r--' => 4,
|
|
'r-x' => 5,
|
|
'rw-' => 6,
|
|
'rwx' => 7,
|
|
);
|
|
|
|
sub setmeta {
|
|
my ($self, $f) = @_;
|
|
my $fn = "$self->{this_backup}/$f->{name}";
|
|
$self->log(3, "$fn is tainted!") if tainted($fn);
|
|
my $mode = $self->acl2mode($f);
|
|
$self->log(3, "$mode is tainted!") if tainted($mode);
|
|
chown($self->name2uid($f->{o}), $self->name2gid($f->{g}), $fn);
|
|
chmod($mode, $fn);
|
|
utime(time, $f->{m}, $fn);
|
|
}
|
|
|
|
# computes the mode from the acl (and the set[ug]id and sticky bits)
|
|
# and returns it. Optional ACL entries are currently ignored but should
|
|
# eventually be returned as a second value.
|
|
|
|
sub acl2mode {
|
|
my ($self, $f) = @_;
|
|
|
|
my $mode = 0;
|
|
if ($f->{acl}) {
|
|
for my $ace (split(',', $f->{acl})) {
|
|
if ($ace =~ /^u::(...)$/) {
|
|
$mode |= ($permstrbits{$1} << 6);
|
|
} elsif ($ace =~ /^g::(...)$/) {
|
|
$mode |= ($permstrbits{$1} << 3);
|
|
} elsif ($ace =~ /^o:(...)$/) {
|
|
$mode |= ($permstrbits{$1} << 0);
|
|
} else {
|
|
$self->log(5, "warning: unknown ACE $ace ignored");
|
|
}
|
|
}
|
|
}
|
|
if ($f->{setuid}) { $mode |= 04000 }
|
|
if ($f->{setgid}) { $mode |= 02000 }
|
|
if ($f->{sticky}) { $mode |= 01000 }
|
|
|
|
return $mode;
|
|
}
|
|
|
|
my %ucache;
|
|
sub name2uid {
|
|
my ($self, $uname) = @_;
|
|
$uname = $1 if $uname =~ /(.*)/; # detaint
|
|
return $ucache{$uname} if (defined $ucache{$uname});
|
|
if ($uname =~ /^\d+$/) {
|
|
return $ucache{$uname} = $uname;
|
|
} else {
|
|
my $uid = getpwnam($uname);
|
|
if (defined($uid)) {
|
|
return $ucache{$uname} = $uid;
|
|
} else {
|
|
return $ucache{$uname} = $self->{unknown_uid};
|
|
}
|
|
}
|
|
}
|
|
|
|
my %gcache;
|
|
sub name2gid {
|
|
my ($self, $gname) = @_;
|
|
$gname = $1 if $gname =~ /(.*)/; # detaint
|
|
return $gcache{$gname} if (defined $gcache{$gname});
|
|
if ($gname =~ /^\d+$/) {
|
|
return $gcache{$gname} = $gname;
|
|
} else {
|
|
my $gid = getgrnam($gname);
|
|
if (defined($gid)) {
|
|
return $gcache{$gname} = $gid;
|
|
} else {
|
|
return $gcache{$gname} = $self->{unknown_gid};
|
|
}
|
|
}
|
|
}
|
|
|
|
# currently used log levels:
|
|
# 0 - fatal error, backup failed
|
|
# 3 - global progress messages, like start and end of a backup, statistics.
|
|
# 5 - errors which prevent a file from being backed up
|
|
# 10 - progress messages for single files.
|
|
sub log {
|
|
my ($self, $level, $msg) = @_;
|
|
if ($level <= $self->{log_level}) {
|
|
$self->{fh_log}->print(strftime("%Y-%m-%dT%H:%M:%S%z", localtime), " $$ [$level]: $msg\n")
|
|
or die "write to log failed: $!";
|
|
}
|
|
}
|
|
|
|
sub log_level {
|
|
my ($self, $log_level) = @_;
|
|
$self->{log_level} = $log_level if defined($log_level);
|
|
return $self->{log_level};
|
|
}
|
|
|
|
sub db_record_version {
|
|
my ($self, $target, $f) = @_;
|
|
|
|
my $t0 = gettimeofday();
|
|
|
|
my $db_f = $self->{dbh}->selectall_arrayref("select * from files where fileset=? and path=?",
|
|
{ Slice => {} },
|
|
$target->{id}, $f->{name});
|
|
my $t1 = gettimeofday();
|
|
$self->{times}{db_record_version_select_files} += $t1 - $t0;
|
|
unless (@$db_f) {
|
|
$self->{dbh}->do("insert into files(fileset, path) values(?, ?)",
|
|
{},
|
|
$target->{id}, $f->{name});
|
|
$db_f = $self->{dbh}->selectall_arrayref("select * from files where fileset=? and path=?",
|
|
{ Slice => {} },
|
|
$target->{id}, $f->{name});
|
|
|
|
}
|
|
my $t2 = gettimeofday();
|
|
$self->{times}{db_record_version_insert_files} += $t2 - $t1;
|
|
if ($f->{t} eq 'f' && !defined($f->{checksum})) {
|
|
# this must be a link to the previous version
|
|
$f->{checksum} = $self->get_checksum($db_f->[0]{id});
|
|
}
|
|
my $t2a = gettimeofday();
|
|
$self->{times}{db_record_version_versions2_get_checksum} += $t2a - $t2;
|
|
|
|
my $query =
|
|
"select id from versions2
|
|
where
|
|
file_type=? and file_size=? and file_mtime=? and
|
|
file_owner=? and file_group=? and file_acl=? and
|
|
file_unix_bits=?
|
|
";
|
|
my @args = (
|
|
$f->{t}, $f->{s}, $f->{m},
|
|
$f->{o}, $f->{g}, $f->{acl},
|
|
join(',', map {$f->{$_} ? ($_) : ()} qw(setuid setgid sticky)),
|
|
);
|
|
if ($f->{t} eq 'f') {
|
|
$query .= " and checksum=?";
|
|
push @args, $f->{checksum};
|
|
} elsif ($f->{t} eq 'l') {
|
|
$query .= " and file_linktarget=?";
|
|
push @args, $f->{lt};
|
|
} elsif ($f->{t} eq 'b' || $f->{t} eq 'c') {
|
|
$query .= " and file_rdev=?";
|
|
push @args, $f->{rdev};
|
|
}
|
|
|
|
my $version_id = $self->{dbh}->selectrow_array($query, {}, @args);
|
|
my $t2b = gettimeofday();
|
|
$self->{times}{db_record_version_versions2_get_version_id} += $t2b - $t2a;
|
|
unless ($version_id) {
|
|
# We use a 32 bit int field in the database, so we have to clamp the mtime to that range.
|
|
# Need to fix this sometime before 2038 :-)
|
|
if ($f->{m} < -2147483648) {
|
|
$f->{m} = -2147483648;
|
|
} elsif ($f->{m} > 2147483647) {
|
|
$f->{m} = 2147483647;
|
|
}
|
|
$self->log(10, "insert into versions2(..., file_mtime=$f->{m}, ...)");
|
|
# XXX why is $f->{checksum} undef here for ./bin/dash?
|
|
$self->{dbh}->do("insert into versions2(
|
|
file_type, file_size, file_mtime,
|
|
file_owner, file_group, file_acl,
|
|
file_unix_bits,
|
|
file_rdev,
|
|
checksum, file_linktarget
|
|
)
|
|
values(
|
|
?, ?, ?,
|
|
?, ?, ?,
|
|
?,
|
|
?,
|
|
?, ?
|
|
)",
|
|
{},
|
|
$f->{t}, $f->{s}, $f->{m},
|
|
$f->{o}, $f->{g}, $f->{acl},
|
|
join(',', map {$f->{$_} ? ($_) : ()} qw(setuid setgid sticky)),
|
|
$f->{rdev},
|
|
$f->{checksum}, $f->{lt},
|
|
);
|
|
$version_id = $self->{dbh}->{mysql_insertid};
|
|
$self->log(10, "insert into versions2 -> $version_id");
|
|
}
|
|
my $t3 = gettimeofday();
|
|
$self->{times}{db_record_version_versions2} += $t3 - $t2;
|
|
$self->{times}{db_record_version_versions2_insert} += $t3 - $t2b;
|
|
push @{ $self->{caches}{insert_instances} },
|
|
[
|
|
$db_f->[0]{id},
|
|
$self->{record_file_id} ? $f->{id} : undef,
|
|
$self->{record_time} ? time() : undef,
|
|
1,
|
|
$self->{session_id}, $version_id
|
|
];
|
|
if (@{ $self->{caches}{insert_instances} } > 10) {
|
|
$self->flush_insert_instances();
|
|
}
|
|
|
|
my $t4 = gettimeofday();
|
|
$self->{times}{db_record_version_insert_instances} += $t4 - $t3;
|
|
$self->{times}{db_record_version} += $t4 - $t0;
|
|
}
|
|
|
|
sub get_checksum {
|
|
my ($self, $file) = @_;
|
|
if ($self->{caches}{file_checksums}{$self->{last_backup_id}}) {
|
|
return $self->{caches}{file_checksums}{$self->{last_backup_id}}{$file};
|
|
} else {
|
|
$self->{caches}{file_checksums}{$self->{last_backup_id}} = my $file_checksums = {};
|
|
my $sth = $self->{dbh}->prepare(
|
|
"select file, checksum from versions2 v, instances i
|
|
where session=? and v.id = i.version");
|
|
$sth->execute($self->{last_backup_id});
|
|
while (my ($file, $checksum) = $sth->fetchrow_array) {
|
|
$file_checksums->{$file} = $checksum;
|
|
}
|
|
|
|
return $file_checksums->{$file};
|
|
}
|
|
}
|
|
|
|
sub flush_insert_instances {
|
|
my ($self) = @_;
|
|
my $dbh = $self->{dbh};
|
|
|
|
if ($self->{caches}{insert_instances} && @{ $self->{caches}{insert_instances} }) {
|
|
my $cmd = "insert into instances(file, file_id, date, online, session, version)"
|
|
. " values "
|
|
. join(", ",
|
|
map {
|
|
"("
|
|
. join(",", map { $dbh->quote($_) } @$_)
|
|
. ")"
|
|
} @{ $self->{caches}{insert_instances} }
|
|
);
|
|
$dbh->do($cmd);
|
|
}
|
|
$dbh->commit();
|
|
$self->{caches}{insert_instances} = [];
|
|
}
|
|
|
|
|
|
sub new_session {
|
|
my ($self) = @_;
|
|
$self->{dbh}->do("insert into sessions(start_date, prefix) values(?, ?)", {}, time(), $self->{this_backup});
|
|
$self->{session_id} = $self->{dbh}->{'mysql_insertid'};
|
|
}
|
|
|
|
sub close_session {
|
|
my ($self) = @_;
|
|
$self->{dbh}->do("update sessions set end_date=? where id=?", {}, time(), $self->{session_id});
|
|
$self->{dbh}->commit();
|
|
$self->close_file_connection;
|
|
delete $self->{target};
|
|
}
|
|
|
|
sub close_file_connection {
|
|
my ($self) = @_;
|
|
if ($self->{file_pid}) {
|
|
close($self->{file_cfd});
|
|
close($self->{file_dfd});
|
|
|
|
$self->log(3, "waiting for $self->{file_pid}");
|
|
waitpid $self->{file_pid}, 0;
|
|
$self->log(3, "$self->{file_pid} terminated with status $?");
|
|
delete $self->{file_cfd};
|
|
delete $self->{file_dfd};
|
|
delete $self->{file_pid};
|
|
}
|
|
}
|
|
|
|
sub get_last_session {
|
|
my ($self) = @_;
|
|
my $sessions
|
|
= $self->{dbh}->selectall_arrayref(
|
|
"select * from sessions where end_date is not null and prefix like ? order by end_date desc",
|
|
{ Slice => {} },
|
|
$self->{basedir} . '/%/' . $self->{target}->{host} . '/' . $self->{target}->{dir}
|
|
);
|
|
$self->{last_backup} = $sessions->[0]{prefix};
|
|
$self->{last_backup_id} = $sessions->[0]{id};
|
|
$self->log(3, "last backup: $self->{last_backup} ($self->{last_backup_id})");
|
|
}
|
|
|
|
=head2 linkdup
|
|
|
|
Try to find a duplicate of the current file in the database and replace the
|
|
current file with a hardlink to it. This is useful if you
|
|
have multiple copies of a file stored in different locations.
|
|
The search starts from the newest session and continues into the past until
|
|
either linking successful or we run out of duplicates.
|
|
This is done because creating a hard link may not always be possible (duplicate is
|
|
on a different file system or has already reached the maximum link count)
|
|
and it is more likely that we can link to new copies than to old ones.
|
|
|
|
=cut
|
|
|
|
sub linkdup {
|
|
my ($self, $f, $backup_filename) = @_;
|
|
my $t0 = gettimeofday();
|
|
# XXX - this seems to be slow
|
|
# XXX - creates huge temp files. Restrict to last few sessions or at least sessions on the same device?
|
|
# XXX - that's not quite as simple: We only have the prefix, but there are many prefixes on the same
|
|
# device. We can create a list of them of them at first call, though and then pass the list
|
|
# to the query. Maybe even shorten the list. ($n newest sessions only)
|
|
# XXX - another possible optimization is to check the last few files we've written: .svn/prop-base
|
|
# normally contains a lot of identical files.
|
|
|
|
unless ($self->{sessions_on_same_device}) {
|
|
my $st = stat($backup_filename);
|
|
my $my_dev = defined $st ? $st->dev : ""; # can this happen?
|
|
my $sth = $self->{dbh}->prepare("select * from sessions order by id desc");
|
|
$sth->execute();
|
|
while (my $r = $sth->fetchrow_hashref()) {
|
|
my $st = lstat $r->{prefix};
|
|
my $dev = defined $st ? $st->dev : "";;
|
|
next unless $dev eq $my_dev;
|
|
last if $self->{sessions_on_same_device} && @{ $self->{sessions_on_same_device} } > 30;
|
|
push @{ $self->{sessions_on_same_device} }, $r->{id}; # We only use the id
|
|
}
|
|
|
|
my $self->{min_instance_id} = $self->{dbh}->selectrow_array("select min(id) from instances where session in " . join(", ", map("?", @{ $self->{sessions_on_same_device} })) . ")");
|
|
$self->log(3, "min_instance_id set to $self->{min_instance_id}");
|
|
|
|
}
|
|
my $tdb0 = gettimeofday();
|
|
my $tdb1;
|
|
my $sth = $self->{dbh}->prepare("select * from versions2, instances, files, sessions
|
|
where file_type=? and file_size=? and file_mtime=?
|
|
and file_owner=? and file_group=? and file_acl=?
|
|
and file_unix_bits=?
|
|
and checksum=? and online=1
|
|
and versions2.id=instances.version
|
|
and instances.id >= ?
|
|
and instances.file=files.id
|
|
and instances.session=sessions.id
|
|
and sessions.id in (" . join(", ", map("?", @{ $self->{sessions_on_same_device} })) . ")" .
|
|
" order by instances.session desc
|
|
");
|
|
$sth->execute(
|
|
$f->{t}, $f->{s}, $f->{m},
|
|
$f->{o}, $f->{g}, $f->{acl},
|
|
join(',', map {$f->{$_} ? ($_) : ()} qw(setuid setgid sticky)),
|
|
$f->{checksum},
|
|
$self->{min_instance_id},
|
|
@{ $self->{sessions_on_same_device} },
|
|
);
|
|
my $st = stat($backup_filename);
|
|
my $my_dev = defined $st ? $st->dev : "";
|
|
while (my $r = $sth->fetchrow_hashref()) {
|
|
unless ($tdb1) {
|
|
$tdb1 = gettimeofday();
|
|
$self->{times}{linkdup_db} += $tdb1 - $tdb0;
|
|
}
|
|
|
|
# check if old file is on same device. If it isn't, skip it.
|
|
# XXX - this should now be obsolete because we already selected only matching sessions above.
|
|
unless ($self->{prefix_device}{$r->{prefix}}) {
|
|
my $st = lstat $r->{prefix};
|
|
$self->{prefix_device}{$r->{prefix}}
|
|
= defined $st ? $st->dev : "";
|
|
}
|
|
next unless $self->{prefix_device}{$r->{prefix}} eq $my_dev;
|
|
|
|
my $oldfile = "$r->{prefix}/$r->{path}";
|
|
if (my $st = lstat($oldfile)) {
|
|
if ($st->mtime == $f->{m} &&
|
|
$st->size == $f->{s} &&
|
|
$st->uid == $self->name2uid($f->{o}) &&
|
|
$st->gid == $self->name2gid($f->{g}) &&
|
|
($st->mode & 07777) == $self->acl2mode($f)
|
|
) {
|
|
my ($dirname, $basename) = $backup_filename =~ m{(.*)/(.*)};
|
|
my $tmpname = "$basename.$$.simba_backup";
|
|
if (length($tmpname) > 255) {
|
|
$tmpname = substr($basename, 0, 235) . ".$$.simba_backup";
|
|
}
|
|
$tmpname = "$dirname/$tmpname";
|
|
rename($backup_filename, "$tmpname") or die "cannot save $backup_filename to $tmpname: $!";
|
|
if (link($oldfile, $backup_filename)) {
|
|
$self->log(10, "linked (dup)");
|
|
unlink("$tmpname") or die "cannot unlink $tmpname: $!";
|
|
$sth->finish();
|
|
my $t1 = gettimeofday();
|
|
$self->{counts}{dup2}++;
|
|
$self->{times}{linkdup} += $t1 - $t0;
|
|
return $oldfile;
|
|
} else {
|
|
$self->log(5, "cannot link $oldfile to $backup_filename");
|
|
rename("$tmpname", $backup_filename) or die "cannot restore $backup_filename from $tmpname: $!";
|
|
}
|
|
}
|
|
}
|
|
}
|
|
my $t1 = gettimeofday();
|
|
$self->{times}{linkdup} += $t1 - $t0;
|
|
unless ($tdb1) {
|
|
$tdb1 = gettimeofday();
|
|
$self->{times}{linkdup_db} += $tdb1 - $tdb0;
|
|
}
|
|
$self->{counts}{linkdup_miss}++;
|
|
return;
|
|
}
|
|
|
|
=head2 store_file
|
|
|
|
store a file in the local filesystem. If the file appears to be unchanged since
|
|
the last backup, try to create a hard link. Otherwise, get the contents of the
|
|
file from the DA, and search for a file with the same contents (i.e., checksum)
|
|
and metadata, but possibly different name and try to link to that. If no link
|
|
can be created to an existing file, create a new one.
|
|
|
|
=cut
|
|
|
|
sub store_file {
|
|
my ($self, $f) = @_;
|
|
|
|
my $success = 1;
|
|
|
|
if($self->present($f)) {
|
|
if (link("$self->{last_backup}/$f->{name}", "$self->{this_backup}/$f->{name}")) {
|
|
$self->log(10, "linked");
|
|
$self->{counts}{dup1}++;
|
|
return $success;
|
|
} else {
|
|
$self->log(5, "cannot link $self->{last_backup}/$f->{name} to $self->{this_backup}/$f->{name}: $!");
|
|
}
|
|
}
|
|
# If size is zero, check if we have seen a matching file before. If we have, link to it.
|
|
# Ubuntu contains a lot of zero sized files (about 8000 per installed kernel).
|
|
# Searching for them in the database is slow, so we special-case that here.
|
|
# We could generalize that, but I don't think that there will ever be enough identical
|
|
# non-empty files to make that worthwhile.
|
|
# XXX - not yet implemented.
|
|
if ($f->{s} == 0 && $f->{t} eq 'f') {
|
|
no warnings 'uninitialized'; # unix bits may not exist
|
|
my $k = "$f->{m} $f->{o} $f->{g} $f->{acl} $f->{setuid} $f->{setgid} $f->{sticky}";
|
|
if ($self->{null_files}{$k}) {
|
|
my $oldfile = $self->{null_files}{$k}{name};
|
|
my $backup_filename = "$self->{this_backup}/$f->{name}";
|
|
if (link($oldfile, $backup_filename)) {
|
|
$self->log(10, "linked (empty)");
|
|
$self->{counts}{dup10}++;
|
|
return $success;
|
|
}
|
|
}
|
|
}
|
|
|
|
# else request from da
|
|
unless ($self->{file_pid}) {
|
|
$self->{file_pid} = open2($self->{file_dfd}, $self->{file_cfd},
|
|
"/usr/bin/ssh",
|
|
"-l", "simba_da",
|
|
$self->{ssh_id_file} ? ("-i", $self->{ssh_id_file}) : (),
|
|
$self->{target}->{host}, "da");
|
|
}
|
|
$self->{file_cfd}->printflush("get $self->{target}->{dir}/$f->{name}\n"); # XXX - encode!
|
|
my $header = $self->{file_dfd}->getline; # this should be the same as $_ - check?
|
|
if ($header =~ /^data (.*)/) {
|
|
my $f2 = $self->parse($1);
|
|
my $backup_filename = "$self->{this_backup}/$f->{name}";
|
|
my $file_bfd;
|
|
unless (open($file_bfd, '>:raw', $backup_filename)) {
|
|
$self->log(5, "cannot open backup file $backup_filename: $!");
|
|
# XXX - There may be some errors from which we can recover, e.g., for
|
|
# "File name too long" we could just shorten the file name. But for
|
|
# now we just skip the file:
|
|
# XXX - some other errors are almost certainly fatal, e.g., ENOENT
|
|
# probably means that our backup device has been unmounted (BTDT).
|
|
$self->close_file_connection;
|
|
return 0;
|
|
}
|
|
my $size = $f2->{s};
|
|
my $err;
|
|
my $sha1 = Digest::SHA->new(1);
|
|
|
|
while ($size > 0) {
|
|
my $buffer;
|
|
my $rc = read($self->{file_dfd}, $buffer, min($size, $BUFSIZE));
|
|
if (!defined($rc)) {
|
|
# I/O error
|
|
$self->log(5, "error reading from data socket: $!");
|
|
last;
|
|
} elsif ($rc == 0) {
|
|
# premature EOF.
|
|
$self->log(5, "unexpected EOF reading from data socket");
|
|
last;
|
|
}
|
|
$file_bfd->print($buffer) or die "write to backup failed: $!";
|
|
$size -= length($buffer);
|
|
$sha1->add($buffer);
|
|
}
|
|
close($file_bfd) or die "write to backup failed: $!";
|
|
my $trailer = $self->{file_dfd}->getline; # should be empty line
|
|
$trailer = $self->{file_dfd}->getline;
|
|
if ($trailer =~ /^fail /) {
|
|
$self->log(5, $trailer);
|
|
$success = 0;
|
|
} elsif ($trailer =~ /^chk sha1 (\w+)/) {
|
|
my $checksum = $sha1->hexdigest;
|
|
if ($checksum ne $1) {
|
|
$self->log(5, "checksum error\n");
|
|
}
|
|
$f->{checksum} = $checksum;
|
|
} else {
|
|
$self->log(5, "unexpected trailer $trailer\n");
|
|
$self->close_file_connection;
|
|
$success = 0;
|
|
}
|
|
unless ($self->linkdup($f, $backup_filename)) {
|
|
$self->setmeta($f);
|
|
$self->log(10, "stored");
|
|
}
|
|
if ($f->{s} == 0 && $f->{t} eq 'f') {
|
|
no warnings 'uninitialized'; # unix bits may not exist
|
|
my $k = "$f->{m} $f->{o} $f->{g} $f->{acl} $f->{setuid} $f->{setgid} $f->{sticky}";
|
|
$self->{null_files}{$k}{name} = $backup_filename;
|
|
}
|
|
} else {
|
|
$self->log(5, "unexpected header $header\n");
|
|
$self->close_file_connection;
|
|
$success = 0;
|
|
}
|
|
return $success;
|
|
}
|
|
|
|
sub adjust_partitions {
|
|
my ($self) = @_;
|
|
my $dbh = $self->{dbh};
|
|
my $database = $dbh->selectrow_array("select database()");
|
|
my $max_id = $dbh->selectrow_array("select max(id) from instances");
|
|
my ($max_part, $limit)
|
|
= $dbh->selectrow_array(
|
|
"select partition_ordinal_position, partition_description
|
|
from information_schema.partitions
|
|
where table_schema='$database' and table_name='instances'
|
|
order by partition_ordinal_position desc
|
|
limit 1");
|
|
while ($max_id + $self->{instances_part_size} > $limit) {
|
|
$max_part++;
|
|
$limit += $self->{instances_part_size};
|
|
my $partition_name = sprintf("p%03d", $max_part);
|
|
$dbh->do("alter table instances add partition (partition $partition_name values less than ($limit))");
|
|
$self->log(3, "added partition $partition_name < $limit");
|
|
}
|
|
}
|
|
|
|
sub export {
|
|
my ($self) = @_;
|
|
my $dbh = $self->{dbh};
|
|
my $exportdir = "/var/lib/simba/export";
|
|
mkdir($exportdir);
|
|
|
|
my $sessions = $dbh->selectcol_arrayref("select id from sessions order by id");
|
|
for my $session_id (@$sessions) {
|
|
next if (-f "$exportdir/$session_id.tsv"
|
|
|| -f "$exportdir/$session_id.tsv.gz");
|
|
$self->log(3, "exporting session $session_id");
|
|
my $sth
|
|
= $dbh->prepare(
|
|
qq{select sessions.id as "sessions.id", start_date, end_date, prefix,
|
|
instances.id as "instances.id", file_id, date, online,
|
|
versions2.id as "versions2.id", file_type, file_size, file_mtime, file_owner, file_group, file_acl, file_unix_bits, file_rdev, checksum, file_linktarget,
|
|
files.id as "files.id", path,
|
|
filesets.id as "filesets.id", host, dir, options, pid
|
|
from sessions, instances, versions2, files, filesets
|
|
where sessions.id=$session_id
|
|
and sessions.id=instances.session
|
|
and instances.version=versions2.id
|
|
and instances.file=files.id
|
|
and files.fileset=filesets.id
|
|
});
|
|
$sth->execute;
|
|
open my $fh, '>', "$exportdir/$session_id.tsv.$$" or die "cannot create $exportdir/$session_id.tsv.$$: $!";
|
|
print $fh join("\t", @{$sth->{NAME}}), "\n";
|
|
while (my $r = $sth->fetchrow_arrayref) {
|
|
no warnings 'uninitialized';
|
|
print $fh join("\t", @$r), "\n" or die "cannot write to $exportdir/$session_id.tsv.$$: $!";
|
|
}
|
|
close($fh);
|
|
rename "$exportdir/$session_id.tsv.$$", "$exportdir/$session_id.tsv" or die "cannot rename $exportdir/$session_id.tsv.$$ to $exportdir/$session_id.tsv: $!";
|
|
system "/bin/gzip", "$exportdir/$session_id.tsv";
|
|
}
|
|
|
|
}
|
|
|
|
sub DESTROY {
|
|
my ($self) = @_;
|
|
$self->{dbh}->disconnect();
|
|
}
|
|
|
|
|
|
# vim: tw=0 expandtab
|
|
1;
|