1 #!/usr/bin/env python 2 3 """ 4 Data representation functions. 5 6 Copyright (C) 2009, 2010, 2011 Paul Boddie <paul@boddie.org.uk> 7 8 This program is free software; you can redistribute it and/or modify it under 9 the terms of the GNU General Public License as published by the Free Software 10 Foundation; either version 3 of the License, or (at your option) any later 11 version. 12 13 This program is distributed in the hope that it will be useful, but WITHOUT ANY 14 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 15 PARTICULAR PURPOSE. See the GNU General Public License for more details. 16 17 You should have received a copy of the GNU General Public License along 18 with this program. If not, see <http://www.gnu.org/licenses/>. 19 """ 20 21 from array import array 22 import operator 23 24 # High-level representations. 25 26 def convert_sequence(values, op, last_from_old): 27 if values: 28 new_values = list(values) 29 last = new_values[0] 30 i = 1 31 length = len(new_values) 32 while i < length: 33 current = new_values[i] 34 new_values[i] = op(current, last) 35 36 # Subtracting entries requires the old value to be used. 37 # Adding entries requires the new value. 38 39 if last_from_old: 40 last = current 41 else: 42 last = new_values[i] 43 44 i += 1 45 46 return new_values 47 else: 48 return values 49 50 def op_seq_monotonic(x, y, op): 51 return tuple([op(a, b) for a, b in zip(x, y)]) 52 53 def add_seq_monotonic(x, y): 54 return op_seq_monotonic(x, y, operator.add) 55 56 def sub_seq_monotonic(x, y): 57 return op_seq_monotonic(x, y, operator.sub) 58 59 def add_seq(x, y): 60 length = min(len(x), len(y)) 61 seq = list(x)[:length] 62 i = 0 63 while i < length: 64 if x[i] != 0: 65 seq[i] = x[i] + y[i] 66 break 67 seq[i] = y[i] 68 i += 1 69 return tuple(seq) 70 71 def sub_seq(x, y): 72 length = min(len(x), len(y)) 73 seq = list(x)[:length] 74 i = 0 75 while i < length: 76 replacement = x[i] - y[i] 77 if replacement != 0: 78 seq[i] = replacement 79 break 80 seq[i] = 0 81 i += 1 82 return tuple(seq) 83 84 def is_sequence(value): 85 return isinstance(value, (list, tuple)) 86 87 def sizeof(value): 88 return is_sequence(value) and len(value) or 0 89 90 def get_monotonic_adder(size): 91 return size and add_seq_monotonic or operator.add 92 93 def get_monotonic_subtractor(size): 94 return size and sub_seq_monotonic or operator.sub 95 96 def get_adder(size): 97 return size and add_seq or operator.add 98 99 def get_subtractor(size): 100 return size and sub_seq or operator.sub 101 102 # Low-level representations. 103 # Variable-length integer functions. 104 105 vint_cache = [] 106 vint_bytes_cache = [] 107 108 def vint(number): 109 110 "Write 'number' as a variable-length integer." 111 112 try: 113 return vint_cache[number] 114 except IndexError: 115 if number >= 0: 116 bytes = array('B') 117 _vint_to_array(number, bytes) 118 return bytes.tostring() 119 120 # Negative numbers are not supported. 121 122 else: 123 raise ValueError, "Number %r is negative." % number 124 125 def vint_to_array(number, bytes): 126 127 "Write 'number' as a variable-length integer to 'bytes'." 128 129 try: 130 bytes += vint_bytes_cache[number] 131 except IndexError: 132 if number >= 0: 133 _vint_to_array(number, bytes) 134 135 # Negative numbers are not supported. 136 137 else: 138 raise ValueError, "Number %r is negative." % number 139 140 def _vint_to_array(number, bytes): 141 142 "Write the 'number' to 'bytes' from least to most significant digits." 143 144 while number > 127: 145 bytes.append(number & 127 | 128) 146 number = number >> 7 147 else: 148 bytes.append(number) 149 150 def vint_from_array(bytes): 151 152 "Read a variable-length integer from 'bytes', returning a number." 153 154 number = 0 155 while bytes: 156 number <<= 7 157 number += bytes.pop() & 127 158 return number 159 160 def vint_from_array_start(bytes, start): 161 162 """ 163 Read a variable-length integer from 'bytes', starting at 'start', and 164 returning a tuple containing a number and the first position after the 165 number. 166 """ 167 168 length = len(bytes) 169 if start == length: 170 raise EOFError 171 172 number = 0 173 digit = 0 174 while start < length: 175 x = bytes[start] 176 number += (x & 127) << digit 177 digit += 7 178 start += 1 179 if not (x & 128): 180 break 181 return number, start 182 183 # Sequence serialisation. 184 185 def sequence_to_array(value, size, bytes): 186 187 "Write the given sequence 'value' with the given 'size' to 'bytes'." 188 189 if size: 190 i = 0 191 limit = min(len(value), size) 192 while i < limit: 193 vint_to_array(value[i], bytes) 194 i += 1 195 while i < size: 196 vint_to_array(0, bytes) 197 else: 198 vint_to_array(value, bytes) 199 200 def sequence_from_array(bytes, size, start=0): 201 202 """ 203 Read a sequence from 'bytes' having the given 'size', returning the sequence 204 and the first position after the sequence. 205 """ 206 207 if size: 208 j = 0 209 value = [] 210 while j < size: 211 v, start = vint_from_array_start(bytes, start) 212 value.append(v) 213 j += 1 214 return tuple(value), start 215 else: 216 return vint_from_array_start(bytes, start) 217 218 # Variable-length integer cache initialisation. 219 220 for i in xrange(0, 65536): 221 bytes = array('B') 222 _vint_to_array(i, bytes) 223 vint_bytes_cache.append(bytes) 224 vint_cache.append(bytes.tostring()) 225 226 # vim: tabstop=4 expandtab shiftwidth=4