Я унаследовал Twisted MultiService, к которому пытаюсь добавить тесты, но что бы я ни делал, я получаю ошибку DirtyReactorAggregateError. Служба подключается к серверу с именем twisted.application.internet.TCPClient
. Я думаю, что ошибка связана с тем, что TCPClient не отключается, но я не уверен, как я должен его отключить. Как правильно протестировать Twisted Service с клиентом?
Вот тестовый пример:
from labrad.node import *
from twisted.trial import unittest
import os
from socket import gethostname
class NodeTestCase(unittest.TestCase):
def setUp(self):
import labrad
name = os.environ.get('LABRADNODE', gethostname()) + '_test'
self.node = Node(name,
labrad.constants.MANAGER_HOST,
labrad.constants.MANAGER_PORT)
self.node.startService()
#self.addCleanup(self.node.stopService)
def test_nothing(self):
self.assertEqual(3, 3)
def tearDown(self):
return self.node.stopService()
Вот сам сервис Node:
class Node(MultiService):
"""Parent Service that keeps the node running.
If the manager is stopped or we lose the network connection,
this service attempts to restart it so that we will come
back online when the manager is back up.
"""
reconnectDelay = 10
def __init__(self, name, host, port):
MultiService.__init__(self)
self.name = name
self.host = host
self.port = port
def startService(self):
MultiService.startService(self)
self.startConnection()
def startConnection(self):
"""Attempt to start the node and connect to LabRAD."""
print 'Connecting to %s:%d...' % (self.host, self.port)
self.node = NodeServer(self.name, self.host, self.port)
self.node.onStartup().addErrback(self._error)
self.node.onShutdown().addCallbacks(self._disconnected, self._error)
self.cxn = TCPClient(self.host, self.port, self.node)
self.addService(self.cxn)
def stopService(self):
if hasattr(self, 'cxn'):
d = defer.maybeDeferred(self.cxn.stopService)
self.removeService(self.cxn)
del self.cxn
return defer.gatherResults([MultiService.stopService(self), d])
else:
return MultiService.stopService(self)
def _disconnected(self, data):
print 'Node disconnected from manager.'
return self._reconnect()
def _error(self, failure):
r = failure.trap(UserError)
if r == UserError:
print "UserError found!"
return None
print failure.getErrorMessage()
return self._reconnect()
def _reconnect(self):
"""Clean up from the last run and reconnect."""
## hack: manually clearing the dispatcher...
dispatcher.connections.clear()
dispatcher.senders.clear()
dispatcher._boundMethods.clear()
## end hack
if hasattr(self, 'cxn'):
self.removeService(self.cxn)
del self.cxn
reactor.callLater(self.reconnectDelay, self.startConnection)
print 'Will try to reconnect in %d seconds...' % self.reconnectDelay