400 lines
11 KiB
Python
400 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import contextlib
|
|
import pathlib
|
|
import sys
|
|
import typing
|
|
from pathlib import Path
|
|
from typing import IO, BinaryIO, TextIO
|
|
|
|
import progressbar
|
|
|
|
|
|
def size_to_bytes(size_str: str) -> int:
|
|
"""
|
|
Convert a size string with suffixes 'k', 'm', etc., to bytes.
|
|
|
|
Note: This function also supports '@' as a prefix to a file path to get the
|
|
file size.
|
|
|
|
>>> size_to_bytes('1024k')
|
|
1048576
|
|
>>> size_to_bytes('1024m')
|
|
1073741824
|
|
>>> size_to_bytes('1024g')
|
|
1099511627776
|
|
>>> size_to_bytes('1024')
|
|
1024
|
|
>>> size_to_bytes('1024p')
|
|
1125899906842624
|
|
"""
|
|
|
|
# Define conversion rates
|
|
suffix_exponent = {
|
|
'k': 1,
|
|
'm': 2,
|
|
'g': 3,
|
|
't': 4,
|
|
'p': 5,
|
|
}
|
|
|
|
# Initialize the default exponent to 0 (for bytes)
|
|
exponent = 0
|
|
|
|
# Check if the size starts with '@' (for file sizes, not handled here)
|
|
if size_str.startswith('@'):
|
|
return pathlib.Path(size_str[1:]).stat().st_size
|
|
|
|
# Check if the last character is a known suffix and adjust the multiplier
|
|
if size_str[-1].lower() in suffix_exponent:
|
|
# Update exponent based on the suffix
|
|
exponent = suffix_exponent[size_str[-1].lower()]
|
|
# Remove the suffix from the size_str
|
|
size_str = size_str[:-1]
|
|
|
|
# Convert the size_str to an integer and apply the exponent
|
|
return int(size_str) * (1024**exponent)
|
|
|
|
|
|
def create_argument_parser() -> argparse.ArgumentParser:
|
|
"""
|
|
Create the argument parser for the `progressbar` command.
|
|
"""
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="""
|
|
Monitor the progress of data through a pipe.
|
|
|
|
Note that this is a Python implementation of the original `pv` command
|
|
that is functional but not yet feature complete.
|
|
"""
|
|
)
|
|
|
|
# Display switches
|
|
parser.add_argument(
|
|
'-p',
|
|
'--progress',
|
|
action='store_true',
|
|
help='Turn the progress bar on.',
|
|
)
|
|
parser.add_argument(
|
|
'-t', '--timer', action='store_true', help='Turn the timer on.'
|
|
)
|
|
parser.add_argument(
|
|
'-e', '--eta', action='store_true', help='Turn the ETA timer on.'
|
|
)
|
|
parser.add_argument(
|
|
'-I',
|
|
'--fineta',
|
|
action='store_true',
|
|
help='Display the ETA as local time of arrival.',
|
|
)
|
|
parser.add_argument(
|
|
'-r', '--rate', action='store_true', help='Turn the rate counter on.'
|
|
)
|
|
parser.add_argument(
|
|
'-a',
|
|
'--average-rate',
|
|
action='store_true',
|
|
help='Turn the average rate counter on.',
|
|
)
|
|
parser.add_argument(
|
|
'-b',
|
|
'--bytes',
|
|
action='store_true',
|
|
help='Turn the total byte counter on.',
|
|
)
|
|
parser.add_argument(
|
|
'-8',
|
|
'--bits',
|
|
action='store_true',
|
|
help='Display total bits instead of bytes.',
|
|
)
|
|
parser.add_argument(
|
|
'-T',
|
|
'--buffer-percent',
|
|
action='store_true',
|
|
help='Turn on the transfer buffer percentage display.',
|
|
)
|
|
parser.add_argument(
|
|
'-A',
|
|
'--last-written',
|
|
type=int,
|
|
help='Show the last NUM bytes written.',
|
|
)
|
|
parser.add_argument(
|
|
'-F',
|
|
'--format',
|
|
type=str,
|
|
help='Use the format string FORMAT for output format.',
|
|
)
|
|
parser.add_argument(
|
|
'-n', '--numeric', action='store_true', help='Numeric output.'
|
|
)
|
|
parser.add_argument(
|
|
'-q',
|
|
'--quiet',
|
|
action='store_true',
|
|
help='No output.',
|
|
)
|
|
|
|
# Output modifiers
|
|
parser.add_argument(
|
|
'-W',
|
|
'--wait',
|
|
action='store_true',
|
|
help='Wait until the first byte has been transferred.',
|
|
)
|
|
parser.add_argument('-D', '--delay-start', type=float, help='Delay start.')
|
|
parser.add_argument(
|
|
'-s', '--size', type=str, help='Assume total data size is SIZE.'
|
|
)
|
|
parser.add_argument(
|
|
'-l',
|
|
'--line-mode',
|
|
action='store_true',
|
|
help='Count lines instead of bytes.',
|
|
)
|
|
parser.add_argument(
|
|
'-0',
|
|
'--null',
|
|
action='store_true',
|
|
help='Count lines terminated with a zero byte.',
|
|
)
|
|
parser.add_argument(
|
|
'-i', '--interval', type=float, help='Interval between updates.'
|
|
)
|
|
parser.add_argument(
|
|
'-m',
|
|
'--average-rate-window',
|
|
type=int,
|
|
help='Window for average rate calculation.',
|
|
)
|
|
parser.add_argument(
|
|
'-w',
|
|
'--width',
|
|
type=int,
|
|
help='Assume terminal is WIDTH characters wide.',
|
|
)
|
|
parser.add_argument(
|
|
'-H', '--height', type=int, help='Assume terminal is HEIGHT rows high.'
|
|
)
|
|
parser.add_argument(
|
|
'-N', '--name', type=str, help='Prefix output information with NAME.'
|
|
)
|
|
parser.add_argument(
|
|
'-f', '--force', action='store_true', help='Force output.'
|
|
)
|
|
parser.add_argument(
|
|
'-c',
|
|
'--cursor',
|
|
action='store_true',
|
|
help='Use cursor positioning escape sequences.',
|
|
)
|
|
|
|
# Data transfer modifiers
|
|
parser.add_argument(
|
|
'-L',
|
|
'--rate-limit',
|
|
type=str,
|
|
help='Limit transfer to RATE bytes per second.',
|
|
)
|
|
parser.add_argument(
|
|
'-B',
|
|
'--buffer-size',
|
|
type=str,
|
|
help='Use transfer buffer size of BYTES.',
|
|
)
|
|
parser.add_argument(
|
|
'-C', '--no-splice', action='store_true', help='Never use splice.'
|
|
)
|
|
parser.add_argument(
|
|
'-E', '--skip-errors', action='store_true', help='Ignore read errors.'
|
|
)
|
|
parser.add_argument(
|
|
'-Z',
|
|
'--error-skip-block',
|
|
type=str,
|
|
help='Skip block size when ignoring errors.',
|
|
)
|
|
parser.add_argument(
|
|
'-S',
|
|
'--stop-at-size',
|
|
action='store_true',
|
|
help='Stop transferring after SIZE bytes.',
|
|
)
|
|
parser.add_argument(
|
|
'-Y',
|
|
'--sync',
|
|
action='store_true',
|
|
help='Synchronise buffer caches to disk after writes.',
|
|
)
|
|
parser.add_argument(
|
|
'-K',
|
|
'--direct-io',
|
|
action='store_true',
|
|
help='Set O_DIRECT flag on all inputs/outputs.',
|
|
)
|
|
parser.add_argument(
|
|
'-X',
|
|
'--discard',
|
|
action='store_true',
|
|
help='Discard input data instead of transferring it.',
|
|
)
|
|
parser.add_argument(
|
|
'-d', '--watchfd', type=str, help='Watch file descriptor of process.'
|
|
)
|
|
parser.add_argument(
|
|
'-R',
|
|
'--remote',
|
|
type=int,
|
|
help='Remote control another running instance of pv.',
|
|
)
|
|
|
|
# General options
|
|
parser.add_argument(
|
|
'-P', '--pidfile', type=pathlib.Path, help='Save process ID in FILE.'
|
|
)
|
|
parser.add_argument(
|
|
'input',
|
|
help='Input file path. Uses stdin if not specified.',
|
|
default='-',
|
|
nargs='*',
|
|
)
|
|
parser.add_argument(
|
|
'-o',
|
|
'--output',
|
|
default='-',
|
|
help='Output file path. Uses stdout if not specified.',
|
|
)
|
|
|
|
return parser
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> None: # noqa: C901
|
|
"""
|
|
Main function for the `progressbar` command.
|
|
|
|
Args:
|
|
argv (list[str] | None): Command-line arguments passed to the script.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
parser: argparse.ArgumentParser = create_argument_parser()
|
|
args: argparse.Namespace = parser.parse_args(argv)
|
|
|
|
with contextlib.ExitStack() as stack:
|
|
output_stream: typing.IO[typing.Any] = _get_output_stream(
|
|
args.output, args.line_mode, stack
|
|
)
|
|
|
|
input_paths: list[BinaryIO | TextIO | Path | IO[typing.Any]] = []
|
|
total_size: int = 0
|
|
filesize_available: bool = True
|
|
for filename in args.input:
|
|
input_path: typing.IO[typing.Any] | pathlib.Path
|
|
if filename == '-':
|
|
if args.line_mode:
|
|
input_path = sys.stdin
|
|
else:
|
|
input_path = sys.stdin.buffer
|
|
|
|
filesize_available = False
|
|
else:
|
|
input_path = pathlib.Path(filename)
|
|
if not input_path.exists():
|
|
parser.error(f'File not found: {filename}')
|
|
|
|
if not args.size:
|
|
total_size += input_path.stat().st_size
|
|
|
|
input_paths.append(input_path)
|
|
|
|
# Determine the size for the progress bar (if provided)
|
|
if args.size:
|
|
total_size = size_to_bytes(args.size)
|
|
filesize_available = True
|
|
|
|
if filesize_available:
|
|
# Create the progress bar components
|
|
widgets = [
|
|
progressbar.Percentage(),
|
|
' ',
|
|
progressbar.Bar(),
|
|
' ',
|
|
progressbar.Timer(),
|
|
' ',
|
|
progressbar.FileTransferSpeed(),
|
|
]
|
|
else:
|
|
widgets = [
|
|
progressbar.SimpleProgress(),
|
|
' ',
|
|
progressbar.DataSize(),
|
|
' ',
|
|
progressbar.Timer(),
|
|
]
|
|
|
|
if args.eta:
|
|
widgets.append(' ')
|
|
widgets.append(progressbar.AdaptiveETA())
|
|
|
|
# Initialize the progress bar
|
|
bar = progressbar.ProgressBar(
|
|
# widgets=widgets,
|
|
max_value=total_size or None,
|
|
max_error=False,
|
|
)
|
|
|
|
# Data processing and updating the progress bar
|
|
buffer_size = (
|
|
size_to_bytes(args.buffer_size) if args.buffer_size else 1024
|
|
)
|
|
total_transferred = 0
|
|
|
|
bar.start()
|
|
with contextlib.suppress(KeyboardInterrupt):
|
|
for input_path in input_paths:
|
|
if isinstance(input_path, pathlib.Path):
|
|
input_stream = stack.enter_context(
|
|
input_path.open('r' if args.line_mode else 'rb')
|
|
)
|
|
else:
|
|
input_stream = input_path
|
|
|
|
while True:
|
|
data: str | bytes
|
|
if args.line_mode:
|
|
data = input_stream.readline(buffer_size)
|
|
else:
|
|
data = input_stream.read(buffer_size)
|
|
|
|
if not data:
|
|
break
|
|
|
|
output_stream.write(data)
|
|
total_transferred += len(data)
|
|
bar.update(total_transferred)
|
|
|
|
bar.finish(dirty=True)
|
|
|
|
|
|
def _get_output_stream(
|
|
output: str | None,
|
|
line_mode: bool,
|
|
stack: contextlib.ExitStack,
|
|
) -> typing.IO[typing.Any]:
|
|
if output and output != '-':
|
|
mode = 'w' if line_mode else 'wb'
|
|
return stack.enter_context(open(output, mode)) # noqa: SIM115
|
|
elif line_mode:
|
|
return sys.stdout
|
|
else:
|
|
return sys.stdout.buffer
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|