# test_audio_deletion_service.py

python 14 hours, 9 minutes ago
# test_audio_deletion_service.py import unittest from unittest.mock import Mock, patch from audio_deletion_service import AudioDeletionService from datetime import datetime import pandas as pd class TestAudioDeletionService(unittest.TestCase): def setUp(self): # Mock configuration and external dependencies config = Mock() self.call_miner_util = Mock() self.oracle_service = Mock() self.mongo_service = Mock() self.s3_service = Mock() self.auth = Mock() self.config_service = Mock() self.cert_service = Mock() self.workflow_service = Mock() self.audit_service = Mock() # Initialize AudioDeletionService with mocked dependencies self.audio_service = AudioDeletionService( config, self.call_miner_util, self.oracle_service, self.mongo_service, self.s3_service, self.auth, self.config_service, self.cert_service, self.workflow_service, self.audit_service ) @patch("audio_deletion_service.logger") def test_delete_audio_from_s3_no_files(self, mock_logger): # Test when no files are found for deletion trace_id = "test_trace" start_time = int(datetime.now().timestamp()) end_time = start_time + 1000 collection = "test_collection" # Mock MongoDB query to return an empty DataFrame self.audio_service.get_audio_paths_from_mongo = Mock(return_value=pd.DataFrame()) # Execute the method self.audio_service.delete_audio_from_s3(collection, start_time, end_time, trace_id) # Verify that no deletion occurred and appropriate log was generated mock_logger.assert_any_call( 'info', trace_id, 'delete_audio_from_s3', f'No files in collection "{collection}" between {datetime.utcfromtimestamp(start_time)} and {datetime.utcfromtimestamp(end_time)}' ) self.s3_service.delete_from_bucket.assert_not_called() self.audio_service.check_and_update_scheduler.assert_called_once_with(None, None, trace_id, "Processing") @patch("audio_deletion_service.logger") def test_delete_audio_from_s3_successful_deletion(self, mock_logger): # Test successful deletion case trace_id = "test_trace" start_time = int(datetime.now().timestamp()) end_time = start_time + 1000 collection = "test_collection" # Mock MongoDB query to return audio paths audio_paths_df = pd.DataFrame({"audio_path": ["path1/audio1.mp3", "path2/audio2.mp3"]}) self.audio_service.get_audio_paths_from_mongo = Mock(return_value=audio_paths_df) # Mock S3 delete_from_bucket to return the number of deleted files self.s3_service.delete_from_bucket = Mock(return_value=2) # Execute the method self.audio_service.delete_audio_from_s3(collection, start_time, end_time, trace_id) # Verify logs and scheduler updates mock_logger.assert_any_call( 'info', trace_id, 'delete_audio_from_s3', f'Beginning deletion of 2 audio files in S3 between {datetime.utcfromtimestamp(start_time)} and {datetime.utcfromtimestamp(end_time)}' ) mock_logger.assert_any_call( 'info', trace_id, 'delete_audio_from_s3', 'Successfully deleted 2 audio files from S3' ) self.audio_service.check_and_update_scheduler.assert_any_call( None, None, trace_id, "Processing" ) self.audio_service.check_and_update_scheduler.assert_any_call( None, None, trace_id, "Completed", 2, 2, 0 ) @patch("audio_deletion_service.logger") def test_delete_audio_from_s3_exception_handling(self, mock_logger): # Test exception handling during audio deletion trace_id = "test_trace" start_time = int(datetime.now().timestamp()) end_time = start_time + 1000 collection = "test_collection" # Simulate an exception in delete_from_bucket self.s3_service.delete_from_bucket.side_effect = Exception("S3 Deletion Failed") # Mock MongoDB to return a non-empty DataFrame audio_paths_df = pd.DataFrame({"audio_path": ["path1/audio1.mp3", "path2/audio2.mp3"]}) self.audio_service.get_audio_paths_from_mongo = Mock(return_value=audio_paths_df) # Execute the method and verify error handling with self.assertRaises(Exception): self.audio_service.delete_audio_from_s3(collection, start_time, end_time, trace_id) mock_logger.assert_any_call("error", trace_id, "executor", "Exception occurred in delete_audio_from_s3") self.audio_service.check_and_update_scheduler.assert_any_call(None, None, trace_id, "Failed") def test_get_audio_paths_from_mongo(self): # Test get_audio_paths_from_mongo to ensure MongoDB query is executed and formatted correctly collection = "test_collection" start_time = datetime(2023, 1, 1) end_time = datetime(2023, 1, 2) # Mock the MongoDB response mock_data = [ {"audio_path": "path/to/audio1.mp3"}, {"audio_path": "path/to/audio2.mp3"} ] self.mongo_service.execute_query = Mock(return_value=mock_data) # Execute the method df = self.audio_service.get_audio_paths_from_mongo(collection, start_time, end_time) # Verify the result is a DataFrame with expected data expected_df = pd.DataFrame(mock_data) pd.testing.assert_frame_equal(df, expected_df) self.mongo_service.execute_query.assert_called_once_with( collection, {'client_capture_date': {"$gte": start_time, "$lte": end_time}}, {'audio_path': 1} )
9
Posted By
Python Script to create AWS beanstalk
#!/usr/bin/python
  
import boto
python aws beanstalk
sandeep sandeep
List all files and folders using python os mo
import os

def list_files_folders(path):
python python-os
kishore_kumar
Get current environment variables in python
import os
env = os.environ

python python-os
kishore_kumar
Get os details using python os
import os
print os.uname()
# Don't use os.system('uname -a'), its j
python python-os
kishore_kumar
Get stats ( lines, words, char count ) of fil
def file_stats(path):
    f = open(path, 'r')
    lines = f.readlines()
python
kishore_kumar
Use map function in python
def get_double(num):
    return num * 2

python
kishore_kumar
Python sample codes for beginners
print "Welcome to python"
python
gaya38 gaya38
Python program for even number checking
a=input("Enter a value:")
if (a%2==0):
    print "The given number is even numb
python
gaya38 gaya38
Python program for prime number check
a=input("Enter a value:")
k=0
b=(a/2)+1
python
gaya38 gaya38
Pass command line arguments in python
import sys
x=len(sys.argv)
a=[]
python
gaya38 gaya38
Python program for the largest number in an a
a = [1,43,98,5]#Dummy data
for l in range(len(a)-1):
        if (a[l]>a[l+1]):
python
gaya38 gaya38
print list of even numbers within a range
n=100
a=[10,20,30,40,50]
b=[60,70,80,90]
python
gaya38 gaya38
generate fibonacci series in python
n=input("Enter the constraint to print n
m=input("Enter the maximum value to prin
a=0
python
gaya38 gaya38
Generate Random number within the range in py
import random
print random.uniform(10,500)
python
gaya38 gaya38
Shuffle list elements in python
import random;
z = [1,90,4,2]
z = random.shuffle(z)
python
gaya38 gaya38
use python requests to get contents of url (
import requests

req = requests.get("https://httpbin.org/
python python-requests
kishore_kumar
how to iterate or get values in python dictio
sample_dict = { "number": 1, "fruits": [

for key in sample_dict:
python
kishore_kumar
create matrix and multiply using numpy in pyt
import numpy as np

matrix = [[1,2,3], [4,5,6], [7,8,9]]
python numpy
kishore_kumar
generate random numbers matrix with numpy pyt
import numpy as np

random_arr = np.random.randint(1,50,9)
python numpy
kishore_kumar
Find min , max and mean for numpy arrays
import numpy as np

random_arr = np.random.randint(1,50,9)
python numpy
kishore_kumar