from metasqlobject.boundattributes import BoundAttribute from sqlapi import sql from sqlapi import exceptions from classreg import class_registry import events import main import cPickle as pickle class Col(BoundAttribute): _standard_sql_type = None # These are overrides that can be used for force the exact SQL # type: sql_type = None sql_constraints = None not_null = False auto_increment = False primary_key = False get = None set = None _db_name = None def __init__(self, **kw): BoundAttribute.__init__(self, **kw) def bind_to_class(self, bound_class, name): self.sqlexpr = sql.Column( sql.Table(bound_class.sqlmeta.table), self.db_name) bound_class.sqlmeta.add_column(self) # @@: Warn about dbName def db_name__get(self): return self._db_name or self.name def db_name__set(self, value): self._db_name = value db_name = property(db_name__get, db_name__set) def extra_repr(self): if self._db_name: return ' db_name=%r' % self.db_name else: return '' def sql_type_parameters(self): cur = {} if self.not_null: cur['not_null'] = True if self.auto_increment: cur['auto_increment'] = True if self.primary_key: cur['primary_key'] = True return cur def standard_sql_type(self, ops): return self._standard_sql_type def __set__(self, obj, value): if self.set is not None: self.set(obj, value, self.raw_set) else: self.raw_set(obj, value) def raw_set(self, obj, value): from_python = self.from_python to_python = self.to_python if from_python is not Col.from_python: db_value = from_python(value, obj) else: db_value = value if to_python is not Col.to_python: value = to_python(db_value, obj) obj.sqlmeta._db_values[self.name] = db_value obj.sqlmeta._python_values[self.name] = value obj.sqlmeta._dirty_columns.append(self.name) if not obj.sqlmeta.lazy: obj.sqlmeta.write_updates() def __get__(self, obj, type=None): if obj is None: return self.sqlexpr if self.get is not None: return self.get(obj, self.raw_get) else: return self.raw_get(obj) def raw_get(self, obj): return obj.sqlmeta._python_values[self.name] def to_python(self, value, state=None): return value def from_python(self, value, state=None): return value def create_sql(self, conn, name=None, **ops): if name is None: name = self.db_name def_ops = self.sql_type_parameters() def_ops.update(ops) ops = def_ops return sql.ColumnDefinition( name, self.sql_type or self.standard_sql_type(ops), **ops), [] def __sql_components__(self, dbname): return self.sqlexpr.__sql_components__(dbname) class StringCol(Col): _standard_sql_type = 'TEXT' length = None def sql_type_parameters(self): cur = Col.sql_type_parameters(self) if self.length is not None: cur['length'] = self.length return cur def standard_sql_type(self, ops): if 'length' in ops: return 'VARCHAR(%i)' % ops['length'] else: return 'TEXT' class IntCol(Col): _standard_sql_type = 'INT' # @@: Warn: KeyCol = IntCol class FloatCol(Col): _standard_sql_type = 'FLOAT' class DateCol(Col): _standard_sql_type = 'DATE' class DateTimeCol(Col): _standard_sql_type = 'TIMESTAMP' class TimeCol(Col): _standard_sql_type = 'TIME' class BoolCol(Col): _standard_sql_type = 'BOOLEAN' def from_python(self, value, state=None): return bool(value) class UnicodeCol(StringCol): # @@: Also support dbEncoding encoding = 'UTF-8' def to_python(self, value, state=None): if value is None: return None elif isinstance(value, unicode): return value return value.decode(self.encoding) def from_python(self, value, state=None): if value is None: return None elif isinstance(value, str): # @@: error? return value return value.encode(self.encoding) # @@: Also support BLOBCol class BlobCol(Col): _standard_sql_type = 'BLOB' def to_python(self, value, state=None): if value is None: return None # Is this too sloppy? if hasattr(value, 'tostring'): return value.tostring() return str(value) #if isinstance(value, str): # ## sqlite: # return plugin.decode_blob(value) #elif isinstance(value, (buffer_type, _binaryType)): # if isinstance(value, array_type): # # MySQL: # return value.tostring() # return str(value) raise validators.Invalid( 'expected a string in the BlobCol %s; got %s %r instead' % (self.name, type(value), value)) def from_python(self, value, state=None): if value is None: return None conn = state.sqlmeta.get_connection() value = conn.plugin.module.Binary(value) return value class PickleCol(BlobCol): def to_python(self, value, state=None): value = BlobCol.to_python(self, value, state) return pickle.loads(value) def from_python(self, value, state=None): value = pickle.dumps(value) return BlobCol.from_python(self, value, state) # @@: Warn: class ForeignKey(BoundAttribute): cascade = None foreign_class = None def __init__(self, foreign_class=None, **args): if foreign_class is not None: args['foreign_class'] = foreign_class BoundAttribute.__init__(self, **args) for hidden_attr in ('_copyfrom', 'name', 'bound_class'): if hidden_attr in args: del args[hidden_attr] if self.name: if not getattr(self, 'column_name', None): self.column_name = self.name + '_id' if getattr(self, 'column_args', None): self.column_args.update(args) else: self.column_args = args else: self.column_args = args def create_column(self): return IntCol(soclass=self.bound_class, name=self.column_name, **self.column_args) def create_reference(self): return Reference(self.foreign_class, self.column_name, soclass=self.bound_class, name=self.name, cascade=self.cascade) def bind_to_class(self, bound_class, name): ref = self.create_reference() col = self.create_column() setattr(bound_class, col.name, col) col.__addtoclass__(bound_class, col.name) setattr(bound_class, ref.name, ref) ref.__addtoclass__(bound_class, ref.name) class Reference(BoundAttribute): get = None set = None cascade = None def __init__(self, foreign_class=None, column_name=None, **args): BoundAttribute.__init__(self, **args) if foreign_class is not None: self.foreign_class = foreign_class if column_name is not None: self.column_name = column_name assert self.cascade in (None, True, False, 'null') if self.cascade is not None and self.bound_class is not None: class_registry.register_callback( self.set_cascade, self.foreign_class, self.bound_class.__module__) def bind_to_class(self, bound_class, name): self.relative_module = self.bound_class.__module__ def extra_repr(self): return '%s->%s' % ( self.bound_class, self.foreign_class or '(not set)') def set_cascade(self, joined_class): joined_class.event_hub.listen( events.RowDestroySignal, self, 'joined_destroyed') joined_class.event_hub.listen( events.DropTableSignal, self, 'joined_drop_table') def joined_destroyed(self, instance, post_funcs): current = self.bound_class.select_by( **{self.column_name: instance.id}) current = list(current) if (self.cascade is not None and not self.cascade and current): raise exceptions.IntegrityError( "Cannot delete %r; instances depend on it: %r" % (instance, current)) def do_cascade(soclass, del_id): for item in current: if self.cascade == 'null': setattr(item, self.column_name, None) else: item.destroy_self() post_funcs.append(do_cascade) def joined_drop_table(self, soclass, connection, cascade, post_funcs): if cascade and self.bound_class.sqlmeta.table_exists(connection): self.bound_class.sqlmeta.drop_table(cascade=cascade) def __get__(self, obj, type=None): if obj is None: return SQLForeignKeyExpr(self) if self.get is not None: return self.get(obj, self.raw_get) else: return self.raw_get(obj) def raw_get(self, obj): try: key = obj.sqlmeta._python_values[self.column_name] except KeyError: raise AttributeError( "Cannot retrieve .%s until .%s is set" % (self.name, self.column_name)) if key is None: return None return class_registry.get_class( self.foreign_class, self.relative_module).get(key) def __set__(self, obj, value): if self.set is not None: self.set(obj, value, self.raw_set) else: self.raw_set(obj, value) def raw_set(self, obj, value): if isinstance(value, (int, long, str)): raise ValueError( "You can only set a reference to an object with an " ".id attribute, not the actual id (got: %r)" % value) if value is not None: key = value.id else: key = None setattr(obj, self.column_name, key) class Compound(BoundAttribute): def __init__(self, *columns, **kw): BoundAttribute.__init__(self, **kw) actual_columns = [] for name in columns: if isinstance(name, (list, tuple)): actual_columns.extend(name) else: actual_columns.append(name) self.columns = tuple(actual_columns) for name, value in kw.items(): setattr(self, name, value) def __get__(self, obj, type=None): if obj is None: return sql.CompoundExpression( *[getattr(type, name) for name in self.columns]) return tuple( [getattr(obj, name) for name in self.columns]) def __set__(self, obj, value): if len(value) != len(self.columns): raise ValueError( "You must pass a tuple of length %i to %r (you gave: " "%r)" % ( len(self.columns), self, value)) obj.set(**dict(zip(self.columns, value))) class SQLForeignKeyExpr(object): def __init__(self, reference): self.reference = reference def __repr__(self): return '<%s %s>' % ( self.__class__.__name__, sql.sqlrepr(self)) def __sql_components__(self, dbsql): return [getattr(self.reference.soclass, self.reference.column_name)] def __eq__(self, other): if other is None: return sql.SQLPostfixOp(self, 'IS NULL') elif isinstance(other, main.SQLObject): return sql.SQLOp(self, '=', other.id) else: return sql.SQLOp(self, '=', other) def __ne__(self, other): if other is None: return sql.SQLPostfixOp(self, 'IS NOT NULL') else: return sql.SQLOp(self, '<>', other.id) # @@: Should these be available? def __and__(self, other): return SQLOp(self, 'AND', other) def __rand__(self, other): return SQLOp(other, 'AND', self) def __or__(self, other): return SQLOp(self, 'OR', other) def __ror__(self, other): return SQLOp(other, 'OR', self) def __invert__(self): return SQLUnaryOp('NOT', self) __all__ = ['ForeignKey'] for name, value in globals().items(): if (isinstance(value, type) and issubclass(value, Col)): __all__.append(name)