使用方法以下: python
twistd ftp react
這時啓動的ftp服務器的HOME目錄爲/usr/local/ftp,匿名用戶名爲anonymous,密碼爲隨便的一個郵箱地址,端口號爲2121。 django
能夠經過以下命令查看此服務器的其餘參數: 安全
twistd ftp –help 服務器
比較有用的就是–root,來指定登錄的根目錄。還有就是–port指定端口號。 app
不過要注意的是,這個服務器僅用於調試,不要應用於產品環境,由於它並無作安全性方面的工做。 this
#!/usr/bin/env python # -*- coding: utf-8 -*- """a very simple ftp server by twisted """ import sys, os sys.path.append('../../..') os.environ['DJANGO_SETTINGS_MODULE'] = 'settings' from twisted.protocols import ftp from twisted.cred import portal, checkers, credentials, error as credError from twisted.internet import reactor, defer from zope.interface import implements from twisted.python import filepath from django.contrib.auth import authenticate from django.contrib.auth.models import User class UserChecker: implements(checkers.ICredentialsChecker) credentialInterfaces = (credentials.IUsernamePassword,) def __init__(self): "passwords: a dict-like object mapping usernames to passwords" def requestAvatarId(self, credentials): user = authenticate(username=credentials.username, password=credentials.password) if user is not None: if user.is_active: return defer.succeed(credentials.username) else: return defer.fail(credError.UnauthorizedLogin("access denied")) else: return defer.fail(credError.UnauthorizedLogin("No such user")) class MyFTPRealm: implements(portal.IRealm) def __init__(self, anonymousRoot): self.anonymousRoot = filepath.FilePath(anonymousRoot) def requestAvatar(self, avatarId, mind, *interfaces): for iface in interfaces: if iface is ftp.IFTPShell: if avatarId is checkers.ANONYMOUS: avatar = ftp.FTPAnonymousShell(self.anonymousRoot) else: try: user = User.objects.get(username=avatarId) ftpdir = user.ftp.all()[0].ftpdir except: raise "沒有該用戶" avatar = ftp.FTPShell(filepath.FilePath(ftpdir.encode("utf-8"))) return ftp.IFTPShell, avatar, getattr(avatar, "logout", lambda:None) raise NotImplementedError("only IFTPShell interface is supported by this realm") class MyFtpServer(ftp.FTP): def __init__(self, *args, **kw): super(ftp.FTP, self).__init__(*args, **kw) def dataReceived(self, data): self.transport.write(data) if __name__ == "__main__": p = portal.Portal(MyFTPRealm("")) p.registerChecker(UserChecker()) factory = ftp.FTPFactory(MyFtpServer()) factory.portal = p reactor.listenTCP(2121, factory) reactor.run()