Package cherrypy :: Package process :: Module wspbus
[hide private]
[frames] | no frames]

Source Code for Module cherrypy.process.wspbus

  1  """An implementation of the Web Site Process Bus. 
  2   
  3  This module is completely standalone, depending only on the stdlib. 
  4   
  5  Web Site Process Bus 
  6  -------------------- 
  7   
  8  A Bus object is used to contain and manage site-wide behavior: 
  9  daemonization, HTTP server start/stop, process reload, signal handling, 
 10  drop privileges, PID file management, logging for all of these, 
 11  and many more. 
 12   
 13  In addition, a Bus object provides a place for each web framework 
 14  to register code that runs in response to site-wide events (like 
 15  process start and stop), or which controls or otherwise interacts with 
 16  the site-wide components mentioned above. For example, a framework which 
 17  uses file-based templates would add known template filenames to an 
 18  autoreload component. 
 19   
 20  Ideally, a Bus object will be flexible enough to be useful in a variety 
 21  of invocation scenarios: 
 22   
 23   1. The deployer starts a site from the command line via a 
 24      framework-neutral deployment script; applications from multiple frameworks 
 25      are mixed in a single site. Command-line arguments and configuration 
 26      files are used to define site-wide components such as the HTTP server, 
 27      WSGI component graph, autoreload behavior, signal handling, etc. 
 28   2. The deployer starts a site via some other process, such as Apache; 
 29      applications from multiple frameworks are mixed in a single site. 
 30      Autoreload and signal handling (from Python at least) are disabled. 
 31   3. The deployer starts a site via a framework-specific mechanism; 
 32      for example, when running tests, exploring tutorials, or deploying 
 33      single applications from a single framework. The framework controls 
 34      which site-wide components are enabled as it sees fit. 
 35   
 36  The Bus object in this package uses topic-based publish-subscribe 
 37  messaging to accomplish all this. A few topic channels are built in 
 38  ('start', 'stop', 'exit', 'graceful', 'log', and 'main'). Frameworks and 
 39  site containers are free to define their own. If a message is sent to a 
 40  channel that has not been defined or has no listeners, there is no effect. 
 41   
 42  In general, there should only ever be a single Bus object per process. 
 43  Frameworks and site containers share a single Bus object by publishing 
 44  messages and subscribing listeners. 
 45   
 46  The Bus object works as a finite state machine which models the current 
 47  state of the process. Bus methods move it from one state to another; 
 48  those methods then publish to subscribed listeners on the channel for 
 49  the new state.:: 
 50   
 51                          O 
 52                          | 
 53                          V 
 54         STOPPING --> STOPPED --> EXITING -> X 
 55            A   A         | 
 56            |    \___     | 
 57            |        \    | 
 58            |         V   V 
 59          STARTED <-- STARTING 
 60   
 61  """ 
 62   
 63  import atexit 
 64  import os 
 65  import sys 
 66  import threading 
 67  import time 
 68  import traceback as _traceback 
 69  import warnings 
 70   
 71  from cherrypy._cpcompat import set 
 72   
 73  # Here I save the value of os.getcwd(), which, if I am imported early enough, 
 74  # will be the directory from which the startup script was run.  This is needed 
 75  # by _do_execv(), to change back to the original directory before execv()ing a 
 76  # new process.  This is a defense against the application having changed the 
 77  # current working directory (which could make sys.executable "not found" if 
 78  # sys.executable is a relative-path, and/or cause other problems). 
 79  _startup_cwd = os.getcwd() 
 80   
 81   
