Package logilab :: Package common :: Module shellutils
[frames] | no frames]

Source Code for Module logilab.common.shellutils

  1  # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. 
  2  # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr 
  3  # 
  4  # This file is part of logilab-common. 
  5  # 
  6  # logilab-common is free software: you can redistribute it and/or modify it under 
  7  # the terms of the GNU Lesser General Public License as published by the Free 
  8  # Software Foundation, either version 2.1 of the License, or (at your option) any 
  9  # later version. 
 10  # 
 11  # logilab-common is distributed in the hope that it will be useful, but WITHOUT 
 12  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
 13  # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
 14  # details. 
 15  # 
 16  # You should have received a copy of the GNU Lesser General Public License along 
 17  # with logilab-common.  If not, see <http://www.gnu.org/licenses/>. 
 18  """shell/term utilities, useful to write some python scripts instead of shell 
 19  scripts. 
 20  """ 
 21   
 22  from __future__ import print_function 
 23   
 24  __docformat__ = "restructuredtext en" 
 25   
 26  import os 
 27  import glob 
 28  import shutil 
 29  import stat 
 30  import sys 
 31  import tempfile 
 32  import time 
 33  import fnmatch 
 34  import errno 
 35  import string 
 36  import random 
 37  import subprocess 
 38  from os.path import exists, isdir, islink, basename, join 
 39   
 40  from six import string_types 
 41  from six.moves import range, input as raw_input 
 42   
 43  from logilab.common import STD_BLACKLIST, _handle_blacklist 
 44  from logilab.common.compat import str_to_bytes 
 45  from logilab.common.deprecation import deprecated 
 46   
 47   
