#!/usr/bin/env python3 # -*- coding: utf-8 -*- #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # module zopen # function zopen # class _zopen # version 2.0 # # purpose class to open a file with (un)compression as # indicated by the extension(s) of the file name. # for writing to a file, a temporary new file can # be requested which will be written and renamed # to the original name to replace it. # # note this module requires Python3 version 3.6 or later # versions other than 3.6.9 have not been tested # versions up to 4.0 are expected to work # # note much of this code exceeds a width of 80 characters. # it is advised to use a virtual terminal with a width # of at least 128 characters. #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- """zopen: a module to open files with compression based on file name extension(s), including support for temporary written files. Copyright © 2021 by Phil D. Howard - all other rights reserved Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies. THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA, OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE, OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. The author may be contacted by decoding the number 11054987560151472272755686915985840251291393453694611309 (provu igi la numeron al duuma) """ from decimal import Decimal from io import IOBase from os import environ,getcwd,remove,rename from os.path import exists,isfile,join from sys import stdin,stdout,stderr from uuid import uuid4 default_compresslevel = 6 stdio_names = ('-','+','--','++') digit_names = 'zero one two three four five six seven eight nine'.split() yes_no_table = {} for x in ('false','no','faux','non','falso','no', 'falsch','nein', 'falso','no', 'fals','no', 'falso','não','nao',): yes_no_table[x] = False for x in ('true','yes','vrai','oui','cierto','sí','si','wahr','ja','jawohl','vero','sì','si','veritat','sí','si','verdade','sim', ): yes_no_table[x] = True #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # these functions make code look less cluttered where isinstance() is needed #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- def is_bb (x):return isinstance(x, (bytes,bytearray) ) def is_complex (x):return isinstance(x, complex) def is_fcd (x):return isinstance(x, (float,complex,Decimal) ) def is_iio (x):return isinstance(x, (int,IOBase) ) def is_int (x):return isinstance(x, int ) def is_lt (x):return isinstance(x, (list,tuple) ) def is_sbb (x):return isinstance(x, (str,bytes,bytearray) ) def is_str (x):return isinstance(x, str ) #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # function _tostr # purpose convert argument one to str without decoding or # reconstruct containers recursively with results or # leave unrecognized values unchanged # argument 1 argument to be converted or reconstructed # return the converted str or if into= is given, store converted str there # note if argument is a str just return it, other types raise TypeError #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- def _tostr(a,*,into=None): """Convert to str, transparently (no encoding).""" if isinstance(a,str): # str just stays the same (copied) return str(a) elif isinstance(a,(bytearray,bytes,memoryview)): # each byte becomes a character return str().join(chr(x)for x in a) elif isinstance(a,(int,float,Decimal,complex)): if isinstance(a,complex): # a complex becomes a float from real a = a.real # a number becomes a character (modulo 1114112 return chr(int(a)%1114112) elif is_dict(a): # values in a dict are converted, not keys return {k:_tostr(v)for k,v in a.items()} elif is_list(a): # items in a list are converted, making a new list return [_tostr(x) for x in a] elif is_tuple(a): # items in a tuple are converted, making a new tuple return tuple(_tostr(x) for x in a) elif is_set(a): # items in a set are converted, making a new set return set(_tostr(x) for x in a) elif is_frozenset(a): # items in a frozenset are converted, making a new frozenset return frozenset(_tostr(x) for x in a) # unrecognized objects remain unchanged return a #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # function _flatten # purpose to flatten any depth of list/tuple of lists/tuples to # a single level list/tuple # example [[[a],[b]],[[c],[d]]] -> [a,b,c,d] #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- def _flatten(a): """Flatten any depth of list/tuple of lists/tuples to a single level list/tuple.""" r = [] for x in a: if is_lt(x): r.extend(_flatten(x)) else: r.append(x) return type(a)(r) #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # function zfiles_exist # # purpose given a base non-compressed file name, return a list # of it and the compressesed file names like it that # do exist # # aruments 1: name= (str,bytes,bytearray) name of file # this is for the caller to see all file choices # of non-comressed and compressed file names # # exts= (str,bytes,bytearray,iterable) extensions # that are to be considered to be compressed files # # returns (list of str) list of file names that exist # # note the give name of an uncompressed file will be # listed first only if it exists # # note this is an extra function that may be useful for # code that needs to open existing files that may # be compressed # # note file name and exrensions may given as str or bytes # or bytarray but a list of str is always retuurne #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- def zfiles_exist( name, *, exts=None, ): """Return a list that exists among the given uncompressed name and its compressed forms.""" fs = [name] if exists(name) and isfile(name) else [] if is_lt(exts): exts = ' '.join(exts) if not exts: exts = 'gz bz2 bz xz tgz tbz txz' for ext in exts.split(): n = isstr(name)+'.'+isstr(ext) if isfile(n): fs.append(n) return fs #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # function zopen_if_only_one # # purpose open one compressed or non-compressed file # for reading if only one such file exists # # returns one open file if there exists only one file like that # # exceptions if the condition of only one file as named or one # file of a compressed variant name is not met then a # ValueError exception is raised. # # note this is an extra function that may be useful for code # that needs to open only one existing file that may or # may not be compressed without it in advance #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- def zopen_if_only_one( name=None, mode=None, compresslevel=None, *, buffering=-1, check=-1, closefd=True, compress=None, encoding=None, errors=None, exts=None, format=None, newline=None, tempname=False, stdioname=None, ): """Open and return a non-compressed file or one of its compressed files if exactly one exists along with the file list.""" if is_sbb(name): _tostr(name) else: raise TypeError('name is not str or bytes or bytearray, got {name!r}') names = zfiles_exist(name,exts=exts) if len(names) < 1: raise ValueError(f'no files found of {name!r} and all compressed forms') if len(names) > 1: raise ValueError(f'more than one file found of {name!r} and all compressed forms') if not mode: mode = 'r' if is_sbb(mode): mode = _tostr(mode) else: raise TypeError('mode is not str or bytes or bytearray, got {mode!r}') if 'a' in mode: raise PermissionError('only read mode makes sense for zopen_if_only_one(), not append') if 'w' in mode: raise PermissionError('only read mode makes sense for zopen_if_only_one(), not write') if 'x' in mode: raise PermissionError('only read mode makes sense for zopen_if_only_one(), not exclusive create') return zopen( names[0], mode, compresslevel, buffering=buffering, check=check, closefd=closefd, compress=compress, encoding=encoding, errors=errors, format=format, newline=newline, tempname=tempname, stdioname=stdioname, ) #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # function zopen # # purpose open a file with compression based on file name # extension(s) and creation using a temporary name # that is moved to the intended name when closed # for write. all arguments can be used as keyword # arguments and name, mode and compresslevel can # be used as the first three positional arguments. # # version 2.0 # # name= (str,bytes,bytearray) is the actual name of the file to be opened for reading or writing # (IOBas) is an open file object to be reopened for reading or writing # (int) is a POSIX file descriptor to be reopened for reading or writing # mode= (str) is the mode to choose the method of opening the file # (str) 'r'* # buffering= (any) passed on to each file open # check= (any) passed on to each file open # closefd= (any) passed on to each file open # compresslevel= (int) compression level, 1..9 or 0 for no compression (default is what the compression library code sets) # compress= (bool) True:* compression enabled using original file name extensions to select algorithms # (bool) False: compression disabled regardless of file name extensions # (str,bytes,bytearray) string of extensions to select algorithms for (un)compression # (list,tuple) of strings to select algorithms for (un)compression # encoding= (any) passed on to each file open # errors= # format= # newline= # tempname= (bool) is the choice to use a temporary name that is renamed at close # (bool) False* disables a temporary name # (bool) True enables a temporary name with the actual temporary name randomly generated by zopen # (str,bytes,bytearray) enables a temporary name and is the actual temporary name specified # * default #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- def zopen( name=None, mode='r', compresslevel=None, *, buffering=-1, check=-1, closefd=True, compress=None, encoding=None, errors=None, format=None, newline=None, tempname=False, stdioname=None, ): """Open a named file with or without compression based on file name extension(s).""" #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # return the open file object #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- file = _zopen( name=name, mode=mode, compresslevel=compresslevel, buffering=buffering, check=check, closefd=closefd, compress=compress, encoding=encoding, errors=errors, format=format, newline=newline, tempname=tempname, stdioname=stdioname, ) return file.just_one_file() #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # class _zopen (IOBase) # # purpose open a file with compression based on file name # extension(s) and creation using a temporary name # that is moved to the intended name when closed # for write. all arguments can be used as keyword # arguments and name, mode and compresslevel can # be used as the first three positional arguments. # # version 2.0 # # name= (str,bytes,bytearray) is the actual name of the file to be opened for reading or writing # (IOBas) is an open file object to be reopened for reading or writing # (int) is a POSIX file descriptor to be reopened for reading or writing # mode= (str) is the mode to choose the method of opening the file # (str) 'r'* # buffering= (any) passed on to each file open # check= (any) passed on to each file open # closefd= (any) passed on to each file open # compresslevel= (int) compression level, 1..9 or 0 for no compression (default is what the compression library code sets) # compress= (bool) True:* compression enabled using original file name extensions to select algorithms # (bool) False: compression disabled regardless of file name extensions # (str,bytes,bytearray) string of extensions to select algorithms for (un)compression # (list,tuple) of strings to select algorithms for (un)compression # encoding= (any) passed on to each file open # errors= # format= # newline= # tempname= (bool) is the choice to use a temporary name that is renamed at close # (bool) False* disables a temporary name # (bool) True enables a temporary name with the actual temporary name randomly generated by zopen # (str,bytes,bytearray) enables a temporary name and is the actual temporary name specified # * default #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- class _zopen(IOBase): def __init__( self, name=None, mode='r', compresslevel=None, *, buffering=-1, check=-1, closefd=True, compress=None, encoding=None, errors=None, format=None, newline=None, tempname=False, stdioname=None, ): """Open a named file with compression based on file name extension(s), then support its operation and closure.""" self.isopen = False # start in an unopen state #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # below are 3 functions with a common calling API to make # it easy to open compression files for various algorithms. # they are defined entirely within the __init__ method so # they have access to caller variables to make calls simple. #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # bz2_open # # each compression module open method is called differently # so these three functions exist to make them work alike to # the code that chooses which compression algorithm it uses # these functions are never called for non-compressed files # these functions are defined inside __init__ to access the # method's variables including keyword arguments # # this function is selected based on the filename extension # as indexed in the dictionary zmap #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- def bz2_open(file,mode): """Common API opener for bz2.""" import bz2 if read: f = bz2.open( file, mode, encoding=encoding, errors=errors, newline=newline ) else: f = bz2.open( file, mode, compresslevel=compresslevel, encoding=encoding, errors=errors, newline=newline, ) return f #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # gzip_open # # each compression module open method is called differently # so these three functions exist to make them work alike to # the code that chooses which compression algorithm it uses # these functions are never called for non-compressed files # these functions are defined inside __init__ to access the # method's variables including keyword arguments # # this function is selected based on the filename extension # as indexed in the dictionary zmap #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- def gzip_open(file,mode): """Common API opener for gzip.""" import gzip if read: f = gzip.open( file, mode, encoding=encoding, errors=errors, newline=newline, ) else: f = gzip.open( file, mode, compresslevel=compresslevel, encoding=encoding, errors=errors, newline=newline, ) return f #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # lzma_open # # each compression module open method is called differently # so these three functions exist to make them work alike to # the code that chooses which compression algorithm it uses # these functions are never called for non-compressed files # these functions are defined inside __init__ to access the # method's variables including keyword arguments # # this function is selected based on the filename extension # as indexed in the dictionary zmap #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- def lzma_open(file,mode): """Common API opener for lzma.""" import lzma if read: f = lzma.open( file, mode, encoding=encoding, errors=errors, newline=newline, ) else: f = lzma.open( file, mode, preset=compresslevel, encoding=encoding, errors=errors, newline=newline, ) return f #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # map file name extension (str) to compression opener function above # for non-compressed file names the above functions are not called and the below dictionary is not used # this dictionary also serves to validate extension names #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- ext_map = dict( bz = bz2_open, bz2 = bz2_open, gz = gzip_open, xz = lzma_open, tbz = bz2_open, tgz = gzip_open, txz = lzma_open, ) #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # init this class #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- self.cwd = getcwd() #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # check name argument #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- if not name: raise TypeError('no filename given') elif is_int(name): if name < 0: raise ValueError(f'int (file descriptor) {name} is negative') if name < 3: name = (stdin,stdout,stderr)[name] elif is_iio(name) or is_str(name): pass elif is_bb(name): # change bytes or bytearray to str so it is simpler to work with name = _tostr(name) # no encoding else: raise TypeError('file name (arg 1 or name=) is not a str (or bytes or bytearray or int'\ f' or open file)\nname = {name!r}') self.name = join(self.cwd,name) # full name in case CWD is changed before close does rename #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # check mode argument #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- if mode is None: # unspecifed mode,read = 'r',True elif is_str(mode): pass elif is_bb(mode): mode = _tostr(mode) else: raise TypeError('mode (arg 2 or mode=) is not a str (or bytes or bytearray)' f', mode = {mode!r}') if '+' in mode: raise ValueError('Update mode is not supported by zopen().' 'Use the builtin open() function for update mode.') for ch in mode: if ch not in 'abrtwx+': raise ValuError(f'invalid character {ch!r} in mode {mode!r}') append = 'a' in mode binary = 'b' in mode create = 'x' in mode read = 'r' in mode text = 't' in mode write = 'w' in mode if (append,create, write).count(True) < 1: read = True if (append,create,read,write).count(True) > 1: raise ValueError(f'invalid mode has more than one type, got {mode[:12]!r} for {name!r}') if binary and text: raise ValueError(f'invalid mode mixes binary and text, got {mode[:12]!r} for {name!r}') un = ['','un'][read] # rebuild a consistent mode string from the mode flags, True behaves like 1 mode = ( ['','a'][append] + ['','x'][create] + ['','r'][read] + ['','w'][write] + ['','b'][binary] + ['','t'][text] ) #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # check compresslevel argument #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # None uses the default # English number words are accepted,('one','two',...) # number becomes rounded to an int (5.5 -> 6) # if string == "no" compression is disabled # if string is an English number name it is used # otherwise string digits are converted to a number #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- if read: compresslevel = None else: if compresslevel is None: compresslevel = default_compresslevel elif is_sbb(compresslevel): if is_bb(compresslevel): compresslevel = _tostr(compresslevel) compresslevel = compresslevel.lower() if compresslevel in digit_names: compresslevel = digit_names.index(compresslevel) if compresslevel == 'no': compresslevel = None else: compresslevel = float(compresslevel) # float instead of int to do rounding below if is_fcd(compresslevel): if is_complex(compresslevel): compresslevel = compresslevel.real compresslevel = int(1/2+compresslevel) if is_int(compresslevel): if compresslevel < 0 or compresslevel > 9: raise ValueError( 'option argument compresslevel is out of range (0..9)' f', got compresslevel={compresslevel!r}' ) else: raise TypeError(f'option argument compresslevel is not a number, got compresslevel={compresslevel!r}') #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # check compress argument #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # None: default is True # True: compression enabled using original file name extensions to select algorithms # False: compression disabled # (str,bytes,bytearray): string of extensions to select algorithm(s) # (list,tuple): of strings to select compression algorithm(s) #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- if compress in (None,True): compress = name if compress is False or is_int(compress): pass elif is_str(compress): # split around period or comma or space compress = compress.replace('.',' ').replace(',',' ').split(' ') elif is_sbb(compress): # convert to str and split around period or comma or space compress = _tostr(compress).replace('.',' ').replace(',',' ').split(' ') else: raise TypeError(f'compress is not True or False or a string or a list/tuple of strings' f', for {name!r}, got {compress!r}') if is_lt(compress): # list/tuple maybe of lists/tuples ultimately of string of extensions compress = list(_flatten(compress)) new = [] for x in range(len(compress)-1,-1,-1): ext = compress[x] if ext in ext_map: new[:0] = [ext] else: break compress = new if not compress: # if no extensions remain then there is no compression compress = False #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # if stdio requested by file name (no real name so no extension, which means no compression or tempname) # WAIT: what if the 'compress' keyword option is used ... for now, that is not supported for stdio #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- stdio = None if stdioname and name in stdio_names: if append: raise ValueError(f'mode "a" with {name!r} (STDOUT) is not supported') if create: raise ValueError(f'mode "x" with {name!r} (STDOUT) is not supported') if compress: s = ('STDOUT','STDIN')[read] raise ValueError(f'the "compress" option with {name!r} ({s}) is not yet supported') # bleh... not yet coded compress = False tempname = False stdio = open((stdout,stdin)[read].fileno(),'wr'[read] + ('b'[not binary:])) #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # tempname is or becomes a str with the temporary file name, given or generated or not #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- if is_sbb(tempname): # a given temporary name if is_bb(tempname): tempname = _tostr(tempname) # make it be a real str type x = tempname.lower() if x in yes_no_table: tempname = yes_no_table[x] elif tempname: # a true value if append or read: raise ValueError('tempname with append or read makes no sense') tempname = True else: # not a true value tempname = False if tempname is True: tempname = self.name + '_' + str(uuid4()) # generate a temporary name self.tempname = tempname # False or a str #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # for a compressed file, open compression file layers based on # name extensions in reverse order (last extension opened first). # for a non-compressed file, open with the plain builtin open() # function since no copression is invplved and this is one file. #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- if self.tempname: openthis = self.tempname openmode = 'x' # force exclusive creation mode else: openthis = self.name openmode = mode # use the caller's mode if stdio: # or the like filestack = [stdio] elif compress: # compress is not empty, so do one or more compression opens in reverse order filestack = [] front = compress[0] compress[0] = None for ext in reversed(compress): # back to front if ext: # not front thismode = 'wr'[read]+'b' else: # is front (last to be opened) thismode = 'wr'[read]+'bt'[text] ext = front openthis = ext_map[ext]( # caller of zopen() gets to deal with any exceptions that happen openthis, ''.join(openmode), ) filestack[:0] = [openthis] # build this list in reverse else: # compress is empty, so do one regular open openthis = open( # caller of zopen() gets to deal with any exceptions that happen openthis, ''.join(openmode), buffering=buffering, closefd=closefd, encoding=encoding, errors=errors, newline=newline, ) filestack = [openthis] # "there can be only one" self.tempfile = openthis if self.tempname else None self.filestack = filestack self.isopen = True # now open for business return #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # end of _zopen.__init__ #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # return just one file to be used #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # if _zopen() only needs to open and use a single file then the caller # can just use that one file directly. method just_one_file() returns # that one open file. function zopen calls just_one_file() to replace # the _zopen() object. method just_one_file() will return the same # _zopen() object if it is to be used. even though zopen() is a # function, it returns the appropriate class instance to use. #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- def just_one_file(self): """Return just the one file to be used.""" if len(self.filestack) > 1 or self.tempfile or self.tempname: return self else: file = self.filestack[0] self.filestack[0] = None # deref the file in the stack self.filestack = None # deref the stack (of one file) self.isopen = False # flag this as not open return file # function zopen() gets it to return it #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # close also does rename when a temporary file is involved #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- def close(self): """Close this file object, closing all compression files and renaming a temporary file.""" if not self.isopen: # if not open then nothing to do return None if self.filestack: # close files top to bottom for file in self.filestack: file.close() if self.tempname: # if tempname mode then try doing a rename to original name try: rename(self.tempname,self.name) except PermissionError: raise PermissionError(f'rename denied, check ownership or permissions at or near file {self.name!r}?') except OSError: raise OSError(f'rename failed, check at or near file {self.name!r}?') self.isopen = False return #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # support the with statement #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- def __enter__(self): """Enter a with statement.""" return self def __exit__(self,exc_type,exc_val,exc_tb): "Exit a with statement.""" self.close() return #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # support object deletion #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # if a temporary file is being written, then delete # what is being written instead of replacing the # original with what may be an incomplete new file #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- def __del__(self): """This object is being deleted so clean up everything.""" if self.isopen: if self.tempfile and self.tempname: self.tempfile.close() remove(self.tempname) self.tempname = None # dereference (not essential since this object is being deleted) self.tempfile = None # dereference (not essential since this object is being deleted) self.close() return #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- # if this object is not yet ready then raise AttributeError # otherwise get attributes from the file last opened (first in list) #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- def __getattr__(self,attrname): """Get attributes from the file last opened.""" if self.filestack[0]: return getattr(self.filestack[0],attrname) raise AttributeError(f'attribute {attrname!r} requested of not open instance of class zopen') #-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.-------.------- if __name__ == '__main__': exit('this is not a command and your arguments have been ignored')