Lichen

Annotated lib/__builtins__/stream.py

902:dc210430ce4c
2019-05-27 Paul Boddie Updated copyright statement years.
paul@529 1
#!/usr/bin/env python
paul@529 2
paul@529 3
"""
paul@529 4
Stream objects.
paul@529 5
paul@529 6
Copyright (C) 2015, 2016, 2017 Paul Boddie <paul@boddie.org.uk>
paul@529 7
paul@529 8
This program is free software; you can redistribute it and/or modify it under
paul@529 9
the terms of the GNU General Public License as published by the Free Software
paul@529 10
Foundation; either version 3 of the License, or (at your option) any later
paul@529 11
version.
paul@529 12
paul@529 13
This program is distributed in the hope that it will be useful, but WITHOUT
paul@529 14
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
paul@529 15
FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
paul@529 16
details.
paul@529 17
paul@529 18
You should have received a copy of the GNU General Public License along with
paul@529 19
this program.  If not, see <http://www.gnu.org/licenses/>.
paul@529 20
"""
paul@529 21
paul@529 22
from __builtins__.types import check_int, check_string
paul@529 23
from native import isinstance as _isinstance, fclose, fflush, fread, fwrite
paul@529 24
paul@529 25
class filestream:
paul@529 26
paul@529 27
    "Generic file-oriented stream functionality."
paul@529 28
paul@529 29
    def __init__(self, encoding=None, bufsize=1024):
paul@529 30
paul@529 31
        "Initialise the stream with the given 'encoding' and 'bufsize'."
paul@529 32
paul@529 33
        self.encoding = encoding
paul@529 34
        self.bufsize = bufsize
paul@529 35
paul@529 36
        # Internal stream details.
paul@529 37
paul@529 38
        self.__data__ = None
paul@529 39
paul@529 40
    def _convert(self, bytes):
paul@529 41
paul@529 42
        "Convert 'bytes' to text if necessary."
paul@529 43
paul@529 44
        if self.encoding:
paul@529 45
            return unicode(bytes, self.encoding)
paul@529 46
        else:
paul@529 47
            return bytes
paul@529 48
paul@529 49
    def flush(self):
paul@529 50
paul@529 51
        "Flush the stream."
paul@529 52
paul@529 53
        fflush(self.__data__)
paul@529 54
paul@529 55
    def read(self, n=0):
paul@529 56
paul@529 57
        "Read 'n' bytes from the stream."
paul@529 58
paul@529 59
        check_int(n)
paul@529 60
paul@529 61
        # Read any indicated number of bytes.
paul@529 62
paul@529 63
        if n > 0:
paul@529 64
            s = fread(self.__data__, n)
paul@529 65
paul@529 66
        # Read all remaining bytes.
paul@529 67
paul@529 68
        else:
paul@529 69
            l = []
paul@529 70
paul@529 71
            # Read until end-of-file.
paul@529 72
paul@529 73
            try:
paul@529 74
                while True:
paul@529 75
                    self._read_data(l)
paul@529 76
paul@529 77
            # Handle end-of-file reads.
paul@529 78
paul@529 79
            except EOFError:
paul@529 80
                pass
paul@529 81
paul@529 82
            s = "".join(l)
paul@529 83
paul@529 84
        return self._convert(s)
paul@529 85
paul@529 86
    def readline(self, n=0):
paul@529 87
paul@529 88
        """
paul@529 89
        Read until an end-of-line indicator is encountered or at most 'n' bytes,
paul@529 90
        if indicated.
paul@529 91
        """
paul@529 92
paul@529 93
        check_int(n)
paul@529 94
paul@529 95
        # Read any indicated number of bytes.
paul@529 96
paul@529 97
        if n > 0:
paul@529 98
            s = fread(self.__data__, n)
paul@529 99
paul@529 100
        # Read until an end-of-line indicator.
paul@529 101
paul@529 102
        else:
paul@529 103
            l = []
paul@529 104
paul@529 105
            # Read until end-of-line or end-of-file.
paul@529 106
paul@529 107
            try:
paul@529 108
                while not self._read_until_newline(l):
paul@529 109
                    pass
paul@529 110
paul@529 111
            # Handle end-of-file reads.
paul@529 112
paul@529 113
            except EOFError:
paul@529 114
                pass
paul@529 115
paul@529 116
            s = "".join(l)
paul@529 117
paul@529 118
        return self._convert(s)
paul@529 119
paul@529 120
    def _read_data(self, l):
paul@529 121
paul@529 122
        "Read data into 'l'."
paul@529 123
paul@529 124
        l.append(fread(self.__data__, self.bufsize))
paul@529 125
paul@529 126
    def _read_until_newline(self, l):
paul@529 127
paul@529 128
        "Read data into 'l', returning whether a newline has been read."
paul@529 129
paul@529 130
        # NOTE: Only POSIX newlines are supported currently.
paul@529 131
paul@529 132
        s = fread(self.__data__, 1)
paul@529 133
        l.append(s)
paul@529 134
        return s == "\n"
paul@529 135
paul@529 136
    def readlines(self, n=None): pass
paul@529 137
paul@529 138
    def write(self, s):
paul@529 139
paul@529 140
        "Write string 's' to the stream."
paul@529 141
paul@529 142
        check_string(s)
paul@529 143
paul@529 144
        # Encode text as bytes if necessary. When the encoding is not set, any
paul@529 145
        # original encoding of the text will be applied.
paul@529 146
paul@529 147
        if _isinstance(s, utf8string):
paul@529 148
            s = s.encode(self.encoding)
paul@529 149
paul@529 150
        fwrite(self.__data__, s)
paul@529 151
paul@529 152
    def close(self):
paul@529 153
paul@529 154
        "Close the stream."
paul@529 155
paul@529 156
        fclose(self.__data__)
paul@529 157
paul@529 158
# vim: tabstop=4 expandtab shiftwidth=4