try:
from django.utils.unittest import TestCase
except ImportError:
from django.test import TestCase
try:
from django.utils import unittest
except ImportError:
import unittest
import json
import pickle
import string
import sys
import mock
from autobahn.twisted.websocket import WebSocketServerFactory
from mock import MagicMock, Mock
from twisted.conch.telnet import DO, DONT, IAC, NAWS, SB, SE, WILL
from twisted.internet.base import DelayedCall
from twisted.test import proto_helpers
from twisted.trial.unittest import TestCase as TwistedTestCase
import evennia
from evennia.server.portal import irc
from evennia.server.portal.portalsessionhandler import PortalSessionHandler
from evennia.server.portal.service import EvenniaPortalService
from evennia.utils.test_resources import BaseEvenniaTest
from .amp import (
AMP_MAXLEN,
AMPMultiConnectionProtocol,
MsgPortal2Server,
MsgServer2Portal,
)
from .amp_server import AMPServerFactory
from .mccp import MCCP
from .mssp import MSSP
from .mxp import MXP
from .naws import DEFAULT_HEIGHT, DEFAULT_WIDTH
from .suppress_ga import SUPPRESS_GA
from .telnet import TelnetProtocol, TelnetServerFactory
from .telnet_oob import MSDP, MSDP_VAL, MSDP_VAR
from .ttype import IS, TTYPE
from .webclient import WebSocketClient
[docs]
class TestAMPServer(TwistedTestCase):
"""
Test AMP communication
"""
[docs]
def setUp(self):
super().setUp()
portal = Mock()
factory = AMPServerFactory(portal)
self.proto = factory.buildProtocol(("localhost", 0))
self.transport = MagicMock() # proto_helpers.StringTransport()
self.transport.client = ["localhost"]
self.transport.write = MagicMock()
[docs]
def test_amp_out(self):
self.proto.makeConnection(self.transport)
self.proto.data_to_server(MsgServer2Portal, 1, test=2)
if pickle.HIGHEST_PROTOCOL == 5:
# Python 3.8+
byte_out = (
b"\x00\x04_ask\x00\x011\x00\x08_command\x00\x10MsgServer2Portal\x00\x0b"
b"packed_data\x00 x\xdak`\x9d*\xc8\x00\x01\xde\x8c\xb5SzXJR"
b"\x8bK\xa6x3\x15\xb7M\xd1\x03\x00VU\x07u\x00\x00"
)
elif pickle.HIGHEST_PROTOCOL == 4:
# Python 3.7
byte_out = (
b"\x00\x04_ask\x00\x011\x00\x08_command\x00\x10MsgServer2Portal\x00\x0b"
b"packed_data\x00 x\xdak`\x99*\xc8\x00\x01\xde\x8c\xb5SzXJR"
b"\x8bK\xa6x3\x15\xb7M\xd1\x03\x00V:\x07t\x00\x00"
)
self.transport.write.assert_called_with(byte_out)
with mock.patch("evennia.server.portal.amp.amp.AMP.dataReceived") as mocked_amprecv:
self.proto.dataReceived(byte_out)
mocked_amprecv.assert_called_with(byte_out)
[docs]
def test_amp_in(self):
self.proto.makeConnection(self.transport)
self.proto.data_to_server(MsgPortal2Server, 1, test=2)
if pickle.HIGHEST_PROTOCOL == 5:
# Python 3.8+
byte_out = (
b"\x00\x04_ask\x00\x011\x00\x08_command\x00\x10MsgPortal2Server\x00\x0b"
b"packed_data\x00 x\xdak`\x9d*\xc8\x00\x01\xde\x8c\xb5SzXJR"
b"\x8bK\xa6x3\x15\xb7M\xd1\x03\x00VU\x07u\x00\x00"
)
elif pickle.HIGHEST_PROTOCOL == 4:
# Python 3.7
byte_out = (
b"\x00\x04_ask\x00\x011\x00\x08_command\x00\x10MsgPortal2Server\x00\x0b"
b"packed_data\x00 x\xdak`\x99*\xc8\x00\x01\xde\x8c\xb5SzXJR"
b"\x8bK\xa6x3\x15\xb7M\xd1\x03\x00V:\x07t\x00\x00"
)
self.transport.write.assert_called_with(byte_out)
with mock.patch("evennia.server.portal.amp.amp.AMP.dataReceived") as mocked_amprecv:
self.proto.dataReceived(byte_out)
mocked_amprecv.assert_called_with(byte_out)
[docs]
def test_large_msg(self):
"""
Send message larger than AMP_MAXLEN - should be split into several
"""
self.proto.makeConnection(self.transport)
outstr = "test" * AMP_MAXLEN
self.proto.data_to_server(MsgServer2Portal, 1, test=outstr)
if pickle.HIGHEST_PROTOCOL == 5:
# Python 3.8+
self.transport.write.assert_called_with(
b"\x00\x04_ask\x00\x011\x00\x08_command\x00\x10MsgServer2Portal\x00\x0bpacked_data"
b"\x00wx\xda\xed\xc6\xc1\t\x80 \x00@Q#=5Z\x0b\xb8\x80\x13\xe85h\x80\x8e\xbam`Dc\xf4><\xf8g"
b"\x1a[\xf8\xda\x97\xa3_\xb1\x95\xdaz\xbe\xe7\x1a\xde\x03\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xe0\x1f\x1eP\x1d\x02\r\x00\rpacked_data.2"
b"\x00Zx\xda\xed\xc3\x01\r\x00\x00\x08\xc0\xa0\xb4&\xf0\xfdg\x10a\xa3"
b"\xd9RUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUU\xf5\xfb\x03m\xe0\x06"
b"\x1d\x00\rpacked_data.3\x00Zx\xda\xed\xc3\x01\r\x00\x00\x08\xc0\xa0\xb4&\xf0\xfdg\x10a"
b"\xa3fSUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUU\xf5\xfb\x03n\x1c"
b"\x06\x1e\x00\rpacked_data.4\x00Zx\xda\xed\xc3\x01\t\x00\x00\x0c\x03\xa0\xb4O\xb0\xf5gA"
b"\xae`\xda\x8b\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa"
b"\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa"
b"\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa"
b"\xaa\xaa\xaa\xdf\x0fnI\x06,\x00\rpacked_data.5\x00\x18x\xdaK-.)I\xc5\x8e\xa7\xb22@\xc0"
b"\x94\xe2\xb6)z\x00Z\x1e\x0e\xb6\x00\x00"
)
elif pickle.HIGHEST_PROTOCOL == 4:
# Python 3.7
self.transport.write.assert_called_with(
b"\x00\x04_ask\x00\x011\x00\x08_command\x00\x10MsgServer2Portal\x00\x0bpacked_data"
b"\x00wx\xda\xed\xc6\xc1\t\x80 \x00@Q#o\x8e\xd6\x02-\xe0\x04z\r\x1a\xa0\xa3m+$\xd2"
b"\x18\xbe\x0f\x0f\xfe\x1d\xdf\x14\xfe\x8e\xedjO\xac\xb9\xd4v\xf6o\x0f\xf3\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00X\xc3\x00P\x10\x02\x0c\x00\rpacked_data.2\x00Zx\xda\xed\xc3\x01\r\x00\x00\x08"
b"\xc0\xa0\xb4&\xf0\xfdg\x10a\xa3\xd9RUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUU"
b"\xf5\xfb\x03m\xe0\x06\x1d\x00\rpacked_data.3\x00Zx\xda\xed\xc3\x01\r\x00\x00\x08"
b"\xc0\xa0\xb4&\xf0\xfdg\x10a\xa3fSUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUU"
b"\xf5\xfb\x03n\x1c\x06\x1e\x00\rpacked_data.4\x00Zx\xda\xed\xc3\x01\t\x00\x00\x0c"
b"\x03\xa0\xb4O\xb0\xf5gA\xae`\xda\x8b\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa"
b"\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa"
b"\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa"
b"\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xaa\xdf\x0fnI\x06,\x00\rpacked_data.5"
b"\x00\x18x\xdaK-.)I\xc5\x8e\xa7\xb22@\xc0\x94\xe2\xb6)z\x00Z\x1e\x0e\xb6\x00\x00"
)
[docs]
class TestIRC(TestCase):
[docs]
def test_plain_ansi(self):
"""
Test that printable characters do not get mangled.
"""
irc_ansi = irc.parse_ansi_to_irc(string.printable)
ansi_irc = irc.parse_irc_to_ansi(string.printable)
self.assertEqual(irc_ansi, string.printable)
self.assertEqual(ansi_irc, string.printable)
[docs]
def test_bold(self):
s_irc = "\x02thisisatest"
s_eve = r"|hthisisatest"
self.assertEqual(irc.parse_ansi_to_irc(s_eve), s_irc)
self.assertEqual(s_eve, irc.parse_irc_to_ansi(s_irc))
[docs]
def test_italic(self):
s_irc = "\x02thisisatest"
s_eve = r"|hthisisatest"
self.assertEqual(irc.parse_ansi_to_irc(s_eve), s_irc)
[docs]
def test_colors(self):
color_map = (
("\0030", r"|w"),
("\0031", r"|X"),
("\0032", r"|B"),
("\0033", r"|G"),
("\0034", r"|r"),
("\0035", r"|R"),
("\0036", r"|M"),
("\0037", r"|Y"),
("\0038", r"|y"),
("\0039", r"|g"),
("\00310", r"|C"),
("\00311", r"|c"),
("\00312", r"|b"),
("\00313", r"|m"),
("\00314", r"|x"),
("\00315", r"|W"),
("\00399,5", r"|[r"),
("\00399,3", r"|[g"),
("\00399,7", r"|[y"),
("\00399,2", r"|[b"),
("\00399,6", r"|[m"),
("\00399,10", r"|[c"),
("\00399,15", r"|[w"),
("\00399,1", r"|[x"),
)
for m in color_map:
self.assertEqual(irc.parse_irc_to_ansi(m[0]), m[1])
self.assertEqual(m[0], irc.parse_ansi_to_irc(m[1]))
[docs]
def test_identity(self):
"""
Test that the composition of the function and
its inverse gives the correct string.
"""
s = r"|wthis|Xis|gis|Ma|C|complex|*string"
self.assertEqual(irc.parse_irc_to_ansi(irc.parse_ansi_to_irc(s)), s)
[docs]
class TestTelnet(TwistedTestCase):
[docs]
def setUp(self):
super().setUp()
self.portal = EvenniaPortalService()
evennia.EVENNIA_PORTAL_SERVICE = self.portal
self.amp_server_factory = AMPServerFactory(self.portal)
self.amp_server = self.amp_server_factory.buildProtocol("127.0.0.1")
factory = TelnetServerFactory()
factory.protocol = TelnetProtocol
evennia.PORTAL_SESSION_HANDLER = PortalSessionHandler()
factory.sessionhandler = evennia.PORTAL_SESSION_HANDLER
factory.sessionhandler.portal = Mock()
self.proto = factory.buildProtocol(("localhost", 0))
self.transport = proto_helpers.StringTransport()
self.addCleanup(factory.sessionhandler.disconnect_all)
[docs]
@mock.patch("evennia.server.portal.portalsessionhandler.reactor", new=MagicMock())
def test_command_stacking_no_type_error(self):
self.transport.client = ["localhost"]
self.transport.setTcpKeepAlive = Mock()
d = self.proto.makeConnection(self.transport)
# Mudlet sends multiple commands in one packet when command stacking
data = b"wave\r\nsay hi\r\n"
try:
self.proto.dataReceived(data)
except TypeError:
self.fail("dataReceived raised TypeError on stacked commands")
# clean up to prevent Unclean reactor
self.proto.nop_keep_alive.stop()
self.proto._handshake_delay.cancel()
return d
[docs]
@mock.patch("evennia.server.portal.portalsessionhandler.reactor", new=MagicMock())
def test_mudlet_ttype(self):
self.transport.client = ["localhost"]
self.transport.setTcpKeepAlive = Mock()
d = self.proto.makeConnection(self.transport)
# test suppress_ga
self.assertTrue(self.proto.protocol_flags["NOGOAHEAD"])
self.proto.dataReceived(IAC + DONT + SUPPRESS_GA)
self.assertFalse(self.proto.protocol_flags["NOGOAHEAD"])
self.assertEqual(self.proto.handshakes, 7)
# test naws
self.assertEqual(self.proto.protocol_flags["SCREENWIDTH"], {0: DEFAULT_WIDTH})
self.assertEqual(self.proto.protocol_flags["SCREENHEIGHT"], {0: DEFAULT_HEIGHT})
self.proto.dataReceived(IAC + WILL + NAWS)
self.proto.dataReceived(b"".join([IAC, SB, NAWS, b"", b"x", b"", b"d", IAC, SE]))
self.assertEqual(self.proto.protocol_flags["SCREENWIDTH"][0], 78)
self.assertEqual(self.proto.protocol_flags["SCREENHEIGHT"][0], 45)
self.assertEqual(self.proto.handshakes, 6)
# test ttype
self.assertFalse(self.proto.protocol_flags["TTYPE"])
self.assertTrue(self.proto.protocol_flags["ANSI"])
self.proto.dataReceived(IAC + WILL + TTYPE)
self.proto.dataReceived(b"".join([IAC, SB, TTYPE, IS, b"MUDLET", IAC, SE]))
self.assertTrue(self.proto.protocol_flags["XTERM256"])
self.assertEqual(self.proto.protocol_flags["CLIENTNAME"], "MUDLET")
self.assertTrue(self.proto.protocol_flags["FORCEDENDLINE"])
self.assertTrue(self.proto.protocol_flags["NOGOAHEAD"])
self.assertFalse(self.proto.protocol_flags["NOPROMPTGOAHEAD"])
self.proto.dataReceived(b"".join([IAC, SB, TTYPE, IS, b"XTERM", IAC, SE]))
self.proto.dataReceived(b"".join([IAC, SB, TTYPE, IS, b"MTTS 137", IAC, SE]))
self.assertEqual(self.proto.handshakes, 5)
# test mccp
self.proto.dataReceived(IAC + DONT + MCCP)
self.assertFalse(self.proto.protocol_flags["MCCP"])
self.assertEqual(self.proto.handshakes, 4)
# test mssp
self.proto.dataReceived(IAC + DONT + MSSP)
self.assertEqual(self.proto.handshakes, 3)
# test oob
self.proto.dataReceived(IAC + DO + MSDP)
self.proto.dataReceived(
b"".join([IAC, SB, MSDP, MSDP_VAR, b"LIST", MSDP_VAL, b"COMMANDS", IAC, SE])
)
self.assertTrue(self.proto.protocol_flags["OOB"])
self.assertEqual(self.proto.handshakes, 2)
# test mxp
self.proto.dataReceived(IAC + DONT + MXP)
self.assertFalse(self.proto.protocol_flags["MXP"])
self.assertEqual(self.proto.handshakes, 1)
# clean up to prevent Unclean reactor
self.proto.nop_keep_alive.stop()
self.proto._handshake_delay.cancel()
return d
[docs]
def test_mxp_parse(self):
"""
Test that mxp_parse correctly converts Evennia MXP markup to MXP escape sequences,
and leaves messages without MXP markup untouched.
"""
from evennia.server.portal.mxp import MXP_TEMPSECURE, mxp_parse
# no MXP markup - should be returned unchanged
self.assertEqual(mxp_parse("hello world"), "hello world")
# angle brackets without MXP markup - should be returned unchanged
self.assertEqual(mxp_parse("<name>"), "<name>")
# basic link substitution
result = mxp_parse("|lchelp overview|lthelp overview|le")
self.assertIn('<SEND HREF="help overview">', result)
self.assertIn("help overview", result)
self.assertIn(MXP_TEMPSECURE, result)
self.assertNotIn("|lc", result)
self.assertNotIn("|lt", result)
self.assertNotIn("|le", result)
# surrounding text should pass through unchanged
result = mxp_parse("<|lchelp eat|lthelp eat|le>")
self.assertIn("<", result)
self.assertIn(">", result)
self.assertNotIn("<", result)
self.assertNotIn(">", result)
self.assertIn('<SEND HREF="help eat">', result)
# non-MXP ampersands should pass through unchanged
result = mxp_parse("fish & chips |lchelp eat|lthelp eat|le")
self.assertIn("fish & chips", result)
self.assertNotIn("&", result)
[docs]
@mock.patch("evennia.server.portal.portalsessionhandler.reactor", new=MagicMock())
def test_naws_resize_syncs_updated_width(self):
"""
Verify that a NAWS resize packet causes sessionhandler.sync to be called
AFTER negotiate_sizes has updated SCREENWIDTH, not before.
Regression test for the ordering bug introduced in #3498.
"""
self.transport.client = ["localhost"]
self.transport.setTcpKeepAlive = Mock()
d = self.proto.makeConnection(self.transport)
self.addCleanup(self.proto.nop_keep_alive.stop)
self.addCleanup(self.proto._handshake_delay.cancel)
# Complete NAWS handshake: client says WILL NAWS -> sets AUTORESIZE=True
self.proto.dataReceived(IAC + WILL + NAWS)
self.assertTrue(self.proto.protocol_flags["AUTORESIZE"])
# Patch sync before any NAWS subneg so it never tries sessionhandler.get()
# (the session isn't reachable via get() in this test setup). Capture
# SCREENWIDTH at the moment sync is called to assert ordering.
synced_widths = []
def capturing_sync(session):
synced_widths.append(self.proto.protocol_flags["SCREENWIDTH"][0])
self.proto.sessionhandler.sync = capturing_sync
# Initial size from handshake (120 wide, 40 tall)
self.proto.dataReceived(b"".join([IAC, SB, NAWS, b"\x00\x78", b"\x00\x28", IAC, SE]))
self.assertEqual(self.proto.protocol_flags["SCREENWIDTH"][0], 120)
synced_widths.clear()
# Simulate a terminal resize to 160 wide, 50 tall
self.proto.dataReceived(b"".join([IAC, SB, NAWS, b"\x00\xa0", b"\x00\x32", IAC, SE]))
# SCREENWIDTH must be updated to 160 BEFORE sync fires
self.assertEqual(self.proto.protocol_flags["SCREENWIDTH"][0], 160)
self.assertEqual(synced_widths, [160]) # sync saw the NEW width, not 120
return d
[docs]
class TestWebSocket(BaseEvenniaTest):
[docs]
def setUp(self):
super().setUp()
self.portal = EvenniaPortalService()
evennia.EVENNIA_PORTAL_SERVICE = self.portal
self.amp_server_factory = AMPServerFactory(self.portal)
self.amp_server = self.amp_server_factory.buildProtocol("127.0.0.1")
self.proto = WebSocketClient()
self.proto.factory = WebSocketServerFactory()
evennia.PORTAL_SESSION_HANDLER = PortalSessionHandler()
self.proto.factory.sessionhandler = evennia.PORTAL_SESSION_HANDLER
self.proto.sessionhandler = evennia.PORTAL_SESSION_HANDLER
self.proto.sessionhandler.portal = Mock()
self.proto.transport = proto_helpers.StringTransport()
# self.proto.transport = proto_helpers.FakeDatagramTransport()
self.proto.transport.client = ["localhost"]
self.proto.transport.setTcpKeepAlive = Mock()
self.proto.state = MagicMock()
self.addCleanup(self.proto.factory.sessionhandler.disconnect_all)
DelayedCall.debug = True
[docs]
def tearDown(self):
super().tearDown()
[docs]
@mock.patch("evennia.server.portal.portalsessionhandler.reactor", new=MagicMock())
def test_data_in(self):
self.proto.sessionhandler.data_in = MagicMock()
self.proto.onOpen()
msg = json.dumps(["logged_in", (), {}]).encode()
self.proto.onMessage(msg, isBinary=False)
self.proto.sessionhandler.data_in.assert_called_with(self.proto, logged_in=[[], {}])
sendStr = "You can get anything you want at Alice's Restaurant."
msg = json.dumps(["text", (sendStr,), {}]).encode()
self.proto.onMessage(msg, isBinary=False)
self.proto.sessionhandler.data_in.assert_called_with(self.proto, text=[[sendStr], {}])
[docs]
@mock.patch("evennia.server.portal.portalsessionhandler.reactor", new=MagicMock())
def test_data_out(self):
self.proto.onOpen()
self.proto.sendEncoded = MagicMock()
self.proto.sessionhandler.data_out(self.proto, text=[["Excepting Alice"], {}])
self.proto.sendEncoded.assert_called_once()
call_args = self.proto.sendEncoded.call_args
data = call_args[0][0]
# EvenniaV1Format encodes as JSON TEXT frame
parsed = json.loads(data)
self.assertEqual(parsed[0], "text")
self.assertEqual(parsed[1], ["Excepting Alice"])
# Verify frame is sent as TEXT (not BINARY) — v1 uses JSON TEXT frames
args, kwargs = call_args
is_binary = kwargs.get("is_binary", args[1] if len(args) > 1 else False)
self.assertFalse(is_binary)