streamsx.database package

Database integration for IBM Streams

For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:

Overview

Provides classes and functions to run SQL statements to a database.

Credentials

Db2 Warehouse credentials are defined using service credentials JSON.

The mandatory JSON elements are “username”, “password” and “jdbcurl”:

{
    "username": "<JDBC_USER>",
    "password": "<JDBC_PASSWORD>",
    "jdbcurl":  "<JDBC_URL>"
}
class streamsx.database.JDBCStatement(credentials, **options)

Bases: streamsx.topology.composite.Map

Composite map transformation for JDBC statement

The statement is called once for each input tuple received. Result sets that are produced by the statement are emitted as output stream tuples.

This function includes the JDBC driver for Db2 database (‘com.ibm.db2.jcc.DB2Driver’) in the application bundle per default.

Different drivers, e.g. for other databases than Db2, can be applied with the properties jdbc_driver_lib and jdbc_driver_class.

There are two ways to specify the statement:

  • Statement is part of the input stream. You can specify which input stream attribute contains the statement with sql_attribute. If input stream is of type CommonSchema.String, then you don’t need to specify the sql_attribute property.
  • Statement is given with the sql property. The statement can contain parameter markers that are set with input stream attributes specified by sql_params.

Example of a Streams application that inserts generated data into as rows in a table:

from streamsx.topology.topology import *
from streamsx.topology.schema import StreamSchema
from streamsx.topology.context import submit
import streamsx.database as db
import random
import time

# generates some data with schema (ID, NAME, AGE)
def generate_data():
    counter = 0
    while True:
        #yield a random id, name and age
        counter = counter +1
        yield  {"NAME": "Name_" + str(random.randint(0,500)), "ID": counter, "AGE": random.randint(10,99)}
        time.sleep(0.10)

topo = Topology()

tuple_schema = StreamSchema("tuple<int64 ID, rstring NAME, int32 AGE>")
# Generates data for a stream of three attributes. Each attribute maps to a column using the same name of the Db2 database table.
sample_data = topo.source(generate_data, name="GeneratedData").map(lambda tpl: (tpl["ID"], tpl["NAME"], tpl["AGE"]), schema=tuple_schema)

statement = db.JDBCStatement(credentials)
statement.sql = 'INSERT INTO SAMPLE_DEMO (ID, NAME, AGE) VALUES (? , ?, ?)'
statement.sql_params = 'ID, NAME, AGE'

sample_data.map(statement, name='INSERT')

# Use for IBM Streams including IBM Cloud Pak for Data
submit ('DISTRIBUTED', topo, cfg)

Example with key value arguments for the options parameter:

config = {
    'sql': 'INSERT INTO SAMPLE_DEMO (ID, NAME, AGE) VALUES (? , ?, ?)'
    'sql_params': 'ID, NAME, AGE'
}
inserts = sample_stream.map(db.JDBCStatement(credentials, **config))

Example with “select count” statement and defined output schema with attribute TOTAL having the result of the query:

sample_schema = StreamSchema('tuple<int32 TOTAL, rstring string>')
sql_query = 'SELECT COUNT(*) AS TOTAL FROM SAMPLE.TAB1'
query = topo.source([sql_query]).as_string()
res = query.map(db.JDBCStatement(credentials), schema=sample_schema)

Example with “drop table” statement and default output schema (set to input schema):

sql_drop = 'DROP TABLE RUN_SAMPLE'
s = topo.source([sql_drop]).as_string()
res_sql = s.map(db.JDBCStatement(credentials))
res_sql.print()

Example for using configured external connection with the name ‘Db2-Cloud’ (Cloud Pak for Data only), see Connecting to data sources:

db_external_connection = icpd_util.get_connection('Db2-Cloud',conn_class='external')
res = query.map(db.JDBCStatement(db_external_connection), schema=sample_schema)

New in version 1.5.

credentials

The credentials of the IBM cloud Db2 warehouse service as dict or configured external connection of kind “Db2 Warehouse” (Cloud Pak for Data only) as dict or the name of the application configuration.

Type:dict|str
options

The additional optional parameters as variable keyword arguments.

Type:kwargs
jdbc_driver_class

Set the class name of the JDBC driver. The default driver is for DB2 database com.ibm.db2.jcc.DB2Driver.

Type:str
jdbc_driver_lib

