util Module

Rom - the Redis object mapper for Python

Copyright 2013-2020 Josiah Carlson

Released under the LGPL license version 2.1 and version 3 (you can choose which you’d like to be bound under).

Changing connection settings

There are four ways to change the way that rom connects to Redis.

  1. Set the global default connection settings by calling rom.util.set_connection_settings() with the same arguments you would pass to the redis.Redis() constructor:

    import rom.util
    
    rom.util.set_connection_settings(host='myhost', db=7)
    
  2. Give each model its own Redis connection on creation, called _conn, which will be used whenever any Redis-related calls are made on instances of that model:

    import rom
    
    class MyModel(rom.Model):
        _conn = redis.Redis(host='myhost', db=7)
    
        attribute = rom.String()
        other_attribute = rom.String()
    
  3. Replace the CONNECTION object in rom.util:

    import rom.util
    
    rom.util.CONNECTION = redis.Redis(host='myhost', db=7)
    
  4. Monkey-patch rom.util.get_connection with a function that takes no arguments and returns a Redis connection:

    import rom.util
    
    def my_connection():
        # Note that this function doesn't use connection pooling,
        # you would want to instantiate and cache a real connection
        # as necessary.
        return redis.Redis(host='myhost', db=7)
    
    rom.util.get_connection = my_connection
    

Using a non-caching session object

If you would like to stop rom from caching your data for later session.commit() or faster data fetching (and there are several reasons for doing this), rom offers two methods to enable or disable caching on either a global or per-thread basis.

  1. To set the global default behavior as not caching anything, you can:

    import rom.util
    rom.util.use_null_session()
    

    From the point that rom.util.use_null_session() is called, no additional caching will be performed. You must explicitly .save() any newly created entities, and session.commit() will only save those objects that had been cached prior to the rom.util.use_null_session() call.

    You can switch back to the standard rom behavior by calling rom.util.use_rom_session().

  2. To override behavior on a per-thread basis, you can set the attribute null_session on the session object (which is available as rom.session, rom.columns.session, or rom.util.session), which will set the thread’s behavior to be uncached (session.null_session = True), cached (session.null_session = False), or the global default (del session.null_session).

Using a geo index

Your model must have an attribute called geo_index, and that attribute must be a list of GeoIndex() objects. Names for GeoIndex items must be unique per model.:

import rom

def cb(data):
    # ``data`` can act as an object or dictionary. This function could have
    # actually just read::
    #     return data
    # ... but we will be explicit in what we return... yes this works!
    return {'lon': data['lon'], 'lat': data.lat}

class PointOfInterest(rom.Model):
    ...
    tags = rom.String(index=True, keygen=rom.FULL_TEXT)
    avg_rating = rom.Float(index=True)
    lon = rom.Float()
    lat = rom.Float()
    geo_index = [
        # callback function passed to GeoIndex as the 2nd argument *must*
        # return a dictionary containing 'lon' and 'lat' values, as degrees
        rom.GeoIndex('geo_index', cb),
        ...
    ]


# Gets the 50 *best* restaurants, up to 25 miles away, from a given point,
# ordered by highest to lowest rating
points = PointOfInterest.query \
    .filter(tags='restaurant') \
    .near('geo_index', <lon>, <lat>, 25, 'mi') \
    .order_by('-avg_rating') \
    .limit(0, 50) \
    .all()

# geo queries can also independently have their own counts, which can offer
# performance improvements for some queries (filtering earlier rather than
# later), for slightly different semantics...

# Gets the 50 *closest* restaurants, up to 25 miles away, from a given point,
# ordered by highest to lowest rating.
points = PointOfInterest.query \
    .filter(tags='restaurant') \
    .near('geo_index', <lon>, <lat>, 25, 'mi', 50) \
    .order_by('-avg_rating') \
    .all()
rom.util.CASE_INSENSITIVE(val)

Old alias for SIMPLE_CI

class rom.util.ClassProperty(get, set=None, delete=None)

Bases: object

Borrowed from: https://gist.github.com/josiahcarlson/1561563

deleter(delete)
getter(get)
setter(set)
rom.util.EntityLock(entity, acquire_timeout, lock_timeout)

