1 import os
2 import signal
3 import socket
4 import sys
5 import time
6 import unittest
7 import warnings
8
9 import cherrypy
10 import cherrypy.process.servers
11 from cherrypy._cpcompat import BadStatusLine, ntob
12 from cherrypy.test import helper
13
14 engine = cherrypy.engine
15 thisdir = os.path.join(os.getcwd(), os.path.dirname(__file__))
16
17
19
21 self.bus = bus
22 self.running = False
23 self.startcount = 0
24 self.gracecount = 0
25 self.threads = {}
26
33
35 self.running = True
36 self.startcount += 1
37
40
43
46
49
50 db_connection = Dependency(engine)
51
52
54 class Root:
55
56 def index(self):
57 return "Hello World"
58 index.exposed = True
59
60 def ctrlc(self):
61 raise KeyboardInterrupt()
62 ctrlc.exposed = True
63
64 def graceful(self):
65 engine.graceful()
66 return "app was (gracefully) restarted succesfully"
67 graceful.exposed = True
68
69 def block_explicit(self):
70 while True:
71 if cherrypy.response.timed_out:
72 cherrypy.response.timed_out = False
73 return "broken!"
74 time.sleep(0.01)
75 block_explicit.exposed = True
76
77 def block_implicit(self):
78 time.sleep(0.5)
79 return "response.timeout = %s" % cherrypy.response.timeout
80 block_implicit.exposed = True
81
82 cherrypy.tree.mount(Root())
83 cherrypy.config.update({
84 'environment': 'test_suite',
85 'engine.deadlock_poll_freq': 0.1,
86 })
87
88 db_connection.subscribe()
89
90
91
92
94 setup_server = staticmethod(setup_server)
95
99
101 engine.stop()
102
103 self.assertEqual(db_connection.running, False)
104 self.assertEqual(db_connection.startcount, 1)
105 self.assertEqual(len(db_connection.threads), 0)
106
107
108 engine.start()
109 self.assertEqual(engine.state, engine.states.STARTED)
110
111 host = cherrypy.server.socket_host
112 port = cherrypy.server.socket_port
113 self.assertRaises(IOError, cherrypy._cpserver.check_port, host, port)
114
115
116 self.assertEqual(db_connection.running, True)
117 self.assertEqual(db_connection.startcount, 2)
118 self.assertEqual(len(db_connection.threads), 0)
119
120 self.getPage("/")
121 self.assertBody("Hello World")
122 self.assertEqual(len(db_connection.threads), 1)
123
124
125 engine.stop()
126 self.assertEqual(engine.state, engine.states.STOPPED)
127
128
129 self.assertEqual(db_connection.running, False)
130 self.assertEqual(len(db_connection.threads), 0)
131
132
133 def exittest():
134 self.getPage("/")
135 self.assertBody("Hello World")
136 engine.exit()
137 cherrypy.server.start()
138 engine.start_with_callback(exittest)
139 engine.block()
140 self.assertEqual(engine.state, engine.states.EXITING)
141
143 cherrypy.server.start()
144 engine.start()
145
146
147 self.assertEqual(db_connection.running, True)
148 grace = db_connection.gracecount
149
150 self.getPage("/")
151 self.assertBody("Hello World")
152 self.assertEqual(len(db_connection.threads), 1)
153
154
155 engine.graceful()
156 self.assertEqual(engine.state, engine.states.STARTED)
157 self.getPage("/")
158 self.assertBody("Hello World")
159 self.assertEqual(db_connection.running, True)
160 self.assertEqual(db_connection.gracecount, grace + 1)
161 self.assertEqual(len(db_connection.threads), 1)
162
163
164 self.getPage("/graceful")
165 self.assertEqual(engine.state, engine.states.STARTED)
166 self.assertBody("app was (gracefully) restarted succesfully")
167 self.assertEqual(db_connection.running, True)
168 self.assertEqual(db_connection.gracecount, grace + 2)
169
170
171 self.assertEqual(len(db_connection.threads), 0)
172
173 engine.stop()
174 self.assertEqual(engine.state, engine.states.STOPPED)
175 self.assertEqual(db_connection.running, False)
176 self.assertEqual(len(db_connection.threads), 0)
177
219
250
285
287
288
289 if engine.state != engine.states.EXITING:
290 engine.exit()
291
292
293
294 p = helper.CPProcess(ssl=(self.scheme.lower() == 'https'),
295 wait=True)
296 p.write_conf(
297 extra="""starterror: True
298 test_case_name: "test_5_Start_Error"
299 """
300 )
301 p.start(imports='cherrypy.test._test_states_demo')
302 if p.exit_code == 0:
303 self.fail("Process failed to return nonzero exit code.")
304
305
307
309 if os.name not in ['posix']:
310 return self.skip("skipped (not on posix) ")
311 self.HOST = '127.0.0.1'
312 self.PORT = 8081
313
314
315
316 p = helper.CPProcess(ssl=(self.scheme.lower() == 'https'),
317 wait=True, daemonize=True,
318 socket_host='127.0.0.1',
319 socket_port=8081)
320 p.write_conf(
321 extra='test_case_name: "test_daemonize"')
322 p.start(imports='cherrypy.test._test_states_demo')
323 try:
324
325 self.getPage("/pid")
326 self.assertStatus(200)
327 page_pid = int(self.body)
328 self.assertEqual(page_pid, p.get_pid())
329 finally:
330
331 self.getPage("/exit")
332 p.join()
333
334
335
336 if p.exit_code != 0:
337 self.fail("Daemonized parent process failed to exit cleanly.")
338
339
341
343
344 try:
345 from signal import SIGHUP
346 except ImportError:
347 return self.skip("skipped (no SIGHUP) ")
348
349
350 p = helper.CPProcess(ssl=(self.scheme.lower() == 'https'))
351 p.write_conf(
352 extra='test_case_name: "test_SIGHUP_tty"')
353 p.start(imports='cherrypy.test._test_states_demo')
354
355 os.kill(p.get_pid(), SIGHUP)
356
357 p.join()
358
360
361 try:
362 from signal import SIGHUP
363 except ImportError:
364 return self.skip("skipped (no SIGHUP) ")
365
366 if os.name not in ['posix']:
367 return self.skip("skipped (not on posix) ")
368
369
370
371
372 p = helper.CPProcess(ssl=(self.scheme.lower() == 'https'),
373 wait=True, daemonize=True)
374 p.write_conf(
375 extra='test_case_name: "test_SIGHUP_daemonized"')
376 p.start(imports='cherrypy.test._test_states_demo')
377
378 pid = p.get_pid()
379 try:
380
381 os.kill(pid, SIGHUP)
382
383 time.sleep(2)
384 self.getPage("/pid")
385 self.assertStatus(200)
386 new_pid = int(self.body)
387 self.assertNotEqual(new_pid, pid)
388 finally:
389
390 self.getPage("/exit")
391 p.join()
392
394 if not hasattr(signal, signal_name):
395 self.skip("skipped (no %(signal_name)s)" % vars())
396
397 if not hasattr(os, 'kill'):
398 self.skip("skipped (no os.kill)")
399
401 "SIGTERM should shut down the server whether daemonized or not."
402 self._require_signal_and_kill('SIGTERM')
403
404
405 p = helper.CPProcess(ssl=(self.scheme.lower() == 'https'))
406 p.write_conf(
407 extra='test_case_name: "test_SIGTERM"')
408 p.start(imports='cherrypy.test._test_states_demo')
409
410 os.kill(p.get_pid(), signal.SIGTERM)
411
412 p.join()
413
414 if os.name in ['posix']:
415
416 p = helper.CPProcess(ssl=(self.scheme.lower() == 'https'),
417 wait=True, daemonize=True)
418 p.write_conf(
419 extra='test_case_name: "test_SIGTERM_2"')
420 p.start(imports='cherrypy.test._test_states_demo')
421
422 os.kill(p.get_pid(), signal.SIGTERM)
423
424 p.join()
425
427 self._require_signal_and_kill('SIGTERM')
428
429
430
431
432
433 if os.name == 'nt':
434 self.skip("SIGTERM not available")
435
436
437 p = helper.CPProcess(ssl=(self.scheme.lower() == 'https'))
438 p.write_conf(
439 extra="""unsubsig: True
440 test_case_name: "test_signal_handler_unsubscribe"
441 """)
442 p.start(imports='cherrypy.test._test_states_demo')
443
444 os.kill(p.get_pid(), signal.SIGTERM)
445
446 p.join()
447
448
449 target_line = open(p.error_log, 'rb').readlines()[-10]
450 if not ntob("I am an old SIGTERM handler.") in target_line:
451 self.fail("Old SIGTERM handler did not run.\n%r" % target_line)
452
453
455
457 """
458 Wait on INADDR_ANY should not raise IOError
459
460 In cases where the loopback interface does not exist, CherryPy cannot
461 effectively determine if a port binding to INADDR_ANY was effected.
462 In this situation, CherryPy should assume that it failed to detect
463 the binding (not that the binding failed) and only warn that it could
464 not verify it.
465 """
466
467
468
469
470
471
472 free_port = self.find_free_port()
473
474 servers = cherrypy.process.servers
475
476 def with_shorter_timeouts(func):
477 """
478 A context where occupied_port_timeout is much smaller to speed
479 test runs.
480 """
481
482 orig_timeout = servers.occupied_port_timeout
483 servers.occupied_port_timeout = .07
484 try:
485 func()
486 finally:
487 servers.occupied_port_timeout = orig_timeout
488
489 def do_waiting():
490
491 with warnings.catch_warnings(record=True) as w:
492 servers.wait_for_occupied_port('0.0.0.0', free_port)
493 self.assertEqual(len(w), 1)
494 self.assertTrue(isinstance(w[0], warnings.WarningMessage))
495 self.assertTrue(
496 'Unable to verify that the server is bound on ' in str(w[0]))
497
498
499
500 self.assertRaises(IOError, servers.wait_for_occupied_port,
501 '127.0.0.1', free_port)
502
503 with_shorter_timeouts(do_waiting)
504
506 "Find a free port by binding to port 0 then unbinding."
507 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
508 sock.bind(('', 0))
509 free_port = sock.getsockname()[1]
510 sock.close()
511 return free_port
512