Source code for ibm_watson_machine_learning.connections

# (C) Copyright IBM Corp. 2020.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function
import requests
from ibm_watson_machine_learning.utils import SPACES_DETAILS_TYPE, INSTANCE_DETAILS_TYPE, MEMBER_DETAILS_TYPE,DATA_ASSETS_DETAILS_TYPE, STR_TYPE, STR_TYPE_NAME, docstring_parameter, meta_props_str_conv, str_type_conv, get_file_from_cos
from ibm_watson_machine_learning.metanames import ConnectionMetaNames
from ibm_watson_machine_learning.wml_resource import WMLResource
from ibm_watson_machine_learning.wml_client_error import WMLClientError, ApiRequestFailure
import os
import json

_DEFAULT_LIST_LENGTH = 0


[docs]class Connections(WMLResource): """ Store and manage your Connections. """ ConfigurationMetaNames = ConnectionMetaNames() """MetaNames for Connection creation.""" def __init__(self, client): WMLResource.__init__(self, __name__, client) self._ICP = client.ICP def _get_required_element_from_response(self, response_data): WMLResource._validate_type(response_data, u'connection_response', dict) try: if self._client.default_space_id is not None: new_el = {'metadata': {'id': response_data['metadata']['asset_id'], 'space_id': response_data['metadata']['space_id'], 'asset_type': response_data['metadata']['asset_type'], 'create_time': response_data['metadata']['create_time'], 'last_access_time': response_data['metadata']['usage'].get('last_access_time') }, 'entity': { 'datasource_type': response_data['entity']['datasource_type'], 'description': response_data['entity']['description'], 'name': response_data['entity']['name'], 'origin_country': response_data['entity']['origin_country'], 'owner_id': response_data['entity']['owner_id'], 'properties': response_data['entity']['properties'] } } elif self._client.default_project_id is not None: if self._client.WSD: # TODO : There is no owner id in connection api response of WSD1.0, this might need a check in WSD2.0 # kaganesa: add the below line in 2.0 # 'owner_id': response_data['entity']['owner_id'], new_el = {'metadata': {'id': response_data['metadata']['asset_id'], 'project_id': response_data['metadata']['project_id'], 'asset_type': response_data['metadata']['asset_type'], 'create_time': response_data['metadata']['create_time'], 'last_access_time': response_data['metadata']['usage'].get('last_access_time') }, 'entity': { 'datasource_type': response_data['entity']['datasource_type'], 'description': response_data['entity']['description'], 'name': response_data['entity']['name'], 'origin_country': response_data['entity']['origin_country'], # 'owner_id': response_data['entity']['owner_id'], 'properties': response_data['entity']['properties'] } } else: new_el = {'metadata': {'id': response_data['metadata']['asset_id'], 'project_id': response_data['metadata']['project_id'], 'asset_type': response_data['metadata']['asset_type'], 'create_time': response_data['metadata']['create_time'], 'last_access_time': response_data['metadata']['usage'].get('last_access_time') }, 'entity': { 'datasource_type': response_data['entity']['datasource_type'], 'description': response_data['entity']['description'], 'name': response_data['entity']['name'], 'origin_country': response_data['entity']['origin_country'], 'owner_id': response_data['entity']['owner_id'], 'properties': response_data['entity']['properties'] } } if 'href' in response_data['metadata']: if self._client.CLOUD_PLATFORM_SPACES or self._client.ICP_PLATFORM_SPACES: href_without_host = response_data['href'].split('.com')[-1] new_el[u'metadata'].update({'href': href_without_host}) else: new_el['metadata'].update({'href': response_data['href']}) return new_el except Exception as e: raise WMLClientError("Failed to read Response from down-stream service: " + response_data)
[docs] @docstring_parameter({'str_type': STR_TYPE_NAME}) def get_details(self, connection_id=None): """ Get connection details for the given unique Connection id. If no connection_id is passed, details for all connections will be returned. **Parameters** .. important:: #. **connection_id**: Unique id of Connection\n **type**: str\n **Output** .. important:: **returns**: Metadata of the stored Connection\n **return type**: dict\n **Example** >>> connection_details = client.connections.get_details(connection_id) >>> connection_details = client.connections.get_details() """ self._client._check_if_either_is_set() Connections._validate_type(connection_id, u'connection_id', STR_TYPE, False) if self._client.WSD: header_param = self._client._get_headers(wsdconnection_api_req=True) else: header_param = self._client._get_headers() verify = not self._client.ICP_30 and not self._client.ICP and not self._client.WSD if connection_id: response = requests.get(self._client.service_instance._href_definitions.get_connection_by_id_href(connection_id), params=self._client._params(), headers=header_param, verify=verify) if response.status_code == 200: return self._get_required_element_from_response( self._handle_response(200, u'get connection details', response)) else: return self._handle_response(200, u'get connection details', response) else: response = requests.get(self._client.service_instance._href_definitions.get_connections_href(), params=self._client._params(), headers=header_param, verify=verify) if response.status_code == 200: return {'resources': [self._get_required_element_from_response(x) for x in response.json()['resources']]} else: return self._handle_response(200, u'get connection details', response)
[docs] @docstring_parameter({'str_type': STR_TYPE_NAME}) def create(self, meta_props): """ Creates a connection. Input to PROPERTIES field examples 1. MySQL >>> client.connections.ConfigurationMetaNames.PROPERTIES: { >>> "database": "database", >>> "password": "password", >>> "port": "3306", >>> "host": "host url", >>> "ssl": "false", >>> "username": "username" >>> } 2. Google Big query a. Method1: Use service account json. The service account json generated can be provided as input as-is. Provide actual values in json. Example is only indicative to show the fields. Refer to Google big query documents how to generate the service account json\n >>> client.connections.ConfigurationMetaNames.PROPERTIES: { >>> "type": "service_account", >>> "project_id": "project_id", >>> "private_key_id": "private_key_id", >>> "private_key": "private key contents", >>> "client_email": "client_email", >>> "client_id": "client_id", >>> "auth_uri": "https://accounts.google.com/o/oauth2/auth", >>> "token_uri": "https://oauth2.googleapis.com/token", >>> "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", >>> "client_x509_cert_url": "client_x509_cert_url" >>> } b. Method2: Using OAuth Method. Refer to Google big query documents how to generate OAuth token\n >>> client.connections.ConfigurationMetaNames.PROPERTIES: { >>> "access_token": "access token generated for big query", >>> "refresh_token": "refresh token", >>> "project_id": "project_id", >>> "client_secret": "This is your gmail account password", >>> "client_id": "client_id" >>> } 3. MS SQL >>> client.connections.ConfigurationMetaNames.PROPERTIES: { >>> "database": "database", >>> "password": "password", >>> "port": "1433", >>> "host": "host", >>> "username": "username" >>> } 4. Tera data >>> client.connections.ConfigurationMetaNames.PROPERTIES: { >>> "database": "database", >>> "password": "password", >>> "port": "1433", >>> "host": "host", >>> "username": "username" >>> } **Parameters** .. important:: #. **meta_props**: meta data of the connection configuration. To see available meta names use:\n >>> client.connections.ConfigurationMetaNames.get() **type**: dict\n **Output** .. important:: **returns**: metadata of the stored connection\n **return type**: dict\n **Example** >>> sqlserver_data_source_type_id = client.connections.get_datasource_type_uid_by_name('sqlserver') >>> connections_details = client.connections.create({ >>> client.connections.ConfigurationMetaNames.NAME: "sqlserver connection", >>> client.connections.ConfigurationMetaNames.DESCRIPTION: "connection description", >>> client.connections.ConfigurationMetaNames.DATASOURCE_TYPE: sqlserver_data_source_type_id, >>> client.connections.ConfigurationMetaNames.PROPERTIES: { "database": "database", >>> "password": "password", >>> "port": "1433", >>> "host": "host", >>> "username": "username"} >>> }) """ WMLResource._chk_and_block_create_update_for_python36(self) connection_meta = self.ConfigurationMetaNames._generate_resource_metadata( meta_props, with_validation=True, client=self._client ) big_query_data_source_type_id = self.get_datasource_type_uid_by_name('bigquery') # Either service acct json credentials can be given or oauth json can be given # If service acct json, then we need to create a newline json with "credentials" key if connection_meta[u'datasource_type'] == big_query_data_source_type_id: if 'private_key' in connection_meta[u'properties']: result = json.dumps(connection_meta[u'properties'], separators=(',\n', ':')) newmap = {"credentials": result} connection_meta[u'properties'] = newmap connection_meta.update({'origin_country': 'US'}) #Step1 : Create an asset print("Creating connections...") if self._client.WSD: header_param = self._client._get_headers(wsdconnection_api_req=True) else: header_param = self._client._get_headers() if not self._client.ICP_30 and not self._client.ICP and not self._client.WSD: creation_response = requests.post( self._client.service_instance._href_definitions.get_connections_href(), headers=header_param, params = self._client._params(), json=connection_meta ) else: creation_response = requests.post( self._client.service_instance._href_definitions.get_connections_href(), headers=header_param, json=connection_meta, params=self._client._params(), verify=False ) connection_details = self._handle_response(201, u'creating new connection', creation_response) if creation_response.status_code == 201: connection_id = connection_details["metadata"]["asset_id"] print("SUCCESS") return self._get_required_element_from_response(connection_details) else: raise WMLClientError("Failed while creating a Connections. Try again.")
[docs] @docstring_parameter({'str_type': STR_TYPE_NAME}) def delete(self, connection_id): """ Delete a stored Connection. **Parameters** .. important:: #. **connection_id**: Unique id of the connection to be deleted.\n **type**: str\n **Output** .. important:: **returns**: status ("SUCCESS" or "FAILED")\n **return type**: str\n **Example** >>> client.connections.delete(connection_id) """ ##For CP4D, check if either spce or project ID is set self._client._check_if_either_is_set() pipeline_uid = str_type_conv(connection_id) Connections._validate_type(connection_id, u'connection_id', STR_TYPE, True) if self._client.WSD: header_param = self._client._get_headers(wsdconnection_api_req=True) else: header_param = self._client._get_headers() connection_endpoint = self._client.service_instance._href_definitions.get_connection_by_id_href(connection_id) if not self._ICP and not self._client.ICP_30 and not self._client.WSD: response_delete = requests.delete(connection_endpoint, params=self._client._params(), headers=header_param) else: response_delete = requests.delete(connection_endpoint, params=self._client._params(), headers=header_param, verify=False) return self._handle_response(204, u'connection deletion', response_delete, False)
[docs] @staticmethod @docstring_parameter({'str_type': STR_TYPE_NAME}) def get_uid(connection_details): """ Get Unique Id of stored connection. **Parameters** .. important:: #. **connection_details**: Metadata of the stored connection\n **type**: dict\n **Output** .. important:: **returns**: Unique Id of stored connection\n **return type**: str\n **Example** >>> connection_uid = client.connection.get_uid(connection_details) """ Connections._validate_type(connection_details, u'connection_details', object, True) return WMLResource._get_required_element_from_dict(connection_details, u'connection_details', [u'metadata', u'id'])
[docs] def list_datasource_types(self): """ List stored datasource types assets. **Output** .. important:: This method only prints the list of datasources type in a table format.\n **return type**: None\n **Example** >>> client.connections.list_datasource_types() """ if self._client.WSD: header_param = self._client._get_headers(wsdconnection_api_req=True) else: header_param = self._client._get_headers() if not self._client.ICP_30 and not self._client.ICP and not self._client.WSD: response = requests.get(self._client.service_instance._href_definitions.get_connection_data_types_href(), headers=header_param) else: response = requests.get(self._client.service_instance._href_definitions.get_connection_data_types_href(), headers=header_param, verify=False) datasource_details = self._handle_response(200, u'list datasource types', response)['resources'] space_values = [ (m[u'entity'][u'name'], m[u'metadata'][u'asset_id'], m[u'entity'][u'type'], m['entity']['status']) for m in datasource_details] self._list(space_values, [u'NAME', u'DATASOURCE_ID', u'TYPE', u'STATUS'], None, None)
[docs] def list(self): """ List all stored connections. **Output** .. important:: This method only prints the list of all connections in a table format.\n **return type**: None\n **Example** >>> client.connections.list() """ if self._client.WSD: header_param = self._client._get_headers(wsdconnection_api_req=True) else: header_param = self._client._get_headers() if not self._client.ICP_30 and not self._client.ICP and not self._client.WSD: response = requests.get(self._client.service_instance._href_definitions.get_connections_href(), params=self._client._params(), headers=header_param) else: response = requests.get(self._client.service_instance._href_definitions.get_connections_href(), params=self._client._params(), headers=header_param, verify=False) self._handle_response(200, u'list datasource type', response) datasource_details = self._handle_response(200, u'list datasource types', response)['resources'] space_values = [ (m[u'entity'][u'name'], m[u'metadata'][u'asset_id'],m['metadata']['create_time'], m[u'entity'][u'datasource_type']) for m in datasource_details] self._list(space_values, [u'NAME', u'ID', u'CREATED', u'DATASOURCE_TYPE_ID', ], None, None)
[docs] @docstring_parameter({'str_type': STR_TYPE_NAME}) def get_datasource_type_uid_by_name(self, name): """ Get stored datasource types id for the given datasource type name. **Parameters** .. important:: #. **name**: name of datasource type\n **type**: int\n **Output** .. important:: This method only prints the id of given datasource type name.\n **return type**: Str\n **Example** >>> client.connections.get_datasource_type_uid_by_name('cloudobjectstorage') """ Connections._validate_type(name, u'name', str, True) if self._client.WSD: header_param = self._client._get_headers(wsdconnection_api_req=True) else: header_param = self._client._get_headers() if not self._client.ICP_30 and not self._client.ICP and not self._client.WSD: response = requests.get(self._client.service_instance._href_definitions.get_connection_data_types_href(), headers=header_param) else: response = requests.get(self._client.service_instance._href_definitions.get_connection_data_types_href(), headers=header_param, verify=False) datasource_id = 'None' datasource_details = self._handle_response(200, u'list datasource types', response)['resources'] for i, ds_resource in enumerate(datasource_details): if ds_resource['entity']['name'] == name: datasource_id = ds_resource['metadata']['asset_id'] return datasource_id