test_efe_intent_service

python 3 days, 22 hours ago
import unittest from unittest.mock import MagicMock, patch, mock_open from datetime import datetime import pytz from efe_intent_service import EfeIntentService class TestEfeIntentService(unittest.TestCase): def setUp(self): # Mock dependencies self.config = {"cloud-ai-urls": {"EFE_FORMATTED_LOAD_API": "http://mock-api"}} self.mongo_service = MagicMock() self.oracle_service = MagicMock() self.azure_service = MagicMock() self.llm_model_response_service = MagicMock() self.moog_service = MagicMock() # Instantiate the service self.efe_service = EfeIntentService( self.config, self.mongo_service, self.oracle_service, self.azure_service, self.llm_model_response_service, self.moog_service ) @patch("builtins.open", new_callable=mock_open, read_data="dummy,data\n1,2") def test_write_efe_spreadsheet_to_mongo(self, mock_file): # Mock the file input file = MagicMock() file.filename = "test.csv" file.file.read.return_value = b"dummy,data\n1,2" # Call the method self.efe_service.write_efe_spreadsheet_to_mongo(file) # Assertions self.mongo_service.insert_all_query.assert_called_once() self.mongo_service.insert_all_query.assert_called_with( "ai_efe_raw_events", [{'dummy': '1', 'data': '2'}] ) @patch("emona.util.common_util.chunker") def test_write_data_to_mongo_in_chunks_successful_insertion(self, mock_chunker): # Sample input mongo_bulk_write_list = [{"key": "value1"}, {"key": "value2"}, {"key": "value3"}] chunk_size = 2 mock_chunker.return_value = [[{"key": "value1"}, {"key": "value2"}], [{"key": "value3"}]] mongo_collection_name = "test_collection" # Call method self.efe_service.write_data_to_mongo_in_chunks(mongo_bulk_write_list, mongo_collection_name, chunk_size) # Assertions mock_chunker.assert_called_once_with(mongo_bulk_write_list, chunk_size) self.assertEqual(mongo_service.bulk_write.call_count, 2) # Two chunks mongo_service.bulk_write.assert_any_call( mongo_collection_name, [{"key": "value1"}, {"key": "value2"}] ) mongo_service.bulk_write.assert_any_call( mongo_collection_name, [{"key": "value3"}] ) @patch("emona.util.common_util.chunker") def test_write_data_to_mongo_in_chunks_with_failures(self, mock_chunker): mongo_bulk_write_list = [{"key": "value1"}, {"key": "value2"}, {"key": "value3"}] chunk_size = 2 mock_chunker.return_value = [[{"key": "value1"}, {"key": "value2"}], [{"key": "value3"}]] mongo_collection_name = "test_collection" # Simulate an exception for the first chunk mongo_service.bulk_write.side_effect = [Exception("Insertion failed"), None] # Call method self.efe_service.write_data_to_mongo_in_chunks(mongo_bulk_write_list, mongo_collection_name, chunk_size) # Assertions mock_chunker.assert_called_once_with(mongo_bulk_write_list, chunk_size) self.assertEqual(mongo_service.bulk_write.call_count, 2) # Two chunks attempted mongo_service.bulk_write.assert_any_call( mongo_collection_name, [{"key": "value1"}, {"key": "value2"}] ) mongo_service.bulk_write.assert_any_call( mongo_collection_name, [{"key": "value3"}] ) # Empty input mongo_bulk_write_list = [] chunk_size = 2 mock_chunker.return_value = [] mongo_collection_name = "test_collection" # Call method self.efe_service.write_data_to_mongo_in_chunks(mongo_bulk_write_list, mongo_collection_name, chunk_size) # Assertions mock_chunker.assert_called_once_with(mongo_bulk_write_list, chunk_size) mongo_service.bulk_write.assert_not_called() # No calls since the list is empty def test_run_efe_intent_success(self): # Mock inputs start_time_epoch = 1633035600 end_time_epoch = 1633122000 provider = "google" gcp_model_type = "text" scan_table = "scan_table" scan_id = "12345" # Mock responses self.oracle_service.execute_select_query.return_value = '[["MockPromptName", "MockPrompt"]]' self.mongo_service.execute_query.return_value = [{"userUtterance": "sample utterance"}] self.llm_model_response_service.call_google_text_llm.return_value = ["MockIntent"] # Call the method self.efe_service.run_efe_intent( start_time_epoch, end_time_epoch, provider, gcp_model_type, "text_model", "chat_model", "azure_model", 0.7, 100, 0.8, 40, "tuned_model", 0, 0, None, scan_table, scan_id, "application/json", "trace123" ) # Assertions self.oracle_service.execute_update_query.assert_called() # Ensure queries were called self.mongo_service.insert_all_query.assert_called_once() self.mongo_service.insert_all_query.assert_called_with( "ai_efe_intent", MagicMock() ) def test_get_prompt_and_prompt_name(self): # Mock response self.oracle_service.execute_select_query.return_value = '[["PromptName", "PromptContent"]]' # Call the method result = self.efe_service.get_prompt_and_prompt_name(14, "google") # Assertions self.assertEqual(result, {"prompt_name": "PromptName", "prompt": "PromptContent"}) def test_get_model_name_google_text(self): result = self.efe_service.get_model_name("Google", "text", "text_model", "chat_model", "azure_model") self.assertEqual(result, "text_model") def test_get_intent_for_user_utterances_google_text(self): # Mock response self.llm_model_response_service.call_google_text_llm.return_value = ["MockIntent"] # Call the method result = self.efe_service.get_intent_for_user_utterances( "user utterance", "Google", "text", "text_model", "chat_model", "azure_model", "prompt", "examples", 0.7, 100, 0.8, 40, "tuned_model", 0, 0, None, "application/json", "trace123" ) # Assertions self.assertEqual(result, "MockIntent") def test_trigger_data_move_to_oracle_formatted(self): # Mock response self.auth = MagicMock() self.auth.generate_assertion_token.return_value = "MockToken" with patch("requests.Session.post") as mock_post: mock_post.return_value.status_code = 200 mock_post.return_value.text = "Success" result = self.efe_service.trigger_data_move_to_oracle_formatted("2021-01-01 12:00:00", "EFE", "Process") self.assertEqual(result.status_code, 200) if __name__ == "__main__": unittest.main()
14
Posted By
Python Script to create AWS beanstalk
#!/usr/bin/python
  
