Preface
1. origin
2. Invasion and warehousing
3. Obtain merchant ID
4. modify SQL
PrefaceDuring the evolution of the project , Sometimes strange temporary needs may arise , These requirements will involve all SQL, The whole project is not allowed to be developed SQL Rewrite , For example, control the permissions of different people to access the table , Or what I face SASS We need to change our needs , At this time, it is necessary to modify... According to the corresponding conditions at run time SQL sentence .
1. originRecently, the project is preparing to engage in SASS turn ,SASS One feature of virtualization is multi tenancy , And the data between each tenant should be isolated , For database isolation schemes, the common ones are database isolation , Table isolation , Field isolation , At present, I only use table isolation and field isolation ( The principle of database isolation is similar ). Field isolation is relatively simple , It's just that the query conditions are different , Like the ones below SQL Inquire about :
SELECT * FROM t_demo WHERE tenant_id='xxx' AND is_del=0
But for the sake of rigor , Requirements need to be implemented SQL Check whether the corresponding table is brought with tenant_id
Query fields for .
For table isolation, it's a little troublesome , It needs to run according to the corresponding tenants ID To process a data table , for instance , If there is one of the following SQL Inquire about :
SELECT * FROM t_demo WHERE is_del=0
When you meet a tenant A when ,SQL The query will become :
SELECT * FROM t_demo_a WHERE is_del=0
When you meet a tenant B when ,SQL The query will become :
SELECT * FROM t_demo_b WHERE is_del=0
If the number of merchants is fixed , Usually written in code if-else
Just to judge , But common SASS The merchants of application will always be added , So for this SQL Logic will become like this :
def sql_handle(tenant_id: str): table_name: str = f"t_demo_{tenant_id}" sql: str = f"SELECT * FROM {table_name} WHERE is_del=0"
But there are a few problems , about ORM Come on , Start by creating only one t_demo
The corresponding table object is OK , Now you have to create multiple table objects based on multiple merchants , This is unrealistic , Secondly, if it is written naked SQL, You usually use IDE The inspection of , And for such SQL:
sql: str = f"SELECT * FROM {table_name} WHERE is_del=0"
IDE There is no way to check , Of course, there is one of the most serious problems , The current project is already very large , If adaptation changes are made to the calls of each related table , Then the amount of work is very huge , So the best solution is to get the information from users in the engine library SQL Statement and has not been sent to MySQL
Before the server automatically according to the merchant ID change SQL, To achieve this effect , You have to invade the... We use MySQL
Engine library , Modify the methods inside to meet our needs .
2. Invasion and warehousingNo matter it's used
dbutils
stillsqlalchemy
, Can specify an engine library , At present, the commonly used engine library ispymysql
, Therefore, the following will be in the form ofpymysql
Give an example to illustrate .
Because we have to invade the engine library we use , So we should first judge which method we need to modify the engine library , After reading the source code , I decided to just change pymysql.cursors.Cursor
Of mogrify
Method :
def mogrify(self, query, args=None): """ Returns the exact string that is sent to the database by calling the execute() method. This method follows the extension to the DB API 2.0 followed by Psycopg. """ conn = self._get_db() if args is not None: query = query % self._escape_args(args, conn) return query
The function of this method is to send the user SQL And parameters , Generate a final SQL, Just meet our needs , So we can create a new one of our own through the idea of inheritance Cursor
class :
import pymysqlclass Cursor(pymysql.cursors.Cursor): def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str: # Here you can write processing and synthesis SQL Logic mogrify_sql: str = super().mogrify(query, args) # Here, you can write the process after synthesis SQL Logic return mogrify_sqlclass DictCursor(pymysql.cursors.DictCursorMixin, Cursor): """A cursor which returns results as a dictionary""" # Directly modifying Cursor Class `mogrify` The method does not affect `DictCursor` class , So we also need to create a new `Cursor` class .
created Cursor
After the class , You need to think about how to pymysql
Apply our custom Cursor
The class , General Mysql
All connection libraries support us to import custom Cursor
class , such as pymysql
:
import pymysql.cursors# Connect to the databaseconnection = pymysql.connect( host='localhost', user='user', password='passwd', database='db', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor)
We can go through cursorclass
To designate our Cursor
class , If the library used does not support or for other reasons, you need to use the monkey patch method , See for specific usage Python The probe completes the data extraction of the call library .
Now we have settled where to modify SQL The problem. , The next step is to think about how to mogrify
Method to get the merchant ID And those tables to be replaced , Generally, when we call a piece of code , There are two ways to pass parameters , One is to pass parameters of array type :
with conn.cursor() as cursor: cursor.execute("SELECT * FROM t_demo WHERE is_del=%s", (0, ))
One is the parameter of dictionary type :
with conn.cursor() as cursor: cursor.execute("SELECT * FROM t_demo WHERE is_del=%(is_del)s", {"is_del": 0})
At present, most projects have these two types of writing habits , And the engine library is executing execute
The parameter will be processed before sql
and args
Passed on to mogrify
, If we use dictionary type parameters , Then we can embed the parameters we need , And in mogrify
It's extracted from it , But it uses an array type parameter or ORM Library, it is difficult to pass parameters to mogrify
The method , You can pass context
Implicitly pass parameters to mogrify
Method , The specific analysis and principle can be seen :python How to use contextvars Module source code analysis .
context
It's easy to use , The first is to create a context
Encapsulated classes :
from contextvars import ContextVar, Tokenfrom typing import Any, Dict, Optional, Setcontext: ContextVar[Dict[str, Any]] = ContextVar("context", default={})class Context(object): """ The basis of context call , Support Type Hints Check """ tenant_id: str replace_table_set: Set[str] def __getattr__(self, key: str) -> Any: value: Any = context.get().get(key) return value def __setattr__(self, key: str, value: Any) -> None: context.get()[key] = valueclass WithContext(Context): """ Simple treatment reset token Logic , and context management , Only in business code """ def __init__(self) -> None: self._token: Optional[Token] = None def __enter__(self) -> "WithContext": self._token = context.set({}) return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: if self._token: context.reset(self._token) self._token = None
Next, in the business code , adopt context Pass in the parameters corresponding to the current business :
with WithContext as context: context.tenant_id = "xxx" context.replace_table_set = {"t_demo"} with conn.cursor() as cursor: cursor.execute("SELECT * FROM t_demo WHERE is_del=%s", (0, ))
And then in mogrify
By calling context
You can get the corresponding parameters :
import pymysqlclass Cursor(pymysql.cursors.Cursor): def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str: tenant_id: str = context.tenant_id replace_table_set: Set[str] = context.replace_table_set # Here you can write processing and synthesis SQL Logic mogrify_sql: str = super().mogrify(query, args) # Here, you can write the process after synthesis SQL Logic return mogrify_sql
4. modify SQL Now? , everything , There are only modifications left SQL The logic of , When I was working on other projects , The tables built are very standard , They are t_xxx
Name the table in the format of , This makes it easy to replace table names , Only two substitutions can be compatible with most situations , The code is as follows :
import pymysqlclass Cursor(pymysql.cursors.Cursor): def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str: tenant_id: str = context.tenant_id replace_table_set: Set[str] = context.replace_table_set # A simple example , In fact, regularization is more efficient for replace_table in replace_table_set: if replace_table in query: # Replace table name query = query.replace(f" {replace_table} ", f" {replace_table}_{tenant_id} ") # Replace the table with the table name in the query criteria query = query.replace(f" {replace_table}.", f" {replace_table}_{tenant_id}.") mogrify_sql: str = super().mogrify(query, args) # Here, you can write the process after synthesis SQL Logic return mogrify_sql
But now the project SQL The specification is not very good , Some table names are still MySQL
Key words of , So simple replacement is not feasible , At the same time, in this demand , Some tables only need field isolation , You need to ensure that there is a query with the corresponding field , This means that there must be a library to parse SQL
, And return some data so that we can easily know SQL
What are the table names in , What are the query fields .
Currently in Python There is a well-known SQL
Parsing library --sqlparse, It can be parsed by the parsing engine SQL Resolve to a Python object
, Then we can use some grammar to judge what is SQL
keyword , Which are table names , What are query criteria and so on . But this library only implements some underlying API, We need to treat him and SQL Some relatively complete functions can only be realized after a better understanding , For example, below 3 Common SQL:
SELECT * FROM t_demoSELECT * FROM t_demo as demoSELECT * FROM t_other as other LEFT JOIN t_demo demo on demo.xxx==other.xxx
If we want to pass sqlparse
To extract the table name, you need to deal with this 3 In this case , And if we want to write every situation , It will be very laborious , There may also be omissions , Then you need another library --sql_metadata, This library is based on sqlparse
And regular parsing library , At the same time, it provides a large number of packages for common use methods , We can know by directly calling the corresponding function SQL
What table names are there in the , What are the query fields .
At present, this library is known to have a defect , It will automatically remove the symbol of the field , For example, when the table name is keyword , We need to use ` Wrap it up with symbols :
SELECT * FROM `case`
But through sql_metadata
The table name obtained after parsing is case
instead of `case`, It needs to be handled manually , But I don't think this is a BUG, Create a table without following the specification , Who can blame .
You can go through sql_metadata
To realize the functions I need , After modification according to requirements , The code looks like this ( See note... For instructions ):
from typing import Dict, Set, Tuple, Unionimport pymysqlimport sql_metadataclass Cursor(pymysql.cursors.Cursor): def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str: tenant_id: str = context.tenant_id # Generate a parsed SQL object sql_parse: sql_metadata.Parser = sql_metadata.Parser(query) # A new attribute , Here, save the table name that needs to verify the query criteria check_flag = False where_table_set: Set[str] = context.where_table_set # This method will get SQL Corresponding table, Back to a table Array of for table_name in sql_parse.tables: if table_name in where_table_set: if sql_parse.columns_dict: # This method returns SQL Corresponding field , It's divided into select, join, where etc. , It's just... Here where for where_column in sql_parse.columns_dict.get("where", []): # If you connect the watch , It's similar to t_demo.tenant_id, So be compatible with this situation if "tenant_id" in where_column.lower().split("."): check_flag = True break if not check_flag: # If you don't pass the inspection, you throw it wrong raise RuntimeError() # Logic for changing table names replace_table_set: Set[str] = context.replace_table_set new_query: str = query for table_name in sql_parse.tables: if table_name in replace_table_set: new_query = "" # tokens Store the parsed data , such as SELECT * FROM t_demo After analysis, it is # [SELECT, *, FROM, t_demo] four token for token in sql_parse.tokens: # Judge token Whether it is the table name if token.is_potential_table_name: # Extract the table name of the specification parse_table_name: str = token.stringified_token.strip() if parse_table_name in replace_table_set: new_table_name: str = f" {parse_table_name}_{tenant_id}" # next_token representative SQL The next field of if token.next_token.normalized != "AS": # If the current table does not have an alias set # adopt AS Set the table name before replacement as the alias of the new table name , In this way, even if the following table name has not been changed , You can also read the corresponding merchants ID Table of new_table_name += f" AS {parse_table_name}" query += new_table_name continue # adopt stringified_token The obtained data will be automatically filled with spaces , such as `FROM` What you get is ` FROM`, In this way, you don't have to consider whether to add spaces when splicing new_query += token.stringified_token mogrify_sql: str = super().mogrify(new_query, args) # Here, you can write the process after synthesis SQL Logic return mogrify_sql
This code is very simple , It only gives a brief introduction , In fact, this logic will apply to all SQL
Querying , We should make sure that this code is OK , At the same time, don't waste too much performance , Therefore, code splitting and optimization should be considered when using . For example, in the process of use, you can find , our SQL
Conversion and checking are in the parent class Cursor.mogrify
What happened before , This means that no matter in our code logic cursor.execute
What are the parameters passed , For the same code logic , From here query
Value is constant , Consider the following code :
def get_user_info(uid: str) -> Dict[str, Any]: with conn.cursor() as cursor: cursor.execute("SELECT * FROM t_user WHERE uid=%(uid)s", {"uid": uid}) return cursor.fetchone() or {}
This code is passed to Cursor.mogrify
Of query For ever SELECT * FROM t_user WHERE uid=%(uid)s, What has changed is args in uid Different . With such a prerequisite , Then we can put query
The verification results and conversion results are cached , Reduce the need to parse every time SQL
Performance waste caused by re verification . As for how to implement caching, you need to decide according to your own project , For example, there are only a few hundred projects SQL
perform , Then use it directly Python
Of dict
Just store it , If executed in the project SQL
quite a lot , At the same time, some are executed very frequently , Some are performed very low frequency , Then consider using LRU
Caching .
This is about Python Modify business at runtime SQL This is the code article , More about Python To modify the code content, please search the previous articles on the software development network or continue to browse the relevant articles below. I hope you can support the software development network in the future !