test_task_service

python 1 day, 8 hours ago
import unittest from unittest.mock import MagicMock, patch from task_service import TaskService from modelclasses.profiler_request import ProfilerRequest from modelclasses.statistics_profiler_request import StatisticsProfilerRequest class TestTaskService(unittest.TestCase): def setUp(self): # Mock dependencies self.mock_config = {} self.mock_oracle_service = MagicMock() self.mock_profiling_service = MagicMock() self.mock_statistics_profiler_service = MagicMock() # Set up environment variables patch.dict('os.environ', { 'GROUP_NAME': 'TestGroup', 'RUN_TYPE': 'profiler' }).start() # Create an instance of TaskService self.task_service = TaskService( self.mock_config, self.mock_oracle_service, self.mock_profiling_service, self.mock_statistics_profiler_service ) def tearDown(self): patch.stopall() def test_run_task_with_profiler(self): # Mock Oracle service responses self.mock_oracle_service.call_stored_proc.return_value = 'test-uuid' self.mock_oracle_service.execute_query.return_value = [('test-uuid', '{"attribute": "value"}')] self.mock_profiling_service.run_profiler.return_value = 'Success' # Run the task self.task_service.run_task() # Verify Oracle service and profiling service calls self.mock_oracle_service.call_stored_proc.assert_called_once() self.mock_oracle_service.execute_query.assert_called_once() self.mock_profiling_service.run_profiler.assert_called_once_with( profiler_request=ProfilerRequest(attribute='value') ) # Verify task status update self.mock_oracle_service.execute_update_query.assert_called_with( "UPDATE JOB_TASK SET UPDATE_TIME = systimestamp, STATUS = 'Completed' where UUID ='test-uuid'" ) def test_run_task_no_uuid_returned(self): # Simulate no UUID returned self.mock_oracle_service.call_stored_proc.return_value = None # Run the task result = self.task_service.run_task() # Verify no profiling task runs self.assertFalse(result) self.mock_profiling_service.run_profiler.assert_not_called() self.mock_oracle_service.execute_query.assert_not_called() def test_update_task_status(self): # Update task status self.task_service.task_dict = {"uuid": "test-uuid"} self.task_service.update_task_status("Completed") # Verify query execution self.mock_oracle_service.execute_update_query.assert_called_once_with( "UPDATE JOB_TASK SET UPDATE_TIME = systimestamp, STATUS = 'Completed' where UUID ='test-uuid'" ) def test_run_task_with_invalid_request(self): # Mock Oracle service responses with invalid data self.mock_oracle_service.call_stored_proc.return_value = 'test-uuid' self.mock_oracle_service.execute_query.return_value = [('test-uuid', '{"invalid": "data"}')] # Mock the profiler to raise a validation error self.mock_profiling_service.run_profiler.side_effect = ValidationError # Run the task self.task_service.run_task() # Verify failure status update self.mock_oracle_service.execute_update_query.assert_called_with( "UPDATE JOB_TASK SET UPDATE_TIME = systimestamp, STATUS = 'Failed' where UUID ='test-uuid'" ) def test_enrich_ss_request_object(self): # Mock Oracle responses for table and request info self.mock_oracle_service.execute_query.side_effect = [ [('table_id_1', 'table_name_1', 'BASE')], [('2023-01-01', '2023-12-31')] ] # Mock request object request = {'datasetId': 123, 'requestId': 456} # Enrich the request enriched_request = self.task_service.enrich_ss_request_object(request) # Verify enriched values self.assertEqual(enriched_request['table_id'], 'table_id_1') self.assertEqual(enriched_request['start_time'], '2023-01-01') self.assertEqual(enriched_request['end_time'], '2023-12-31')
16
Posted By
Python Script to create AWS beanstalk
#!/usr/bin/python
  
import boto
python aws beanstalk
sandeep sandeep
List all files and folders using python os mo
import os

def list_files_folders(path):
python python-os
kishore_kumar
Get current environment variables in python
import os
env = os.environ

python python-os
kishore_kumar
Get os details using python os
import os
print os.uname()
# Don't use os.system('uname -a'), its j
python python-os
kishore_kumar
Get stats ( lines, words, char count ) of fil
def file_stats(path):
    f = open(path, 'r')
    lines = f.readlines()
python
kishore_kumar
Use map function in python
def get_double(num):
    return num * 2

python
kishore_kumar
Python sample codes for beginners
print "Welcome to python"
python
gaya38 gaya38
Python program for even number checking
a=input("Enter a value:")
if (a%2==0):
    print "The given number is even numb
python
gaya38 gaya38
Python program for prime number check
a=input("Enter a value:")
k=0
b=(a/2)+1
python
gaya38 gaya38
Pass command line arguments in python
import sys
x=len(sys.argv)
a=[]
python
gaya38 gaya38
Python program for the largest number in an a
a = [1,43,98,5]#Dummy data
for l in range(len(a)-1):
        if (a[l]>a[l+1]):
python
gaya38 gaya38
print list of even numbers within a range
n=100
a=[10,20,30,40,50]
b=[60,70,80,90]
python
gaya38 gaya38
generate fibonacci series in python
n=input("Enter the constraint to print n
m=input("Enter the maximum value to prin
a=0
python
gaya38 gaya38
Generate Random number within the range in py
import random
print random.uniform(10,500)
python
gaya38 gaya38
Shuffle list elements in python
import random;
z = [1,90,4,2]
z = random.shuffle(z)
python
gaya38 gaya38
use python requests to get contents of url (
import requests

req = requests.get("https://httpbin.org/
python python-requests
kishore_kumar
how to iterate or get values in python dictio
sample_dict = { "number": 1, "fruits": [

for key in sample_dict:
python
kishore_kumar
create matrix and multiply using numpy in pyt
import numpy as np

matrix = [[1,2,3], [4,5,6], [7,8,9]]
python numpy
kishore_kumar
generate random numbers matrix with numpy pyt
import numpy as np

random_arr = np.random.randint(1,50,9)
python numpy
kishore_kumar
Find min , max and mean for numpy arrays
import numpy as np

random_arr = np.random.randint(1,50,9)
python numpy
kishore_kumar