This article presents a potential solution for semantic code search in TrakCare using IRIS Vector Search.
Here's a brief overview of results from the TrakCare Semantic code search for the query: "Validation before database object save".
-
Code Embedding model
There are numerous embedding models designed for sentences and paragraphs, but they are not ideal for code specific embeddings.
Three code-specific embedding models were evaluated: voyage-code-2, CodeBERT, GraphCodeBERT. While none of these models were pre-trained for the ObjectScripts language, they still outperformed general-purpose embedding models in this context.
The CodeBERT was chosen as the embedding model for this solution. offering reliable performance without the need for an API key. 😁
class GraphCodeBERTEmbeddingModel:
def __init__(self, model_name="microsoft/codebert-base"):
self.tokenizer = RobertaTokenizer.from_pretrained(model_name)
self.model = RobertaModel.from_pretrained(model_name)
def get_embedding(self, text):
"""
Generate a CodeBERT embedding for the given text.
"""
inputs = self.tokenizer(text, return_tensors="pt", max_length=512, truncation=True, padding="max_length")
with torch.no_grad():
outputs = self.model(**inputs)
# Use the [CLS] token embedding for the representation
cls_embedding = outputs.last_hidden_state[:, 0, :].squeeze().numpy()
return cls_embedding
-
IRIS Vector database
A table is defined with a VECTOR-typed column to store the embeddings. Please note that COLUMNAR index is not supported for VECTOR-typed column.
CodeBERT embeddings have 768 dimensions. It can process texts of the maximal length of 512 tokens.
CREATE TABLE TrakCareCodeVector (
file VARCHAR(150),
codes VARCHAR(2000),
codes_vector VECTOR(DOUBLE,768)
)
-
Python DB-API
The Python DB-API is used to establish a connection with IRIS instance to execute the SQL statements.
- Build the vector database for Trakcare source code.
- Retrieve the top_K highest DOT_PRODUCT code embeddings from IRIS vector database.
# build IRIS vector database
import iris
import os
from dotenv import load_dotenv
load_dotenv()
class IrisConn:
"""Connection with IRIS instance to execute the SQL statements """
def __init__(self) -> None:
connection_string = os.getenv("CONNECTION_STRING")
username = os.getenv("IRISUSERNAME")
password = os.getenv("PASSWORD")
self.connection = iris.connect(
connectionstr=connection_string,
username=username,
password=password,
timeout=10000,
)
self.cursor = self.connection.cursor()
def insert(self, params: list):
try:
sql = "INSERT INTO TrakCareCodeVector (file, codes, codes_vector) VALUES (?, ?, TO_VECTOR(?,double))"
self.cursor.execute(sql,params)
except Exception as ex:
print(ex)
def fetch_query(self, query: str):
self.cursor.execute(query)
return self.cursor.fetchall()
def close_db(self):
self.cursor.close()
self.connection.close()
from transformers import AutoTokenizer, AutoModel, RobertaTokenizer, RobertaModel, logging
import torch
import numpy as np
import os
from db import IrisConn
from GraphcodebertEmbeddings import MethodEmbeddingGenerator
from IRISClassParser import parse_directory
import sys, getopt
class GraphCodeBERTEmbeddingModel:
def __init__(self, model_name="microsoft/codebert-base"):
self.tokenizer = RobertaTokenizer.from_pretrained(model_name)
self.model = RobertaModel.from_pretrained(model_name)
def get_embedding(self, text):
"""
Generate a CodeBERT embedding for the given text.
"""
inputs = self.tokenizer(text, return_tensors="pt", max_length=512, truncation=True, padding="max_length")
with torch.no_grad():
outputs = self.model(**inputs)
# Use the [CLS] token embedding for the representation
cls_embedding = outputs.last_hidden_state[:, 0, :].squeeze().numpy()
return cls_embedding
class IrisVectorDB:
def __init__(self, vector_dim):
"""
Initialize the IRIS vector database.
"""
self.conn = IrisConn()
self.vector_dim = vector_dim
def insert(self, description: str, codes: str, vector):
params=[description, codes, f'{vector.tolist()}']
self.conn.insert(params)
def search(self, query_vector, top_k=5):
query_vectorStr = query_vector.tolist()
query = f"SELECT TOP {top_k} file,codes FROM TrakCareCodeVector ORDER BY VECTOR_COSINE(codes_vector, TO_VECTOR('{query_vectorStr}',double)) DESC"
results = self.conn.fetch_query(query)
return results
# Chatbot for code retrieval
class CodeRetrieveChatbot:
def __init__(self, embedding_model, vector_db):
self.embedding_model = embedding_model
self.vector_db = vector_db
def add_to_database(self, description, code_snippet, embedding = None):
if embedding is None:
embedding = self.embedding_model.get_embedding(code_snippet)
self.vector_db.insert(description, code_snippet, embedding)
def retrieve_code(self, query, top_k=5):
"""
Retrieve the most relevant code snippets for the given query.
"""
query_embedding = self.embedding_model.get_embedding(query)
results = self.vector_db.search(query_embedding, top_k)
return results
-
Code Chunks
Since CodeBERT can process texts with the maximal length of 512 tokens. Large classes and methods have to be chunked into smaller parts. Each chunk is then embedded and stored in the vector database.
from transformers import AutoTokenizer, AutoModel, RobertaTokenizer, RobertaModel
import torch
from IRISClassParser import parse_directory
class MethodEmbeddingGenerator:
def __init__(self, model_name="microsoft/codebert-base"):
"""
Initialize the embedding generator with CodeBERT.
:param model_name: The name of the pretrained CodeBERT model.
"""
self.tokenizer = RobertaTokenizer.from_pretrained(model_name)
self.model = RobertaModel.from_pretrained(model_name)
self.max_tokens = self.tokenizer.model_max_length # Typically 512 for CodeBERT
def chunk_method(self, method_implementation):
"""
Split method implementation into chunks based on lines of code that approximate the token limit.
:param method_implementation: The method implementation as a string.
:return: A list of chunks.
"""
lines = method_implementation.splitlines()
chunks = []
current_chunk = []
current_length = 0
for line in lines:
# Estimate tokens of the line
line_token_estimate = len(self.tokenizer.tokenize(line))
if current_length + line_token_estimate <= self.max_tokens - 2:
current_chunk.append(line)
current_length += line_token_estimate
else:
# Add the current chunk to chunks and reset
chunks.append("\n".join(current_chunk))
current_chunk = [line]
current_length = line_token_estimate
# Add the last chunk if it has content
if current_chunk:
chunks.append("\n".join(current_chunk))
return chunks
def get_embeddings(self, method_implementation):
"""
Generate embeddings for a method implementation, handling large methods by chunking.
:param method_implementation: The method implementation as a string.
:return: A list of embeddings (one for each chunk).
"""
chunks = self.chunk_method(method_implementation)
embeddings = {}
for chunk in chunks:
inputs = self.tokenizer(chunk, return_tensors="pt", truncation=True, padding=True, max_length=self.max_tokens)
with torch.no_grad():
outputs = self.model(**inputs)
# Use the [CLS] token embedding (index 0) as the representation
cls_embedding = outputs.last_hidden_state[:, 0, :].squeeze(0)
embeddings[chunk] = cls_embedding.numpy()
return embeddings
def process_methods(self, methods):
"""
Process a list of methods to generate embeddings for each.
:param methods: A list of dictionaries with method names and implementations.
:return: A dictionary with method names as keys and embeddings as values.
"""
method_embeddings = {}
for method in methods:
method_name = method["name"]
implementation = method["implementation"]
print(f"Processing method embedding: {method_name}")
method_embeddings[method_name] = self.get_embeddings(implementation)
return method_embeddings
-
UI - The Angular APP
The stack uses Angular as the frontend and Python (Flask) as the backend.
-
Future Directions
The searching result is not perfec because the embedding model is not pre-trained for objectscripts.