diff --git a/pipelinewise/cli/commands.py b/pipelinewise/cli/commands.py index 28d35b231..f2b4d6aeb 100644 --- a/pipelinewise/cli/commands.py +++ b/pipelinewise/cli/commands.py @@ -1,14 +1,17 @@ """ PipelineWise CLI - Commands """ +import contextlib import os import shlex import logging import json import time +from typing import Callable, Optional +import sherlock from dataclasses import dataclass -from subprocess import PIPE, STDOUT, Popen +from subprocess import PIPE, STDOUT, Popen, TimeoutExpired from . import utils from .errors import StreamBufferTooLargeException @@ -478,13 +481,20 @@ def log_file_with_status(log_file: str, status: str) -> str: return f'{log_file}.{status}' -# pylint: disable=too-many-locals -def run_command(command: str, log_file: str = None, line_callback: callable = None): +# TODO: This method is too complex! make its complexity less than 15! +# pylint: disable=too-many-locals,too-many-statements,too-many-branches +def run_command( # noqa: C901 + command: str, + lock: Optional[sherlock.lock.BaseLock] = None, + log_file: Optional[str] = None, + line_callback: Optional[Callable[[str], str]] = None, +): """ Runs a shell command with or without log file with STDOUT and STDERR Args: command: A unix command to run + lock: An optional distributed lock to ensure only one instance log_file: Write stdout and stderr to log file line_callback: function to call on each line on stdout and stderr """ @@ -505,10 +515,15 @@ def run_command(command: str, log_file: str = None, line_callback: callable = No log_file_success = log_file_with_status(log_file, STATUS_SUCCESS) # Start command - with Popen(shlex.split(piped_command), stdout=PIPE, stderr=STDOUT) as proc: - with open(log_file_running, 'a+', encoding='utf-8') as logfile: + last_renew = time.time() + try: + with lock or contextlib.nullcontext() as lock_obj, \ + Popen(shlex.split(piped_command), stdout=PIPE, stderr=STDOUT) as proc, \ + open(log_file_running, 'a+', encoding='utf-8') as logfile: + # Prevent reading from stdout blocking. + os.set_blocking(proc.stdout.fileno(), False) stdout = '' - while True: + while proc.poll() is None: line = proc.stdout.readline() if line: decoded_line = line.decode('utf-8') @@ -520,8 +535,21 @@ def run_command(command: str, log_file: str = None, line_callback: callable = No logfile.write(decoded_line) logfile.flush() - if proc.poll() is not None: - break + + if lock_obj is not None: + # Check if the lock is going to expire in the next 5 seconds + # and so we should renew the lock. + now = time.time() + if now - last_renew > lock_obj.expire - 5: + if lock_obj.renew() is False: + # We've failed to renew the lock for some reason so we should probably + # exit. + proc.terminate() + last_renew = now + except Exception: + # If the subprocess failed for any reason then make sure to rename the logfile. + os.rename(log_file_running, log_file_failed) + raise proc_rc = proc.poll() if proc_rc != 0: @@ -542,13 +570,30 @@ def run_command(command: str, log_file: str = None, line_callback: callable = No return [proc_rc, stdout, None] # No logfile needed: STDOUT and STDERR returns in an array once the command finished - with Popen(shlex.split(piped_command), stdout=PIPE, stderr=PIPE) as proc: - proc_tuple = proc.communicate() + last_renew = time.time() + with lock or contextlib.nullcontext() as lock_obj, \ + Popen(shlex.split(piped_command), stdout=PIPE, stderr=PIPE) as proc: + while True: + try: + proc_tuple = proc.communicate(timeout=1.0) + break + except TimeoutExpired: + if lock_obj is not None: + # Check if the lock is going to expire in the next 5 seconds + # and so we should renew the lock. + now = time.time() + if now - last_renew > lock_obj.expire - 5: + if lock_obj.renew() is False: + # We've failed to renew the lock for some reason so we should probably + # exit. + proc.terminate() + last_renew = now + proc_rc = proc.returncode stdout = proc_tuple[0].decode('utf-8') stderr = proc_tuple[1].decode('utf-8') - if proc_rc != 0: - LOGGER.error(stderr) + if proc_rc != 0: + LOGGER.error(stderr) return [proc_rc, stdout, stderr] diff --git a/pipelinewise/cli/config.py b/pipelinewise/cli/config.py index 229a6e3e4..e8528b9a8 100644 --- a/pipelinewise/cli/config.py +++ b/pipelinewise/cli/config.py @@ -156,7 +156,6 @@ def get_connector_files(connector_dir: str) -> Dict: 'state': os.path.join(connector_dir, 'state.json'), 'transformation': os.path.join(connector_dir, 'transformation.json'), 'selection': os.path.join(connector_dir, 'selection.json'), - 'pidfile': os.path.join(connector_dir, 'pipelinewise.pid'), } def save(self): diff --git a/pipelinewise/cli/pipelinewise.py b/pipelinewise/cli/pipelinewise.py index 8e90c1b89..8e063d5d2 100644 --- a/pipelinewise/cli/pipelinewise.py +++ b/pipelinewise/cli/pipelinewise.py @@ -3,13 +3,14 @@ """ import logging import os +import pathlib import shutil import signal import sys import json import copy import psutil -import pidfile +import sherlock from datetime import datetime from time import time @@ -60,6 +61,8 @@ } +DEFAULT_LOCK_EXPIRY = 30 + # pylint: disable=too-many-lines,too-many-instance-attributes,too-many-public-methods class PipelineWise: @@ -74,6 +77,9 @@ class PipelineWise: def __init__(self, args, config_dir, venv_dir, profiling_dir=None): + # List of locks to release on exit. + self.locks: Dict[str, sherlock.lock.BaseLock] = {} + self.profiling_mode = args.profiler self.profiling_dir = profiling_dir self.drop_pg_slot = False @@ -111,6 +117,20 @@ def __init__(self, args, config_dir, venv_dir, profiling_dir=None): for sig in [signal.SIGINT, signal.SIGTERM]: signal.signal(sig, self.stop_tap) + def get_lock(self, target_id: str, tap_id: str) -> sherlock.lock.BaseLock: + """ + Get a distributed lock for a given tap and cache it + """ + key = f'{target_id}__{tap_id}' + if key not in self.locks: + self.locks[key] = sherlock.FileLock( + key, + client=pathlib.Path(self.temp_dir), + timeout=1, + expire=DEFAULT_LOCK_EXPIRY, + ) + return self.locks[key] + def send_alert( self, message: str, level: str = BaseAlertHandler.ERROR, exc: Exception = None ) -> dict: @@ -940,15 +960,12 @@ def detect_tap_status(self, target_id, tap_id): status['currentStatus'] = 'not-configured' # Tap exists and has log in running status - elif ( - os.path.isdir(log_dir) - and len(utils.search_files(log_dir, patterns=['*.log.running'])) > 0 - ): + elif self.get_lock(target_id, tap_id).locked(): status['currentStatus'] = 'running' - # Configured and not running + # Tap exists and in running status else: - status['currentStatus'] = 'ready' + status['currentStatus'] = 'running' # Get last run instance if os.path.isdir(log_dir): @@ -1023,19 +1040,6 @@ def run_tap_singer( profiling_dir=self.profiling_dir, ) - # Do not run if another instance is already running - log_dir = os.path.dirname(self.tap_run_log_file) - if ( - os.path.isdir(log_dir) - and len(utils.search_files(log_dir, patterns=['*.log.running'])) > 0 - ): - self.logger.info( - 'Failed to run. Another instance of the same tap is already running. ' - 'Log file detected in running status at %s', - log_dir, - ) - sys.exit(1) - start = None state = None @@ -1067,13 +1071,17 @@ def update_state_file_with_extra_log(line: str) -> str: sys.stdout.write(line) return update_state_file(line) + # We need to get a distributed lock to make sure that we're not running at the same time + # as another process. + lock = self.get_lock(target.target_id, tap.tap_id) + # Run command with update_state_file as a callback to call for every stdout line if self.extra_log: commands.run_command( - command, self.tap_run_log_file, update_state_file_with_extra_log + command, lock, self.tap_run_log_file, update_state_file_with_extra_log ) else: - commands.run_command(command, self.tap_run_log_file, update_state_file) + commands.run_command(command, lock, self.tap_run_log_file, update_state_file) # update the state file one last time to make sure it always has the last state message. if state is not None: @@ -1100,14 +1108,18 @@ def add_partialsync_output_to_main_logger(line: str) -> str: sys.stdout.write(line) return line + # We need to get a distributed lock to make sure that we're not running at the same time + # as another process. + lock = self.get_lock(target.target_id, tap.tap_id) + if self.extra_log: # Run command and copy partialsync output to main logger commands.run_command( - command, self.tap_run_log_file, add_partialsync_output_to_main_logger + command, lock, self.tap_run_log_file, add_partialsync_output_to_main_logger ) else: # Run command - commands.run_command(command, self.tap_run_log_file) + commands.run_command(command, lock, self.tap_run_log_file) def run_tap_fastsync( self, tap: TapParams, target: TargetParams, transform: TransformParams @@ -1128,19 +1140,6 @@ def run_tap_fastsync( drop_pg_slot=self.drop_pg_slot, ) - # Do not run if another instance is already running - log_dir = os.path.dirname(self.tap_run_log_file) - if ( - os.path.isdir(log_dir) - and len(utils.search_files(log_dir, patterns=['*.log.running'])) > 0 - ): - self.logger.info( - 'Failed to run. Another instance of the same tap is already running. ' - 'Log file detected in running status at %s', - log_dir, - ) - sys.exit(1) - # Fastsync is running in subprocess. # Collect the formatted logs and log it in the main PipelineWise process as well # Logs are already formatted at this stage so not using logging functions to avoid double formatting. @@ -1148,14 +1147,18 @@ def add_fastsync_output_to_main_logger(line: str) -> str: sys.stdout.write(line) return line + # We need to get a distributed lock to make sure that we're not running at the same time + # as another process. + lock = self.get_lock(target.target_id, tap.tap_id) + if self.extra_log: # Run command and copy fastsync output to main logger commands.run_command( - command, self.tap_run_log_file, add_fastsync_output_to_main_logger + command, lock, self.tap_run_log_file, add_fastsync_output_to_main_logger ) else: # Run command - commands.run_command(command, self.tap_run_log_file) + commands.run_command(command, lock, self.tap_run_log_file) # pylint: disable=too-many-statements,too-many-locals def run_tap(self): @@ -1233,79 +1236,78 @@ def run_tap(self): utils.create_backup_of_the_file(tap_state) start_time = datetime.now() try: - with pidfile.PIDFile(self.tap['files']['pidfile']): - target_params = TargetParams( - target_id=target_id, - type=target_type, - bin=self.target_bin, - python_bin=self.target_python_bin, - config=cons_target_config, - ) + target_params = TargetParams( + target_id=target_id, + type=target_type, + bin=self.target_bin, + python_bin=self.target_python_bin, + config=cons_target_config, + ) + + transform_params = TransformParams( + bin=self.transform_field_bin, + python_bin=self.transform_field_python_bin, + config=tap_transformation, + tap_id=tap_id, + target_id=target_id, + ) - transform_params = TransformParams( - bin=self.transform_field_bin, - python_bin=self.transform_field_python_bin, - config=tap_transformation, + # Run fastsync for FULL_TABLE replication method + if len(fastsync_stream_ids) > 0: + self.logger.info( + 'Table(s) selected to sync by fastsync: %s', fastsync_stream_ids + ) + self.tap_run_log_file = os.path.join( + log_dir, f'{target_id}-{tap_id}-{current_time}.fastsync.log' + ) + tap_params = TapParams( tap_id=tap_id, - target_id=target_id, + type=tap_type, + bin=self.tap_bin, + python_bin=self.tap_python_bin, + config=tap_config, + properties=tap_properties_fastsync, + state=tap_state, ) - # Run fastsync for FULL_TABLE replication method - if len(fastsync_stream_ids) > 0: - self.logger.info( - 'Table(s) selected to sync by fastsync: %s', fastsync_stream_ids - ) - self.tap_run_log_file = os.path.join( - log_dir, f'{target_id}-{tap_id}-{current_time}.fastsync.log' - ) - tap_params = TapParams( - tap_id=tap_id, - type=tap_type, - bin=self.tap_bin, - python_bin=self.tap_python_bin, - config=tap_config, - properties=tap_properties_fastsync, - state=tap_state, - ) - - self.run_tap_fastsync( - tap=tap_params, target=target_params, transform=transform_params - ) - else: - self.logger.info( - 'No table available that needs to be sync by fastsync' - ) + self.run_tap_fastsync( + tap=tap_params, target=target_params, transform=transform_params + ) + else: + self.logger.info( + 'No table available that needs to be sync by fastsync' + ) - # Run singer tap for INCREMENTAL and LOG_BASED replication methods - if len(singer_stream_ids) > 0: - self.logger.info( - 'Table(s) selected to sync by singer: %s', singer_stream_ids - ) - self.tap_run_log_file = os.path.join( - log_dir, f'{target_id}-{tap_id}-{current_time}.singer.log' - ) - tap_params = TapParams( - tap_id=tap_id, - type=tap_type, - bin=self.tap_bin, - python_bin=self.tap_python_bin, - config=tap_config, - properties=tap_properties_singer, - state=tap_state, - ) + # Run singer tap for INCREMENTAL and LOG_BASED replication methods + if len(singer_stream_ids) > 0: + self.logger.info( + 'Table(s) selected to sync by singer: %s', singer_stream_ids + ) + self.tap_run_log_file = os.path.join( + log_dir, f'{target_id}-{tap_id}-{current_time}.singer.log' + ) + tap_params = TapParams( + tap_id=tap_id, + type=tap_type, + bin=self.tap_bin, + python_bin=self.tap_python_bin, + config=tap_config, + properties=tap_properties_singer, + state=tap_state, + ) - self.run_tap_singer( - tap=tap_params, - target=target_params, - transform=transform_params, - stream_buffer_size=stream_buffer_size, - ) - else: - self.logger.info( - 'No table available that needs to be sync by singer' - ) + self.run_tap_singer( + tap=tap_params, + target=target_params, + transform=transform_params, + stream_buffer_size=stream_buffer_size, + ) + else: + self.logger.info( + 'No table available that needs to be sync by singer' + ) - except pidfile.AlreadyRunningError: + except sherlock.LockTimeoutException: self.logger.error('Another instance of the tap is already running.') sys.exit(1) # Delete temp files if there is any @@ -1329,53 +1331,53 @@ def stop_tap(self, sig=None, frame=None): """ Stop running tap - The command finds the tap specific pidfile that was created by run_tap command and sends - a SIGTERM to the process. + The command sends a SIGTERM to all child processes. """ self.logger.info('Trying to stop tap gracefully...') - pidfile_path = self.tap['files']['pidfile'] - try: - with open(pidfile_path, encoding='utf-8') as pidf: - pid = int(pidf.read()) - pgid = os.getpgid(pid) - parent = psutil.Process(pid) - - # Terminate all the processes in the current process' process group. - for child in parent.children(recursive=True): - if os.getpgid(child.pid) == pgid: - self.logger.info('Sending SIGTERM to child pid %s...', child.pid) - child.terminate() - try: - child.wait(timeout=5) - except psutil.TimeoutExpired: - child.kill() - - except ProcessLookupError: - self.logger.error( - 'Pid %s not found. Is the tap running on this machine? ' - 'Stopping taps remotely is not supported.', - pid, - ) - sys.exit(1) - - except FileNotFoundError: - self.logger.error( - 'No pidfile found at %s. Tap does not seem to be running.', pidfile_path - ) - sys.exit(1) - - # Remove pidfile. - os.remove(pidfile_path) # Rename log files from running to terminated status if self.tap_run_log_file: tap_run_log_file_running = f'{self.tap_run_log_file}.running' tap_run_log_file_terminated = f'{self.tap_run_log_file}.terminated' - if os.path.isfile(tap_run_log_file_running): + try: os.rename(tap_run_log_file_running, tap_run_log_file_terminated) + except FileNotFoundError: + self.logger.warning( + 'No logfile found at %s.', tap_run_log_file_running + ) + except Exception: + self.logger.warning('Failed to rename logfile at %s', tap_run_log_file_running) - sys.exit(1) + # Terminate child processes + try: + pid = os.getpid() + pgid = os.getpgid(pid) + parent = psutil.Process(pid) + + # Terminate all the processes in the current process' process group. + for child in parent.children(recursive=True): + if os.getpgid(child.pid) == pgid: + self.logger.info('Sending SIGTERM to child pid %s...', child.pid) + child.terminate() + try: + child.wait(timeout=1) + except psutil.TimeoutExpired: + child.kill() + except ProcessLookupError: + self.logger.error( + 'Pid %s not found. Is the tap running on this machine? ' + 'Stopping taps remotely is not supported.', + pid, + ) + finally: + # Release any locks that we hold. + for lock in self.locks.values(): + try: + lock.release() + except sherlock.LockException: + self.logger.error('Failed to release lock: %s', lock) + sys.exit(1) # pylint: disable=too-many-locals def sync_tables(self): @@ -1432,59 +1434,58 @@ def sync_tables(self): current_time = datetime.utcnow().strftime('%Y%m%d_%H%M%S') # sync_tables command always using fastsync - with pidfile.PIDFile(self.tap['files']['pidfile']): - self.tap_run_log_file = os.path.join( - log_dir, f'{target_id}-{tap_id}-{current_time}.fastsync.log' - ) + self.tap_run_log_file = os.path.join( + log_dir, f'{target_id}-{tap_id}-{current_time}.fastsync.log' + ) - # Create parameters as NamedTuples - tap_params = TapParams( - tap_id=tap_id, - type=tap_type, - bin=self.tap_bin, - python_bin=self.tap_python_bin, - config=tap_config, - properties=tap_properties, - state=tap_state, - ) + # Create parameters as NamedTuples + tap_params = TapParams( + tap_id=tap_id, + type=tap_type, + bin=self.tap_bin, + python_bin=self.tap_python_bin, + config=tap_config, + properties=tap_properties, + state=tap_state, + ) - target_params = TargetParams( - target_id=target_id, - type=target_type, - bin=self.target_bin, - python_bin=self.target_python_bin, - config=cons_target_config, - ) + target_params = TargetParams( + target_id=target_id, + type=target_type, + bin=self.target_bin, + python_bin=self.target_python_bin, + config=cons_target_config, + ) - transform_params = TransformParams( - bin=self.transform_field_bin, - config=tap_transformation, - python_bin=self.transform_field_python_bin, - tap_id=tap_id, - target_id=target_id, - ) + transform_params = TransformParams( + bin=self.transform_field_bin, + config=tap_transformation, + python_bin=self.transform_field_python_bin, + tap_id=tap_id, + target_id=target_id, + ) - if ConnectorType(target_type) in FASTSYNC_PAIRS.get(ConnectorType(tap_type), set()): - self.run_tap_fastsync( - tap=tap_params, target=target_params, transform=transform_params - ) + if ConnectorType(target_type) in FASTSYNC_PAIRS.get(ConnectorType(tap_type), set()): + self.run_tap_fastsync( + tap=tap_params, target=target_params, transform=transform_params + ) - else: - self.tap_run_log_file = os.path.join( - log_dir, f'{target_id}-{tap_id}-{current_time}.singer.log' - ) - stream_buffer_size = self.tap.get( - 'stream_buffer_size', commands.DEFAULT_STREAM_BUFFER_SIZE - ) + else: + self.tap_run_log_file = os.path.join( + log_dir, f'{target_id}-{tap_id}-{current_time}.singer.log' + ) + stream_buffer_size = self.tap.get( + 'stream_buffer_size', commands.DEFAULT_STREAM_BUFFER_SIZE + ) - self.run_tap_singer( - tap=tap_params, - target=target_params, - transform=transform_params, - stream_buffer_size=stream_buffer_size, - ) + self.run_tap_singer( + tap=tap_params, + target=target_params, + transform=transform_params, + stream_buffer_size=stream_buffer_size, + ) - except pidfile.AlreadyRunningError: + except sherlock.LockTimeoutException: self.logger.error('Another instance of the tap is already running.') sys.exit(1) # Delete temp file if there is any @@ -1719,43 +1720,44 @@ def partial_sync_table(self): log_dir = self.get_tap_log_dir(target_id, tap_id) current_time = datetime.utcnow().strftime('%Y%m%d_%H%M%S') - with pidfile.PIDFile(self.tap['files']['pidfile']): - self.tap_run_log_file = os.path.join( - log_dir, f'{target_id}-{tap_id}-{current_time}.partialsync.log' - ) + self.tap_run_log_file = os.path.join( + log_dir, f'{target_id}-{tap_id}-{current_time}.partialsync.log' + ) - # Create parameters as NamedTuples - tap_params = TapParams( - tap_id=tap_id, - type=tap_type, - bin=self.tap_bin, - python_bin=self.tap_python_bin, - config=tap_config, - properties=tap_properties, - state=tap_state, - ) + # Create parameters as NamedTuples + tap_params = TapParams( + tap_id=tap_id, + type=tap_type, + bin=self.tap_bin, + python_bin=self.tap_python_bin, + config=tap_config, + properties=tap_properties, + state=tap_state, + ) - target_params = TargetParams( - target_id=target_id, - type=target_type, - bin=self.target_bin, - python_bin=self.target_python_bin, - config=cons_target_config, - ) + target_params = TargetParams( + target_id=target_id, + type=target_type, + bin=self.target_bin, + python_bin=self.target_python_bin, + config=cons_target_config, + ) - transform_params = TransformParams( - bin=self.transform_field_bin, - config=tap_transformation, - python_bin=self.transform_field_python_bin, - tap_id=tap_id, - target_id=target_id, - ) + transform_params = TransformParams( + bin=self.transform_field_bin, + config=tap_transformation, + python_bin=self.transform_field_python_bin, + tap_id=tap_id, + target_id=target_id, + ) - self.run_tap_partialsync( - tap=tap_params, target=target_params, transform=transform_params, - ) + self.run_tap_partialsync( + tap=tap_params, + target=target_params, + transform=transform_params, + ) - except pidfile.AlreadyRunningError as exc: + except sherlock.LockTimeoutException as exc: self.logger.error('Another instance of the tap is already running.') raise SystemExit(1) from exc # Delete temp file if there is any diff --git a/setup.py b/setup.py index 1b206e08e..045df124e 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,6 @@ 'pipelinewise-singer-python==1.*', 'singer-encodings==0.0.*', 'messytables==0.15.*', - 'python-pidfile==3.0.0', 'pymongo==3.12.3', 'tzlocal>=2.0,<4.1', 'slackclient==2.9.3', @@ -43,6 +42,7 @@ 'dnspython==2.1.*', 'boto3==1.21.*', 'chardet==4.0.0', + 'sherlock[filelock]==0.4.0', ], extras_require={ 'test': [ diff --git a/tests/units/cli/test_cli.py b/tests/units/cli/test_cli.py index 3ad74e8c3..b06b6cdbd 100644 --- a/tests/units/cli/test_cli.py +++ b/tests/units/cli/test_cli.py @@ -4,7 +4,6 @@ import time import shutil import psutil -import pidfile import pytest from pathlib import Path @@ -614,7 +613,8 @@ def test_command_stop_tap(self): # Stop tap command should stop all the child processes # 1. Start the pipelinewise mock executable that's running # linux piped dummy tap and target connectors - with pidfile.PIDFile(pipelinewise.tap['files']['pidfile']): + pipelinewise.target = {'id': 'target_one'} + with pipelinewise.get_lock('target_one', 'tap_one'): os.spawnl( os.P_NOWAIT, f'{RESOURCES_DIR}/test_stop_tap/scheduler-mock.sh', diff --git a/tests/units/cli/test_config.py b/tests/units/cli/test_config.py index af82a2956..19efad4f4 100644 --- a/tests/units/cli/test_config.py +++ b/tests/units/cli/test_config.py @@ -34,7 +34,6 @@ def test_connector_files(self): 'state': '/var/singer-connector/state.json', 'transformation': '/var/singer-connector/transformation.json', 'selection': '/var/singer-connector/selection.json', - 'pidfile': '/var/singer-connector/pipelinewise.pid', } def test_from_yamls(self): @@ -91,7 +90,6 @@ def test_from_yamls(self): 'selection': f'{PIPELINEWISE_TEST_HOME}/test_snowflake_target/selection.json', 'state': f'{PIPELINEWISE_TEST_HOME}/test_snowflake_target/state.json', 'transformation': f'{PIPELINEWISE_TEST_HOME}/test_snowflake_target/transformation.json', - 'pidfile': f'{PIPELINEWISE_TEST_HOME}/test_snowflake_target/pipelinewise.pid', }, 'taps': [ { @@ -122,7 +120,6 @@ def test_from_yamls(self): 'state': f'{PIPELINEWISE_TEST_HOME}/test_snowflake_target/mysql_sample/state.json', 'transformation': f'{PIPELINEWISE_TEST_HOME}' f'/test_snowflake_target/mysql_sample/transformation.json', - 'pidfile': f'{PIPELINEWISE_TEST_HOME}/test_snowflake_target/mysql_sample/pipelinewise.pid', }, 'schemas': [ { @@ -226,7 +223,6 @@ def test_getters(self): 'state': '/var/singer-connector/state.json', 'transformation': '/var/singer-connector/transformation.json', 'selection': '/var/singer-connector/selection.json', - 'pidfile': '/var/singer-connector/pipelinewise.pid', } def test_save_config(self): diff --git a/tests/units/cli/test_partial_sync.py b/tests/units/cli/test_partial_sync.py index 9995dd992..0f54eb634 100644 --- a/tests/units/cli/test_partial_sync.py +++ b/tests/units/cli/test_partial_sync.py @@ -187,7 +187,7 @@ def test_if_calling_the_module_partial_sync_table_correctly(self, mocked_check): self._run_cli(arguments) call_args = mocked_run_command.call_args[0] - self.assertEqual(2, len(call_args)) + self.assertEqual(3, len(call_args)) # Because each instance of Pipelinewise has a random postfix for log filename, we test it in this way! self.assertRegex( @@ -203,7 +203,7 @@ def test_if_calling_the_module_partial_sync_table_correctly(self, mocked_check): f'--start_value "{arguments["start_value"]}" --end_value "{arguments["end_value"]}"$' ) - self.assertRegex(call_args[1], f'^{self.test_cli.CONFIG_DIR}/{arguments["target"]}/{arguments["tap"]}/log/' + self.assertRegex(call_args[2], f'^{self.test_cli.CONFIG_DIR}/{arguments["target"]}/{arguments["tap"]}/log/' f'{arguments["target"]}-{arguments["tap"]}-[0-9]{{8}}_[0-9]{{6}}' r'\.partialsync\.log') @@ -225,7 +225,7 @@ def test_not_end_value_in_calling_command_if_no_end_value_in_cli_parameter(self, self._run_cli(arguments) call_args = mocked_run_command.call_args[0] - self.assertEqual(2, len(call_args)) + self.assertEqual(3, len(call_args)) # Because each instance of Pipelinewise has a random postfix for log filename, we test it in this way! self.assertRegex( call_args[0], @@ -240,7 +240,7 @@ def test_not_end_value_in_calling_command_if_no_end_value_in_cli_parameter(self, f'--start_value "{arguments["start_value"]}"$' ) - self.assertRegex(call_args[1], f'^{self.test_cli.CONFIG_DIR}/{arguments["target"]}/{arguments["tap"]}/log/' + self.assertRegex(call_args[2], f'^{self.test_cli.CONFIG_DIR}/{arguments["target"]}/{arguments["tap"]}/log/' f'{arguments["target"]}-{arguments["tap"]}-[0-9]{{8}}_[0-9]{{6}}' r'\.partialsync\.log')