main module:
import cherrypy
import logondb
class Root(object):
logon = logondb.LogonDB(path="/logon", authenticated="/", not_authenticated="/goaway",db="/tmp/pwd.db")
@cherrypy.expose
def index(self):
username=Root.logon.checkauth('/logon')
return '<html><body><p>Hello user <b>%s</b></p></body></html>'%username
@cherrypy.expose
def goaway(self):
return '<html><body><h1>Not authenticated, please go away.</h1></body></html>'
goaway._cp_config = {'tools.expires.on':True,'tools.expires.secs':0,'tools.expires.force':True}
@cherrypy.expose
def somepage(self):
username=Root.logon.checkauth('/logon',returntopage=True)
return '<html><body><h1>This is some page.</h1></body></html>'
if __name__ == "__main__":
import os.path
current_dir = os.path.dirname(os.path.abspath(__file__))
root = Root()
def connect(thread_index):
root.logon.connect()
# Tell CherryPy to call "connect" for each thread, when it starts up
print('initializing threads')
cherrypy.engine.subscribe('start_thread', connect)
cherrypy.quickstart(root,config={
'/': { 'log.access_file' : os.path.join(current_dir,"access.log"), 'tools.sessions.on': True },
'/static':
{ 'tools.staticdir.on':True,
'tools.staticdir.dir':os.path.join(current_dir,"static")
},
'/jquery.js':
{ 'tools.staticfile.on':True,
'tools.staticfile.filename':os.path.join(current_dir,"static","jquery","jquery-1.4.2.js")
},
'/jquery-ui.js':
{ 'tools.staticfile.on':True,
'tools.staticfile.filename':os.path.join(current_dir,"static","jquery","jquery-ui-1.8.1.custom.min.js")
},
'/jquerytheme.css':
{ 'tools.staticfile.on':True,
'tools.staticfile.filename':os.path.join(current_dir,"static","jquery","css","redmond","jquery-ui-1.8.1.custom.css")
}
}
)
logondb module:
import logon
import sqlite3
from hashlib import sha1 as hash
import threading
import cherrypy
class LogonDB(logon.Logon):
def __init__(self,path="/logon",authenticated="/",not_authenticated="/",db="/tmp/pwd.db"):
super().__init__(path,authenticated,not_authenticated)
self.db=db
self.initdb()
@staticmethod
def _dohash(s):
h = hash()
h.update(s.encode())
return h.hexdigest()
def checkpass(self,username,password):
password = LogonDB._dohash(password)
c = self.data.conn.cursor()
c.execute("SELECT count(*) FROM pwdb WHERE username = ? AND password = ?",(username,password))
if c.fetchone()[0]==1 :return True
return False
def initdb(self):
conn=sqlite3.connect(self.db)
c = conn.cursor()
c.execute("CREATE TABLE IF NOT EXISTS pwdb(username unique not null,password not null);")
c.execute('INSERT OR IGNORE INTO pwdb VALUES("admin",?)',(LogonDB._dohash("admin"),))
conn.commit()
conn.close()
self.data=threading.local()
def connect(self):
'''call once for every thread as sqlite connection objects cannot be shared among threads.'''
self.data.conn = sqlite3.connect(self.db)
def close(self):
'''call once for every thread.'''
self.data.conn.close()
@cherrypy.expose
def adduserform(self):
if self.checkauth(returntopage=True) != 'admin':
raise cherrypy.HTTPError("403 Forbidden", "Only admin allowed to access this resource.")
return logon.Logon.base_page % '''
<form action="./adduser" method="GET"><fieldset>
<label for="username">New username</label><input id="username" name="username" type="text" />
<label for="password">New password</label><input id="password" name="password" type="password" />
<button type="submit" class="add-button" value="Add">Add</button>>
</fieldset></form>
'''
@cherrypy.expose
def adduser(self,username,password):
if self.checkauth(returntopage=True) != 'admin':
raise cherrypy.HTTPError("403 Forbidden", "Only admin allowed to access this resource.")
password = LogonDB._dohash(password)
try:
c = self.data.conn.cursor()
c.execute("INSERT INTO pwdb (username,password) VALUES (?,?)",(username,password))
self.data.conn.commit()
except sqlite3.IntegrityError:
raise cherrypy.HTTPError("403 Forbidden","username %s already exists"%username)
return '''<html>
<head>
<meta http-equiv="refresh" content="5; url=/">
</head>
<body><h1>%s successfully added.</h1></body>
</html>
''' % username
if __name__ == "__main__":
import unittest
import os
class LogonDBTest(unittest.TestCase):
def setUp(self):
self.logon = LogonDB(db='/tmp/pwd.db')
self.logon.connect()
def tearDown(self):
self.logon.close()
os.unlink('/tmp/pwd.db')
def test_checkpass(self):
self.assertTrue(self.logon.checkpass('admin','admin'))
self.assertFalse(self.logon.checkpass('admin','qadmin'))
self.assertFalse(self.logon.checkpass('nadmin','admin'))
class ThreadLogonDBTest(unittest.TestCase):
def tearDown(self):
os.unlink('/tmp/pwd.db')
def run_check(self):
logon=LogonDB(db='/tmp/pwd.db')
logon.connect()
self.assertTrue(logon.checkpass('admin','admin'))
self.assertFalse(logon.checkpass('admin','qadmin'))
self.assertFalse(logon.checkpass('nadmin','admin'))
logon.close()
def test_threads(self):
n=2
threads=[]
for t in range(n):
thread=threading.Thread(target=self.run_check)
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join(timeout=5.0)
unittest.main(exit=False)
logon module:
import cherrypy
import urllib.parse
import logging
class Logon:
base_page = '''<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">
<html>
<head>
<script type="text/javascript" src="/jquery.js" ></script>
<script type="text/javascript" src="/jquery-ui.js" ></script>
<style type="text/css" title="currentStyle">
@import "/jquerytheme.css";
@import "/static/css/logon.css";
</style>
</head>
<body id="logonscreen">
<div id="content">
%s
</div>
<script type="text/javascript">$("button").button({icons: {primary: 'ui-icon-power'}})</script>
</body>
</html>
'''
# change the GET once in production: otherwise passwords may end up in the access log!
# use <button> NOT <input> for submit and reset otherwise icons won't show!
logon_screen = base_page % '''
<form class="login" action="%s/logon" method="GET">
<fieldset>
<label for="username">Username</label><input id="username" type="text" name="username" />
<script type="text/javascript">$("#username").focus()</script>
<label for="password">Password</label><input id="password" type="password" name="password" />
<input type="hidden" name="returnpage" value="%s" />
<button type="submit" class="login-button" value="Log in">Log in</button>
</fieldset>
</form>
'''
not_authenticated = base_page % '''<h1>Login or password not correct</h1>'''
def __init__(self,path="/logon",authenticated="/",not_authenticated="/"):
self.path=path
self.authenticated=authenticated
self.not_authenticated=not_authenticated
# change this to a proper check in a production environment
@staticmethod
def checkpass(username,password):
if username=='user' and password=='secret': return True
return False
@cherrypy.expose
def index(self,returnpage=None):
if returnpage is None : returnpage = ''
return Logon.logon_screen % (self.path,urllib.parse.quote(returnpage))
index._cp_config = {'tools.expires.on':True,'tools.expires.secs':0,'tools.expires.force':True}
@cherrypy.expose
def logon(self,username,password,returnpage='',db=':memory:'):
returnpage = urllib.parse.unquote(returnpage)
#logging.error("####"+returnpage+"####")
if self.checkpass(username,password):
cherrypy.session['authenticated']=username
if returnpage != '':
raise cherrypy.InternalRedirect(returnpage)
else:
raise cherrypy.InternalRedirect(self.authenticated)
raise cherrypy.InternalRedirect(self.not_authenticated)
@cherrypy.expose
def logoff(self,logoffurl=None):
cherrypy.session.delete()
cherrypy.lib.sessions.expire()
#cherrypy.session['authenticated']=None
if logoffurl is None :
raise cherrypy.InternalRedirect(self.not_authenticated)
raise cherrypy.InternalRedirect(logoffurl)
@staticmethod
def checkauth(logonurl="/", returntopage=False):
returnpage=''
if returntopage:
returnpage='?returnpage='+cherrypy.request.script_name+cherrypy.request.path_info
#returnpage='?returnpage='+cherrypy.request.base+cherrypy.request.script_name+cherrypy.request.path_info
#returnpage='?returnpage='+cherrypy.request.path_info
auth = cherrypy.session.get('authenticated',None)
if auth == None : raise cherrypy.HTTPRedirect(logonurl+returnpage)
return auth

New Topic/Question
Reply



MultiQuote


|