import boto
python aws beanstalk
sandeep sandeep
List all files and folders using python os mo
import os

def list_files_folders(path):
python python-os
kishore_kumar
Get current environment variables in python
import os
env = os.environ

python python-os
kishore_kumar
Get os details using python os
import os
print os.uname()
# Don't use os.system('uname -a'), its j
python python-os
kishore_kumar
Get stats ( lines, words, char count ) of fil
def file_stats(path):
    f = open(path, 'r')
    lines = f.readlines()
python
kishore_kumar
Use map function in python
def get_double(num):
    return num * 2

python
kishore_kumar
Python sample codes for beginners
print "Welcome to python"
python
gaya38 gaya38
Python program for even number checking
a=input("Enter a value:")
if (a%2==0):
    print "The given number is even numb
python
gaya38 gaya38
Python program for prime number check
a=input("Enter a value:")
k=0
b=(a/2)+1
python
gaya38 gaya38
Pass command line arguments in python
import sys
x=len(sys.argv)
a=[]
python
gaya38 gaya38
Python program for the largest number in an a
a = [1,43,98,5]#Dummy data
for l in range(len(a)-1):
        if (a[l]>a[l+1]):
python
gaya38 gaya38
print list of even numbers within a range
n=100
a=[10,20,30,40,50]
b=[60,70,80,90]
python
gaya38 gaya38
generate fibonacci series in python
n=input("Enter the constraint to print n
m=input("Enter the maximum value to prin
a=0
python
gaya38 gaya38
Generate Random number within the range in py
import random
print random.uniform(10,500)
python
gaya38 gaya38
Shuffle list elements in python
import random;
z = [1,90,4,2]
z = random.shuffle(z)
python
gaya38 gaya38
use python requests to get contents of url (
import requests

req = requests.get("https://httpbin.org/
python python-requests
kishore_kumar
how to iterate or get values in python dictio
sample_dict = { "number": 1, "fruits": [

for key in sample_dict:
python
kishore_kumar
create matrix and multiply using numpy in pyt
import numpy as np

matrix = [[1,2,3], [4,5,6], [7,8,9]]
python numpy
kishore_kumar
generate random numbers matrix with numpy pyt
import numpy as np

random_arr = np.random.randint(1,50,9)
python numpy
kishore_kumar
Find min , max and mean for numpy arrays
import numpy as np

random_arr = np.random.randint(1,50,9)
python numpy
kishore_kumar