82 -class ChannelFailures(Exception):
83 84 """Exception raised when errors occur in a listener during Bus.publish(). 85 """ 86 delimiter = '\n' 87
88 - def __init__(self, *args, **kwargs):
89 # Don't use 'super' here; Exceptions are old-style in Py2.4 90 # See https://bitbucket.org/cherrypy/cherrypy/issue/959 91 Exception.__init__(self, *args, **kwargs) 92 self._exceptions = list()
93
94 - def handle_exception(self):
95 """Append the current exception to self.""" 96 self._exceptions.append(sys.exc_info()[1])
97
98 - def get_instances(self):
99 """Return a list of seen exception instances.""" 100 return self._exceptions[:]
101
102 - def __str__(self):
103 exception_strings = map(repr, self.get_instances()) 104 return self.delimiter.join(exception_strings)
105 106 __repr__ = __str__ 107
108 - def __bool__(self):
109 return bool(self._exceptions)
110 __nonzero__ = __bool__
111 112 # Use a flag to indicate the state of the bus. 113 114
115 -class _StateEnum(object):
116
117 - class State(object):
118 name = None 119
120 - def __repr__(self):
121 return "states.%s" % self.name
122
123 - def __setattr__(self, key, value):
124 if isinstance(value, self.State): 125 value.name = key 126 object.__setattr__(self, key, value)
127 states = _StateEnum() 128 states.STOPPED = states.State() 129 states.STARTING = states.State() 130 states.STARTED = states.State() 131 states.STOPPING = states.State() 132 states.EXITING = states.State() 133 134 135 try: 136 import fcntl 137 except ImportError: 138 max_files = 0 139 else: 140 try: 141 max_files = os.sysconf('SC_OPEN_MAX') 142 except AttributeError: 143 max_files = 1024 144 145
146 -class Bus(object):
147 148 """Process state-machine and messenger for HTTP site deployment. 149 150 All listeners for a given channel are guaranteed to be called even 151 if others at the same channel fail. Each failure is logged, but 152 execution proceeds on to the next listener. The only way to stop all 153 processing from inside a listener is to raise SystemExit and stop the 154 whole server. 155 """ 156 157 states = states 158 state = states.STOPPED 159 execv = False 160 max_cloexec_files = max_files 161
162 - def __init__(self):
163 self.execv = False 164 self.state = states.STOPPED 165 self.listeners = dict( 166 [(channel, set()) for channel 167 in ('start', 'stop', 'exit', 'graceful', 'log', 'main')]) 168 self._priorities = {}
169
170 - def subscribe(self, channel, callback, priority=None):
171 """Add the given callback at the given channel (if not present).""" 172 if channel not in self.listeners: 173 self.listeners[channel] = set() 174 self.listeners[channel].add(callback) 175 176 if priority is None: 177 priority = getattr(callback, 'priority', 50) 178 self._priorities[(channel, callback)] = priority
179
180 - def unsubscribe(self, channel, callback):
181 """Discard the given callback (if present).""" 182 listeners = self.listeners.get(channel) 183 if listeners and callback in listeners: 184 listeners.discard(callback) 185 del self._priorities[(channel, callback)]
186
187 - def publish(self, channel, *args, **kwargs):
188 """Return output of all subscribers for the given channel.""" 189 if channel not in self.listeners: 190 return [] 191 192 exc = ChannelFailures() 193 output = [] 194 195 items = [(self._priorities[(channel, listener)], listener) 196 for listener in self.listeners[channel]] 197 try: 198 items.sort(key=lambda item: item[0]) 199 except TypeError: 200 # Python 2.3 had no 'key' arg, but that doesn't matter 201 # since it could sort dissimilar types just fine. 202 items.sort() 203 for priority, listener in items: 204 try: 205 output.append(listener(*args, **kwargs)) 206 except KeyboardInterrupt: 207 raise 208 except SystemExit: 209 e = sys.exc_info()[1] 210 # If we have previous errors ensure the exit code is non-zero 211 if exc and e.code == 0: 212 e.code = 1 213 raise 214 except: 215 exc.handle_exception() 216 if channel == 'log': 217 # Assume any further messages to 'log' will fail. 218 pass 219 else: 220 self.log("Error in %r listener %r" % (channel, listener), 221 level=40, traceback=True) 222 if exc: 223 raise exc 224 return output
225
226 - def _clean_exit(self):
227 """An atexit handler which asserts the Bus is not running.""" 228 if self.state != states.EXITING: 229 warnings.warn( 230 "The main thread is exiting, but the Bus is in the %r state; " 231 "shutting it down automatically now. You must either call " 232 "bus.block() after start(), or call bus.exit() before the " 233 "main thread exits." % self.state, RuntimeWarning) 234 self.exit()
235
236 - def start(self):
237 """Start all services.""" 238 atexit.register(self._clean_exit) 239 240 self.state = states.STARTING 241 self.log('Bus STARTING') 242 try: 243 self.publish('start') 244 self.state = states.STARTED 245 self.log('Bus STARTED') 246 except (KeyboardInterrupt, SystemExit): 247 raise 248 except: 249 self.log("Shutting down due to error in start listener:", 250 level=40, traceback=True) 251 e_info = sys.exc_info()[1] 252 try: 253 self.exit() 254 except: 255 # Any stop/exit errors will be logged inside publish(). 256 pass 257 # Re-raise the original error 258 raise e_info
259
260 - def exit(self):
261 """Stop all services and prepare to exit the process.""" 262 exitstate = self.state 263 try: 264 self.stop() 265 266 self.state = states.EXITING 267 self.log('Bus EXITING') 268 self.publish('exit') 269 # This isn't strictly necessary, but it's better than seeing 270 # "Waiting for child threads to terminate..." and then nothing. 271 self.log('Bus EXITED') 272 except: 273 # This method is often called asynchronously (whether thread, 274 # signal handler, console handler, or atexit handler), so we 275 # can't just let exceptions propagate out unhandled. 276 # Assume it's been logged and just die. 277 os._exit(70) # EX_SOFTWARE 278 279 if exitstate == states.STARTING: 280 # exit() was called before start() finished, possibly due to 281 # Ctrl-C because a start listener got stuck. In this case, 282 # we could get stuck in a loop where Ctrl-C never exits the 283 # process, so we just call os.exit here. 284 os._exit(70) # EX_SOFTWARE
285
286 - def restart(self):
287 """Restart the process (may close connections). 288 289 This method does not restart the process from the calling thread; 290 instead, it stops the bus and asks the main thread to call execv. 291 """ 292 self.execv = True 293 self.exit()
294
295 - def graceful(self):
296 """Advise all services to reload.""" 297 self.log('Bus graceful') 298 self.publish('graceful')
299
300 - def block(self, interval=0.1):
301 """Wait for the EXITING state, KeyboardInterrupt or SystemExit. 302 303 This function is intended to be called only by the main thread. 304 After waiting for the EXITING state, it also waits for all threads 305 to terminate, and then calls os.execv if self.execv is True. This 306 design allows another thread to call bus.restart, yet have the main 307 thread perform the actual execv call (required on some platforms). 308 """ 309 try: 310 self.wait(states.EXITING, interval=interval, channel='main') 311 except (KeyboardInterrupt, IOError): 312 # The time.sleep call might raise 313 # "IOError: [Errno 4] Interrupted function call" on KBInt. 314 self.log('Keyboard Interrupt: shutting down bus') 315 self.exit() 316 except SystemExit: 317 self.log('SystemExit raised: shutting down bus') 318 self.exit() 319 raise 320 321 # Waiting for ALL child threads to finish is necessary on OS X. 322 # See https://bitbucket.org/cherrypy/cherrypy/issue/581. 323 # It's also good to let them all shut down before allowing 324 # the main thread to call atexit handlers. 325 # See https://bitbucket.org/cherrypy/cherrypy/issue/751. 326 self.log("Waiting for child threads to terminate...") 327 for t in threading.enumerate(): 328 # Validate the we're not trying to join the MainThread 329 # that will cause a deadlock and the case exist when 330 # implemented as a windows service and in any other case 331 # that another thread executes cherrypy.engine.exit() 332 if ( 333 t != threading.currentThread() and 334 t.isAlive() and 335 not isinstance(t, threading._MainThread) 336 ): 337 # Note that any dummy (external) threads are always daemonic. 338 if hasattr(threading.Thread, "daemon"): 339 # Python 2.6+ 340 d = t.daemon 341 else: 342 d = t.isDaemon() 343 if not d: 344 self.log("Waiting for thread %s." % t.getName()) 345 t.join() 346 347 if self.execv: 348 self._do_execv()
349
350 - def wait(self, state, interval=0.1, channel=None):
351 """Poll for the given state(s) at intervals; publish to channel.""" 352 if isinstance(state, (tuple, list)): 353 states = state 354 else: 355 states = [state] 356 357 def _wait(): 358 while self.state not in states: 359 time.sleep(interval) 360 self.publish(channel)
361 362 # From http://psyco.sourceforge.net/psycoguide/bugs.html: 363 # "The compiled machine code does not include the regular polling 364 # done by Python, meaning that a KeyboardInterrupt will not be 365 # detected before execution comes back to the regular Python 366 # interpreter. Your program cannot be interrupted if caught 367 # into an infinite Psyco-compiled loop." 368 try: 369 sys.modules['psyco'].cannotcompile(_wait) 370 except (KeyError, AttributeError): 371 pass 372 373 _wait()
374
375 - def _do_execv(self):
376 """Re-execute the current process. 377 378 This must be called from the main thread, because certain platforms 379 (OS X) don't allow execv to be called in a child thread very well. 380 """ 381 args = sys.argv[:] 382 self.log('Re-spawning %s' % ' '.join(args)) 383 384 if sys.platform[:4] == 'java': 385 from _systemrestart import SystemRestart 386 raise SystemRestart 387 else: 388 args.insert(0, sys.executable) 389 if sys.platform == 'win32': 390 args = ['"%s"' % arg for arg in args] 391 392 os.chdir(_startup_cwd) 393 if self.max_cloexec_files: 394 self._set_cloexec() 395 os.execv(sys.executable, args)
396
397 - def _set_cloexec(self):
398 """Set the CLOEXEC flag on all open files (except stdin/out/err). 399 400 If self.max_cloexec_files is an integer (the default), then on 401 platforms which support it, it represents the max open files setting 402 for the operating system. This function will be called just before 403 the process is restarted via os.execv() to prevent open files 404 from persisting into the new process. 405 406 Set self.max_cloexec_files to 0 to disable this behavior. 407 """ 408 for fd in range(3, self.max_cloexec_files): # skip stdin/out/err 409 try: 410 flags = fcntl.fcntl(fd, fcntl.F_GETFD) 411 except IOError: 412 continue 413 fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
414
415 - def stop(self):
416 """Stop all services.""" 417 self.state = states.STOPPING 418 self.log('Bus STOPPING') 419 self.publish('stop') 420 self.state = states.STOPPED 421 self.log('Bus STOPPED')
422
423 - def start_with_callback(self, func, args=None, kwargs=None):
424 """Start 'func' in a new thread T, then start self (and return T).""" 425 if args is None: 426 args = () 427 if kwargs is None: 428 kwargs = {} 429 args = (func,) + args 430 431 def _callback(func, *a, **kw): 432 self.wait(states.STARTED) 433 func(*a, **kw)
434 t = threading.Thread(target=_callback, args=args, kwargs=kwargs) 435 t.setName('Bus Callback ' + t.getName()) 436 t.start() 437 438 self.start() 439 440 return t 441
442 - def log(self, msg="", level=20, traceback=False):
443 """Log the given message. Append the last traceback if requested.""" 444 if traceback: 445 msg += "\n" + "".join(_traceback.format_exception(*sys.exc_info())) 446 self.publish('log', msg, level)
447 448 bus = Bus() 449