-
Notifications
You must be signed in to change notification settings - Fork 1
Add SnowflakeClient #56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a1809ae
42e7d02
bcff180
2bf8fa2
6a4982e
68b7644
fb65e44
ad8b676
8408520
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,132 @@ | ||
| import snowflake.connector as sc | ||
|
|
||
| from nypl_py_utils.functions.log_helper import create_log | ||
|
|
||
|
|
||
| class SnowflakeClient: | ||
| """Client for managing connections to Snowflake""" | ||
|
|
||
| def __init__(self, account, user, private_key=None, password=None): | ||
| self.logger = create_log('snowflake_client') | ||
| if (password is None) == (private_key is None): | ||
| raise SnowflakeClientError( | ||
| 'Either password or private key must be set (but not both)', | ||
| self.logger | ||
| ) from None | ||
|
|
||
| self.conn = None | ||
| self.account = account | ||
| self.user = user | ||
| self.private_key = private_key | ||
| self.password = password | ||
|
|
||
| def connect(self, mfa_code=None, **kwargs): | ||
| """ | ||
| Connects to Snowflake using the given credentials. If you're connecting | ||
| locally, you should be using the password and mfa_code. If the | ||
| connection is for production code, a private_key should be set up. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| mfa_code: str, optional | ||
| The six-digit MFA code. Only necessary for connecting as a human | ||
| user. | ||
| kwargs: | ||
| All possible arguments (such as which warehouse to use or how | ||
| long to wait before timing out) can be found here: | ||
| https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#connect | ||
| """ | ||
| self.logger.info('Connecting to Snowflake') | ||
| if self.private_key is not None: | ||
| try: | ||
| self.conn = sc.connect( | ||
| account=self.account, | ||
| user=self.user, | ||
| private_key=self.private_key, | ||
| **kwargs) | ||
| except Exception as e: | ||
| raise SnowflakeClientError( | ||
| f'Error connecting to Snowflake: {e}', self.logger | ||
| ) from None | ||
| else: | ||
| if mfa_code is None: | ||
| raise SnowflakeClientError( | ||
| 'When using a password, an MFA code must also be provided', | ||
| self.logger | ||
| ) from None | ||
|
|
||
| pw = self.password + mfa_code | ||
| try: | ||
| self.conn = sc.connect( | ||
| account=self.account, | ||
| user=self.user, | ||
| password=pw, | ||
| passcode_in_password=True, | ||
| **kwargs) | ||
| except Exception as e: | ||
| raise SnowflakeClientError( | ||
| f'Error connecting to Snowflake: {e}', self.logger | ||
| ) from None | ||
|
|
||
| def execute_query(self, query, **kwargs): | ||
| """ | ||
| Executes an arbitrary query against the given connection. | ||
|
|
||
| Note that: | ||
| 1) All results will be fetched by default, so this method is not | ||
| suitable if you do not want to load all rows into memory | ||
| 2) AUTOCOMMIT is on by default, so this method is not suitable if | ||
| you want to execute multiple queries in a single transaction | ||
| 3) This method can be used for both read and write queries, but | ||
| it's not optimized for writing -- there is no parameter binding | ||
| or executemany support, and the return value for write queries | ||
| can be unpredictable. | ||
|
Comment on lines
+80
to
+83
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh this is annoying of Snowflake -- can you explain why you decided against using the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah in the future we may add transaction support more similar to what we do in the Redshift client, and I can see executemany going in there. I held off putting it in here because: a) I didn't want to jam every possible functionality into the same Ultimately, I wanted a function that could read data and it just so happened to be that you can also execute arbitrary single SQL commands the same way, but the intention wasn't really to support that as a main use case. |
||
|
|
||
| Parameters | ||
| ---------- | ||
| query: str | ||
| The SQL query to execute | ||
| kwargs: | ||
| All possible arguments (such as timeouts) can be found here: | ||
| https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#execute | ||
|
|
||
| Returns | ||
| ------- | ||
| sequence | ||
| A list of tuples | ||
| """ | ||
| self.logger.info('Querying Snowflake') | ||
| cursor = self.conn.cursor() | ||
| try: | ||
| try: | ||
| cursor.execute(query, **kwargs) | ||
| return cursor.fetchall() | ||
| except Exception: | ||
| raise | ||
| finally: | ||
| cursor.close() | ||
| except Exception as e: | ||
| # If there was an error, also close the connection | ||
| self.close_connection() | ||
|
|
||
| short_q = str(query) | ||
| if len(short_q) > 2500: | ||
| short_q = short_q[:2497] + '...' | ||
| raise SnowflakeClientError( | ||
| f'Error executing Snowflake query {short_q}: {e}', self.logger | ||
| ) from None | ||
|
|
||
| def close_connection(self): | ||
| """Closes the connection""" | ||
| self.logger.info('Closing Snowflake connection') | ||
| self.conn.close() | ||
|
|
||
|
|
||
| class SnowflakeClientError(Exception): | ||
| def __init__(self, message='', logger=None): | ||
| self.message = message | ||
| if logger is not None: | ||
| logger.error(message) | ||
|
|
||
| def __str__(self): | ||
| return self.message | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,14 +10,13 @@ | |
|
|
||
| def load_env_file(run_type, file_string): | ||
| """ | ||
| This method loads a YAML config file containing environment variables, | ||
| decrypts whichever are encrypted, and puts them all into os.environ as | ||
| strings. For a YAML variable containing a list of values, the list is | ||
| exported into os.environ as a json string and should be loaded as such. | ||
| This method reads a YAML config file containing environment variables and | ||
| loads them all into os.environ as strings. See _parse_yaml_dict for more. | ||
| It requires the YAML file to be split into a 'PLAINTEXT_VARIABLES' section | ||
| and an 'ENCRYPTED_VARIABLES' section. See config/sample.yaml for an example | ||
| config file. | ||
| If the config file is divided into 'PLAINTEXT_VARIABLES' and | ||
| 'ENCRYPTED_VARIABLES' sections (see config/sample.yaml for an exmaple), the | ||
| 'ENCRYPTED_VARIABLES' variables will be decrypted first. Otherwise, all | ||
| variables will be loaded as is. | ||
| Parameters | ||
| ---------- | ||
|
|
@@ -36,31 +35,50 @@ def load_env_file(run_type, file_string): | |
| try: | ||
| env_dict = yaml.safe_load(env_stream) | ||
| except yaml.YAMLError: | ||
| logger.error('Invalid YAML file: {}'.format(open_file)) | ||
| raise ConfigHelperError( | ||
| 'Invalid YAML file: {}'.format(open_file)) from None | ||
| except FileNotFoundError: | ||
| logger.error('Could not find config file {}'.format(open_file)) | ||
| raise ConfigHelperError( | ||
| 'Could not find config file {}'.format(open_file)) from None | ||
|
|
||
| if env_dict: | ||
| for key, value in env_dict.get('PLAINTEXT_VARIABLES', {}).items(): | ||
| if type(value) is list: | ||
| os.environ[key] = json.dumps(value) | ||
| else: | ||
| os.environ[key] = str(value) | ||
| if ('PLAINTEXT_VARIABLES' in env_dict | ||
| or 'ENCRYPTED_VARIABLES' in env_dict): | ||
| _parse_yaml_dict(env_dict.get('PLAINTEXT_VARIABLES', {})) | ||
|
|
||
| kms_client = KmsClient() | ||
| for key, value in env_dict.get('ENCRYPTED_VARIABLES', {}).items(): | ||
| if type(value) is list: | ||
| decrypted_list = [kms_client.decrypt(v) for v in value] | ||
| os.environ[key] = json.dumps(decrypted_list) | ||
| else: | ||
| os.environ[key] = kms_client.decrypt(value) | ||
| kms_client.close() | ||
| kms_client = KmsClient() | ||
| _parse_yaml_dict(env_dict.get( | ||
| 'ENCRYPTED_VARIABLES', {}), kms_client) | ||
| kms_client.close() | ||
| else: | ||
| _parse_yaml_dict(env_dict) | ||
|
|
||
|
|
||
| def _parse_yaml_dict(yaml_dict, kms_client=None): | ||
| """ | ||
| Loads YAML dict into os.environ. All values are stored as strings to match | ||
| how AWS Lambda environment variables are stored. For list variables, the | ||
| list is exported into os.environ as a json string. | ||
| If kms_client is not empty, decrypts the variables first. | ||
| Does not allow for sub-dictionaries. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any chance we'd use sub-dictionaries in our configs in the future? Or is that frowned upon?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not against it! I just think the use case for this file is loading config variables as environment variables, and it's unclear what the user would expect in the case of a subdictionary. Do all of the sub-keys get loaded as their own env variables, or does the whole dict get loaded as a JSON string, or? In general, my feeling is I haven't had a use case that required using sub-dictionaries, so there's no use trying to over-engineer this for a hypothetical use case. |
||
| """ | ||
| for key, value in yaml_dict.items(): | ||
| if type(value) is dict: | ||
| raise ConfigHelperError( | ||
| 'Found sub-dictionary in YAML config') from None | ||
| elif type(value) is list: | ||
| val = [kms_client.decrypt(v) | ||
| for v in value] if kms_client else value | ||
| os.environ[key] = json.dumps(val) | ||
| else: | ||
| val = kms_client.decrypt(value) if kms_client else value | ||
| os.environ[key] = str(val) | ||
|
|
||
|
|
||
| class ConfigHelperError(Exception): | ||
| def __init__(self, message=None): | ||
| self.message = message | ||
| if message is not None: | ||
| logger.error(message) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI @yossariano -- I updated this again so that the name of the logger would be output. I also think this means that different struct loggers will now act a little more independently, which is good, although I could be wrong |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woah I've never seen a boolean like this before, nice thinking!