Path to the JDBC driver library file. Specify the jar filename with absolute path, containing the class given with jdbc_driver_class property. Per default the db2jcc4.jar is added to the ‘opt’ directory in the application bundle.

Type:str
keystore

Path to the key store file for the SSL connection

Type:str
keystore_password

Password for the key store file given by the keystore property.

Type:str
keystore_type

Type of the key store file (JKS, PKCS12).

Type:str
plugin_name

Name of the security plugin

Type:str
populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation.

Parameters:
  • topology – Topology containing the composite map.
  • stream – Stream to be transformed.
  • schema – Schema passed into map.
  • name – Name passed into map.
  • **options – Future options passed to map.
Returns:

Single stream representing the transformation of stream.

Return type:

Stream

security_mechanism

Value of the security mechanism

Type:int
sql

String containing the SQL statement. Use this as alternative option to sql_attribute property.

Type:str
sql_attribute

Name of the input stream attribute containing the SQL statement. Use this as alternative option to sql property.

Type:str
sql_params

The values of SQL statement parameters. These values and SQL statement parameter markers are associated in lexicographic order. For example, the first parameter marker in the SQL statement is associated with the first sql_params value.

Type:str
ssl_connection

Set to True to enable SSL connection

Type:bool
transaction_size

The number of tuples to commit per transaction. The default value is 1.

Type:int
truststore

Path to the trust store file for the SSL connection

Type:str
truststore_password

Password for the trust store file given by the truststore property.

Type:str
truststore_type

Type of the trust store file (JKS, PKCS12).

Type:str
vm_arg

Arbitrary JVM arguments can be passed to the Streams operator

Type:str
streamsx.database.download_toolkit(url=None, target_dir=None)

Downloads the latest JDBC toolkit from GitHub.

Example for updating the JDBC toolkit for your topology with the latest toolkit from GitHub:

import streamsx.database as db
# download toolkit from GitHub
jdbc_toolkit_location = db.download_toolkit()
# add the toolkit to topology
streamsx.spl.toolkit.add_toolkit(topology, jdbc_toolkit_location)

Example for updating the topology with a specific version of the JDBC toolkit using a URL:

import streamsx.database as db
url171 = 'https://github.com/IBMStreams/streamsx.jdbc/releases/download/v1.7.1/streamsx.jdbc.toolkits-1.7.1-20190703-1017.tgz'
jdbc_toolkit_location = db.download_toolkit(url=url171)
streamsx.spl.toolkit.add_toolkit(topology, jdbc_toolkit_location)
Parameters:
  • url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.
  • target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is None a location relative to the system temporary directory is chosen.
Returns:

the location of the downloaded toolkit

Return type:

str

Note

This function requires an outgoing Internet connection

New in version 1.4.

streamsx.database.configure_connection(instance, name='database', credentials=None)

Configures IBM Streams for a certain connection.

Creates or updates an application configuration object containing the required properties with connection information.

Example for creating a configuration for a Streams instance with connection details:

from streamsx.rest import Instance
import streamsx.topology.context
from icpd_core import icpd_util

cfg = icpd_util.get_service_instance_details (name='your-streams-instance')
cfg[context.ConfigParams.SSL_VERIFY] = False
instance = Instance.of_service (cfg)
app_cfg = configure_connection (instance, credentials = 'my_credentials_json')

In Cloud Pak for Data you can configure a connection to Db2 with Connecting to data sources Example using this configured external connection with the name ‘Db2-Cloud’ to create an application configuration for IBM Streams:

db_external_connection = icpd_util.get_connection('Db2-Cloud',conn_class='external')
app_cfg = configure_connection (instance, credentials = db_external_connection)
Parameters:
  • instance (streamsx.rest_primitives.Instance) – IBM Streams instance object.
  • name (str) – Name of the application configuration, default name is ‘database’.
  • credentials (str|dict) – The service credentials, for example Db2 Warehouse service credentials.
Returns:

Name of the application configuration.

streamsx.database.run_statement(stream, credentials, schema=None, sql=None, sql_attribute=None, sql_params=None, transaction_size=1, jdbc_driver_class='com.ibm.db2.jcc.DB2Driver', jdbc_driver_lib=None, ssl_connection=None, truststore=None, truststore_password=None, keystore=None, keystore_password=None, keystore_type=None, truststore_type=None, plugin_name=None, security_mechanism=None, vm_arg=None, name=None)

