python
14 hours, 5 minutes ago
# test_efe_mongo_to_oracle.py
import unittest
from unittest.mock import Mock, patch
from efe_mongo_to_oracle import EfeMongoToOracle
from datetime import datetime, timezone
import pandas as pd
class TestEfeMongoToOracle(unittest.TestCase):
def setUp(self):
# Set up mock services and configuration
self.config = Mock()
self.mongo_service = Mock()
self.oracle_service = Mock()
self.moog_service = Mock()
# Initialize EfeMongoToOracle with mocked dependencies
self.efe_service = EfeMongoToOracle(
self.config, self.mongo_service, self.oracle_service, self.moog_service
)
@patch("efe_mongo_to_oracle.logging")
def test_get_mongo_data_by_runtimes_valid_data(self, mock_logging):
# Test successful fetching of MongoDB data by runtimes
runtimes = "2024-03-19 12:12:00"
namespace = "EFE"
process_name = "PROCESS_NAME"
column_mapping_df = pd.DataFrame({"SRC_TABLE": ["test_table"]})
# Mock MongoDB data
mock_data = [{"_id": 1, "value": "data1"}, {"_id": 2, "value": "data2"}]
self.mongo_service.execute_query.return_value = mock_data
# Execute the method
result = self.efe_service.get_mongo_data_by_runtimes(runtimes, namespace, process_name, column_mapping_df)
# Verify MongoDB query call and correct result returned
self.mongo_service.execute_query.assert_called_once_with("test_table", {"RUN_TIME": {"$in": [datetime(2024, 3, 19, 12, 12)]}})
self.assertEqual(result, mock_data)
@patch("efe_mongo_to_oracle.logging")
def test_get_mongo_data_by_runtimes_invalid_collection(self, mock_logging):
# Test handling of invalid MongoDB collection in column mapping
runtimes = "2024-03-19 12:12:00"
namespace = "EFE"
process_name = "PROCESS_NAME"
column_mapping_df = pd.DataFrame({"SRC_TABLE": ["table1", "table2"]})
# Execute the method
result = self.efe_service.get_mongo_data_by_runtimes(runtimes, namespace, process_name, column_mapping_df)
# Verify no MongoDB call and appropriate log message
self.mongo_service.execute_query.assert_not_called()
mock_logging.info.assert_called_with(
"Invalid data in AI_SRC_COLUMN_MAPPING. Did not find exactly one src table for namespace EFE and process name PROCESS_NAME"
)
self.assertIsNone(result)
def test_clean_mongo_data_efe(self):
# Test transformation and cleaning of MongoDB data
mongo_data = [{"SRC_COLUMN1": "SomeLongText", "SRC_COLUMN2": "data2"}]
column_mapping_df = pd.DataFrame({
"SRC_COLUMN": ["SRC_COLUMN1", "SRC_COLUMN2"],
"DEST_DATA_TYPE": ["STRING", "STRING"]
})
# Execute the method
result = self.efe_service.clean_mongo_data_efe(mongo_data, column_mapping_df)
# Verify data cleaning result
expected_result = [{"SRC_COLUMN1": "SomeLongText", "SRC_COLUMN2": "data2"}]
self.assertEqual(result, expected_result)
@patch("efe_mongo_to_oracle.logging")
def test_get_column_mapping_df(self, mock_logging):
# Test fetching and formatting of column mapping from Oracle
namespace = "EFE"
process_name = "PROCESS_NAME"
oracle_result = str([{
'SRC_TABLE': 'source_table', 'DEST_TABLE': 'dest_table',
'SRC_COLUMN': 'src_column', 'DEST_COLUMN': 'dest_column',
'SRC_DATA_TYPE': 'STRING', 'DEST_DATA_TYPE': 'STRING',
'SRC_FORMAT': None, 'DEST_FORMAT': None
}])
# Mock Oracle response
self.oracle_service.execute_select_query.return_value = oracle_result
# Execute the method
result = self.efe_service.get_column_mapping_df(namespace, process_name)
# Verify Oracle query call and expected DataFrame result
self.oracle_service.execute_select_query.assert_called_once()
self.assertIsInstance(result, pd.DataFrame)
self.assertIn("SRC_COLUMN", result.columns)
@patch("efe_mongo_to_oracle.logging")
def test_write_mongo_to_oracle_efe(self, mock_logging):
# Test complete process of writing Mongo data to Oracle
runtimes = "2024-03-19 12:12:00"
namespace = "EFE"
process_name = "PROCESS_NAME"
column_mapping_df = pd.DataFrame({
"SRC_TABLE": ["source_table"],
"DEST_TABLE": ["dest_table"],
"SRC_COLUMN": ["src_column"],
"DEST_COLUMN": ["dest_column"],
"DEST_DATA_TYPE": ["STRING"]
})
mongo_data = [{"src_column": "data1"}]
# Mock all dependencies in the method
self.efe_service.get_column_mapping_df = Mock(return_value=column_mapping_df)
self.efe_service.get_mongo_data_by_runtimes = Mock(return_value=mongo_data)
self.efe_service.clean_mongo_data_efe = Mock(return_value=mongo_data)
# Execute the method
self.efe_service.write_mongo_to_oracle_efe(runtimes, namespace, process_name)
# Verify logs and Oracle insertion call
self.oracle_service.execute_query.assert_called_once()
mock_logging.info.assert_any_call(
f"Finished inserting {len(mongo_data)} data into oracle dest_table"
)
@patch("efe_mongo_to_oracle.logging")
def test_write_mongo_to_oracle_efe_exception_handling(self, mock_logging):
# Test exception handling within write_mongo_to_oracle_efe
runtimes = "2024-03-19 12:12:00"
namespace = "EFE"
process_name = "PROCESS_NAME"
# Cause an exception in one of the method calls
self.efe_service.get_column_mapping_df.side_effect = Exception("Test exception")
# Execute the method and verify exception handling
self.efe_service.write_mongo_to_oracle_efe(runtimes, namespace, process_name)
mock_logging.info.assert_called()
self.moog_service.create_event.assert_called_once_with(
"Exception occurred in write_mongo_to_oracle_efe", "EFE Mongo To Oracle", 'write_mongo_to_oracle_efe'
)
0 Comments
Please Login to Comment Here