#!/usr/bin/perl =head1 NAME Simba::CA =head1 DESCRIPTION Collecting Agent of the Simba backup system. This class represents one instance of a running collecting agent. The only user-callable methods are the constructor new and the instance method run, which collects all the files from various disk agents. The Simba::CA package is a hashref with the following keys: =over =item basedir =item unknown_uid =item unknown_gid =item fh_log =item log_level =item dbh =item targets =item ssh_id_file =item target =item last_backup =item last_backup_id =item timestamp =item this_backup =item session_id =item file_pid =item file_cfd =item file_dfd =back =cut package Simba::CA; use strict; use warnings; use Encode; use IPC::Open2; use POSIX qw(strftime); use Simba::Util qw(quote unquote typestr); use Readonly; use Digest::SHA1; use List::Util qw(min); use IO::Handle; use File::stat; use Scalar::Util qw(tainted); use DBI; use Time::HiRes qw(gettimeofday); Readonly my $BUFSIZE => 128 * 1024; sub new { my ($class, $opt) = @_; my $self = {}; bless $self, $class; $self->{basedir} = '/backup'; $self->{unknown_uid} = 65533; $self->{unknown_gid} = 65533; $self->{fh_log} = exists($opt->{fh_log}) ? $opt->{fh_log} : \*STDERR; $self->{log_level} = 99; if ($opt->{dbi}) { $self->{dbh} = DBI->connect(@{ $opt->{dbi} }, { AutoCommit => 0, PrintError => 1, RaiseError => 1 } ); } elsif ($opt->{dbi_file}) { my $fn = $opt->{dbi_file}; open(FN, "<$fn") or die "cannot open $fn: $!"; my $line = ; close(FN); my @cred = split(/[\s\n]+/, $line); $self->{dbh} = DBI->connect(@cred, { AutoCommit => 0, PrintError => 1, RaiseError => 1 } ); } $self->{targets} = $self->{dbh}->selectall_arrayref("select * from filesets", { Slice => {} }); if ($opt->{filesets}) { $self->{targets} = [ grep { my $id = $_->{id}; grep { $id == $_ } @{ $opt->{filesets} } } @{ $self->{targets} } ]; } if ($ENV{HOME} =~ m{([/\w]*)}) { if (-f "$1/.ssh/id_rsa") { if (my $st = stat("$1/.ssh/id_rsa")) { if ($st->uid == $>) { $self->{ssh_id_file} = "$1/.ssh/id_rsa"; } } } } return $self; } sub run { my ($self) = @_; # run sequentially for prototype. In production we probably # want some concurrency for my $target (@{$self->{targets}}) { $self->backup2disk($target); } $self->log(3, "statistics:"); for (sort keys %{ $self->{counts} }) { $self->log(3, " $_: $self->{counts}{$_}"); } for (sort keys %{ $self->{times} }) { $self->log(3, " $_: $self->{times}{$_} s"); } } sub backup2disk { my ($self, $target) = @_; $self->log(3, "starting backup for target host " . $target->{host} . " dir " . $target->{dir}); $self->{target} = $target; # get previous generation $self->get_last_session(); my $timestamp = $self->{timestamp} || strftime('%Y-%m-%dT%H.%M.%S', localtime); $self->{this_backup} = $self->{basedir} . "/$timestamp/" . $target->{host} . '/' . $target->{dir}; $self->new_session(); my ($list_pid, $list_cfd, $list_dfd); # connection to get list of files $list_pid = open2($list_dfd, $list_cfd, "/usr/bin/ssh", "-l", "simba_da", $self->{ssh_id_file} ? ("-i", $self->{ssh_id_file}) : (), $target->{host}, "da"); $list_cfd->printflush("list $target->{dir}\n"); # XXX - encode! close($list_cfd); my $count = 0; while (<$list_dfd>) { $count++; chomp; $self->log(10, "file: $_"); # split into fields chomp; my $f = $self->parse($_); if ($count % 1000 == 0) { $self->log(9, "file $count: $f->{name}"); } my $success = 1; if ($f->{t} eq 'f') { $success = $self->store_file($f); } elsif ($f->{t} eq 'd') { my $d = "$self->{this_backup}/$f->{name}"; $d =~ s,//+,/,g; mkdir_p($d) or die "cannot mkdir $d: $!"; # XXX $self->setmeta($f); } elsif ($f->{t} eq 'l') { my $l = "$self->{this_backup}/$f->{name}"; unless (symlink($f->{lt}, $l)) { die "cannot symlink $l -> $f->{lt}: $!"; # XXX } # $self->setmeta($f); ignore for symlinks. would need to use # lchown, lchmod, etc. } else { # create local copy (or insert into DB only?) $self->log(5, "ignored $_\n"); } # insert into DB. $self->db_record_version($target, $f) if ($success); } $self->flush_insert_instances(); $self->close_session(); $self->log(3, "finished backup for target host " . $target->{host} . " dir " . $target->{dir} . ": $count files"); $self->{counts}{objects} += $count; } sub parse { my ($self, $s) = @_; my @s = split(/ +/, $s); my $f = {}; $f->{name} = shift @s; $f->{name} = $1 if ($f->{name} =~ /(.*)/); # detaint XXX for (@s) { my ($k, $v) = split(/=/, $_, 2); $f->{$k} = $v; # special processing for permissions etc, here? } $f->{o} = unquote($f->{o}); $f->{g} = unquote($f->{g}); $f->{acl} = unquote($f->{acl}); $f->{m} = $1 if $f->{m} =~ /^(\d+)$/; $f->{lt} = unquote($1) if defined $f->{lt} && $f->{lt} =~ /(.*)/; return $f; } sub present { my ($self, $f) = @_; return unless $self->{last_backup}; my $st = lstat("$self->{last_backup}/$f->{name}"); return unless $st; if ($st->mtime == $f->{m} && $st->size == $f->{s} && $st->uid == $self->name2uid($f->{o}) && $st->gid == $self->name2gid($f->{g}) && ($st->mode & 07777) == $self->acl2mode($f) ) { return 1; } else { return 0; } } sub mkdir_p { my ($dir, $perm) = @_; $perm = 0777 unless(defined($perm)); if (-d $dir) { return 1; } elsif (mkdir($dir, $perm)) { return 1; } elsif ($!{ENOENT}) { my $parentdir = $dir; $parentdir =~ s|(.*)/.+|$1|; mkdir_p($parentdir, $perm); if (-d $dir) { return 1; } else { return mkdir($dir, $perm); } } else { return undef; } } sub basedir { my ($self, $dir) = @_; $self->{basedir} = $dir if defined($dir); return $self->{basedir}; } sub targets { my ($self, $targets) = @_; $self->{targets} = $targets if defined($targets); return $self->{targets}; } sub add_target { my ($self, $target) = @_; push @{ $self->{targets} }, $target; return $self->{targets}; } my %permstrbits = ( '---' => 0, '--x' => 1, '-w-' => 2, '-wx' => 3, 'r--' => 4, 'r-x' => 5, 'rw-' => 6, 'rwx' => 7, ); sub setmeta { my ($self, $f) = @_; my $fn = "$self->{this_backup}/$f->{name}"; $self->log(3, "$fn is tainted!") if tainted($fn); my $mode = $self->acl2mode($f); $self->log(3, "$mode is tainted!") if tainted($mode); chown($self->name2uid($f->{o}), $self->name2gid($f->{g}), $fn); chmod($mode, $fn); utime(time, $f->{m}, $fn); } # computes the mode from the acl (and the set[ug]id and sticky bits) # and returns it. Optional ACL entries are currently ignored but should # eventually be returned as a second value. sub acl2mode { my ($self, $f) = @_; my $mode = 0; if ($f->{acl}) { for my $ace (split(',', $f->{acl})) { if ($ace =~ /^u::(...)$/) { $mode |= ($permstrbits{$1} << 6); } elsif ($ace =~ /^g::(...)$/) { $mode |= ($permstrbits{$1} << 3); } elsif ($ace =~ /^o:(...)$/) { $mode |= ($permstrbits{$1} << 0); } else { $self->log(5, "warning: unknown ACE $ace ignored"); } } } if ($f->{setuid}) { $mode |= 04000 } if ($f->{setgid}) { $mode |= 02000 } if ($f->{sticky}) { $mode |= 01000 } return $mode; } my %ucache; sub name2uid { my ($self, $uname) = @_; $uname = $1 if $uname =~ /(.*)/; # detaint return $ucache{$uname} if (defined $ucache{$uname}); if ($uname =~ /^\d+$/) { return $ucache{$uname} = $uname; } else { my $uid = getpwnam($uname); if (defined($uid)) { return $ucache{$uname} = $uid; } else { return $ucache{$uname} = $self->{unknown_uid}; } } } my %gcache; sub name2gid { my ($self, $gname) = @_; $gname = $1 if $gname =~ /(.*)/; # detaint return $gcache{$gname} if (defined $gcache{$gname}); if ($gname =~ /^\d+$/) { return $gcache{$gname} = $gname; } else { my $gid = getgrnam($gname); if (defined($gid)) { return $gcache{$gname} = $gid; } else { return $gcache{$gname} = $self->{unknown_gid}; } } } # currently used log levels: # 0 - fatal error, backup failed # 3 - global progress messages, like start and end of a backup, statistics. # 5 - errors which prevent a file from being backed up # 10 - progress messages for single files. sub log { my ($self, $level, $msg) = @_; if ($level <= $self->{log_level}) { $self->{fh_log}->print(strftime("%Y-%m-%dT%H:%M:%S%z", localtime), " $$ [$level]: $msg\n") or die "write to log failed: $!"; } } sub log_level { my ($self, $log_level) = @_; $self->{log_level} = $log_level if defined($log_level); return $self->{log_level}; } sub db_record_version { my ($self, $target, $f) = @_; my $t0 = gettimeofday(); my $db_f = $self->{dbh}->selectall_arrayref("select * from files where fileset=? and path=?", { Slice => {} }, $target->{id}, $f->{name}); my $t1 = gettimeofday(); $self->{times}{db_record_version_select_files} += $t1 - $t0; unless (@$db_f) { $self->{dbh}->do("insert into files(fileset, path) values(?, ?)", {}, $target->{id}, $f->{name}); $db_f = $self->{dbh}->selectall_arrayref("select * from files where fileset=? and path=?", { Slice => {} }, $target->{id}, $f->{name}); } my $t2 = gettimeofday(); $self->{times}{db_record_version_insert_files} += $t2 - $t1; if ($f->{t} eq 'f' && !defined($f->{checksum})) { # this must be a link to the previous version my $db_pv = $self->{dbh}->selectall_arrayref( "select * from versions2 v, instances i where file=? and session=? and v.id = i.version", { Slice => {} }, $db_f->[0]{id}, $self->{last_backup_id} ); $f->{checksum} = $db_pv->[0]{checksum}; } my $query = "select id from versions2 where file_type=? and file_size=? and file_mtime=? and file_owner=? and file_group=? and file_acl=? and file_unix_bits=? "; my @args = ( $f->{t}, $f->{s}, $f->{m}, $f->{o}, $f->{g}, $f->{acl}, join(',', map {$f->{$_} ? ($_) : ()} qw(setuid setgid sticky)), ); if ($f->{t} eq 'f') { $query .= " and checksum=?"; push @args, $f->{checksum}; } elsif ($f->{t} eq 'l') { $query .= " and file_linktarget=?"; push @args, $f->{lt}; } elsif ($f->{t} eq 'b' || $f->{t} eq 'c') { $query .= " and file_rdev=?"; push @args, $f->{rdev}; } my $version_id = $self->{dbh}->selectrow_array($query, {}, @args); unless ($version_id) { # XXX why is $f->{checksum} undef here for ./bin/dash? $self->{dbh}->do("insert into versions2( file_type, file_size, file_mtime, file_owner, file_group, file_acl, file_unix_bits, file_rdev, checksum, file_linktarget ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )", {}, $f->{t}, $f->{s}, $f->{m}, $f->{o}, $f->{g}, $f->{acl}, join(',', map {$f->{$_} ? ($_) : ()} qw(setuid setgid sticky)), $f->{rdev}, $f->{checksum}, $f->{lt}, ); $version_id = $self->{dbh}->{mysql_insertid}; } my $t3 = gettimeofday(); $self->{times}{db_record_version_versions2} += $t3 - $t2; push @{ $self->{caches}{insert_instances} }, [ $db_f->[0]{id}, $f->{id}, time(), 1, $self->{session_id}, $version_id ]; if (@{ $self->{caches}{insert_instances} } > 10) { $self->flush_insert_instances(); } my $t4 = gettimeofday(); $self->{times}{db_record_version_insert_instances} += $t4 - $t3; $self->{times}{db_record_version} += $t4 - $t0; } sub flush_insert_instances { my ($self) = @_; my $dbh = $self->{dbh}; if (@{ $self->{caches}{insert_instances} }) { my $cmd = "insert into instances(file, file_id, date, online, session, version)" . " values " . join(", ", map { "(" . join(",", map { $dbh->quote($_) } @$_) . ")" } @{ $self->{caches}{insert_instances} } ); $dbh->do($cmd); } $self->{caches}{insert_instances} = []; } sub new_session { my ($self) = @_; $self->{dbh}->do("insert into sessions(start_date, prefix) values(?, ?)", {}, time(), $self->{this_backup}); $self->{session_id} = $self->{dbh}->{'mysql_insertid'}; } sub close_session { my ($self) = @_; $self->{dbh}->do("update sessions set end_date=? where id=?", {}, time(), $self->{session_id}); $self->close_file_connection; delete $self->{target}; } sub close_file_connection { my ($self) = @_; if ($self->{file_pid}) { close($self->{file_cfd}); close($self->{file_dfd}); $self->log(3, "waiting for $self->{file_pid}"); waitpid $self->{file_pid}, 0; $self->log(3, "$self->{file_pid} terminated with status $?"); delete $self->{file_cfd}; delete $self->{file_dfd}; delete $self->{file_pid}; } } sub get_last_session { my ($self) = @_; my $sessions = $self->{dbh}->selectall_arrayref( "select * from sessions where end_date is not null and prefix like ? order by end_date desc", { Slice => {} }, $self->{basedir} . '/%/' . $self->{target}->{host} . '/' . $self->{target}->{dir} ); $self->{last_backup} = $sessions->[0]{prefix}; $self->{last_backup_id} = $sessions->[0]{id}; } =head2 linkdup Try to find a duplicate of the current file in the database and replace the current file with a hardlink to it. This is useful if you have multiple copies of a file stored in different locations. The search starts from the newest session and continues into the past until either linking successful or we run out of duplicates. This is done because creating a hard link may not always be possible (duplicate is on a different file system or has already reached the maximum link count) and it is more likely that we can link to new copies than to old ones. =cut sub linkdup { my ($self, $f, $backup_filename) = @_; my $t0 = gettimeofday(); # XXX - this seems to be slow my $sth = $self->{dbh}->prepare("select * from versions2, instances, files, sessions where file_type=? and file_size=? and file_mtime=? and file_owner=? and file_group=? and file_acl=? and file_unix_bits=? and checksum=? and online=1 and instances.file=files.id and versions2.id=instances.version and instances.session=sessions.id order by instances.session desc "); $sth->execute( $f->{t}, $f->{s}, $f->{m}, $f->{o}, $f->{g}, $f->{acl}, join(',', map {$f->{$_} ? ($_) : ()} qw(setuid setgid sticky)), $f->{checksum} ); my $st = stat($backup_filename); my $my_dev = defined $st ? $st->dev : ""; while (my $r = $sth->fetchrow_hashref()) { # check if old file is on same device. If it isn't, skip it. unless ($self->{prefix_device}{$r->{prefix}}) { my $st = lstat $r->{prefix}; $self->{prefix_device}{$r->{prefix}} = defined $st ? $st->dev : ""; } next unless $self->{prefix_device}{$r->{prefix}} eq $my_dev; my $oldfile = "$r->{prefix}/$r->{path}"; if (my $st = lstat($oldfile)) { if ($st->mtime == $f->{m} && $st->size == $f->{s} && $st->uid == $self->name2uid($f->{o}) && $st->gid == $self->name2gid($f->{g}) && ($st->mode & 07777) == $self->acl2mode($f) ) { rename($backup_filename, "$backup_filename.$$.simba_backup") or die "cannot save $backup_filename to $backup_filename.$$.simba_backup: $!"; if (link($oldfile, $backup_filename)) { $self->log(10, "linked (dup)"); unlink("$backup_filename.$$.simba_backup") or die "cannot unlink $backup_filename.$$.simba_backup: $!"; $sth->finish(); my $t1 = gettimeofday(); $self->{counts}{dup2}++; $self->{times}{linkdup} += $t1 - $t0; return $oldfile; } else { $self->log(5, "cannot link $oldfile to $backup_filename"); rename("$backup_filename.$$.simba_backup", $backup_filename) or die "cannot restore $backup_filename from $backup_filename.$$.simba_backup: $!"; } } } } my $t1 = gettimeofday(); $self->{times}{linkdup} += $t1 - $t0; return; } =head2 store_file store a file in the local filesystem. If the file appears to be unchanged since the last backup, try to create a hard link. Otherwise, get the contents of the file from the DA, and search for a file with the same contents (i.e., checksum) and metadata, but possibly different name and try to link to that. If no link can be created to an existing file, create a new one. =cut sub store_file { my ($self, $f) = @_; my $success = 1; if($self->present($f)) { if (link("$self->{last_backup}/$f->{name}", "$self->{this_backup}/$f->{name}")) { $self->log(10, "linked"); $self->{counts}{dup1}++; return $success; } else { $self->log(5, "cannot link $self->{last_backup}/$f->{name} to $self->{this_backup}/$f->{name}: $!"); } } # else request from da unless ($self->{file_pid}) { $self->{file_pid} = open2($self->{file_dfd}, $self->{file_cfd}, "/usr/bin/ssh", "-l", "simba_da", $self->{ssh_id_file} ? ("-i", $self->{ssh_id_file}) : (), $self->{target}->{host}, "da"); } $self->{file_cfd}->printflush("get $self->{target}->{dir}/$f->{name}\n"); # XXX - encode! my $header = $self->{file_dfd}->getline; # this should be the same as $_ - check? if ($header =~ /^data (.*)/) { my $f2 = $self->parse($1); my $backup_filename = "$self->{this_backup}/$f->{name}"; my $file_bfd; unless (open($file_bfd, '>:raw', $backup_filename)) { $self->log(5, "cannot open backup file $backup_filename: $!"); # XXX - There may be some errors from which we can recover, e.g., for # "File name too long" we could just shorten the file name. But for # now we just skip the file: # XXX - some other errors are almost certainly fatal, e.g., ENOENT # probably means that our backup device has been unmounted (BTDT). $self->close_file_connection; return 0; } my $size = $f2->{s}; my $err; my $sha1 = Digest::SHA1->new; while ($size > 0) { my $buffer; my $rc = read($self->{file_dfd}, $buffer, min($size, $BUFSIZE)); if (!defined($rc)) { # I/O error $self->log(5, "error reading from data socket: $!"); last; } elsif ($rc == 0) { # premature EOF. $self->log(5, "unexpected EOF reading from data socket"); last; } $file_bfd->print($buffer) or die "write to backup failed: $!"; $size -= length($buffer); $sha1->add($buffer); } close($file_bfd) or die "write to backup failed: $!"; my $trailer = $self->{file_dfd}->getline; # should be empty line $trailer = $self->{file_dfd}->getline; if ($trailer =~ /^fail /) { $self->log(5, $trailer); $success = 0; } elsif ($trailer =~ /^chk sha1 (\w+)/) { my $checksum = $sha1->hexdigest; if ($checksum ne $1) { $self->log(5, "checksum error\n"); } $f->{checksum} = $checksum; } else { $self->log(5, "unexpected trailer $trailer\n"); $self->close_file_connection; $success = 0; } unless ($self->linkdup($f, $backup_filename)) { $self->setmeta($f); $self->log(10, "stored"); } } else { $self->log(5, "unexpected header $header\n"); $self->close_file_connection; $success = 0; } return $success; } sub DESTROY { my ($self) = @_; $self->{dbh}->disconnect(); } # vim: tw=0 expandtab 1;