Runs a SQL statement using DB2 client driver and JDBC database interface.

The statement is called once for each input tuple received. Result sets that are produced by the statement are emitted as output stream tuples.

This function includes the JDBC driver for DB2 database (‘com.ibm.db2.jcc.DB2Driver’) in the application bundle per default.

Different drivers, e.g. for other databases than DB2, can be applied and the parameters jdbc_driver_class and jdbc_driver_lib must be specified.

Supports two ways to specify the statement:

  • Statement is part of the input stream. You can specify which input stream attribute contains the statement with the sql_attribute argument. If input stream is of type CommonSchema.String, then you don’t need to specify the sql_attribute argument.
  • Statement is given with the sql argument. The statement can contain parameter markers that are set with input stream attributes specified by sql_params argument.

Example with “insert” statement and values passed with input stream, where the input stream “sample_stream” is of type “sample_schema”:

import streamsx.database as db

sample_schema = StreamSchema('tuple<rstring A, rstring B>')
...
sql_insert = 'INSERT INTO RUN_SAMPLE (A, B) VALUES (?, ?)'
inserts = db.run_statement(sample_stream, credentials=credentials, schema=sample_schema, sql=sql_insert, sql_params="A, B")

Example with “select count” statement and defined output schema with attribute TOTAL having the result of the query:

sample_schema = StreamSchema('tuple<int32 TOTAL, rstring string>')
sql_query = 'SELECT COUNT(*) AS TOTAL FROM SAMPLE.TAB1'
query = topo.source([sql_query]).as_string()
res = db.run_statement(query, credentials=credentials, schema=sample_schema)

Example for using configured external connection with the name ‘Db2-Cloud’ (Cloud Pak for Data only), see Connecting to data sources:

db_external_connection = icpd_util.get_connection('Db2-Cloud',conn_class='external')
res = db.run_statement(query, credentials=db_external_connection, schema=sample_schema)
Parameters:
  • stream (streamsx.topology.topology.Stream) – Stream of tuples containing the SQL statements or SQL statement parameter values. Supports streamsx.topology.schema.StreamSchema (schema for a structured stream) or CommonSchema.String as input.
  • credentials (dict|str) – The credentials of the IBM cloud DB2 warehouse service as dict or configured external connection of kind “Db2 Warehouse” (Cloud Pak for Data only) as dict or the name of the application configuration.
  • schema (StreamSchema) – Schema for returned stream. Defaults to input stream schema if not set.
  • sql (str) – String containing the SQL statement. Use this as alternative option to sql_attribute parameter.
  • sql_attribute (str) – Name of the input stream attribute containing the SQL statement. Use this as alternative option to sql parameter.
  • sql_params (str) – The values of SQL statement parameters. These values and SQL statement parameter markers are associated in lexicographic order. For example, the first parameter marker in the SQL statement is associated with the first sql_params value.
  • transaction_size (int) – The number of tuples to commit per transaction. The default value is 1.
  • jdbc_driver_class (str) – The default driver is for DB2 database ‘com.ibm.db2.jcc.DB2Driver’.
  • jdbc_driver_lib (str) – Path to the JDBC driver library file. Specify the jar filename with absolute path, containing the class given with jdbc_driver_class parameter. Per default the ‘db2jcc4.jar’ is added to the ‘opt’ directory in the application bundle.
  • ssl_connection (bool) – Set to True to enable SSL connection.
  • truststore (str) – Path to the trust store file for the SSL connection.
  • truststore_password (str) – Password for the trust store file given by the truststore parameter.
  • keystore (str) – Path to the key store file for the SSL connection.
  • keystore_password (str) – Password for the key store file given by the keystore parameter.
  • keystore_type (str) – Type of the key store file (JKS, PKCS12).
  • truststore_type (str) – Type of the key store file (JKS, PKCS12).
  • plugin_name (str) – Name of the security plugin.
  • security_mechanism (int) – Value of the security mechanism.
  • vm_arg (str) – Arbitrary JVM arguments can be passed to the Streams operator.
  • name (str) – Sink name in the Streams context, defaults to a generated name.
Returns:

Output Stream.

Return type:

streamsx.topology.topology.Stream

Deprecated since version 1.5.0: Use the JDBCStatement.

Indices and tables