Useful when you want exclusive access to an entity across all writers.:

# example
import rom

class Document(rom.Model):
    owner = rom.ManyToOne('User', on_delete='restrict')
    ...

def change_owner(document, new_owner):
    with rom.util.EntityLock(document, 5, 90):
        document.owner = new_owner
        document.save()
rom.util.FULL_TEXT(val)

This is a basic full-text index keygen function. Words are lowercased, split by whitespace, and stripped of punctuation from both ends before an inverted index is created for term searching.

rom.util.IDENTITY(val)

This is a basic “equality” index keygen, primarily meant to be used for things like:

Model.query.filter(col='value')

Where FULL_TEXT would transform a sentence like “A Simple Sentence” into an inverted index searchable by the words “a”, “simple”, and/or “sentence”, IDENTITY will only be searchable by the original full sentence with the same capitalization - “A Simple Sentence”. See IDENTITY_CI for the same function, only case-insensitive.

rom.util.IDENTITY_CI(val)

Case-insensitive version of IDENTITY

rom.util.IDENTITY_STRING(val)

like IDENTITY, but for String columns

rom.util.IDENTITY_STRING_CI(val)

like IDENTITY_CI, but for String columns

class rom.util.Lock(conn, lockname, acquire_timeout, lock_timeout)

Bases: object

Borrowed/modified from my book, Redis in Action: https://github.com/josiahcarlson/redis-in-action/blob/master/python/ch11_listing_source.py

Useful for locking over a string key in Redis. Minimally correct for the required semantics. Mostly intended as a general building block for use by EntityLock.

acquire()
acquire_timeout
conn
identifier
lock_timeout
lockname
refresh()
release()
rom.util.SIMPLE(val)

This is a basic case-sensitive “sorted order” index keygen function for strings. This will return a value that is suitable to be used for ordering by a 7-byte prefix of a string (that is 7 characters from a byte-string, and 1.75-7 characters from a unicode string, depending on character -> encoding length).

Warning

Case sensitivity is based on the (encoded) byte prefixes of the strings/text being indexed, so ordering may be different than a native comparison ordering (especially if an order is different based on characters past the 7th encoded byte).

rom.util.SIMPLE_CI(val)

The same as SIMPLE, only case-insensitive.

class rom.util.Session(*args, **kwargs)

Bases: _local

This is a very dumb session. All it tries to do is to keep a cache of loaded entities, offering the ability to call .save() on modified (or all) entities with .flush() or .commit().

This is exposed via the session global variable, which is available when you import rom as rom.session.

Note

calling .flush() or .commit() doesn’t cause all objects to be written simultaneously. They are written one-by-one, with any error causing the call to fail.

add(obj)

Adds an entity to the session.

commit(full=False, all=False, force=False)

Call .save() on all modified entities in the session. Also forgets all known entities in the session, so this should only be called at the end of a request.

Arguments:

  • full - pass True to force save full entities, not only changes

  • all - pass True to save all entities known, not only those entities that have been modified.

  • full - pass True to force-save all entities known, ignoring DataRaceError and EntityDeletedError exceptions

delete(*objects, **kwargs)

This method offers the ability to delete multiple entities in a single round trip to Redis (assuming your models are all stored on the same server). You can call:

session.delete(obj)
session.delete(obj1, obj2, ...)
session.delete([obj1, obj2, ...])

The keyword argument force=True can be provided, which can force the deletion of an entity again, even if we believe it to already be deleted.

If force=True, we won’t re-call the object’s _before_delete() method, but we will re-call _after_delete().

Note

Objects are automatically dropped from the session after delete for the sake of cache coherency.

flush(full=False, all=False, force=False)

Call .save() on all modified entities in the session. Use when you want to flush changes to Redis, but don’t want to lose your local session cache.

See the .commit() method for arguments and their meanings.

forget(obj)

Forgets about an entity (automatically called when an entity is deleted). Call this to ensure that an entity that you’ve modified is not automatically saved on session.commit() .

get(pk)

Fetches an entity from the session based on primary key.

property null_session
refresh(*objects, **kwargs)

