# -*- test-case-name: twisted.conch.test.test_helper -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Partial in-memory terminal emulator
@author: Jp Calderone
"""
from __future__ import print_function
import re, string
from zope.interface import implementer
from incremental import Version
from twisted.internet import defer, protocol, reactor
from twisted.python import log, _textattributes
from twisted.python.compat import iterbytes
from twisted.python.deprecate import deprecated, deprecatedModuleAttribute
from twisted.conch.insults import insults
FOREGROUND = 30
BACKGROUND = 40
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, N_COLORS = range(9)
class _FormattingState(_textattributes._FormattingStateMixin):
"""
Represents the formatting state/attributes of a single character.
Character set, intensity, underlinedness, blinkitude, video
reversal, as well as foreground and background colors made up a
character's attributes.
"""
compareAttributes = (
'charset', 'bold', 'underline', 'blink', 'reverseVideo', 'foreground',
'background', '_subtracting')
def __init__(self, charset=insults.G0, bold=False, underline=False,
blink=False, reverseVideo=False, foreground=WHITE,
background=BLACK, _subtracting=False):
self.charset = charset
self.bold = bold
self.underline = underline
self.blink = blink
self.reverseVideo = reverseVideo
self.foreground = foreground
self.background = background
self._subtracting = _subtracting
@deprecated(Version('Twisted', 13, 1, 0))
def wantOne(self, **kw):
"""
Add a character attribute to a copy of this formatting state.
@param **kw: An optional attribute name and value can be provided with
a keyword argument.
@return: A formatting state instance with the new attribute.
@see: L{DefaultFormattingState._withAttribute}.
"""
k, v = kw.popitem()
return self._withAttribute(k, v)
def toVT102(self):
# Spit out a vt102 control sequence that will set up
# all the attributes set here. Except charset.
attrs = []
if self._subtracting:
attrs.append(0)
if self.bold:
attrs.append(insults.BOLD)
if self.underline:
attrs.append(insults.UNDERLINE)
if self.blink:
attrs.append(insults.BLINK)
if self.reverseVideo:
attrs.append(insults.REVERSE_VIDEO)
if self.foreground != WHITE:
attrs.append(FOREGROUND + self.foreground)
if self.background != BLACK:
attrs.append(BACKGROUND + self.background)
if attrs:
return '\x1b[' + ';'.join(map(str, attrs)) + 'm'
return ''
CharacterAttribute = _FormattingState
deprecatedModuleAttribute(
Version('Twisted', 13, 1, 0),
'Use twisted.conch.insults.text.assembleFormattedText instead.',
'twisted.conch.insults.helper',
'CharacterAttribute')
# XXX - need to support scroll regions and scroll history
@implementer(insults.ITerminalTransport)
class TerminalBuffer(protocol.Protocol):
"""
An in-memory terminal emulator.
"""
for keyID in (b'UP_ARROW', b'DOWN_ARROW', b'RIGHT_ARROW', b'LEFT_ARROW',
b'HOME', b'INSERT', b'DELETE', b'END', b'PGUP', b'PGDN',
b'F1', b'F2', b'F3', b'F4', b'F5', b'F6', b'F7', b'F8', b'F9',
b'F10', b'F11', b'F12'):
execBytes = keyID + b" = object()"
execStr = execBytes.decode("ascii")
exec(execStr)
TAB = b'\t'
BACKSPACE = b'\x7f'
width = 80
height = 24
fill = b' '
void = object()
def getCharacter(self, x, y):
return self.lines[y][x]
def connectionMade(self):
self.reset()
def write(self, data):
"""
Add the given printable bytes to the terminal.
Line feeds in L{bytes} will be replaced with carriage return / line
feed pairs.
"""
for b in iterbytes(data.replace(b'\n', b'\r\n')):
self.insertAtCursor(b)
def _currentFormattingState(self):
return _FormattingState(self.activeCharset, **self.graphicRendition)
def insertAtCursor(self, b):
"""
Add one byte to the terminal at the cursor and make consequent state
updates.
If b is a carriage return, move the cursor to the beginning of the
current row.
If b is a line feed, move the cursor to the next row or scroll down if
the cursor is already in the last row.
Otherwise, if b is printable, put it at the cursor position (inserting
or overwriting as dictated by the current mode) and move the cursor.
"""
if b == b'\r':
self.x = 0
elif b == b'\n':
self._scrollDown()
elif b in string.printable.encode("ascii"):
if self.x >= self.width:
self.nextLine()
ch = (b, self._currentFormattingState())
if self.modes.get(insults.modes.IRM):
self.lines[self.y][self.x:self.x] = [ch]
self.lines[self.y].pop()
else:
self.lines[self.y][self.x] = ch
self.x += 1
def _emptyLine(self, width):
return [(self.void, self._currentFormattingState())
for i in range(width)]
def _scrollDown(self):
self.y += 1
if self.y >= self.height:
self.y -= 1
del self.lines[0]
self.lines.append(self._emptyLine(self.width))
def _scrollUp(self):
self.y -= 1
if self.y < 0:
self.y = 0
del self.lines[-1]
self.lines.insert(0, self._emptyLine(self.width))
def cursorUp(self, n=1):
self.y = max(0, self.y - n)
def cursorDown(self, n=1):
self.y = min(self.height - 1, self.y + n)
def cursorBackward(self, n=1):
self.x = max(0, self.x - n)
def cursorForward(self, n=1):
self.x = min(self.width, self.x + n)
def cursorPosition(self, column, line):
self.x = column
self.y = line
def cursorHome(self):
self.x = self.home.x
self.y = self.home.y
def index(self):
self._scrollDown()
def reverseIndex(self):
self._scrollUp()
def nextLine(self):
"""
Update the cursor position attributes and scroll down if appropriate.
"""
self.x = 0
self._scrollDown()
def saveCursor(self):
self._savedCursor = (self.x, self.y)
def restoreCursor(self):
self.x, self.y = self._savedCursor
del self._savedCursor
def setModes(self, modes):
for m in modes:
self.modes[m] = True
def resetModes(self, modes):
for m in modes:
try:
del self.modes[m]
except KeyError:
pass
def setPrivateModes(self, modes):
"""
Enable the given modes.
Track which modes have been enabled so that the implementations of
other L{insults.ITerminalTransport} methods can be properly implemented
to respect these settings.
@see: L{resetPrivateModes}
@see: L{insults.ITerminalTransport.setPrivateModes}
"""
for m in modes:
self.privateModes[m] = True
def resetPrivateModes(self, modes):
"""
Disable the given modes.
@see: L{setPrivateModes}
@see: L{insults.ITerminalTransport.resetPrivateModes}
"""
for m in modes:
try:
del self.privateModes[m]
except KeyError:
pass
def applicationKeypadMode(self):
self.keypadMode = 'app'
def numericKeypadMode(self):
self.keypadMode = 'num'
def selectCharacterSet(self, charSet, which):
self.charsets[which] = charSet
def shiftIn(self):
self.activeCharset = insults.G0
def shiftOut(self):
self.activeCharset = insults.G1
def singleShift2(self):
oldActiveCharset = self.activeCharset
self.activeCharset = insults.G2
f = self.insertAtCursor
def insertAtCursor(b):
f(b)
del self.insertAtCursor
self.activeCharset = oldActiveCharset
self.insertAtCursor = insertAtCursor
def singleShift3(self):
oldActiveCharset = self.activeCharset
self.activeCharset = insults.G3
f = self.insertAtCursor
def insertAtCursor(b):
f(b)
del self.insertAtCursor
self.activeCharset = oldActiveCharset
self.insertAtCursor = insertAtCursor
def selectGraphicRendition(self, *attributes):
for a in attributes:
if a == insults.NORMAL:
self.graphicRendition = {
'bold': False,
'underline': False,
'blink': False,
'reverseVideo': False,
'foreground': WHITE,
'background': BLACK}
elif a == insults.BOLD:
self.graphicRendition['bold'] = True
elif a == insults.UNDERLINE:
self.graphicRendition['underline'] = True
elif a == insults.BLINK:
self.graphicRendition['blink'] = True
elif a == insults.REVERSE_VIDEO:
self.graphicRendition['reverseVideo'] = True
else:
try:
v = int(a)
except ValueError:
log.msg("Unknown graphic rendition attribute: " + repr(a))
else:
if FOREGROUND <= v <= FOREGROUND + N_COLORS:
self.graphicRendition['foreground'] = v - FOREGROUND
elif BACKGROUND <= v <= BACKGROUND + N_COLORS:
self.graphicRendition['background'] = v - BACKGROUND
else:
log.msg("Unknown graphic rendition attribute: " + repr(a))
def eraseLine(self):
self.lines[self.y] = self._emptyLine(self.width)
def eraseToLineEnd(self):
width = self.width - self.x
self.lines[self.y][self.x:] = self._emptyLine(width)
def eraseToLineBeginning(self):
self.lines[self.y][:self.x + 1] = self._emptyLine(self.x + 1)
def eraseDisplay(self):
self.lines = [self._emptyLine(self.width) for i in range(self.height)]
def eraseToDisplayEnd(self):
self.eraseToLineEnd()
height = self.height - self.y - 1
self.lines[self.y + 1:] = [self._emptyLine(self.width) for i in range(height)]
def eraseToDisplayBeginning(self):
self.eraseToLineBeginning()
self.lines[:self.y] = [self._emptyLine(self.width) for i in range(self.y)]
def deleteCharacter(self, n=1):
del self.lines[self.y][self.x:self.x+n]
self.lines[self.y].extend(self._emptyLine(min(self.width - self.x, n)))
def insertLine(self, n=1):
self.lines[self.y:self.y] = [self._emptyLine(self.width) for i in range(n)]
del self.lines[self.height:]
def deleteLine(self, n=1):
del self.lines[self.y:self.y+n]
self.lines.extend([self._emptyLine(self.width) for i in range(n)])
def reportCursorPosition(self):
return (self.x, self.y)
def reset(self):
self.home = insults.Vector(0, 0)
self.x = self.y = 0
self.modes = {}
self.privateModes = {}
self.setPrivateModes([insults.privateModes.AUTO_WRAP,
insults.privateModes.CURSOR_MODE])
self.numericKeypad = 'app'
self.activeCharset = insults.G0
self.graphicRendition = {
'bold': False,
'underline': False,
'blink': False,
'reverseVideo': False,
'foreground': WHITE,
'background': BLACK}
self.charsets = {
insults.G0: insults.CS_US,
insults.G1: insults.CS_US,
insults.G2: insults.CS_ALTERNATE,
insults.G3: insults.CS_ALTERNATE_SPECIAL}
self.eraseDisplay()
def unhandledControlSequence(self, buf):
print('Could not handle', repr(buf))
def __bytes__(self):
lines = []
for L in self.lines:
buf = []
length = 0
for (ch, attr) in L:
if ch is not self.void:
buf.append(ch)
length = len(buf)
else:
buf.append(self.fill)
lines.append(b''.join(buf[:length]))
return b'\n'.join(lines)
class ExpectationTimeout(Exception):
pass
class ExpectableBuffer(TerminalBuffer):
_mark = 0
def connectionMade(self):
TerminalBuffer.connectionMade(self)
self._expecting = []
def write(self, data):
TerminalBuffer.write(self, data)
self._checkExpected()
def cursorHome(self):
TerminalBuffer.cursorHome(self)
self._mark = 0
def _timeoutExpected(self, d):
d.errback(ExpectationTimeout())
self._checkExpected()
def _checkExpected(self):
s = self.__bytes__()[self._mark:]
while self._expecting:
expr, timer, deferred = self._expecting[0]
if timer and not timer.active():
del self._expecting[0]
continue
for match in expr.finditer(s):
if timer:
timer.cancel()
del self._expecting[0]
self._mark += match.end()
s = s[match.end():]
deferred.callback(match)
break
else:
return
def expect(self, expression, timeout=None, scheduler=reactor):
d = defer.Deferred()
timer = None
if timeout:
timer = scheduler.callLater(timeout, self._timeoutExpected, d)
self._expecting.append((re.compile(expression), timer, d))
self._checkExpected()
return d
__all__ = [
'CharacterAttribute', 'TerminalBuffer', 'ExpectableBuffer']