404

[ Avaa Bypassed ]




Upload:

Command:

botdev@3.147.78.134: ~ $
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.logger._json}.
"""

from io import StringIO, BytesIO

from zope.interface.verify import verifyObject, BrokenMethodImplementation

from twisted.python.compat import unicode

from twisted.trial.unittest import TestCase

from twisted.python.failure import Failure

from .._observer import ILogObserver, LogPublisher
from .._format import formatEvent
from .._levels import LogLevel
from .._flatten import extractField
from .._global import globalLogPublisher
from .._json import (
    eventAsJSON, eventFromJSON, jsonFileLogObserver, eventsFromJSONLogFile,
    log as jsonLog
)
from .._logger import Logger


def savedJSONInvariants(testCase, savedJSON):
    """
    Assert a few things about the result of L{eventAsJSON}, then return it.

    @param testCase: The L{TestCase} with which to perform the assertions.
    @type testCase: L{TestCase}

    @param savedJSON: The result of L{eventAsJSON}.
    @type savedJSON: L{unicode} (we hope)

    @return: C{savedJSON}
    @rtype: L{unicode}

    @raise AssertionError: If any of the preconditions fail.
    """
    testCase.assertIsInstance(savedJSON, unicode)
    testCase.assertEqual(savedJSON.count("\n"), 0)
    return savedJSON



class SaveLoadTests(TestCase):
    """
    Tests for loading and saving log events.
    """

    def savedEventJSON(self, event):
        """
        Serialize some an events, assert some things about it, and return the
        JSON.

        @param event: An event.
        @type event: L{dict}

        @return: JSON.
        """
        return savedJSONInvariants(self, eventAsJSON(event))


    def test_simpleSaveLoad(self):
        """
        Saving and loading an empty dictionary results in an empty dictionary.
        """
        self.assertEqual(eventFromJSON(self.savedEventJSON({})), {})


    def test_saveLoad(self):
        """
        Saving and loading a dictionary with some simple values in it results
        in those same simple values in the output; according to JSON's rules,
        though, all dictionary keys must be L{unicode} and any non-L{unicode}
        keys will be converted.
        """
        self.assertEqual(
            eventFromJSON(self.savedEventJSON({1: 2, u"3": u"4"})),
            {u"1": 2, u"3": u"4"}
        )


    def test_saveUnPersistable(self):
        """
        Saving and loading an object which cannot be represented in JSON will
        result in a placeholder.
        """
        self.assertEqual(
            eventFromJSON(self.savedEventJSON({u"1": 2, u"3": object()})),
            {u"1": 2, u"3": {u"unpersistable": True}}
        )


    def test_saveNonASCII(self):
        """
        Non-ASCII keys and values can be saved and loaded.
        """
        self.assertEqual(
            eventFromJSON(self.savedEventJSON(
                {u"\u1234": u"\u4321", u"3": object()}
            )),
            {u"\u1234": u"\u4321", u"3": {u"unpersistable": True}}
        )


    def test_saveBytes(self):
        """
        Any L{bytes} objects will be saved as if they are latin-1 so they can
        be faithfully re-loaded.
        """
        def asbytes(x):
            if bytes is str:
                return b"".join(map(chr, x))
            else:
                return bytes(x)

        inputEvent = {"hello": asbytes(range(255))}
        if bytes is not str:
            # On Python 3, bytes keys will be skipped by the JSON encoder. Not
            # much we can do about that.  Let's make sure that we don't get an
            # error, though.
            inputEvent.update({b"skipped": "okay"})
        self.assertEqual(
            eventFromJSON(self.savedEventJSON(inputEvent)),
            {u"hello": asbytes(range(255)).decode("charmap")}
        )


    def test_saveUnPersistableThenFormat(self):
        """
        Saving and loading an object which cannot be represented in JSON, but
        has a string representation which I{can} be saved as JSON, will result
        in the same string formatting; any extractable fields will retain their
        data types.
        """
        class Reprable(object):
            def __init__(self, value):
                self.value = value

            def __repr__(self):
                return("reprable")

        inputEvent = {
            "log_format": "{object} {object.value}",
            "object": Reprable(7)
        }
        outputEvent = eventFromJSON(self.savedEventJSON(inputEvent))
        self.assertEqual(formatEvent(outputEvent), "reprable 7")


    def test_extractingFieldsPostLoad(self):
        """
        L{extractField} can extract fields from an object that's been saved and
        loaded from JSON.
        """
        class Obj(object):
            def __init__(self):
                self.value = 345

        inputEvent = dict(log_format="{object.value}", object=Obj())
        loadedEvent = eventFromJSON(self.savedEventJSON(inputEvent))
        self.assertEqual(extractField("object.value", loadedEvent), 345)

        # The behavior of extractField is consistent between pre-persistence
        # and post-persistence events, although looking up the key directly
        # won't be:
        self.assertRaises(KeyError, extractField, "object", loadedEvent)
        self.assertRaises(KeyError, extractField, "object", inputEvent)


    def test_failureStructurePreserved(self):
        """
        Round-tripping a failure through L{eventAsJSON} preserves its class and
        structure.
        """
        events = []
        log = Logger(observer=events.append)
        try:
            1 / 0
        except ZeroDivisionError:
            f = Failure()
            log.failure("a message about failure", f)
        import sys
        if sys.exc_info()[0] is not None:
            # Make sure we don't get the same Failure by accident.
            sys.exc_clear()
        self.assertEqual(len(events), 1)
        loaded = eventFromJSON(self.savedEventJSON(events[0]))['log_failure']
        self.assertIsInstance(loaded, Failure)
        self.assertTrue(loaded.check(ZeroDivisionError))
        self.assertIsInstance(loaded.getTraceback(), str)


    def test_saveLoadLevel(self):
        """
        It's important that the C{log_level} key remain a
        L{constantly.NamedConstant} object.
        """
        inputEvent = dict(log_level=LogLevel.warn)
        loadedEvent = eventFromJSON(self.savedEventJSON(inputEvent))
        self.assertIs(loadedEvent["log_level"], LogLevel.warn)


    def test_saveLoadUnknownLevel(self):
        """
        If a saved bit of JSON (let's say, from a future version of Twisted)
        were to persist a different log_level, it will resolve as None.
        """
        loadedEvent = eventFromJSON(
            '{"log_level": {"name": "other", '
            '"__class_uuid__": "02E59486-F24D-46AD-8224-3ACDF2A5732A"}}'
        )
        self.assertEqual(loadedEvent, dict(log_level=None))



class FileLogObserverTests(TestCase):
    """
    Tests for L{jsonFileLogObserver}.
    """

    def test_interface(self):
        """
        A L{FileLogObserver} returned by L{jsonFileLogObserver} is an
        L{ILogObserver}.
        """
        with StringIO() as fileHandle:
            observer = jsonFileLogObserver(fileHandle)
            try:
                verifyObject(ILogObserver, observer)
            except BrokenMethodImplementation as e:
                self.fail(e)


    def assertObserverWritesJSON(self, **kwargs):
        """
        Asserts that an observer created by L{jsonFileLogObserver} with the
        given arguments writes events serialized as JSON text, using the given
        record separator.

        @param recordSeparator: A record separator.
        @type recordSeparator: L{unicode}

        @param kwargs: Keyword arguments to pass to L{jsonFileLogObserver}.
        @type kwargs: L{dict}
        """
        recordSeparator = kwargs.get("recordSeparator", u"\x1e")

        with StringIO() as fileHandle:
            observer = jsonFileLogObserver(fileHandle, **kwargs)
            event = dict(x=1)
            observer(event)
            self.assertEqual(
                fileHandle.getvalue(),
                u'{0}{{"x": 1}}\n'.format(recordSeparator)
            )


    def test_observeWritesDefaultRecordSeparator(self):
        """
        A L{FileLogObserver} created by L{jsonFileLogObserver} writes events
        serialzed as JSON text to a file when it observes events.
        By default, the record separator is C{u"\\x1e"}.
        """
        self.assertObserverWritesJSON()


    def test_observeWritesEmptyRecordSeparator(self):
        """
        A L{FileLogObserver} created by L{jsonFileLogObserver} writes events
        serialzed as JSON text to a file when it observes events.
        This test sets the record separator to C{u""}.
        """
        self.assertObserverWritesJSON(recordSeparator=u"")


    def test_failureFormatting(self):
        """
        A L{FileLogObserver} created by L{jsonFileLogObserver} writes failures
        serialized as JSON text to a file when it observes events.
        """
        io = StringIO()
        publisher = LogPublisher()
        logged = []
        publisher.addObserver(logged.append)
        publisher.addObserver(jsonFileLogObserver(io))
        logger = Logger(observer=publisher)
        try:
            1 / 0
        except:
            logger.failure("failed as expected")
        reader = StringIO(io.getvalue())
        deserialized = list(eventsFromJSONLogFile(reader))
        def checkEvents(logEvents):
            self.assertEqual(len(logEvents), 1)
            [failureEvent] = logEvents
            self.assertIn("log_failure", failureEvent)
            failureObject = failureEvent["log_failure"]
            self.assertIsInstance(failureObject, Failure)
            tracebackObject = failureObject.getTracebackObject()
            self.assertEqual(
                tracebackObject.tb_frame.f_code.co_filename.rstrip("co"),
                __file__.rstrip("co")
            )
        checkEvents(logged)
        checkEvents(deserialized)



class LogFileReaderTests(TestCase):
    """
    Tests for L{eventsFromJSONLogFile}.
    """

    def setUp(self):
        self.errorEvents = []

        def observer(event):
            if (
                event["log_namespace"] == jsonLog.namespace and
                "record" in event
            ):
                self.errorEvents.append(event)

        self.logObserver = observer

        globalLogPublisher.addObserver(observer)


    def tearDown(self):
        globalLogPublisher.removeObserver(self.logObserver)


    def _readEvents(self, fileHandle, **kwargs):
        """
        Test that L{eventsFromJSONLogFile} reads two pre-defined events from a
        file: C{{u"x": 1}} and C{{u"y": 2}}.

        @param fileHandle: The file to read from.

        @param kwargs: Keyword arguments to pass to L{eventsFromJSONLogFile}.
        """
        events = eventsFromJSONLogFile(fileHandle, **kwargs)

        self.assertEqual(next(events), {u"x": 1})
        self.assertEqual(next(events), {u"y": 2})
        self.assertRaises(StopIteration, next, events)  # No more events


    def test_readEventsAutoWithRecordSeparator(self):
        """
        L{eventsFromJSONLogFile} reads events from a file and automatically
        detects use of C{u"\\x1e"} as the record separator.
        """
        with StringIO(
            u'\x1e{"x": 1}\n'
            u'\x1e{"y": 2}\n'
        ) as fileHandle:
            self._readEvents(fileHandle)
            self.assertEqual(len(self.errorEvents), 0)


    def test_readEventsAutoEmptyRecordSeparator(self):
        """
        L{eventsFromJSONLogFile} reads events from a file and automatically
        detects use of C{u""} as the record separator.
        """
        with StringIO(
            u'{"x": 1}\n'
            u'{"y": 2}\n'
        ) as fileHandle:
            self._readEvents(fileHandle)
            self.assertEqual(len(self.errorEvents), 0)


    def test_readEventsExplicitRecordSeparator(self):
        """
        L{eventsFromJSONLogFile} reads events from a file and is told to use
        a specific record separator.
        """
        # Use u"\x08" (backspace)... because that seems weird enough.
        with StringIO(
            u'\x08{"x": 1}\n'
            u'\x08{"y": 2}\n'
        ) as fileHandle:
            self._readEvents(fileHandle, recordSeparator=u"\x08")
            self.assertEqual(len(self.errorEvents), 0)


    def test_readEventsPartialBuffer(self):
        """
        L{eventsFromJSONLogFile} handles buffering a partial event.
        """
        with StringIO(
            u'\x1e{"x": 1}\n'
            u'\x1e{"y": 2}\n'
        ) as fileHandle:
            # Use a buffer size smaller than the event text.
            self._readEvents(fileHandle, bufferSize=1)
            self.assertEqual(len(self.errorEvents), 0)


    def test_readTruncated(self):
        """
        If the JSON text for a record is truncated, skip it.
        """
        with StringIO(
            u'\x1e{"x": 1'
            u'\x1e{"y": 2}\n'
        ) as fileHandle:
            events = eventsFromJSONLogFile(fileHandle)

            self.assertEqual(next(events), {u"y": 2})
            self.assertRaises(StopIteration, next, events)  # No more events

            # We should have logged the lost record
            self.assertEqual(len(self.errorEvents), 1)
            self.assertEqual(
                self.errorEvents[0]["log_format"],
                u"Unable to read truncated JSON record: {record!r}"
            )
            self.assertEqual(self.errorEvents[0]["record"], b'{"x": 1')


    def test_readUnicode(self):
        """
        If the file being read from vends L{unicode}, strings decode from JSON
        as-is.
        """
        # The Euro currency sign is u"\u20ac"
        with StringIO(u'\x1e{"currency": "\u20ac"}\n') as fileHandle:
            events = eventsFromJSONLogFile(fileHandle)

            self.assertEqual(next(events), {u"currency": u"\u20ac"})
            self.assertRaises(StopIteration, next, events)  # No more events
            self.assertEqual(len(self.errorEvents), 0)


    def test_readUTF8Bytes(self):
        """
        If the file being read from vends L{bytes}, strings decode from JSON as
        UTF-8.
        """
        # The Euro currency sign is b"\xe2\x82\xac" in UTF-8
        with BytesIO(b'\x1e{"currency": "\xe2\x82\xac"}\n') as fileHandle:
            events = eventsFromJSONLogFile(fileHandle)

            # The Euro currency sign is u"\u20ac"
            self.assertEqual(next(events), {u"currency": u"\u20ac"})
            self.assertRaises(StopIteration, next, events)  # No more events
            self.assertEqual(len(self.errorEvents), 0)


    def test_readTruncatedUTF8Bytes(self):
        """
        If the JSON text for a record is truncated in the middle of a two-byte
        Unicode codepoint, we don't want to see a codec exception and the
        stream is read properly when the additional data arrives.
        """
        # The Euro currency sign is u"\u20ac" and encodes in UTF-8 as three
        # bytes: b"\xe2\x82\xac".
        with BytesIO(b'\x1e{"x": "\xe2\x82\xac"}\n') as fileHandle:
            events = eventsFromJSONLogFile(fileHandle, bufferSize=8)

            self.assertEqual(next(events), {u"x": u"\u20ac"})  # Got unicode
            self.assertRaises(StopIteration, next, events)  # No more events
            self.assertEqual(len(self.errorEvents), 0)


    def test_readInvalidUTF8Bytes(self):
        """
        If the JSON text for a record contains invalid UTF-8 text, ignore that
        record.
        """
        # The string b"\xe2\xac" is bogus
        with BytesIO(
            b'\x1e{"x": "\xe2\xac"}\n'
            b'\x1e{"y": 2}\n'
        ) as fileHandle:
            events = eventsFromJSONLogFile(fileHandle)

            self.assertEqual(next(events), {u"y": 2})
            self.assertRaises(StopIteration, next, events)  # No more events

            # We should have logged the lost record
            self.assertEqual(len(self.errorEvents), 1)
            self.assertEqual(
                self.errorEvents[0]["log_format"],
                u"Unable to decode UTF-8 for JSON record: {record!r}"
            )
            self.assertEqual(
                self.errorEvents[0]["record"],
                b'{"x": "\xe2\xac"}\n'
            )


    def test_readInvalidJSON(self):
        """
        If the JSON text for a record is invalid, skip it.
        """
        with StringIO(
            u'\x1e{"x": }\n'
            u'\x1e{"y": 2}\n'
        ) as fileHandle:
            events = eventsFromJSONLogFile(fileHandle)

            self.assertEqual(next(events), {u"y": 2})
            self.assertRaises(StopIteration, next, events)  # No more events

            # We should have logged the lost record
            self.assertEqual(len(self.errorEvents), 1)
            self.assertEqual(
                self.errorEvents[0]["log_format"],
                u"Unable to read JSON record: {record!r}"
            )
            self.assertEqual(self.errorEvents[0]["record"], b'{"x": }\n')


    def test_readUnseparated(self):
        """
        Multiple events without a record separator are skipped.
        """
        with StringIO(
            u'\x1e{"x": 1}\n'
            u'{"y": 2}\n'
        ) as fileHandle:
            events = eventsFromJSONLogFile(fileHandle)

            self.assertRaises(StopIteration, next, events)  # No more events

            # We should have logged the lost record
            self.assertEqual(len(self.errorEvents), 1)
            self.assertEqual(
                self.errorEvents[0]["log_format"],
                u"Unable to read JSON record: {record!r}"
            )
            self.assertEqual(
                self.errorEvents[0]["record"],
                b'{"x": 1}\n{"y": 2}\n'
            )


    def test_roundTrip(self):
        """
        Data written by a L{FileLogObserver} returned by L{jsonFileLogObserver}
        and read by L{eventsFromJSONLogFile} is reconstructed properly.
        """
        event = dict(x=1)

        with StringIO() as fileHandle:
            observer = jsonFileLogObserver(fileHandle)
            observer(event)

            fileHandle.seek(0)
            events = eventsFromJSONLogFile(fileHandle)

            self.assertEqual(tuple(events), (event,))
            self.assertEqual(len(self.errorEvents), 0)

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 161 B 0644
test_buffer.py File 1.6 KB 0644
test_file.py File 5.49 KB 0644
test_filter.py File 11.73 KB 0644
test_flatten.py File 8.92 KB 0644
test_format.py File 12.37 KB 0644
test_global.py File 11.23 KB 0644
test_io.py File 7.04 KB 0644
test_json.py File 18.15 KB 0644
test_legacy.py File 14.06 KB 0644
test_levels.py File 875 B 0644
test_logger.py File 7.18 KB 0644
test_observer.py File 5.13 KB 0644
test_stdlib.py File 8.44 KB 0644
test_util.py File 2.61 KB 0644