Source code for stetl.outputs.dboutput

# -*- coding: utf-8 -*-
#
# Output classes for ETL, databases.
#
# Author: Just van den Broecke
#
from stetl.output import Output
from stetl.util import Util
from stetl.packet import FORMAT
from stetl.postgis import PostGIS

log = Util.get_log('dboutput')


[docs]class DbOutput(Output): """ Output to any database (abstract base class). """ def __init__(self, configdict, section, consumes): Output.__init__(self, configdict, section, consumes) def write(self, packet): return packet
[docs]class PostgresDbOutput(DbOutput): """ Output to PostgreSQL database. Input is an SQL string. Output by executing input SQL string. consumes=FORMAT.string """ def __init__(self, configdict, section): DbOutput.__init__(self, configdict, section, consumes=FORMAT.string) def write(self, packet): if packet.data is None: return packet log.info('executing SQL') db = PostGIS(self.cfg.get_dict()) rowcount = db.tx_execute(packet.data) log.info('executed SQL, rowcount=%d' % rowcount) return packet
[docs]class PostgresInsertOutput(PostgresDbOutput): """ Output by inserting single record into Postgres database. Input is a record (Python dic structure) or a Python list of dicts (records). Creates an INSERT for Postgres to insert each single record. consumes=FORMAT.record """ def __init__(self, configdict, section, consumes=FORMAT.record): DbOutput.__init__(self, configdict, section, consumes=[FORMAT.record_array, FORMAT.record]) self.query = None self.db = None self.key = self.cfg.get('key') def init(self): # Connect only once to DB log.info('Init: connect to DB') self.db = PostGIS(self.cfg.get_dict()) self.db.connect() def exit(self): # Disconnect from DB when done log.info('Exit: disconnect from DB') self.db.disconnect() def create_query(self, record): # We assume that all records do the same INSERT key/values # See http://grokbase.com/t/postgresql/psycopg/12735bvkmv/insert-into-with-a-dictionary-or-generally-with-a-variable-number-of-columns # e.g. INSERT INTO lml_files ("file_name", "file_data") VALUES (%s,%s) query = "INSERT INTO %s (%s) VALUES (%s)" % (self.cfg.get('table'), ",".join(['%s' % k for k in record]), ",".join(["%s",]*len(record.keys()))) log.info('query is %s', query) return query def write(self, packet): # Deal with empty or zero-length data structures (list or dict) if packet.data is None or len(packet.data) == 0: return packet # ASSERT: record data present # record is Python dict (single record) or list of Python dict (multiple records) record = packet.data # Generate INSERT query template once first_record = record if type(record) is list and len(record) > 0: first_record = record[0] # Create query once if self.query is None: self.query = self.create_query(first_record) # Check if record is single (dict) or array (list of dict) if type(record) is dict: # Do insert with values from the single record self.db.execute(self.query, record.values()) self.db.commit(close=False) # log.info('committed record key=%s' % record[self.key]) elif type(record) is list: # Multiple records in list for rec in record: # Do insert with values from the record self.db.execute(self.query, rec.values()) self.db.commit(close=False) log.info('committed %d records' % len(record)) return packet