404

[ Avaa Bypassed ]




Upload:

Command:

botdev@3.147.78.134: ~ $
# DistUpgradeAptCdrom.py 
#  
#  Copyright (c) 2008 Canonical
#  
#  Author: Michael Vogt <michael.vogt@ubuntu.com>
# 
#  This program is free software; you can redistribute it and/or 
#  modify it under the terms of the GNU General Public License as 
#  published by the Free Software Foundation; either version 2 of the
#  License, or (at your option) any later version.
# 
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
# 
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
#  USA

import re
import os
import apt
import apt_pkg
import logging
import gzip
import shutil
import subprocess
import sys
import tempfile
from gettext import gettext as _


class AptCdromError(Exception):
    """ base exception for apt cdrom errors """
    pass


class AptCdrom(object):
    """ represents a apt cdrom object """

    def __init__(self, view, path):
        self.view = view
        self.cdrompath = path
        # the directories we found on disk with signatures, packages and i18n
        self.packages = set()
        self.signatures = set()
        self.i18n = set()
        apt_pkg.init_config()

    def restore_backup(self, backup_ext):
        """ restore the backup copy of the cdroms.list file 
           (*not* sources.list)! 
        """
        cdromstate = os.path.join(apt_pkg.config.find_dir("Dir::State"),
                                  apt_pkg.config.find("Dir::State::cdroms"))
        if os.path.exists(cdromstate + backup_ext):
            shutil.copy(cdromstate + backup_ext, cdromstate)
        # mvo: we don't have to care about restoring the sources.list here
        #      because aptsources will do this for us anyway

    def comment_out_cdrom_entry(self):
        """ comment out the cdrom entry """
        diskname = self._readDiskName()
        pentry = self._generateSourcesListLine(diskname, self.packages)
        sourceslist = apt_pkg.config.find_file("Dir::Etc::sourcelist")
        with open(sourceslist) as f:
            content = f.read()
        content = content.replace(pentry, "# %s" % pentry)
        with open(sourceslist, "w") as f:
            f.write(content)

    def _scanCD(self):
        """ 
        scan the CD for interessting files and return them as:
        (packagesfiles, signaturefiles, i18nfiles)
        """
        packages = set()
        signatures = set()
        i18n = set()
        for root, dirs, files in os.walk(self.cdrompath, topdown=True):
            if (root.endswith("debian-installer") or 
                    root.endswith("dist-upgrader")):
                del dirs[:]
                continue
            elif ".aptignr" in files:
                continue
            elif "Packages" in files:
                packages.add(os.path.join(root, "Packages"))
            elif "Packages.gz" in files:
                packages.add(os.path.join(root, "Packages.gz"))
            elif "Sources" in files or "Sources.gz" in files:
                logging.error(
                    "Sources entry found in %s but not supported" % root)
            elif "Release.gpg" in files:
                signatures.add(os.path.join(root, "Release.gpg"))
            elif "i18n" in dirs:
                for f in os.listdir(os.path.join(root, "i18n")):
                    i18n.add(os.path.join(root, "i18n", f))
            # there is nothing under pool but deb packages (no
            # indexfiles, so we skip that here
            elif os.path.split(root)[1] == "pool":
                del dirs[:]
        return (packages, signatures, i18n)

    def _writeDatabase(self):
        """ update apts cdrom.list """
        dbfile = apt_pkg.config.find_file("Dir::State::cdroms")
        cdrom = apt_pkg.Cdrom()
        id = cdrom.ident(apt.progress.base.CdromProgress())
        label = self._readDiskName()
        with open(dbfile, "a") as out:
            out.write('CD::%s "%s";\n' % (id, label))
            out.write('CD::%s::Label "%s";\n' % (id, label))

    def _dropArch(self, packages):
        """ drop architectures that are not ours """
        # create a copy
        packages = set(packages)
        # now go over the packagesdirs and drop stuff that is not
        # our binary-$arch 
        arch = apt_pkg.config.find("APT::Architecture")
        for d in set(packages):
            if "/binary-" in d and arch not in d:
                packages.remove(d)
        return packages
    
    def _readDiskName(self):
        # default to cdrompath if there is no name
        diskname = self.cdrompath
        info = os.path.join(self.cdrompath, ".disk", "info")
        if os.path.exists(info):
            with open(info) as f:
                diskname = f.read()
            for special in ('"', ']', '[', '_'):
                diskname = diskname.replace(special, '_')
        return diskname

    def _generateSourcesListLine(self, diskname, packages):
        # see apts indexcopy.cc:364 for details
        path = ""                                    
        dist = ""
        comps = []
        for d in packages:
            # match(1) is the path, match(2) the dist
            # and match(3) the components
            m = re.match("(.*)/dists/([^/]*)/(.*)/binary-*", d)
            if not m:
                raise AptCdromError(
                    _("Could not calculate sources.list entry"))
            path = m.group(1)
            dist = m.group(2)
            comps.append(m.group(3))
        if not path or not comps:
            return None
        comps.sort()
        pentry = "deb cdrom:[%s]/ %s %s" % (diskname, dist, " ".join(comps))
        return pentry

    def _copyTranslations(self, translations, targetdir=None):
        if not targetdir:
            targetdir = apt_pkg.config.find_dir("Dir::State::lists")
        diskname = self._readDiskName()
        for f in translations:
            fname = apt_pkg.uri_to_filename(
                "cdrom:[%s]/%s" % (diskname, f[f.find("dists"):]))
            outf = os.path.join(targetdir, os.path.splitext(fname)[0])
            if f.endswith(".gz"):
                with gzip.open(f) as g, open(outf, "wb") as out:
                    # uncompress in 64k chunks
                    while True:
                        s = g.read(64000)
                        out.write(s)
                        if s == b"":
                            break
            else:
                shutil.copy(f, outf)
        return True

    def _copyPackages(self, packages, targetdir=None):
        if not targetdir:
            targetdir = apt_pkg.config.find_dir("Dir::State::lists")
        # CopyPackages()
        diskname = self._readDiskName()
        for f in packages:
            fname = apt_pkg.uri_to_filename(
                "cdrom:[%s]/%s" % (diskname, f[f.find("dists"):]))
            outf = os.path.join(targetdir, os.path.splitext(fname)[0])
            if f.endswith(".gz"):
                with gzip.open(f) as g, open(outf, "wb") as out:
                    # uncompress in 64k chunks
                    while True:
                        s = g.read(64000)
                        out.write(s)
                        if s == b"":
                            break
            else:
                shutil.copy(f, outf)
        return True

    def _verifyRelease(self, signatures):
        " verify the signatues and hashes "
        for sig in signatures:
            basepath = os.path.split(sig)[0]
            # do gpg checking
            releasef = os.path.splitext(sig)[0]
            verify_env = os.environ.copy()
            cmd = ["apt-key", "--quiet", "verify", sig, releasef]
            with tempfile.NamedTemporaryFile() as fp:
                fp.write(apt_pkg.config.dump())
                verify_env["APT_CONFIG"] = fp.name
                ret = subprocess.call(cmd, env=verify_env)
            if not (ret == 0):
                return False
            # now do the hash sum checks
            with open(releasef) as f:
                t = apt_pkg.TagFile(f)
                t.step()
                sha256_section = t.section["SHA256"]
            for entry in sha256_section.split("\n"):
                (hash, size, name) = entry.split()
                f = os.path.join(basepath, name)
                if not os.path.exists(f):
                    logging.info("ignoring missing '%s'" % f)
                    continue
                with open(f) as fp:
                    sum = apt_pkg.sha256sum(fp)
                if not (sum == hash):
                    logging.error(
                        "hash sum mismatch expected %s but got %s" % (
                            hash, sum))
                    return False
        return True

    def _copyRelease(self, signatures, targetdir=None):
        " copy the release file "
        if not targetdir:
            targetdir = apt_pkg.config.find_dir("Dir::State::lists")
        diskname = self._readDiskName()
        for sig in signatures:
            releasef = os.path.splitext(sig)[0]
            # copy both Release and Release.gpg
            for f in (sig, releasef):
                fname = apt_pkg.uri_to_filename(
                    "cdrom:[%s]/%s" % (diskname, f[f.find("dists"):]))
                shutil.copy(f, os.path.join(targetdir, fname))
        return True

    def _doAdd(self):
        " reimplement pkgCdrom::Add() in python "
        # os.walk() will not follow symlinks so we don't need
        # pkgCdrom::Score() and not dropRepeats() that deal with
        # killing the links
        (self.packages, self.signatures, self.i18n) = self._scanCD()
        self.packages = self._dropArch(self.packages)
        if len(self.packages) == 0:
            logging.error("no useable indexes found on CD, wrong ARCH?")
            raise AptCdromError(
                _("Unable to locate any package files, "
                  "perhaps this is not a Ubuntu Disc or the wrong "
                  "architecture?"))

        # CopyAndVerify
        if self._verifyRelease(self.signatures):
            self._copyRelease(self.signatures)
        
        # copy the packages and translations
        self._copyPackages(self.packages)
        self._copyTranslations(self.i18n)

        # add CD to cdroms.list "database" and update sources.list
        diskname = self._readDiskName()
        if not diskname:
            logging.error("no .disk/ directory found")
            return False
        debline = self._generateSourcesListLine(diskname, self.packages)
        
        # prepend to the sources.list
        sourceslist = apt_pkg.config.find_file("Dir::Etc::sourcelist")
        with open(sourceslist) as f:
            content = f.read()
        with open(sourceslist, "w") as f:
            f.write(
                "# added by the release upgrader\n%s\n%s" %
                (debline, content))
        self._writeDatabase()

        return True

    def add(self, backup_ext=None):
        " add a cdrom to apt's database "
        logging.debug("AptCdrom.add() called with '%s'", self.cdrompath)
        # do backup (if needed) of the cdroms.list file
        if backup_ext:
            cdromstate = os.path.join(
                apt_pkg.config.find_dir("Dir::State"),
                apt_pkg.config.find("Dir::State::cdroms"))
            if os.path.exists(cdromstate):
                shutil.copy(cdromstate, cdromstate + backup_ext)
        # do the actual work
        apt_pkg.config.set("Acquire::cdrom::mount", self.cdrompath)
        apt_pkg.config.set("APT::CDROM::NoMount", "true")
        # FIXME: add cdrom progress here for the view
        #progress = self.view.getCdromProgress()
        try:
            res = self._doAdd()
        except (SystemError, AptCdromError) as e:
            logging.error("can't add cdrom: %s" % e)
            self.view.error(_("Failed to add the CD"),
                            _("There was a error adding the CD, the "
                              "upgrade will abort. Please report this as "
                              "a bug if this is a valid Ubuntu CD.\n\n"
                              "The error message was:\n'%s'") % e)
            return False
        logging.debug("AptCdrom.add() returned: %s" % res)
        return res

    def __bool__(self):
        """ helper to use this as 'if cdrom:' """
        return self.cdrompath is not None

    if sys.version < '3':
        __nonzero__ = __bool__

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
DistUpgradeApport.py File 4.95 KB 0644
DistUpgradeAptCdrom.py File 12.39 KB 0644
DistUpgradeCache.py File 52.55 KB 0644
DistUpgradeConfigParser.py File 3.75 KB 0644
DistUpgradeController.py File 100.96 KB 0644
DistUpgradeFetcher.py File 5.83 KB 0644
DistUpgradeFetcherCore.py File 11.2 KB 0644
DistUpgradeFetcherKDE.py File 9.72 KB 0644
DistUpgradeFetcherSelf.py File 2.02 KB 0644
DistUpgradeGettext.py File 2.97 KB 0644
DistUpgradeMain.py File 9.1 KB 0644
DistUpgradePatcher.py File 3.92 KB 0644
DistUpgradeQuirks.py File 47.38 KB 0644
DistUpgradeVersion.py File 21 B 0644
DistUpgradeView.py File 16.2 KB 0644
DistUpgradeViewGtk3.py File 32.7 KB 0644
DistUpgradeViewKDE.py File 39.87 KB 0644
DistUpgradeViewNonInteractive.py File 13.45 KB 0644
DistUpgradeViewText.py File 12.33 KB 0644
GtkProgress.py File 3.93 KB 0644
MetaRelease.py File 16.65 KB 0644
QUrlOpener.py File 3.24 KB 0644
ReleaseNotesViewer.py File 7.48 KB 0644
ReleaseNotesViewerWebkit.py File 2.86 KB 0644
SimpleGtk3builderApp.py File 2.01 KB 0644
SimpleGtkbuilderApp.py File 1.99 KB 0644
__init__.py File 0 B 0644
apt_btrfs_snapshot.py File 9.64 KB 0644
apt_clone.py File 32.81 KB 0644
dist-upgrade.py File 126 B 0644
distinfo.py File 11.3 KB 0644
distro.py File 23.16 KB 0644
sourceslist.py File 18.36 KB 0644
telemetry.py File 3.4 KB 0644
utils.py File 17.95 KB 0644
xorg_fix_proprietary.py File 3.93 KB 0644