This method is an alternate API for refreshing many entities (possibly not tracked by the session). You can call:

session.refresh(obj)
session.refresh(obj1, obj2, ...)
session.refresh([obj1, obj2, ...])

And all provided entities will be reloaded from Redis.

To force reloading for modified entities, you can pass force=True.

refresh_all(*objects, **kwargs)

This method is an alternate API for refreshing all entities tracked by the session. You can call:

session.refresh_all()
session.refresh_all(force=True)

And all entities known by the session will be reloaded from Redis.

To force reloading for modified entities, you can pass force=True.

rollback()

Forget about all entities in the session (.commit() will do nothing).

save(*objects, **kwargs)

This method is an alternate API for saving many entities (possibly not tracked by the session). You can call:

session.save(obj)
session.save(obj1, obj2, ...)
session.save([obj1, obj2, ...])

And the entities will be flushed to Redis.

You can pass the keyword arguments full, all, and force with the same meaning and semantics as the .commit() method.

rom.util.clean_old_index(model, block_size=100, **kwargs)

This utility function will clean out old index data that was accidentally left during item deletion in rom versions <= 0.27.0 . You should run this after you have upgraded all of your clients to version 0.28.0 or later.

Arguments:

  • model - the model whose entities you want to reindex

  • block_size - the maximum number of items to check at a time defaulting to 100

This function will yield its progression through re-checking all of the data that could be left over.

Example use:

for progress, total in clean_old_index(MyModel, block_size=200):
    print "%s of %s"%(progress, total)
rom.util.clean_unsafe_cols(*models, **kwargs)
Args:
models - any model base that you want to clean the namespace of, because

you believe there are leftover unsafe column data that you have previously removed

block_size - kwarg only, number of items to fetch per SCAN call

What this does:

  1. Scans the namespace prefix for your model;
    clean_unsafe_cols(MyModelX) ->

    “MyModelX:” <- namespace

  2. For any key that looks like: “MyModelX:<number>:<attr>”

    Where attr not in {‘idx’, ‘uidx’, ‘pre’, ‘suf’, ‘geo’, ‘’}

  3. IF given one of the above keys from #2, and

    “MyModelX:<number> DOES NOT EXIST, then DELETE KEY because the base entity does not exist ELSE IGNORE

Returns:
(keys_examined,

keys_deleted)

rom.util.get_connection()

Override me for one of the ways to change the way I connect to Redis.

rom.util.refresh_all_indexes()

This utility function will renew and refresh all indexes for all entities. Useful if you’ve recently upgraded rom, or changed your index definitions and want to ensure that all of your models are properly indexed.

Will print the model namespaces and progress along the way.

rom.util.refresh_indices(model, block_size=100, scan=True)

This utility function will iterate over all entities of a provided model, refreshing their indices. This is primarily useful after adding an index on a column.

Arguments:

  • model - the model whose entities you want to reindex

  • block_size - the maximum number of entities you want to fetch from Redis at a time, defaulting to 100

This function will yield its progression through re-indexing all of your entities.

Example use:

for progress, total in refresh_indices(MyModel, block_size=200):
    print "%s of %s"%(progress, total)

Note

This uses the session object to handle index refresh via calls to .commit(). If you have any outstanding entities known in the session, they will be committed.

rom.util.set_connection_settings(*args, **kwargs)

Update the global connection settings for models that don’t have model-specific connections.

rom.util.show_progress(job)

This utility function will print the progress of a passed iterator job as started by refresh_indices() and clean_old_index().

Usage example:

class RomTest(Model):
    pass

for i in xrange(1000):
    RomTest().save()

util.show_progress(util.clean_old_index(RomTest))
rom.util.use_null_session()

If you call use_null_session(), you will change the default session for all threads to not cache. You can override the default on a per-thread basis by manipulating session.null_session (set to True, False, or delete the attribute to not cache, cache, or use the global default, respectively).

rom.util.use_rom_session()

If you call use_rom_session(), you will change the default session for all threads to cache. You can override the default on a per-thread basis by manipulating session.null_session (set to True, False, or delete the attribute to not cache, cache, or use the global default, respectively).