python
                
                sqlalchemy
                
                3 months, 2 weeks ago
                from sqlalchemy.schema import CreateTable
from sqlalchemy.dialects.mssql import dialect as mssql_dialect
from your_models_module import MyTable, AnotherModel
import re
model_classes = {
    "mytable": MyTable,
    "another_table": AnotherModel,
}
def extract_create_table_ddl(sql_text: str) -> dict:
    """
    Extracts CREATE TABLE statements keyed by table name.
    """
    table_regex = re.compile(
        r"IF NOT EXISTS.*?BEGIN\s+CREATE TABLE\s+(?:\[dbo\]\.)?\[?(\w+)\]?\s*\((.*?)\)\s*;?\s*END",
        re.DOTALL | re.IGNORECASE
    )
    
    tables = {}
    for match in table_regex.finditer(sql_text):
        table_name = match.group(1).lower()
        columns_block = match.group(2)
        tables[table_name] = columns_block.strip()
    return tables
def parse_columns(ddl_block: str) -> tuple:
    """
    Parses the DDL block (inside CREATE TABLE (...)) into a dictionary of
    column definitions including constraints.
    Returns a dict of {column_name: full_column_definition}.
    Also returns a list of table-level constraints.
    """
    column_definitions = {}
    table_constraints = []
    # Remove newlines inside parentheses (for cleaner parsing)
    ddl_lines = [line.strip().rstrip(',') for line in ddl_block.splitlines() if line.strip()]
    for line in ddl_lines:
        # Table-level constraints start with known keywords
        upper_line = line.upper()
        if upper_line.startswith(('PRIMARY KEY', 'UNIQUE', 'CHECK', 'FOREIGN KEY', 'CONSTRAINT')):
            table_constraints.append(line)
            continue
        # Column-level definition (example: key_type VARCHAR(200) NOT NULL)
        tokens = line.split()
        if not tokens:
            continue
        # Extract column name
        col_name = tokens[0].strip('[]`"')  # Remove brackets or quotes
        col_def = ' '.join(tokens[1:])  # Rest of the line includes type and constraints
        column_definitions[col_name.lower()] = col_def.strip()
    return column_definitions, table_constraints
def get_model_columns(model_ddl: str) -> tuple:
    ddl_match = re.search(r"\((.*)\)", model_ddl, re.DOTALL)
    if not ddl_match:
        return {}, []
    ddl_block = ddl_match.group(1)
    return parse_columns(ddl_block)
'''
def compare_columns(table_name: str, file_cols: dict, model_cols: dict):
    all_keys = set(file_cols.keys()).union(set(model_cols.keys()))
    for col in sorted(all_keys):
        file_type = file_cols.get(col)
        model_type = model_cols.get(col)
        if file_type != model_type:
            print(f"[{table_name}] Column mismatch: '{col}': File={file_type}, Model={model_type}")
def compare_columns(table_name: str, file_cols: list, model_cols: list):
    for i in range(len(file_cols)):
        if file_cols[i]!=model_cols[i]:
            print(f"[{table_name}] mismatch:\n\tModel: {model_cols[i]}\n\tFile={file_cols[i]}")
'''
def compare_columns(table_name: str, file_cols: dict, model_cols: dict):
    all_keys = set(file_cols.keys()).union(set(model_cols.keys()))
    for col in sorted(all_keys):
        file_def = file_cols.get(col)
        model_def = model_cols.get(col)
        if file_def != model_def:
            print(f"[{table_name}] Column mismatch: '{col}'")
            print(f"  - File : {file_def}")
            print(f"  - Model: {model_def}")
def compare_table_constraints(table_name: str, file_constraints: list, model_constraints: list):
    file_set = set([c.lower() for c in file_constraints])
    model_set = set([c.lower() for c in model_constraints])
    extra_in_file = file_set - model_set
    extra_in_model = model_set - file_set
    for constraint in extra_in_file:
        print(f"[{table_name}] Constraint only in FILE: {constraint}")
    for constraint in extra_in_model:
        print(f"[{table_name}] Constraint only in MODEL: {constraint}")
def compare_sql_with_models(sql_path: str, model_classes: dict):
    with open(sql_path, 'r') as f:
        sql_text = f.read()
    file_ddls = extract_create_table_ddl(sql_text)
    for table_name, file_ddl in file_ddls.items():
        if table_name not in model_classes:
            print(f"[{table_name}] Table not found in models.")
            continue
        model_cls = model_classes[table_name]
        model_ddl = str(CreateTable(model_cls.__table__).compile(dialect=mssql.dialect()))
        file_cols, file_constraints = parse_columns(file_ddl)
        model_cols, model_constraints = get_model_columns(model_ddl)
        print(f"\n=== Comparing table: {table_name} ===")
        compare_columns(table_name, file_cols, model_cols)
        compare_table_constraints(table_name, file_constraints, model_constraints)
compare_sql_with_models("your_sql_file.sql", model_classes)
                
                
                
                
                
                
                
             
            
0 Comments
Please Login to Comment Here