blob: 38547d783e905996924a986f7d0d4a206a79529b [file] [log] [blame]
# This module contains abstractions for the input stream. You don't have to
# looks further, there are no pretty code.
#
# We define two classes here.
#
# Mark(source, line, column)
# It's just a record and its only use is producing nice error messages.
# Parser does not use it for any other purposes.
#
# Reader(source, data)
# Reader determines the encoding of `data` and converts it to unicode.
# Reader provides the following methods and attributes:
# reader.peek(length=1) - return the next `length` characters
# reader.forward(length=1) - move the current position to `length` characters.
# reader.index - the number of the current character.
# reader.line, stream.column - the line and the column of the current character.
__all__ = ["Reader", "ReaderError"]
from .error import YAMLError, Mark
import codecs, re
class ReaderError(YAMLError):
def __init__(self, name, position, character, encoding, reason):
self.name = name
self.character = character
self.position = position
self.encoding = encoding
self.reason = reason
def __str__(self):
if isinstance(self.character, bytes):
return (
"'%s' codec can't decode byte #x%02x: %s\n"
' in "%s", position %d'
% (
self.encoding,
ord(self.character),
self.reason,
self.name,
self.position,
)
)
else:
return "unacceptable character #x%04x: %s\n" ' in "%s", position %d' % (
self.character,
self.reason,
self.name,
self.position,
)
class Reader(object):
# Reader:
# - determines the data encoding and converts it to a unicode string,
# - checks if characters are in allowed range,
# - adds '\0' to the end.
# Reader accepts
# - a `bytes` object,
# - a `str` object,
# - a file-like object with its `read` method returning `str`,
# - a file-like object with its `read` method returning `unicode`.
# Yeah, it's ugly and slow.
def __init__(self, stream):
self.name = None
self.stream = None
self.stream_pointer = 0
self.eof = True
self.buffer = ""
self.pointer = 0
self.raw_buffer = None
self.raw_decode = None
self.encoding = None
self.index = 0
self.line = 0
self.column = 0
if isinstance(stream, str):
self.name = "<unicode string>"
self.check_printable(stream)
self.buffer = stream + "\0"
elif isinstance(stream, bytes):
self.name = "<byte string>"
self.raw_buffer = stream
self.determine_encoding()
else:
self.stream = stream
self.name = getattr(stream, "name", "<file>")
self.eof = False
self.raw_buffer = None
self.determine_encoding()
def peek(self, index=0):
try:
return self.buffer[self.pointer + index]
except IndexError:
self.update(index + 1)
return self.buffer[self.pointer + index]
def prefix(self, length=1):
if self.pointer + length >= len(self.buffer):
self.update(length)
return self.buffer[self.pointer : self.pointer + length]
def forward(self, length=1):
if self.pointer + length + 1 >= len(self.buffer):
self.update(length + 1)
while length:
ch = self.buffer[self.pointer]
self.pointer += 1
self.index += 1
if ch in "\n\x85\u2028\u2029" or (
ch == "\r" and self.buffer[self.pointer] != "\n"
):
self.line += 1
self.column = 0
elif ch != "\uFEFF":
self.column += 1
length -= 1
def get_mark(self):
if self.stream is None:
return Mark(
self.name, self.index, self.line, self.column, self.buffer, self.pointer
)
else:
return Mark(self.name, self.index, self.line, self.column, None, None)
def determine_encoding(self):
while not self.eof and (self.raw_buffer is None or len(self.raw_buffer) < 2):
self.update_raw()
if isinstance(self.raw_buffer, bytes):
if self.raw_buffer.startswith(codecs.BOM_UTF16_LE):
self.raw_decode = codecs.utf_16_le_decode
self.encoding = "utf-16-le"
elif self.raw_buffer.startswith(codecs.BOM_UTF16_BE):
self.raw_decode = codecs.utf_16_be_decode
self.encoding = "utf-16-be"
else:
self.raw_decode = codecs.utf_8_decode
self.encoding = "utf-8"
self.update(1)
NON_PRINTABLE = re.compile(
"[^\x09\x0A\x0D\x20-\x7E\x85\xA0-\uD7FF\uE000-\uFFFD\U00010000-\U0010ffff]"
)
def check_printable(self, data):
match = self.NON_PRINTABLE.search(data)
if match:
character = match.group()
position = self.index + (len(self.buffer) - self.pointer) + match.start()
raise ReaderError(
self.name,
position,
ord(character),
"unicode",
"special characters are not allowed",
)
def update(self, length):
if self.raw_buffer is None:
return
self.buffer = self.buffer[self.pointer :]
self.pointer = 0
while len(self.buffer) < length:
if not self.eof:
self.update_raw()
if self.raw_decode is not None:
try:
data, converted = self.raw_decode(
self.raw_buffer, "strict", self.eof
)
except UnicodeDecodeError as exc:
character = self.raw_buffer[exc.start]
if self.stream is not None:
position = (
self.stream_pointer - len(self.raw_buffer) + exc.start
)
else:
position = exc.start
raise ReaderError(
self.name, position, character, exc.encoding, exc.reason
)
else:
data = self.raw_buffer
converted = len(data)
self.check_printable(data)
self.buffer += data
self.raw_buffer = self.raw_buffer[converted:]
if self.eof:
self.buffer += "\0"
self.raw_buffer = None
break
def update_raw(self, size=4096):
data = self.stream.read(size)
if self.raw_buffer is None:
self.raw_buffer = data
else:
self.raw_buffer += data
self.stream_pointer += len(data)
if not data:
self.eof = True