import csv from typing import TextIO from smart_open import open def byte_ranges( csvfile: str, chunk_size: int = 100, **kwargs, ) -> list[tuple[int, int]]: """Compute byte ranges for reading a CSV file in fixed-size line chunks. Returns pairs (start_byte, end_byte) for each fixed-size group of lines. Parameters ---------- csvfile : str Path to the CSV file, opened in binary mode internally. chunk_size : int, optional Number of lines per chunk. Default is 100. **kwargs : Extra options passed to `open()`, e.g., buffering. Returns ------- list of tuple[int, int] Byte ranges covering each chunk of lines. Example ------- >>> byte_ranges("users.csv", chunk_size=500) [(0, 3125), (3126, 6150), (6151, 9124)] """ line_offsets = [0] with open(csvfile, 'rb', **kwargs) as fp: while True: if not fp.readline(): break line_offsets.append(fp.tell()) total_lines = len(line_offsets) - 1 byte_ranges = [] for start_line in range(1, total_lines + 1, chunk_size): # Calculate the end line index, bounded by total lines end_line = min(start_line + chunk_size - 1, total_lines) # Get byte range for this chunk start_byte = line_offsets[start_line - 1] end_byte = line_offsets[end_line] - 1 byte_ranges.append((start_byte, end_byte)) return byte_ranges def detect_delimiter(sample: TextIO) -> str: """Detect the delimiter character used in a CSV file. Parameters ---------- sample : TextIO A file-like object opened in text mode (e.g., from `open('file.csv')`). Must be readable and at position 0. Returns ------- str The detected delimiter character (e.g., ',', ';', '\\t'). Raises ------ csv.Error If the file cannot be parsed as CSV or delimiter detection fails. ValueError If the file is empty or contains no detectable delimiter. """ sniffer = csv.Sniffer() dialect = sniffer.sniff(sample.read()) sample.seek(0) return dialect.delimiter