streamsx.database package¶
Database integration for IBM Streams¶
For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:
Overview¶
Provides classes and functions to run SQL statements to a database.
Credentials¶
Db2 Warehouse credentials are defined using service credentials JSON.
The mandatory JSON elements are “username”, “password” and “jdbcurl”:
{
"username": "<JDBC_USER>",
"password": "<JDBC_PASSWORD>",
"jdbcurl": "<JDBC_URL>"
}
-
class
streamsx.database.
JDBCStatement
(credentials, **options)¶ Bases:
streamsx.topology.composite.Map
Composite map transformation for JDBC statement
The statement is called once for each input tuple received. Result sets that are produced by the statement are emitted as output stream tuples.
This function includes the JDBC driver for Db2 database (‘com.ibm.db2.jcc.DB2Driver’) in the application bundle per default.
Different drivers, e.g. for other databases than Db2, can be applied with the properties
jdbc_driver_lib
andjdbc_driver_class
.There are two ways to specify the statement:
- Statement is part of the input stream. You can specify which input stream attribute contains the statement with
sql_attribute
. If input stream is of typeCommonSchema.String
, then you don’t need to specify thesql_attribute
property. - Statement is given with the
sql
property. The statement can contain parameter markers that are set with input stream attributes specified bysql_params
.
Example of a Streams application that inserts generated data into as rows in a table:
from streamsx.topology.topology import * from streamsx.topology.schema import StreamSchema from streamsx.topology.context import submit import streamsx.database as db import random import time # generates some data with schema (ID, NAME, AGE) def generate_data(): counter = 0 while True: #yield a random id, name and age counter = counter +1 yield {"NAME": "Name_" + str(random.randint(0,500)), "ID": counter, "AGE": random.randint(10,99)} time.sleep(0.10) topo = Topology() tuple_schema = StreamSchema("tuple<int64 ID, rstring NAME, int32 AGE>") # Generates data for a stream of three attributes. Each attribute maps to a column using the same name of the Db2 database table. sample_data = topo.source(generate_data, name="GeneratedData").map(lambda tpl: (tpl["ID"], tpl["NAME"], tpl["AGE"]), schema=tuple_schema) statement = db.JDBCStatement(credentials) statement.sql = 'INSERT INTO SAMPLE_DEMO (ID, NAME, AGE) VALUES (? , ?, ?)' statement.sql_params = 'ID, NAME, AGE' sample_data.map(statement, name='INSERT') # Use for IBM Streams including IBM Cloud Pak for Data submit ('DISTRIBUTED', topo, cfg)
Example with key value arguments for the
options
parameter:config = { 'sql': 'INSERT INTO SAMPLE_DEMO (ID, NAME, AGE) VALUES (? , ?, ?)' 'sql_params': 'ID, NAME, AGE' } inserts = sample_stream.map(db.JDBCStatement(credentials, **config))
Example with “select count” statement and defined output schema with attribute
TOTAL
having the result of the query:sample_schema = StreamSchema('tuple<int32 TOTAL, rstring string>') sql_query = 'SELECT COUNT(*) AS TOTAL FROM SAMPLE.TAB1' query = topo.source([sql_query]).as_string() res = query.map(db.JDBCStatement(credentials), schema=sample_schema)
Example with “drop table” statement and default output schema (set to input schema):
sql_drop = 'DROP TABLE RUN_SAMPLE' s = topo.source([sql_drop]).as_string() res_sql = s.map(db.JDBCStatement(credentials)) res_sql.print()
Example for using configured external connection with the name ‘Db2-Cloud’ (Cloud Pak for Data only), see Connecting to data sources:
db_external_connection = icpd_util.get_connection('Db2-Cloud',conn_class='external') res = query.map(db.JDBCStatement(db_external_connection), schema=sample_schema)
New in version 1.5.
-
credentials
¶ The credentials of the IBM cloud Db2 warehouse service as dict or configured external connection of kind “Db2 Warehouse” (Cloud Pak for Data only) as dict or the name of the application configuration.
Type: dict|str
-
options
¶ The additional optional parameters as variable keyword arguments.
Type: kwargs
-
jdbc_driver_class
¶ Set the class name of the JDBC driver. The default driver is for DB2 database
com.ibm.db2.jcc.DB2Driver
.Type: str
-
jdbc_driver_lib
¶ Path to the JDBC driver library file. Specify the jar filename with absolute path, containing the class given with
jdbc_driver_class
property. Per default thedb2jcc4.jar
is added to the ‘opt’ directory in the application bundle.Type: str
-
keystore
¶ Path to the key store file for the SSL connection
Type: str
-
keystore_type
¶ Type of the key store file (JKS, PKCS12).
Type: str
-
plugin_name
¶ Name of the security plugin
Type: str
-
populate
(topology, stream, schema, name, **options)¶ Populate the topology with this composite map transformation.
Parameters: - topology – Topology containing the composite map.
- stream – Stream to be transformed.
- schema – Schema passed into
map
. - name – Name passed into
map
. - **options – Future options passed to
map
.
Returns: Single stream representing the transformation of stream.
Return type: Stream
-
security_mechanism
¶ Value of the security mechanism
Type: int
-
sql
¶ String containing the SQL statement. Use this as alternative option to
sql_attribute
property.Type: str
-
sql_attribute
¶ Name of the input stream attribute containing the SQL statement. Use this as alternative option to
sql
property.Type: str
-
sql_params
¶ The values of SQL statement parameters. These values and SQL statement parameter markers are associated in lexicographic order. For example, the first parameter marker in the SQL statement is associated with the first sql_params value.
Type: str
-
ssl_connection
¶ Set to
True
to enable SSL connectionType: bool
-
transaction_size
¶ The number of tuples to commit per transaction. The default value is 1.
Type: int
-
truststore
¶ Path to the trust store file for the SSL connection
Type: str
-
truststore_password
¶ Password for the trust store file given by the
truststore
property.Type: str
-
truststore_type
¶ Type of the trust store file (JKS, PKCS12).
Type: str
-
vm_arg
¶ Arbitrary JVM arguments can be passed to the Streams operator
Type: str
- Statement is part of the input stream. You can specify which input stream attribute contains the statement with
-
streamsx.database.
download_toolkit
(url=None, target_dir=None)¶ Downloads the latest JDBC toolkit from GitHub.
Example for updating the JDBC toolkit for your topology with the latest toolkit from GitHub:
import streamsx.database as db # download toolkit from GitHub jdbc_toolkit_location = db.download_toolkit() # add the toolkit to topology streamsx.spl.toolkit.add_toolkit(topology, jdbc_toolkit_location)
Example for updating the topology with a specific version of the JDBC toolkit using a URL:
import streamsx.database as db url171 = 'https://github.com/IBMStreams/streamsx.jdbc/releases/download/v1.7.1/streamsx.jdbc.toolkits-1.7.1-20190703-1017.tgz' jdbc_toolkit_location = db.download_toolkit(url=url171) streamsx.spl.toolkit.add_toolkit(topology, jdbc_toolkit_location)
Parameters: - url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.
- target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given,
the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems.
If target_dir is
None
a location relative to the system temporary directory is chosen.
Returns: the location of the downloaded toolkit
Return type: str
Note
This function requires an outgoing Internet connection
New in version 1.4.
-
streamsx.database.
configure_connection
(instance, name='database', credentials=None)¶ Configures IBM Streams for a certain connection.
Creates or updates an application configuration object containing the required properties with connection information.
Example for creating a configuration for a Streams instance with connection details:
from streamsx.rest import Instance import streamsx.topology.context from icpd_core import icpd_util cfg = icpd_util.get_service_instance_details (name='your-streams-instance') cfg[context.ConfigParams.SSL_VERIFY] = False instance = Instance.of_service (cfg) app_cfg = configure_connection (instance, credentials = 'my_credentials_json')
In Cloud Pak for Data you can configure a connection to Db2 with Connecting to data sources Example using this configured external connection with the name ‘Db2-Cloud’ to create an application configuration for IBM Streams:
db_external_connection = icpd_util.get_connection('Db2-Cloud',conn_class='external') app_cfg = configure_connection (instance, credentials = db_external_connection)
Parameters: - instance (streamsx.rest_primitives.Instance) – IBM Streams instance object.
- name (str) – Name of the application configuration, default name is ‘database’.
- credentials (str|dict) – The service credentials, for example Db2 Warehouse service credentials.
Returns: Name of the application configuration.
-
streamsx.database.
run_statement
(stream, credentials, schema=None, sql=None, sql_attribute=None, sql_params=None, transaction_size=1, jdbc_driver_class='com.ibm.db2.jcc.DB2Driver', jdbc_driver_lib=None, ssl_connection=None, truststore=None, truststore_password=None, keystore=None, keystore_password=None, keystore_type=None, truststore_type=None, plugin_name=None, security_mechanism=None, vm_arg=None, name=None)¶ Runs a SQL statement using DB2 client driver and JDBC database interface.
The statement is called once for each input tuple received. Result sets that are produced by the statement are emitted as output stream tuples.
This function includes the JDBC driver for DB2 database (‘com.ibm.db2.jcc.DB2Driver’) in the application bundle per default.
Different drivers, e.g. for other databases than DB2, can be applied and the parameters
jdbc_driver_class
andjdbc_driver_lib
must be specified.Supports two ways to specify the statement:
- Statement is part of the input stream. You can specify which input stream attribute contains the statement with the
sql_attribute
argument. If input stream is of typeCommonSchema.String
, then you don’t need to specify thesql_attribute
argument. - Statement is given with the
sql
argument. The statement can contain parameter markers that are set with input stream attributes specified bysql_params
argument.
Example with “insert” statement and values passed with input stream, where the input stream “sample_stream” is of type “sample_schema”:
import streamsx.database as db sample_schema = StreamSchema('tuple<rstring A, rstring B>') ... sql_insert = 'INSERT INTO RUN_SAMPLE (A, B) VALUES (?, ?)' inserts = db.run_statement(sample_stream, credentials=credentials, schema=sample_schema, sql=sql_insert, sql_params="A, B")
Example with “select count” statement and defined output schema with attribute
TOTAL
having the result of the query:sample_schema = StreamSchema('tuple<int32 TOTAL, rstring string>') sql_query = 'SELECT COUNT(*) AS TOTAL FROM SAMPLE.TAB1' query = topo.source([sql_query]).as_string() res = db.run_statement(query, credentials=credentials, schema=sample_schema)
Example for using configured external connection with the name ‘Db2-Cloud’ (Cloud Pak for Data only), see Connecting to data sources:
db_external_connection = icpd_util.get_connection('Db2-Cloud',conn_class='external') res = db.run_statement(query, credentials=db_external_connection, schema=sample_schema)
Parameters: - stream (streamsx.topology.topology.Stream) – Stream of tuples containing the SQL statements or SQL statement parameter values. Supports
streamsx.topology.schema.StreamSchema
(schema for a structured stream) orCommonSchema.String
as input. - credentials (dict|str) – The credentials of the IBM cloud DB2 warehouse service as dict or configured external connection of kind “Db2 Warehouse” (Cloud Pak for Data only) as dict or the name of the application configuration.
- schema (StreamSchema) – Schema for returned stream. Defaults to input stream schema if not set.
- sql (str) – String containing the SQL statement. Use this as alternative option to
sql_attribute
parameter. - sql_attribute (str) – Name of the input stream attribute containing the SQL statement. Use this as alternative option to
sql
parameter. - sql_params (str) – The values of SQL statement parameters. These values and SQL statement parameter markers are associated in lexicographic order. For example, the first parameter marker in the SQL statement is associated with the first sql_params value.
- transaction_size (int) – The number of tuples to commit per transaction. The default value is 1.
- jdbc_driver_class (str) – The default driver is for DB2 database ‘com.ibm.db2.jcc.DB2Driver’.
- jdbc_driver_lib (str) – Path to the JDBC driver library file. Specify the jar filename with absolute path, containing the class given with
jdbc_driver_class
parameter. Per default the ‘db2jcc4.jar’ is added to the ‘opt’ directory in the application bundle. - ssl_connection (bool) – Set to
True
to enable SSL connection. - truststore (str) – Path to the trust store file for the SSL connection.
- truststore_password (str) – Password for the trust store file given by the truststore parameter.
- keystore (str) – Path to the key store file for the SSL connection.
- keystore_password (str) – Password for the key store file given by the keystore parameter.
- keystore_type (str) – Type of the key store file (JKS, PKCS12).
- truststore_type (str) – Type of the key store file (JKS, PKCS12).
- plugin_name (str) – Name of the security plugin.
- security_mechanism (int) – Value of the security mechanism.
- vm_arg (str) – Arbitrary JVM arguments can be passed to the Streams operator.
- name (str) – Sink name in the Streams context, defaults to a generated name.
Returns: Output Stream.
Return type: Deprecated since version 1.5.0: Use the
JDBCStatement
.- Statement is part of the input stream. You can specify which input stream attribute contains the statement with the