streamsx.database package¶
Database integration for IBM Streams¶
For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:
Overview¶
Provides classes and functions to run SQL statements to a database.
Credentials¶
Db2 Warehouse credentials are defined using service credentials JSON.
The mandatory JSON elements are “username”, “password” and “jdbcurl”:
{
"username": "<JDBC_USER>",
"password": "<JDBC_PASSWORD>",
"jdbcurl": "<JDBC_URL>"
}
-
class
streamsx.database.
JDBCStatement
(credentials, **options)¶ Bases:
streamsx.topology.composite.Map
Composite map transformation for JDBC statement
The statement is called once for each input tuple received. Result sets that are produced by the statement are emitted as output stream tuples.
This function includes the JDBC driver for Db2 database (‘com.ibm.db2.jcc.DB2Driver’) in the application bundle per default.
Different drivers, e.g. for other databases than Db2, can be applied with the properties
jdbc_driver_lib
andjdbc_driver_class
.There are two ways to specify the statement:
Statement is part of the input stream. You can specify which input stream attribute contains the statement with
sql_attribute
. If input stream is of typeCommonSchema.String
, then you don’t need to specify thesql_attribute
property.Statement is given with the
sql
property. The statement can contain parameter markers that are set with input stream attributes specified bysql_params
.
Example of a Streams application that inserts generated data into as rows in a table:
from streamsx.topology.topology import * from streamsx.topology.schema import StreamSchema from streamsx.topology.context import submit import streamsx.database as db import random import time # generates some data with schema (ID, NAME, AGE) def generate_data(): counter = 0 while True: #yield a random id, name and age counter = counter +1 yield {"NAME": "Name_" + str(random.randint(0,500)), "ID": counter, "AGE": random.randint(10,99)} time.sleep(0.10) topo = Topology() tuple_schema = StreamSchema("tuple<int64 ID, rstring NAME, int32 AGE>") # Generates data for a stream of three attributes. Each attribute maps to a column using the same name of the Db2 database table. sample_data = topo.source(generate_data, name="GeneratedData").map(lambda tpl: (tpl["ID"], tpl["NAME"], tpl["AGE"]), schema=tuple_schema) statement = db.JDBCStatement(credentials) statement.sql = 'INSERT INTO SAMPLE_DEMO (ID, NAME, AGE) VALUES (? , ?, ?)' statement.sql_params = 'ID, NAME, AGE' sample_data.map(statement, name='INSERT') # Use for IBM Streams including IBM Cloud Pak for Data submit ('DISTRIBUTED', topo, cfg)
Example with key value arguments for the
options
parameter:config = { 'sql': 'INSERT INTO SAMPLE_DEMO (ID, NAME, AGE) VALUES (? , ?, ?)' 'sql_params': 'ID, NAME, AGE' } inserts = sample_stream.map(db.JDBCStatement(credentials, **config))
Example with “select count” statement and defined output schema with attribute
TOTAL
having the result of the query:sample_schema = StreamSchema('tuple<int32 TOTAL, rstring string>') sql_query = 'SELECT COUNT(*) AS TOTAL FROM SAMPLE.TAB1' query = topo.source([sql_query]).as_string() res = query.map(db.JDBCStatement(credentials), schema=sample_schema)
Example with “drop table” statement and default output schema (set to input schema):
sql_drop = 'DROP TABLE RUN_SAMPLE' s = topo.source([sql_drop]).as_string() res_sql = s.map(db.JDBCStatement(credentials)) res_sql.print()
Example for using configured external connection with the name ‘Db2-Cloud’ (Cloud Pak for Data only), see Connecting to data sources:
db_external_connection = icpd_util.get_connection('Db2-Cloud',conn_class='external') res = query.map(db.JDBCStatement(db_external_connection), schema=sample_schema)
New in version 1.5.
-
credentials
¶ The credentials of the IBM cloud Db2 warehouse service as dict or configured external connection of kind “Db2 Warehouse” (Cloud Pak for Data only) as dict or the name of the application configuration.
- Type
dict|str
-
options
¶ The additional optional parameters as variable keyword arguments.
- Type
kwargs
-
property
batch_on_punct
¶ Set to true, when execute the batch on window punctuation marker.
New in version 1.6.
- Type
bool
-
property
batch_size
¶ Number of statements transmitted in a batch.
New in version 1.6.
- Type
int
-
property
commit_on_punct
¶ Set to true, when commit shall be done on window punctuation marker.
New in version 1.6.
- Type
bool
-
property
jdbc_driver_class
¶ Set the class name of the JDBC driver. The default driver is for DB2 database
com.ibm.db2.jcc.DB2Driver
.- Type
str
-
property
jdbc_driver_lib
¶ Path to the JDBC driver library file. Specify the jar filename with absolute path, containing the class given with
jdbc_driver_class
property. Per default thedb2jcc4.jar
is added to the ‘opt’ directory in the application bundle.- Type
str
-
property
keystore
¶ Path to the key store file for the SSL connection
- Type
str
-
property
keystore_password
¶ Password for the key store file given by the
keystore
property.- Type
str
-
property
keystore_type
¶ Type of the key store file (JKS, PKCS12).
- Type
str
-
property
plugin_name
¶ Name of the security plugin
- Type
str
-
populate
(topology, stream, schema, name, **options)¶ Populate the topology with this composite map transformation. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:transformed_stream = input_stream.map(myMapComposite)
- Parameters
topology – Topology containing the composite map.
stream – Stream to be transformed.
schema – Schema passed into
map
.name – Name passed into
map
.**options – Future options passed to
map
.
- Returns
Single stream representing the transformation of stream.
- Return type
Stream
-
property
security_mechanism
¶ Value of the security mechanism
- Type
int
-
property
sql
¶ String containing the SQL statement. Use this as alternative option to
sql_attribute
property.- Type
str
-
property
sql_attribute
¶ Name of the input stream attribute containing the SQL statement. Use this as alternative option to
sql
property.- Type
str
-
property
sql_params
¶ The values of SQL statement parameters. These values and SQL statement parameter markers are associated in lexicographic order. For example, the first parameter marker in the SQL statement is associated with the first sql_params value.
- Type
str
-
property
ssl_connection
¶ Set to
True
to enable SSL connection- Type
bool
-
property
transaction_size
¶ The number of tuples to commit per transaction. The default value is 1.
- Type
int
-
property
truststore
¶ Path to the trust store file for the SSL connection
- Type
str
-
property
truststore_password
¶ Password for the trust store file given by the
truststore
property.- Type
str
-
property
truststore_type
¶ Type of the trust store file (JKS, PKCS12).
- Type
str
-
property
vm_arg
¶ Arbitrary JVM arguments can be passed to the Streams operator
- Type
str
-
streamsx.database.
download_toolkit
(url=None, target_dir=None)¶ Downloads the latest JDBC toolkit from GitHub.
Example for updating the JDBC toolkit for your topology with the latest toolkit from GitHub:
import streamsx.database as db # download toolkit from GitHub jdbc_toolkit_location = db.download_toolkit() # add the toolkit to topology streamsx.spl.toolkit.add_toolkit(topology, jdbc_toolkit_location)
Example for updating the topology with a specific version of the JDBC toolkit using a URL:
import streamsx.database as db url171 = 'https://github.com/IBMStreams/streamsx.jdbc/releases/download/v1.7.1/streamsx.jdbc.toolkits-1.7.1-20190703-1017.tgz' jdbc_toolkit_location = db.download_toolkit(url=url171) streamsx.spl.toolkit.add_toolkit(topology, jdbc_toolkit_location)
- Parameters
url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.
target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is
None
a location relative to the system temporary directory is chosen.
- Returns
the location of the downloaded toolkit
- Return type
str
Note
This function requires an outgoing Internet connection
New in version 1.4.
-
streamsx.database.
configure_connection
(instance, name='database', credentials=None)¶ Configures IBM Streams for a certain connection.
Creates or updates an application configuration object containing the required properties with connection information.
Example for creating a configuration for a Streams instance with connection details:
from icpd_core import icpd_util from streamsx.rest_primitives import Instance import streamsx.database as db cfg = icpd_util.get_service_instance_details (name='your-streams-instance') cfg[context.ConfigParams.SSL_VERIFY] = False instance = Instance.of_service (cfg) app_cfg = db.configure_connection (instance, credentials = 'my_credentials_json')
In Cloud Pak for Data you can configure a connection to Db2 with Connecting to data sources Example using this configured external connection with the name ‘Db2-Cloud’ to create an application configuration for IBM Streams:
db_external_connection = icpd_util.get_connection('Db2-Cloud',conn_class='external') app_cfg = db.configure_connection (instance, credentials = db_external_connection)
- Parameters
instance (streamsx.rest_primitives.Instance) – IBM Streams instance object.
name (str) – Name of the application configuration, default name is ‘database’.
credentials (str|dict) – The service credentials, for example Db2 Warehouse service credentials.
- Returns
Name of the application configuration.
-
streamsx.database.
run_statement
(stream, credentials, schema=None, sql=None, sql_attribute=None, sql_params=None, transaction_size=1, jdbc_driver_class='com.ibm.db2.jcc.DB2Driver', jdbc_driver_lib=None, ssl_connection=None, truststore=None, truststore_password=None, keystore=None, keystore_password=None, keystore_type=None, truststore_type=None, plugin_name=None, security_mechanism=None, vm_arg=None, name=None)¶ Runs a SQL statement using DB2 client driver and JDBC database interface.
The statement is called once for each input tuple received. Result sets that are produced by the statement are emitted as output stream tuples.
This function includes the JDBC driver for DB2 database (‘com.ibm.db2.jcc.DB2Driver’) in the application bundle per default.
Different drivers, e.g. for other databases than DB2, can be applied and the parameters
jdbc_driver_class
andjdbc_driver_lib
must be specified.Supports two ways to specify the statement:
Statement is part of the input stream. You can specify which input stream attribute contains the statement with the
sql_attribute
argument. If input stream is of typeCommonSchema.String
, then you don’t need to specify thesql_attribute
argument.Statement is given with the
sql
argument. The statement can contain parameter markers that are set with input stream attributes specified bysql_params
argument.
Example with “insert” statement and values passed with input stream, where the input stream “sample_stream” is of type “sample_schema”:
import streamsx.database as db sample_schema = StreamSchema('tuple<rstring A, rstring B>') ... sql_insert = 'INSERT INTO RUN_SAMPLE (A, B) VALUES (?, ?)' inserts = db.run_statement(sample_stream, credentials=credentials, schema=sample_schema, sql=sql_insert, sql_params="A, B")
Example with “select count” statement and defined output schema with attribute
TOTAL
having the result of the query:sample_schema = StreamSchema('tuple<int32 TOTAL, rstring string>') sql_query = 'SELECT COUNT(*) AS TOTAL FROM SAMPLE.TAB1' query = topo.source([sql_query]).as_string() res = db.run_statement(query, credentials=credentials, schema=sample_schema)
Example for using configured external connection with the name ‘Db2-Cloud’ (Cloud Pak for Data only), see Connecting to data sources:
db_external_connection = icpd_util.get_connection('Db2-Cloud',conn_class='external') res = db.run_statement(query, credentials=db_external_connection, schema=sample_schema)
- Parameters
stream (streamsx.topology.topology.Stream) – Stream of tuples containing the SQL statements or SQL statement parameter values. Supports
streamsx.topology.schema.StreamSchema
(schema for a structured stream) orCommonSchema.String
as input.credentials (dict|str) – The credentials of the IBM cloud DB2 warehouse service as dict or configured external connection of kind “Db2 Warehouse” (Cloud Pak for Data only) as dict or the name of the application configuration.
schema (StreamSchema) – Schema for returned stream. Defaults to input stream schema if not set.
sql (str) – String containing the SQL statement. Use this as alternative option to
sql_attribute
parameter.sql_attribute (str) – Name of the input stream attribute containing the SQL statement. Use this as alternative option to
sql
parameter.sql_params (str) – The values of SQL statement parameters. These values and SQL statement parameter markers are associated in lexicographic order. For example, the first parameter marker in the SQL statement is associated with the first sql_params value.
transaction_size (int) – The number of tuples to commit per transaction. The default value is 1.
jdbc_driver_class (str) – The default driver is for DB2 database ‘com.ibm.db2.jcc.DB2Driver’.
jdbc_driver_lib (str) – Path to the JDBC driver library file. Specify the jar filename with absolute path, containing the class given with
jdbc_driver_class
parameter. Per default the ‘db2jcc4.jar’ is added to the ‘opt’ directory in the application bundle.ssl_connection (bool) – Set to
True
to enable SSL connection.truststore (str) – Path to the trust store file for the SSL connection.
truststore_password (str) – Password for the trust store file given by the truststore parameter.
keystore (str) – Path to the key store file for the SSL connection.
keystore_password (str) – Password for the key store file given by the keystore parameter.
keystore_type (str) – Type of the key store file (JKS, PKCS12).
truststore_type (str) – Type of the key store file (JKS, PKCS12).
plugin_name (str) – Name of the security plugin.
security_mechanism (int) – Value of the security mechanism.
vm_arg (str) – Arbitrary JVM arguments can be passed to the Streams operator.
name (str) – Sink name in the Streams context, defaults to a generated name.
- Returns
Output Stream.
- Return type
Deprecated since version 1.5.0: Use the
JDBCStatement
.