48 -class tempdir(object):
49
50 - def __enter__(self):
51 self.path = tempfile.mkdtemp() 52 return self.path
53
54 - def __exit__(self, exctype, value, traceback):
55 # rmtree in all cases 56 shutil.rmtree(self.path) 57 return traceback is None
58 59
60 -class pushd(object):
61 - def __init__(self, directory):
62 self.directory = directory
63
64 - def __enter__(self):
65 self.cwd = os.getcwd() 66 os.chdir(self.directory) 67 return self.directory
68
69 - def __exit__(self, exctype, value, traceback):
70 os.chdir(self.cwd)
71 72
73 -def chown(path, login=None, group=None):
74 """Same as `os.chown` function but accepting user login or group name as 75 argument. If login or group is omitted, it's left unchanged. 76 77 Note: you must own the file to chown it (or be root). Otherwise OSError is raised. 78 """ 79 if login is None: 80 uid = -1 81 else: 82 try: 83 uid = int(login) 84 except ValueError: 85 import pwd # Platforms: Unix 86 uid = pwd.getpwnam(login).pw_uid 87 if group is None: 88 gid = -1 89 else: 90 try: 91 gid = int(group) 92 except ValueError: 93 import grp 94 gid = grp.getgrnam(group).gr_gid 95 os.chown(path, uid, gid)
96
97 -def mv(source, destination, _action=shutil.move):
98 """A shell-like mv, supporting wildcards. 99 """ 100 sources = glob.glob(source) 101 if len(sources) > 1: 102 assert isdir(destination) 103 for filename in sources: 104 _action(filename, join(destination, basename(filename))) 105 else: 106 try: 107 source = sources[0] 108 except IndexError: 109 raise OSError('No file matching %s' % source) 110 if isdir(destination) and exists(destination): 111 destination = join(destination, basename(source)) 112 try: 113 _action(source, destination) 114 except OSError as ex: 115 raise OSError('Unable to move %r to %r (%s)' % ( 116 source, destination, ex))
117
118 -def rm(*files):
119 """A shell-like rm, supporting wildcards. 120 """ 121 for wfile in files: 122 for filename in glob.glob(wfile): 123 if islink(filename): 124 os.remove(filename) 125 elif isdir(filename): 126 shutil.rmtree(filename) 127 else: 128 os.remove(filename)
129
130 -def cp(source, destination):
131 """A shell-like cp, supporting wildcards. 132 """ 133 mv(source, destination, _action=shutil.copy)
134
135 -def find(directory, exts, exclude=False, blacklist=STD_BLACKLIST):
136 """Recursively find files ending with the given extensions from the directory. 137 138 :type directory: str 139 :param directory: 140 directory where the search should start 141 142 :type exts: basestring or list or tuple 143 :param exts: 144 extensions or lists or extensions to search 145 146 :type exclude: boolean 147 :param exts: 148 if this argument is True, returning files NOT ending with the given 149 extensions 150 151 :type blacklist: list or tuple 152 :param blacklist: 153 optional list of files or directory to ignore, default to the value of 154 `logilab.common.STD_BLACKLIST` 155 156 :rtype: list 157 :return: 158 the list of all matching files 159 """ 160 if isinstance(exts, string_types): 161 exts = (exts,) 162 if exclude: 163 def match(filename, exts): 164 for ext in exts: 165 if filename.endswith(ext): 166 return False 167 return True
168 else: 169 def match(filename, exts): 170 for ext in exts: 171 if filename.endswith(ext): 172 return True 173 return False 174 files = [] 175 for dirpath, dirnames, filenames in os.walk(directory): 176 _handle_blacklist(blacklist, dirnames, filenames) 177 # don't append files if the directory is blacklisted 178 dirname = basename(dirpath) 179 if dirname in blacklist: 180 continue 181 files.extend([join(dirpath, f) for f in filenames if match(f, exts)]) 182 return files 183 184
185 -def globfind(directory, pattern, blacklist=STD_BLACKLIST):
186 """Recursively finds files matching glob `pattern` under `directory`. 187 188 This is an alternative to `logilab.common.shellutils.find`. 189 190 :type directory: str 191 :param directory: 192 directory where the search should start 193 194 :type pattern: basestring 195 :param pattern: 196 the glob pattern (e.g *.py, foo*.py, etc.) 197 198 :type blacklist: list or tuple 199 :param blacklist: 200 optional list of files or directory to ignore, default to the value of 201 `logilab.common.STD_BLACKLIST` 202 203 :rtype: iterator 204 :return: 205 iterator over the list of all matching files 206 """ 207 for curdir, dirnames, filenames in os.walk(directory): 208 _handle_blacklist(blacklist, dirnames, filenames) 209 for fname in fnmatch.filter(filenames, pattern): 210 yield join(curdir, fname)
211
212 -def unzip(archive, destdir):
213 import zipfile 214 if not exists(destdir): 215 os.mkdir(destdir) 216 zfobj = zipfile.ZipFile(archive) 217 for name in zfobj.namelist(): 218 if name.endswith('/'): 219 os.mkdir(join(destdir, name)) 220 else: 221 outfile = open(join(destdir, name), 'wb') 222 outfile.write(zfobj.read(name)) 223 outfile.close()
224 225
226 -class Execute:
227 """This is a deadlock safe version of popen2 (no stdin), that returns 228 an object with errorlevel, out and err. 229 """ 230
231 - def __init__(self, command):
232 cmd = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 233 self.out, self.err = cmd.communicate() 234 self.status = os.WEXITSTATUS(cmd.returncode)
235 236 Execute = deprecated('Use subprocess.Popen instead')(Execute) 237 238
239 -class ProgressBar(object):
240 """A simple text progression bar.""" 241
242 - def __init__(self, nbops, size=20, stream=sys.stdout, title=''):
243 if title: 244 self._fstr = '\r%s [%%-%ss]' % (title, int(size)) 245 else: 246 self._fstr = '\r[%%-%ss]' % int(size) 247 self._stream = stream 248 self._total = nbops 249 self._size = size 250 self._current = 0 251 self._progress = 0 252 self._current_text = None 253 self._last_text_write_size = 0
254
255 - def _get_text(self):
256 return self._current_text
257
258 - def _set_text(self, text=None):
259 if text != self._current_text: 260 self._current_text = text 261 self.refresh()
262
263 - def _del_text(self):
264 self.text = None
265 266 text = property(_get_text, _set_text, _del_text) 267
268 - def update(self, offset=1, exact=False):
269 """Move FORWARD to new cursor position (cursor will never go backward). 270 271 :offset: fraction of ``size`` 272 273 :exact: 274 275 - False: offset relative to current cursor position if True 276 - True: offset as an asbsolute position 277 278 """ 279 if exact: 280 self._current = offset 281 else: 282 self._current += offset 283 284 progress = int((float(self._current)/float(self._total))*self._size) 285 if progress > self._progress: 286 self._progress = progress 287 self.refresh()
288
289 - def refresh(self):
290 """Refresh the progression bar display.""" 291 self._stream.write(self._fstr % ('=' * min(self._progress, self._size)) ) 292 if self._last_text_write_size or self._current_text: 293 template = ' %%-%is' % (self._last_text_write_size) 294 text = self._current_text 295 if text is None: 296 text = '' 297 self._stream.write(template % text) 298 self._last_text_write_size = len(text.rstrip()) 299 self._stream.flush()
300
301 - def finish(self):
302 self._stream.write('\n') 303 self._stream.flush()
304 305
306 -class DummyProgressBar(object):
307 __slot__ = ('text',) 308
309 - def refresh(self):
310 pass
311 - def update(self):
312 pass
313 - def finish(self):
314 pass
315 316 317 _MARKER = object()
318 -class progress(object):
319
320 - def __init__(self, nbops=_MARKER, size=_MARKER, stream=_MARKER, title=_MARKER, enabled=True):
321 self.nbops = nbops 322 self.size = size 323 self.stream = stream 324 self.title = title 325 self.enabled = enabled
326
327 - def __enter__(self):
328 if self.enabled: 329 kwargs = {} 330 for attr in ('nbops', 'size', 'stream', 'title'): 331 value = getattr(self, attr) 332 if value is not _MARKER: 333 kwargs[attr] = value 334 self.pb = ProgressBar(**kwargs) 335 else: 336 self.pb = DummyProgressBar() 337 return self.pb
338
339 - def __exit__(self, exc_type, exc_val, exc_tb):
340 self.pb.finish()
341
342 -class RawInput(object):
343
344 - def __init__(self, input=None, printer=None):
345 self._input = input or raw_input 346 self._print = printer
347
348 - def ask(self, question, options, default):
349 assert default in options 350 choices = [] 351 for option in options: 352 if option == default: 353 label = option[0].upper() 354 else: 355 label = option[0].lower() 356 if len(option) > 1: 357 label += '(%s)' % option[1:].lower() 358 choices.append((option, label)) 359 prompt = "%s [%s]: " % (question, 360 '/'.join([opt[1] for opt in choices])) 361 tries = 3 362 while tries > 0: 363 answer = self._input(prompt).strip().lower() 364 if not answer: 365 return default 366 possible = [option for option, label in choices 367 if option.lower().startswith(answer)] 368 if len(possible) == 1: 369 return possible[0] 370 elif len(possible) == 0: 371 msg = '%s is not an option.' % answer 372 else: 373 msg = ('%s is an ambiguous answer, do you mean %s ?' % ( 374 answer, ' or '.join(possible))) 375 if self._print: 376 self._print(msg) 377 else: 378 print(msg) 379 tries -= 1 380 raise Exception('unable to get a sensible answer')
381
382 - def confirm(self, question, default_is_yes=True):
383 default = default_is_yes and 'y' or 'n' 384 answer = self.ask(question, ('y', 'n'), default) 385 return answer == 'y'
386 387 ASK = RawInput() 388 389
390 -def getlogin():
391 """avoid using os.getlogin() because of strange tty / stdin problems 392 (man 3 getlogin) 393 Another solution would be to use $LOGNAME, $USER or $USERNAME 394 """ 395 if sys.platform != 'win32': 396 import pwd # Platforms: Unix 397 return pwd.getpwuid(os.getuid())[0] 398 else: 399 return os.environ['USERNAME']
400
401 -def generate_password(length=8, vocab=string.ascii_letters + string.digits):
402 """dumb password generation function""" 403 pwd = '' 404 for i in range(length): 405 pwd += random.choice(vocab) 406 return pwd
407