import socket import asynchat import asyncore import time import os # # Generic code (to be used as building blocks) follows... # (normally this would go into a separate module) # (see near bottom for User Customizable Code, less-Generic-code) # def timestamp(): return time.ctime() + ' ' + time.tzname[time.daylight] class Strategy(object): def __init__(self, ctxt): self.ctxt = ctxt def write(self, str): self.ctxt.write(str) def flush(self): self.ctxt.flush() def __getattr__(self, key): if key in self.__dict__: return self.__dict__[key] else: return self.ctxt.get_session_dict()[key] def set_flash(self, msg): self.ctxt.get_session_dict()['flash'] = msg def render(self): if 'flash' in self.ctxt.get_session_dict(): print >>self print >>self, self.flash print >>self del self.ctxt.get_session_dict()['flash'] def parse_line(self, line): # must return the next strategy (or self) raise NotImplementedError class GetInputText(Strategy): def __init__(self, ctxt, msg, key): Strategy.__init__(self, ctxt) self._msg = msg self._key = key def render(self): Strategy.render(self) self.write(self._msg) self.flush() def parse_line(self, line): self.ctxt.get_session_dict()[self._key] = line.strip() return self.next_state_factory() def next_state_factory(self): raise NotImplementedError class GetMenuInput(Strategy): def __init__(self, ctxt, items): Strategy.__init__(self, ctxt) self._items = items def render(self): Strategy.render(self) for i in xrange(len(self._items)): text, callback = self._items[i] print >>self, '%3d. %s' % (i, text) self.write('Your choice? ') self.flush() def parse_line(self, line): line = line.strip() invalid = True try: x = int(line) if not (0 <= x < len(self._items)): invalid = True else: invalid = False except: invalid = True if invalid: self.set_flash('Invalid choice: %s' % line.__repr__()) return self else: text, callback = self._items[x] return callback() class Session(asynchat.async_chat): def __init__(self, server, conn, initial_state): asynchat.async_chat.__init__(self, conn) self.terminator = '\n' self.set_terminator(self.terminator) self.data = [] self.session_values = {} self.switch_state(initial_state(self)) def get_session_dict(self): return self.session_values def switch_state(self, next_state): self.state = next_state self.state.render() def write(self, str): str = str.replace('\n', '\r\n') self.send(str) def flush(self): pass def collect_incoming_data(self, str): self.data.append(str) def found_terminator(self): self.data.append(self.terminator) line = ''.join(self.data).replace('\r', '') self.data = [] self.switch_state( self.state.parse_line(line) ) class Server(asyncore.dispatcher): def __init__(self, address, handler, backlog=1024): asyncore.dispatcher.__init__(self) self.handler = handler if type(address) is str: self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM) if type(address) is tuple: self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() else: raise TypeError('invalid type %s for address' % type(address)) self.bind(address) self.listen(backlog) def handle_accept(self): conn, client_addr = self.accept() self.handler(self, conn) # I don't approve of asyncore's loop function, so here is my own. # Mine does NOT block. It returns after the timeout. def loop(timeout=30.0, use_poll=0, map=None): if map is None: map = asyncore.socket_map if map: asyncore.poll(timeout, map) return True else: return False class CloseConnection(Strategy): def render(self): Strategy.render(self) self.ctxt.close_when_done() # # User customizable code follows.... # class GetUsername(GetInputText): def __init__(self, ctxt): GetInputText.__init__(self, ctxt, 'login: ', 'username') def next_state_factory(self): return GetPassword(self.ctxt) class GetPassword(GetInputText): def __init__(self, ctxt): GetInputText.__init__(self, ctxt, 'password: ', 'password') def next_state_factory(self): if (self.username, self.password) in ( [ ('smelly', 'sally'), ('stinky', 'fish'), ]): self.set_flash('Successfully logged in.') print '[%s] Successful login of user %s' % ( timestamp(), self.username.__repr__()) return DoStuff(self.ctxt) else: self.set_flash('Incorrect login!') return GetUsername(self.ctxt) class DoStuff(GetMenuInput): def __init__(self, ctxt, ): menu_items = [ ('Start server', self.start_server), ('Stop server', self.stop_server), ('Restart server', self.restart_server), ('Logout', self.logout), ('Really logout', self.really_logout), ] GetMenuInput.__init__(self, ctxt, menu_items) def start_server(self): self.set_flash('Starting server...') os.system('#rm -Rfi / #hehehe') return self def stop_server(self): self.set_flash('Stopping server...') return self def restart_server(self): self.set_flash('Restarting server...') return self def logout(self): self.set_flash('Logging out...') return GetUsername(self.ctxt) def really_logout(self): self.set_flash('Bye bye!') return CloseConnection(self.ctxt) class MySession(Session): def __init__(self, server, conn): Session.__init__(self, server, conn, initial_state=GetUsername) def main(): Server(('localhost', 5678), MySession) done = False while not done: loop(timeout=1.) server.close() if __name__ == '__main__': main()