1 import os
2 localDir = os.path.dirname(__file__)
3 import sys
4 import threading
5 import time
6
7 import cherrypy
8 from cherrypy._cpcompat import copykeys, HTTPConnection, HTTPSConnection
9 from cherrypy.lib import sessions
10 from cherrypy.lib.httputil import response_codes
11
12
18
19 cherrypy.tools.allow = cherrypy.Tool('on_start_resource', http_methods_allowed)
20
21
23
24 class Root:
25
26 _cp_config = {'tools.sessions.on': True,
27 'tools.sessions.storage_type': 'ram',
28 'tools.sessions.storage_path': localDir,
29 'tools.sessions.timeout': (1.0 / 60),
30 'tools.sessions.clean_freq': (1.0 / 60),
31 }
32
33 def clear(self):
34 cherrypy.session.cache.clear()
35 clear.exposed = True
36
37 def data(self):
38 cherrypy.session['aha'] = 'foo'
39 return repr(cherrypy.session._data)
40 data.exposed = True
41
42 def testGen(self):
43 counter = cherrypy.session.get('counter', 0) + 1
44 cherrypy.session['counter'] = counter
45 yield str(counter)
46 testGen.exposed = True
47
48 def testStr(self):
49 counter = cherrypy.session.get('counter', 0) + 1
50 cherrypy.session['counter'] = counter
51 return str(counter)
52 testStr.exposed = True
53
54 def setsessiontype(self, newtype):
55 self.__class__._cp_config.update(
56 {'tools.sessions.storage_type': newtype})
57 if hasattr(cherrypy, "session"):
58 del cherrypy.session
59 cls = getattr(sessions, newtype.title() + 'Session')
60 if cls.clean_thread:
61 cls.clean_thread.stop()
62 cls.clean_thread.unsubscribe()
63 del cls.clean_thread
64 setsessiontype.exposed = True
65 setsessiontype._cp_config = {'tools.sessions.on': False}
66
67 def index(self):
68 sess = cherrypy.session
69 c = sess.get('counter', 0) + 1
70 time.sleep(0.01)
71 sess['counter'] = c
72 return str(c)
73 index.exposed = True
74
75 def keyin(self, key):
76 return str(key in cherrypy.session)
77 keyin.exposed = True
78
79 def delete(self):
80 cherrypy.session.delete()
81 sessions.expire()
82 return "done"
83 delete.exposed = True
84
85 def delkey(self, key):
86 del cherrypy.session[key]
87 return "OK"
88 delkey.exposed = True
89
90 def blah(self):
91 return self._cp_config['tools.sessions.storage_type']
92 blah.exposed = True
93
94 def iredir(self):
95 raise cherrypy.InternalRedirect('/blah')
96 iredir.exposed = True
97
98 def restricted(self):
99 return cherrypy.request.method
100 restricted.exposed = True
101 restricted._cp_config = {'tools.allow.on': True,
102 'tools.allow.methods': ['GET']}
103
104 def regen(self):
105 cherrypy.tools.sessions.regenerate()
106 return "logged in"
107 regen.exposed = True
108
109 def length(self):
110 return str(len(cherrypy.session))
111 length.exposed = True
112
113 def session_cookie(self):
114
115 cherrypy.session.load()
116 return cherrypy.session.id
117 session_cookie.exposed = True
118 session_cookie._cp_config = {
119 'tools.sessions.path': '/session_cookie',
120 'tools.sessions.name': 'temp',
121 'tools.sessions.persistent': False}
122
123 cherrypy.tree.mount(Root())
124
125
126 from cherrypy.test import helper
127
128
130 setup_server = staticmethod(setup_server)
131
137
139 self.getPage('/setsessiontype/ram')
140 self.getPage('/clear')
141
142
143
144 self.getPage('/data')
145 self.assertBody("{'aha': 'foo'}")
146 c = self.cookies[0]
147 self.getPage('/data', self.cookies)
148 self.assertEqual(self.cookies[0], c)
149
150 self.getPage('/testStr')
151 self.assertBody('1')
152 cookie_parts = dict([p.strip().split('=')
153 for p in self.cookies[0][1].split(";")])
154
155 self.assertEqual(set(cookie_parts.keys()),
156 set(['session_id', 'expires', 'Path']))
157 self.getPage('/testGen', self.cookies)
158 self.assertBody('2')
159 self.getPage('/testStr', self.cookies)
160 self.assertBody('3')
161 self.getPage('/data', self.cookies)
162
163
164 self.getPage('/length', self.cookies)
165 self.assertBody('2')
166 self.getPage('/delkey?key=counter', self.cookies)
167 self.assertStatus(200)
168
169 self.getPage('/setsessiontype/file')
170 self.getPage('/testStr')
171 self.assertBody('1')
172 self.getPage('/testGen', self.cookies)
173 self.assertBody('2')
174 self.getPage('/testStr', self.cookies)
175 self.assertBody('3')
176 self.getPage('/delkey?key=counter', self.cookies)
177 self.assertStatus(200)
178
179
180 time.sleep(2)
181 self.getPage('/')
182 self.assertBody('1')
183 self.getPage('/length', self.cookies)
184 self.assertBody('1')
185
186
187 self.getPage('/keyin?key=counter', self.cookies)
188 self.assertBody("True")
189 cookieset1 = self.cookies
190
191
192 self.getPage('/')
193 self.getPage('/length', self.cookies)
194 self.assertBody('2')
195
196
197 self.getPage('/delete', self.cookies)
198 self.assertBody("done")
199 self.getPage('/delete', cookieset1)
200 self.assertBody("done")
201 f = lambda: [
202 x for x in os.listdir(localDir) if x.startswith('session-')]
203 self.assertEqual(f(), [])
204
205
206 self.getPage('/')
207 f = lambda: [
208 x for x in os.listdir(localDir) if x.startswith('session-')]
209 self.assertNotEqual(f(), [])
210 time.sleep(2)
211 self.assertEqual(f(), [])
212
216
220
249
250
251
252
253
254 ts = []
255 for c in range(client_thread_count):
256 data_dict[c] = 0
257 t = threading.Thread(target=request, args=(c,))
258 ts.append(t)
259 t.start()
260
261 for t in ts:
262 t.join()
263
264 hitcount = max(data_dict.values())
265 expected = 1 + (client_thread_count * request_count)
266
267 for e in errors:
268 print(e)
269 self.assertEqual(hitcount, expected)
270
276
278
279 self.getPage('/testStr')
280
281 id = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1]
282 path = os.path.join(localDir, "session-" + id)
283 os.unlink(path)
284 self.getPage('/testStr', self.cookies)
285
297
299 self.getPage('/testStr')
300
301 id1 = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1]
302 self.getPage('/regen')
303 self.assertBody('logged in')
304 id2 = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1]
305 self.assertNotEqual(id1, id2)
306
307 self.getPage('/testStr')
308
309 id1 = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1]
310 self.getPage('/testStr',
311 headers=[
312 ('Cookie',
313 'session_id=maliciousid; '
314 'expires=Sat, 27 Oct 2017 04:18:28 GMT; Path=/;')])
315 id2 = self.cookies[0][1].split(";", 1)[0].split("=", 1)[1]
316 self.assertNotEqual(id1, id2)
317 self.assertNotEqual(id2, 'maliciousid')
318
320 self.getPage('/setsessiontype/ram')
321 self.getPage('/clear')
322 self.getPage('/session_cookie')
323
324 cookie_parts = dict([p.strip().split('=')
325 for p in self.cookies[0][1].split(";")])
326
327 self.assertEqual(set(cookie_parts.keys()), set(['temp', 'Path']))
328 id1 = cookie_parts['temp']
329 self.assertEqual(copykeys(sessions.RamSession.cache), [id1])
330
331
332 self.getPage('/session_cookie', self.cookies)
333 cookie_parts = dict([p.strip().split('=')
334 for p in self.cookies[0][1].split(";")])
335
336 self.assertEqual(set(cookie_parts.keys()), set(['temp', 'Path']))
337 self.assertBody(id1)
338 self.assertEqual(copykeys(sessions.RamSession.cache), [id1])
339
340
341 self.getPage('/session_cookie')
342
343 cookie_parts = dict([p.strip().split('=')
344 for p in self.cookies[0][1].split(";")])
345
346 self.assertEqual(set(cookie_parts.keys()), set(['temp', 'Path']))
347
348 id2 = cookie_parts['temp']
349 self.assertNotEqual(id1, id2)
350 self.assertEqual(set(sessions.RamSession.cache.keys()),
351 set([id1, id2]))
352
353
354 time.sleep(2.5)
355 cache = copykeys(sessions.RamSession.cache)
356 if cache:
357 if cache == [id2]:
358 self.fail("The second session did not time out.")
359 else:
360 self.fail("Unknown session id in cache: %r", cache)
361
362
363 import socket
364 try:
365 import memcache
366
367 host, port = '127.0.0.1', 11211
368 for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC,
369 socket.SOCK_STREAM):
370 af, socktype, proto, canonname, sa = res
371 s = None
372 try:
373 s = socket.socket(af, socktype, proto)
374
375
376 s.settimeout(1.0)
377 s.connect((host, port))
378 s.close()
379 except socket.error:
380 if s:
381 s.close()
382 raise
383 break
384 except (ImportError, socket.error):
390 else:
392 setup_server = staticmethod(setup_server)
393
421
423 client_thread_count = 5
424 request_count = 30
425
426
427 self.getPage("/")
428 self.assertBody("1")
429 cookies = self.cookies
430
431 data_dict = {}
432
433 def request(index):
434 for i in range(request_count):
435 self.getPage("/", cookies)
436
437
438 if not self.body.isdigit():
439 self.fail(self.body)
440 data_dict[index] = v = int(self.body)
441
442
443
444 ts = []
445 for c in range(client_thread_count):
446 data_dict[c] = 0
447 t = threading.Thread(target=request, args=(c,))
448 ts.append(t)
449 t.start()
450
451 for t in ts:
452 t.join()
453
454 hitcount = max(data_dict.values())
455 expected = 1 + (client_thread_count * request_count)
456 self.assertEqual(hitcount, expected)
457
463
476