leaf_focus.pdf.xpdf
Text extraction from pdf using xpdf tools.
1"""Text extraction from pdf using xpdf tools.""" 2 3from __future__ import annotations 4 5import dataclasses 6import json 7import logging 8import pathlib 9import subprocess 10 11from datetime import datetime 12 13from beartype import beartype, typing 14from defusedxml import ElementTree 15 16from leaf_focus import utils 17from leaf_focus.pdf import model 18 19 20logger = logging.getLogger(__name__) 21 22 23@beartype 24class XpdfProgram: 25 """Interact with xpdf tools.""" 26 27 OPTS_TEXT_ENCODING: tuple[str, str, str, str, str, str] = ( 28 "Latin1", 29 "ASCII7", 30 "Symbol", 31 "ZapfDingbats", 32 "UTF-8", 33 "UCS-2", 34 ) 35 OPTS_TEXT_LINE_ENDING: tuple[str, str, str] = ("unix", "dos", "mac") 36 OPTS_IMAGE_ROTATION: tuple[int, int, int, int] = (0, 90, 180, 270) 37 OPTS_IMAGE_FREETYPE: tuple[str, str] = ("yes", "no") 38 OPTS_IMAGE_ANTI_ALIAS: tuple[str, str] = ("yes", "no") 39 OPTS_IMAGE_VEC_ANTI_ALIAS: tuple[str, str] = ("yes", "no") 40 41 def __init__(self, directory: pathlib.Path) -> None: 42 """Create a new xpdf program class to interact with xpdf tools. 43 44 Args: 45 directory: The path to the directory containing xpdf tools. 46 """ 47 self._directory = directory 48 49 def info( 50 self, 51 pdf_path: pathlib.Path, 52 output_dir: pathlib.Path, 53 xpdf_args: model.XpdfInfoArgs, 54 ) -> model.XpdfInfoResult: 55 """Get information from a pdf file. 56 57 Args: 58 pdf_path: The path to the pdf file. 59 output_dir: The directory to save pdf info file. 60 xpdf_args: The program arguments. 61 62 Returns: 63 The pdf file information. 64 """ 65 # validation 66 enc = xpdf_args.encoding 67 utils.validate("text encoding", enc, self.OPTS_TEXT_ENCODING) 68 69 utils.validate_pages(xpdf_args.first_page, xpdf_args.last_page) 70 71 if not pdf_path.exists(): 72 msg = f"Pdf file not found '{pdf_path}'." 73 raise utils.LeafFocusError(msg) from FileNotFoundError(pdf_path) 74 75 output_file = utils.output_root(pdf_path, "info", output_dir) 76 output_file = output_file.with_suffix(".json") 77 78 if output_file.exists(): 79 logger.info("Loading existing pdf info file.") 80 with pathlib.Path.open(output_file, encoding="utf-8") as info_file: 81 data = json.load(info_file) 82 data["creation_date"] = utils.parse_date(data.get("creation_date")) 83 data["modification_date"] = utils.parse_date( 84 data.get("modification_date") 85 ) 86 return model.XpdfInfoResult(**data) 87 88 logger.info("Extracting pdf info and saving to file.") 89 90 # build command 91 exe_path = utils.select_exe(self._directory / "pdfinfo") 92 cmd = [str(exe_path)] 93 94 cmd_args = self.build_cmd(xpdf_args) 95 96 cmd.extend(cmd_args) 97 cmd.append(str(pdf_path.resolve())) 98 99 # execute program 100 result = subprocess.run( # noqa: S603 101 cmd, 102 capture_output=True, 103 check=True, 104 timeout=30, 105 text=True, 106 ) 107 lines = result.stdout.splitlines() 108 109 metadata_line_index, data = self.build_field_metadata( 110 pdf_path, 111 lines, 112 ) 113 114 # metadata 115 if metadata_line_index is not None: 116 start = metadata_line_index + 1 117 metadata = "\n".join(lines[start:]) 118 root = ElementTree.fromstring(metadata) 119 data["metadata"] = utils.xml_to_element(root).to_dict() 120 121 if output_dir and output_dir.exists(): 122 logger.debug("Saving pdf info to '%s'.", output_file) 123 output_file.write_text( 124 json.dumps(data, indent=2, cls=utils.CustomJsonEncoder), 125 ) 126 127 return model.XpdfInfoResult(**data) 128 129 def text( 130 self, 131 pdf_path: pathlib.Path, 132 output_path: pathlib.Path, 133 xpdf_args: model.XpdfTextArgs, 134 ) -> model.XpdfTextResult: 135 """Get the text from a pdf file. 136 137 Args: 138 pdf_path: The path to the pdf file. 139 output_path: The directory to save output files. 140 xpdf_args: The pdf program arguments. 141 142 Returns: 143 The result from running the text extraction program. 144 """ 145 # validation 146 eol = xpdf_args.line_end_type 147 utils.validate("end of line", eol, self.OPTS_TEXT_LINE_ENDING) 148 149 utils.validate_pages(xpdf_args.first_page, xpdf_args.last_page) 150 151 if not pdf_path.exists(): 152 msg = f"Pdf file not found '{pdf_path}'." 153 raise utils.LeafFocusError(msg) from FileNotFoundError(str(pdf_path)) 154 155 # build command 156 157 cmd_args = self.build_cmd(xpdf_args) 158 159 output_file = utils.output_root(pdf_path, "output", output_path, cmd_args) 160 output_file = output_file.with_suffix(".txt") 161 162 # check if embedded text file already exists 163 if output_file.exists(): 164 logger.info("Loading extracted embedded text from existing file.") 165 return model.XpdfTextResult( 166 stdout=[], 167 stderr=[], 168 output_path=output_file, 169 ) 170 171 if logger.isEnabledFor(logging.DEBUG): 172 logger.debug("Did not find expected output file '%s'", output_file.name) 173 logger.debug("Listing items in '%s'", output_file.parent) 174 item_count = 0 175 for item in output_file.parent.iterdir(): 176 item_count += 1 177 logger.debug("Found item '%s'", item) 178 logger.debug("Found %s items in dir.", item_count) 179 180 logger.info("Extracting pdf embedded text and saving to file.") 181 182 exe_path = utils.select_exe(self._directory / "pdftotext") 183 184 cmd = [str(exe_path)] 185 186 cmd.extend([*cmd_args, str(pdf_path), str(output_file)]) 187 188 # execute program 189 result = subprocess.run( # noqa: S603 190 cmd, 191 capture_output=True, 192 check=True, 193 timeout=30, 194 text=True, 195 ) 196 197 logger.debug("Saving pdf embedded text to '%s'.", output_file) 198 199 return model.XpdfTextResult( 200 stdout=(result.stdout or "").splitlines(), 201 stderr=(result.stderr or "").splitlines(), 202 output_path=output_file, 203 ) 204 205 def image( 206 self, 207 pdf_path: pathlib.Path, 208 output_path: pathlib.Path, 209 xpdf_args: model.XpdfImageArgs, 210 ) -> model.XpdfImageResult: 211 """Create images of pdf pages. 212 213 Args: 214 pdf_path: The path to the pdf file. 215 output_path: The directory to save output files. 216 xpdf_args: The program arguments. 217 218 Returns: 219 The pdf file image info. 220 """ 221 # validation 222 rot = xpdf_args.rotation 223 utils.validate("rotation", rot, self.OPTS_IMAGE_ROTATION) 224 225 free_type = xpdf_args.free_type 226 utils.validate("freetype", free_type, self.OPTS_IMAGE_FREETYPE) 227 228 anti_alias = xpdf_args.anti_aliasing 229 utils.validate("anti-aliasing", anti_alias, self.OPTS_IMAGE_ANTI_ALIAS) 230 231 anti_alias_vec = xpdf_args.anti_aliasing 232 utils.validate( 233 "vector anti-aliasing", 234 anti_alias_vec, 235 self.OPTS_IMAGE_VEC_ANTI_ALIAS, 236 ) 237 238 utils.validate_pages(xpdf_args.first_page, xpdf_args.last_page) 239 240 if not pdf_path.exists(): 241 msg = f"Pdf file not found '{pdf_path}'." 242 raise utils.LeafFocusError(msg) from FileNotFoundError(str(pdf_path)) 243 244 logger.info("Saving each pdf page as an image.") 245 246 # build command 247 cmd_args = self.build_cmd(xpdf_args) 248 249 output_type = "page-image" 250 251 # don't include the page limits when building the output prefix 252 xpdf_args.first_page = None 253 xpdf_args.last_page = None 254 output_cmd_args = self.build_cmd(xpdf_args) 255 output_dir = utils.output_root( 256 pdf_path, 257 output_type, 258 output_path, 259 output_cmd_args, 260 ) 261 262 for pdf_image_file in output_dir.parent.iterdir(): 263 if not pdf_image_file.name.startswith(output_dir.name): 264 continue 265 266 logger.info("Found existing pdf images.") 267 268 output_files = self.find_images(output_dir) 269 return model.XpdfImageResult( 270 stdout=[], 271 stderr=[], 272 output_dir=output_dir, 273 output_files=output_files, 274 ) 275 276 exe_path = utils.select_exe(self._directory / "pdftopng") 277 cmd = [str(exe_path)] 278 279 cmd.extend([*cmd_args, str(pdf_path), str(output_dir)]) 280 281 # execute program 282 result = subprocess.run( # noqa: S603 283 cmd, 284 capture_output=True, 285 check=True, 286 timeout=30, 287 text=True, 288 ) 289 290 logger.debug("Created pdf page images using prefix '%s'.", output_dir) 291 292 output_files = self.find_images(output_dir) 293 294 return model.XpdfImageResult( 295 stdout=(result.stdout or "").splitlines(), 296 stderr=(result.stderr or "").splitlines(), 297 output_dir=output_dir, 298 output_files=output_files, 299 ) 300 301 def build_cmd(self, tool_args: model.XpdfArgs) -> list[str]: 302 """Build the command arguments from a data class.""" 303 arg_class = tool_args.__class__ 304 cmd_args = [] 305 for field in dataclasses.fields(arg_class): 306 name = field.name 307 value = getattr(tool_args, name) 308 309 field_default = field.default 310 311 # TODO: account for default_factory 312 313 # validate the arg config 314 cmd_key = field.metadata.get("leaf_focus", {}).get("cmd") 315 if not cmd_key: 316 msg = f"Args incorrectly configured: missing 'cmd' for '{name}'." 317 raise ValueError(msg) 318 319 cmd_type = field.metadata.get("leaf_focus", {}).get("cmd_type") 320 if not cmd_type: 321 msg = f"Args incorrectly configured: missing 'cmd_type' for '{name}'." 322 raise ValueError(msg) 323 324 # add the arg 325 if cmd_type == "bool": 326 if value is not None and value is not True and value is not False: 327 msg = ( 328 f"Argument '{name}' must be None, True, or False, " 329 f"not '{value}'." 330 ) 331 raise ValueError(msg) 332 333 if value is True: 334 cmd_args.extend([str(cmd_key)]) 335 336 elif cmd_type == "single": 337 if ( 338 field_default is None 339 and value is not None 340 or field_default != value 341 ): 342 cmd_args.extend([str(cmd_key), str(value)]) 343 else: 344 # no need to add cmd 345 pass 346 else: 347 msg = ( 348 f"Argument '{name}' has unknown cmd_type '{cmd_type}'. " 349 "Expected one of 'bool, single'." 350 ) 351 raise ValueError(msg) 352 353 return cmd_args 354 355 def find_images(self, output_dir: pathlib.Path) -> list[pathlib.Path]: 356 """Find image files in a directory.""" 357 stem_parts = 7 358 stem_digit_parts = 6 359 output_files = [] 360 for file_path in output_dir.parent.iterdir(): 361 if not file_path.is_file(): 362 continue 363 if not file_path.name.startswith(output_dir.stem): 364 continue 365 if file_path.suffix != ".png": 366 continue 367 if len(file_path.stem) < stem_parts: 368 continue 369 if file_path.stem[-stem_parts] != "-": 370 continue 371 if not all(i.isdigit() for i in file_path.stem[-stem_digit_parts:]): 372 continue 373 output_files.append(file_path) 374 375 if not output_files: 376 logger.warning("No page images found.") 377 378 return output_files 379 380 def build_field_metadata( 381 self, 382 pdf_path: pathlib.Path, 383 lines: typing.Iterable[str], 384 ) -> tuple[int | None, dict[str, typing.Any]]: 385 """Build metadata for a field.""" 386 fields_map = { 387 field.metadata.get("leaf_focus", {}).get("name"): field 388 for field in dataclasses.fields(model.XpdfInfoResult) 389 } 390 metadata_line_index: int | None = None 391 392 data: dict[str, typing.Any] = {i.name: None for i in fields_map.values()} 393 for index, line in enumerate(lines): 394 if line.startswith("Metadata:"): 395 metadata_line_index = index 396 break 397 398 value: typing.Any = None 399 key, value = line.split(":", maxsplit=1) 400 key = key.strip() 401 402 field = fields_map.get(key) 403 if not field: 404 msg = f"Unknown pdf info key '{key}' value '{value}' in '{pdf_path}'." 405 raise utils.LeafFocusError(msg) 406 407 data_key = field.name 408 if data.get(data_key) is not None: 409 msg = f"Duplicate pdf info key '{key}' in '{pdf_path}'." 410 raise utils.LeafFocusError(msg) 411 412 typing_arg = typing.get_args(field.type) 413 types_str = [str, "str", "str | None"] 414 types_bool = [bool, "bool", "bool | None"] 415 types_int = [int, "int", "int | None"] 416 types_datetime = [datetime, "datetime", "datetime | None"] 417 418 if field.type in types_str or str in typing_arg: 419 value = value.strip() 420 elif field.type in types_datetime or datetime in typing_arg: 421 value = utils.parse_date(value.strip()) 422 elif field.type in types_bool or bool in typing_arg: 423 value = value.strip().lower() == "yes" 424 elif field.type in types_int or int in typing_arg: 425 if data_key == "file_size_bytes": 426 value = value.replace(" bytes", "") 427 value = int(value.strip().lower()) 428 else: 429 msg = f"Unknown key '{key}' type '{field.type}'" 430 raise ValueError(msg) 431 432 data[data_key] = value 433 434 return metadata_line_index, data
logger =
<Logger leaf_focus.pdf.xpdf (WARNING)>
@beartype
class
XpdfProgram:
24@beartype 25class XpdfProgram: 26 """Interact with xpdf tools.""" 27 28 OPTS_TEXT_ENCODING: tuple[str, str, str, str, str, str] = ( 29 "Latin1", 30 "ASCII7", 31 "Symbol", 32 "ZapfDingbats", 33 "UTF-8", 34 "UCS-2", 35 ) 36 OPTS_TEXT_LINE_ENDING: tuple[str, str, str] = ("unix", "dos", "mac") 37 OPTS_IMAGE_ROTATION: tuple[int, int, int, int] = (0, 90, 180, 270) 38 OPTS_IMAGE_FREETYPE: tuple[str, str] = ("yes", "no") 39 OPTS_IMAGE_ANTI_ALIAS: tuple[str, str] = ("yes", "no") 40 OPTS_IMAGE_VEC_ANTI_ALIAS: tuple[str, str] = ("yes", "no") 41 42 def __init__(self, directory: pathlib.Path) -> None: 43 """Create a new xpdf program class to interact with xpdf tools. 44 45 Args: 46 directory: The path to the directory containing xpdf tools. 47 """ 48 self._directory = directory 49 50 def info( 51 self, 52 pdf_path: pathlib.Path, 53 output_dir: pathlib.Path, 54 xpdf_args: model.XpdfInfoArgs, 55 ) -> model.XpdfInfoResult: 56 """Get information from a pdf file. 57 58 Args: 59 pdf_path: The path to the pdf file. 60 output_dir: The directory to save pdf info file. 61 xpdf_args: The program arguments. 62 63 Returns: 64 The pdf file information. 65 """ 66 # validation 67 enc = xpdf_args.encoding 68 utils.validate("text encoding", enc, self.OPTS_TEXT_ENCODING) 69 70 utils.validate_pages(xpdf_args.first_page, xpdf_args.last_page) 71 72 if not pdf_path.exists(): 73 msg = f"Pdf file not found '{pdf_path}'." 74 raise utils.LeafFocusError(msg) from FileNotFoundError(pdf_path) 75 76 output_file = utils.output_root(pdf_path, "info", output_dir) 77 output_file = output_file.with_suffix(".json") 78 79 if output_file.exists(): 80 logger.info("Loading existing pdf info file.") 81 with pathlib.Path.open(output_file, encoding="utf-8") as info_file: 82 data = json.load(info_file) 83 data["creation_date"] = utils.parse_date(data.get("creation_date")) 84 data["modification_date"] = utils.parse_date( 85 data.get("modification_date") 86 ) 87 return model.XpdfInfoResult(**data) 88 89 logger.info("Extracting pdf info and saving to file.") 90 91 # build command 92 exe_path = utils.select_exe(self._directory / "pdfinfo") 93 cmd = [str(exe_path)] 94 95 cmd_args = self.build_cmd(xpdf_args) 96 97 cmd.extend(cmd_args) 98 cmd.append(str(pdf_path.resolve())) 99 100 # execute program 101 result = subprocess.run( # noqa: S603 102 cmd, 103 capture_output=True, 104 check=True, 105 timeout=30, 106 text=True, 107 ) 108 lines = result.stdout.splitlines() 109 110 metadata_line_index, data = self.build_field_metadata( 111 pdf_path, 112 lines, 113 ) 114 115 # metadata 116 if metadata_line_index is not None: 117 start = metadata_line_index + 1 118 metadata = "\n".join(lines[start:]) 119 root = ElementTree.fromstring(metadata) 120 data["metadata"] = utils.xml_to_element(root).to_dict() 121 122 if output_dir and output_dir.exists(): 123 logger.debug("Saving pdf info to '%s'.", output_file) 124 output_file.write_text( 125 json.dumps(data, indent=2, cls=utils.CustomJsonEncoder), 126 ) 127 128 return model.XpdfInfoResult(**data) 129 130 def text( 131 self, 132 pdf_path: pathlib.Path, 133 output_path: pathlib.Path, 134 xpdf_args: model.XpdfTextArgs, 135 ) -> model.XpdfTextResult: 136 """Get the text from a pdf file. 137 138 Args: 139 pdf_path: The path to the pdf file. 140 output_path: The directory to save output files. 141 xpdf_args: The pdf program arguments. 142 143 Returns: 144 The result from running the text extraction program. 145 """ 146 # validation 147 eol = xpdf_args.line_end_type 148 utils.validate("end of line", eol, self.OPTS_TEXT_LINE_ENDING) 149 150 utils.validate_pages(xpdf_args.first_page, xpdf_args.last_page) 151 152 if not pdf_path.exists(): 153 msg = f"Pdf file not found '{pdf_path}'." 154 raise utils.LeafFocusError(msg) from FileNotFoundError(str(pdf_path)) 155 156 # build command 157 158 cmd_args = self.build_cmd(xpdf_args) 159 160 output_file = utils.output_root(pdf_path, "output", output_path, cmd_args) 161 output_file = output_file.with_suffix(".txt") 162 163 # check if embedded text file already exists 164 if output_file.exists(): 165 logger.info("Loading extracted embedded text from existing file.") 166 return model.XpdfTextResult( 167 stdout=[], 168 stderr=[], 169 output_path=output_file, 170 ) 171 172 if logger.isEnabledFor(logging.DEBUG): 173 logger.debug("Did not find expected output file '%s'", output_file.name) 174 logger.debug("Listing items in '%s'", output_file.parent) 175 item_count = 0 176 for item in output_file.parent.iterdir(): 177 item_count += 1 178 logger.debug("Found item '%s'", item) 179 logger.debug("Found %s items in dir.", item_count) 180 181 logger.info("Extracting pdf embedded text and saving to file.") 182 183 exe_path = utils.select_exe(self._directory / "pdftotext") 184 185 cmd = [str(exe_path)] 186 187 cmd.extend([*cmd_args, str(pdf_path), str(output_file)]) 188 189 # execute program 190 result = subprocess.run( # noqa: S603 191 cmd, 192 capture_output=True, 193 check=True, 194 timeout=30, 195 text=True, 196 ) 197 198 logger.debug("Saving pdf embedded text to '%s'.", output_file) 199 200 return model.XpdfTextResult( 201 stdout=(result.stdout or "").splitlines(), 202 stderr=(result.stderr or "").splitlines(), 203 output_path=output_file, 204 ) 205 206 def image( 207 self, 208 pdf_path: pathlib.Path, 209 output_path: pathlib.Path, 210 xpdf_args: model.XpdfImageArgs, 211 ) -> model.XpdfImageResult: 212 """Create images of pdf pages. 213 214 Args: 215 pdf_path: The path to the pdf file. 216 output_path: The directory to save output files. 217 xpdf_args: The program arguments. 218 219 Returns: 220 The pdf file image info. 221 """ 222 # validation 223 rot = xpdf_args.rotation 224 utils.validate("rotation", rot, self.OPTS_IMAGE_ROTATION) 225 226 free_type = xpdf_args.free_type 227 utils.validate("freetype", free_type, self.OPTS_IMAGE_FREETYPE) 228 229 anti_alias = xpdf_args.anti_aliasing 230 utils.validate("anti-aliasing", anti_alias, self.OPTS_IMAGE_ANTI_ALIAS) 231 232 anti_alias_vec = xpdf_args.anti_aliasing 233 utils.validate( 234 "vector anti-aliasing", 235 anti_alias_vec, 236 self.OPTS_IMAGE_VEC_ANTI_ALIAS, 237 ) 238 239 utils.validate_pages(xpdf_args.first_page, xpdf_args.last_page) 240 241 if not pdf_path.exists(): 242 msg = f"Pdf file not found '{pdf_path}'." 243 raise utils.LeafFocusError(msg) from FileNotFoundError(str(pdf_path)) 244 245 logger.info("Saving each pdf page as an image.") 246 247 # build command 248 cmd_args = self.build_cmd(xpdf_args) 249 250 output_type = "page-image" 251 252 # don't include the page limits when building the output prefix 253 xpdf_args.first_page = None 254 xpdf_args.last_page = None 255 output_cmd_args = self.build_cmd(xpdf_args) 256 output_dir = utils.output_root( 257 pdf_path, 258 output_type, 259 output_path, 260 output_cmd_args, 261 ) 262 263 for pdf_image_file in output_dir.parent.iterdir(): 264 if not pdf_image_file.name.startswith(output_dir.name): 265 continue 266 267 logger.info("Found existing pdf images.") 268 269 output_files = self.find_images(output_dir) 270 return model.XpdfImageResult( 271 stdout=[], 272 stderr=[], 273 output_dir=output_dir, 274 output_files=output_files, 275 ) 276 277 exe_path = utils.select_exe(self._directory / "pdftopng") 278 cmd = [str(exe_path)] 279 280 cmd.extend([*cmd_args, str(pdf_path), str(output_dir)]) 281 282 # execute program 283 result = subprocess.run( # noqa: S603 284 cmd, 285 capture_output=True, 286 check=True, 287 timeout=30, 288 text=True, 289 ) 290 291 logger.debug("Created pdf page images using prefix '%s'.", output_dir) 292 293 output_files = self.find_images(output_dir) 294 295 return model.XpdfImageResult( 296 stdout=(result.stdout or "").splitlines(), 297 stderr=(result.stderr or "").splitlines(), 298 output_dir=output_dir, 299 output_files=output_files, 300 ) 301 302 def build_cmd(self, tool_args: model.XpdfArgs) -> list[str]: 303 """Build the command arguments from a data class.""" 304 arg_class = tool_args.__class__ 305 cmd_args = [] 306 for field in dataclasses.fields(arg_class): 307 name = field.name 308 value = getattr(tool_args, name) 309 310 field_default = field.default 311 312 # TODO: account for default_factory 313 314 # validate the arg config 315 cmd_key = field.metadata.get("leaf_focus", {}).get("cmd") 316 if not cmd_key: 317 msg = f"Args incorrectly configured: missing 'cmd' for '{name}'." 318 raise ValueError(msg) 319 320 cmd_type = field.metadata.get("leaf_focus", {}).get("cmd_type") 321 if not cmd_type: 322 msg = f"Args incorrectly configured: missing 'cmd_type' for '{name}'." 323 raise ValueError(msg) 324 325 # add the arg 326 if cmd_type == "bool": 327 if value is not None and value is not True and value is not False: 328 msg = ( 329 f"Argument '{name}' must be None, True, or False, " 330 f"not '{value}'." 331 ) 332 raise ValueError(msg) 333 334 if value is True: 335 cmd_args.extend([str(cmd_key)]) 336 337 elif cmd_type == "single": 338 if ( 339 field_default is None 340 and value is not None 341 or field_default != value 342 ): 343 cmd_args.extend([str(cmd_key), str(value)]) 344 else: 345 # no need to add cmd 346 pass 347 else: 348 msg = ( 349 f"Argument '{name}' has unknown cmd_type '{cmd_type}'. " 350 "Expected one of 'bool, single'." 351 ) 352 raise ValueError(msg) 353 354 return cmd_args 355 356 def find_images(self, output_dir: pathlib.Path) -> list[pathlib.Path]: 357 """Find image files in a directory.""" 358 stem_parts = 7 359 stem_digit_parts = 6 360 output_files = [] 361 for file_path in output_dir.parent.iterdir(): 362 if not file_path.is_file(): 363 continue 364 if not file_path.name.startswith(output_dir.stem): 365 continue 366 if file_path.suffix != ".png": 367 continue 368 if len(file_path.stem) < stem_parts: 369 continue 370 if file_path.stem[-stem_parts] != "-": 371 continue 372 if not all(i.isdigit() for i in file_path.stem[-stem_digit_parts:]): 373 continue 374 output_files.append(file_path) 375 376 if not output_files: 377 logger.warning("No page images found.") 378 379 return output_files 380 381 def build_field_metadata( 382 self, 383 pdf_path: pathlib.Path, 384 lines: typing.Iterable[str], 385 ) -> tuple[int | None, dict[str, typing.Any]]: 386 """Build metadata for a field.""" 387 fields_map = { 388 field.metadata.get("leaf_focus", {}).get("name"): field 389 for field in dataclasses.fields(model.XpdfInfoResult) 390 } 391 metadata_line_index: int | None = None 392 393 data: dict[str, typing.Any] = {i.name: None for i in fields_map.values()} 394 for index, line in enumerate(lines): 395 if line.startswith("Metadata:"): 396 metadata_line_index = index 397 break 398 399 value: typing.Any = None 400 key, value = line.split(":", maxsplit=1) 401 key = key.strip() 402 403 field = fields_map.get(key) 404 if not field: 405 msg = f"Unknown pdf info key '{key}' value '{value}' in '{pdf_path}'." 406 raise utils.LeafFocusError(msg) 407 408 data_key = field.name 409 if data.get(data_key) is not None: 410 msg = f"Duplicate pdf info key '{key}' in '{pdf_path}'." 411 raise utils.LeafFocusError(msg) 412 413 typing_arg = typing.get_args(field.type) 414 types_str = [str, "str", "str | None"] 415 types_bool = [bool, "bool", "bool | None"] 416 types_int = [int, "int", "int | None"] 417 types_datetime = [datetime, "datetime", "datetime | None"] 418 419 if field.type in types_str or str in typing_arg: 420 value = value.strip() 421 elif field.type in types_datetime or datetime in typing_arg: 422 value = utils.parse_date(value.strip()) 423 elif field.type in types_bool or bool in typing_arg: 424 value = value.strip().lower() == "yes" 425 elif field.type in types_int or int in typing_arg: 426 if data_key == "file_size_bytes": 427 value = value.replace(" bytes", "") 428 value = int(value.strip().lower()) 429 else: 430 msg = f"Unknown key '{key}' type '{field.type}'" 431 raise ValueError(msg) 432 433 data[data_key] = value 434 435 return metadata_line_index, data
Interact with xpdf tools.
XpdfProgram(directory: pathlib.Path)
42 def __init__(self, directory: pathlib.Path) -> None: 43 """Create a new xpdf program class to interact with xpdf tools. 44 45 Args: 46 directory: The path to the directory containing xpdf tools. 47 """ 48 self._directory = directory
Create a new xpdf program class to interact with xpdf tools.
Arguments:
- directory: The path to the directory containing xpdf tools.
OPTS_TEXT_ENCODING: tuple[str, str, str, str, str, str] =
('Latin1', 'ASCII7', 'Symbol', 'ZapfDingbats', 'UTF-8', 'UCS-2')
def
info( self, pdf_path: pathlib.Path, output_dir: pathlib.Path, xpdf_args: leaf_focus.pdf.model.XpdfInfoArgs) -> leaf_focus.pdf.model.XpdfInfoResult:
50 def info( 51 self, 52 pdf_path: pathlib.Path, 53 output_dir: pathlib.Path, 54 xpdf_args: model.XpdfInfoArgs, 55 ) -> model.XpdfInfoResult: 56 """Get information from a pdf file. 57 58 Args: 59 pdf_path: The path to the pdf file. 60 output_dir: The directory to save pdf info file. 61 xpdf_args: The program arguments. 62 63 Returns: 64 The pdf file information. 65 """ 66 # validation 67 enc = xpdf_args.encoding 68 utils.validate("text encoding", enc, self.OPTS_TEXT_ENCODING) 69 70 utils.validate_pages(xpdf_args.first_page, xpdf_args.last_page) 71 72 if not pdf_path.exists(): 73 msg = f"Pdf file not found '{pdf_path}'." 74 raise utils.LeafFocusError(msg) from FileNotFoundError(pdf_path) 75 76 output_file = utils.output_root(pdf_path, "info", output_dir) 77 output_file = output_file.with_suffix(".json") 78 79 if output_file.exists(): 80 logger.info("Loading existing pdf info file.") 81 with pathlib.Path.open(output_file, encoding="utf-8") as info_file: 82 data = json.load(info_file) 83 data["creation_date"] = utils.parse_date(data.get("creation_date")) 84 data["modification_date"] = utils.parse_date( 85 data.get("modification_date") 86 ) 87 return model.XpdfInfoResult(**data) 88 89 logger.info("Extracting pdf info and saving to file.") 90 91 # build command 92 exe_path = utils.select_exe(self._directory / "pdfinfo") 93 cmd = [str(exe_path)] 94 95 cmd_args = self.build_cmd(xpdf_args) 96 97 cmd.extend(cmd_args) 98 cmd.append(str(pdf_path.resolve())) 99 100 # execute program 101 result = subprocess.run( # noqa: S603 102 cmd, 103 capture_output=True, 104 check=True, 105 timeout=30, 106 text=True, 107 ) 108 lines = result.stdout.splitlines() 109 110 metadata_line_index, data = self.build_field_metadata( 111 pdf_path, 112 lines, 113 ) 114 115 # metadata 116 if metadata_line_index is not None: 117 start = metadata_line_index + 1 118 metadata = "\n".join(lines[start:]) 119 root = ElementTree.fromstring(metadata) 120 data["metadata"] = utils.xml_to_element(root).to_dict() 121 122 if output_dir and output_dir.exists(): 123 logger.debug("Saving pdf info to '%s'.", output_file) 124 output_file.write_text( 125 json.dumps(data, indent=2, cls=utils.CustomJsonEncoder), 126 ) 127 128 return model.XpdfInfoResult(**data)
Get information from a pdf file.
Arguments:
- pdf_path: The path to the pdf file.
- output_dir: The directory to save pdf info file.
- xpdf_args: The program arguments.
Returns:
The pdf file information.
def
text( self, pdf_path: pathlib.Path, output_path: pathlib.Path, xpdf_args: leaf_focus.pdf.model.XpdfTextArgs) -> leaf_focus.pdf.model.XpdfTextResult:
130 def text( 131 self, 132 pdf_path: pathlib.Path, 133 output_path: pathlib.Path, 134 xpdf_args: model.XpdfTextArgs, 135 ) -> model.XpdfTextResult: 136 """Get the text from a pdf file. 137 138 Args: 139 pdf_path: The path to the pdf file. 140 output_path: The directory to save output files. 141 xpdf_args: The pdf program arguments. 142 143 Returns: 144 The result from running the text extraction program. 145 """ 146 # validation 147 eol = xpdf_args.line_end_type 148 utils.validate("end of line", eol, self.OPTS_TEXT_LINE_ENDING) 149 150 utils.validate_pages(xpdf_args.first_page, xpdf_args.last_page) 151 152 if not pdf_path.exists(): 153 msg = f"Pdf file not found '{pdf_path}'." 154 raise utils.LeafFocusError(msg) from FileNotFoundError(str(pdf_path)) 155 156 # build command 157 158 cmd_args = self.build_cmd(xpdf_args) 159 160 output_file = utils.output_root(pdf_path, "output", output_path, cmd_args) 161 output_file = output_file.with_suffix(".txt") 162 163 # check if embedded text file already exists 164 if output_file.exists(): 165 logger.info("Loading extracted embedded text from existing file.") 166 return model.XpdfTextResult( 167 stdout=[], 168 stderr=[], 169 output_path=output_file, 170 ) 171 172 if logger.isEnabledFor(logging.DEBUG): 173 logger.debug("Did not find expected output file '%s'", output_file.name) 174 logger.debug("Listing items in '%s'", output_file.parent) 175 item_count = 0 176 for item in output_file.parent.iterdir(): 177 item_count += 1 178 logger.debug("Found item '%s'", item) 179 logger.debug("Found %s items in dir.", item_count) 180 181 logger.info("Extracting pdf embedded text and saving to file.") 182 183 exe_path = utils.select_exe(self._directory / "pdftotext") 184 185 cmd = [str(exe_path)] 186 187 cmd.extend([*cmd_args, str(pdf_path), str(output_file)]) 188 189 # execute program 190 result = subprocess.run( # noqa: S603 191 cmd, 192 capture_output=True, 193 check=True, 194 timeout=30, 195 text=True, 196 ) 197 198 logger.debug("Saving pdf embedded text to '%s'.", output_file) 199 200 return model.XpdfTextResult( 201 stdout=(result.stdout or "").splitlines(), 202 stderr=(result.stderr or "").splitlines(), 203 output_path=output_file, 204 )
Get the text from a pdf file.
Arguments:
- pdf_path: The path to the pdf file.
- output_path: The directory to save output files.
- xpdf_args: The pdf program arguments.
Returns:
The result from running the text extraction program.
def
image( self, pdf_path: pathlib.Path, output_path: pathlib.Path, xpdf_args: leaf_focus.pdf.model.XpdfImageArgs) -> leaf_focus.pdf.model.XpdfImageResult:
206 def image( 207 self, 208 pdf_path: pathlib.Path, 209 output_path: pathlib.Path, 210 xpdf_args: model.XpdfImageArgs, 211 ) -> model.XpdfImageResult: 212 """Create images of pdf pages. 213 214 Args: 215 pdf_path: The path to the pdf file. 216 output_path: The directory to save output files. 217 xpdf_args: The program arguments. 218 219 Returns: 220 The pdf file image info. 221 """ 222 # validation 223 rot = xpdf_args.rotation 224 utils.validate("rotation", rot, self.OPTS_IMAGE_ROTATION) 225 226 free_type = xpdf_args.free_type 227 utils.validate("freetype", free_type, self.OPTS_IMAGE_FREETYPE) 228 229 anti_alias = xpdf_args.anti_aliasing 230 utils.validate("anti-aliasing", anti_alias, self.OPTS_IMAGE_ANTI_ALIAS) 231 232 anti_alias_vec = xpdf_args.anti_aliasing 233 utils.validate( 234 "vector anti-aliasing", 235 anti_alias_vec, 236 self.OPTS_IMAGE_VEC_ANTI_ALIAS, 237 ) 238 239 utils.validate_pages(xpdf_args.first_page, xpdf_args.last_page) 240 241 if not pdf_path.exists(): 242 msg = f"Pdf file not found '{pdf_path}'." 243 raise utils.LeafFocusError(msg) from FileNotFoundError(str(pdf_path)) 244 245 logger.info("Saving each pdf page as an image.") 246 247 # build command 248 cmd_args = self.build_cmd(xpdf_args) 249 250 output_type = "page-image" 251 252 # don't include the page limits when building the output prefix 253 xpdf_args.first_page = None 254 xpdf_args.last_page = None 255 output_cmd_args = self.build_cmd(xpdf_args) 256 output_dir = utils.output_root( 257 pdf_path, 258 output_type, 259 output_path, 260 output_cmd_args, 261 ) 262 263 for pdf_image_file in output_dir.parent.iterdir(): 264 if not pdf_image_file.name.startswith(output_dir.name): 265 continue 266 267 logger.info("Found existing pdf images.") 268 269 output_files = self.find_images(output_dir) 270 return model.XpdfImageResult( 271 stdout=[], 272 stderr=[], 273 output_dir=output_dir, 274 output_files=output_files, 275 ) 276 277 exe_path = utils.select_exe(self._directory / "pdftopng") 278 cmd = [str(exe_path)] 279 280 cmd.extend([*cmd_args, str(pdf_path), str(output_dir)]) 281 282 # execute program 283 result = subprocess.run( # noqa: S603 284 cmd, 285 capture_output=True, 286 check=True, 287 timeout=30, 288 text=True, 289 ) 290 291 logger.debug("Created pdf page images using prefix '%s'.", output_dir) 292 293 output_files = self.find_images(output_dir) 294 295 return model.XpdfImageResult( 296 stdout=(result.stdout or "").splitlines(), 297 stderr=(result.stderr or "").splitlines(), 298 output_dir=output_dir, 299 output_files=output_files, 300 )
Create images of pdf pages.
Arguments:
- pdf_path: The path to the pdf file.
- output_path: The directory to save output files.
- xpdf_args: The program arguments.
Returns:
The pdf file image info.
302 def build_cmd(self, tool_args: model.XpdfArgs) -> list[str]: 303 """Build the command arguments from a data class.""" 304 arg_class = tool_args.__class__ 305 cmd_args = [] 306 for field in dataclasses.fields(arg_class): 307 name = field.name 308 value = getattr(tool_args, name) 309 310 field_default = field.default 311 312 # TODO: account for default_factory 313 314 # validate the arg config 315 cmd_key = field.metadata.get("leaf_focus", {}).get("cmd") 316 if not cmd_key: 317 msg = f"Args incorrectly configured: missing 'cmd' for '{name}'." 318 raise ValueError(msg) 319 320 cmd_type = field.metadata.get("leaf_focus", {}).get("cmd_type") 321 if not cmd_type: 322 msg = f"Args incorrectly configured: missing 'cmd_type' for '{name}'." 323 raise ValueError(msg) 324 325 # add the arg 326 if cmd_type == "bool": 327 if value is not None and value is not True and value is not False: 328 msg = ( 329 f"Argument '{name}' must be None, True, or False, " 330 f"not '{value}'." 331 ) 332 raise ValueError(msg) 333 334 if value is True: 335 cmd_args.extend([str(cmd_key)]) 336 337 elif cmd_type == "single": 338 if ( 339 field_default is None 340 and value is not None 341 or field_default != value 342 ): 343 cmd_args.extend([str(cmd_key), str(value)]) 344 else: 345 # no need to add cmd 346 pass 347 else: 348 msg = ( 349 f"Argument '{name}' has unknown cmd_type '{cmd_type}'. " 350 "Expected one of 'bool, single'." 351 ) 352 raise ValueError(msg) 353 354 return cmd_args
Build the command arguments from a data class.
def
find_images(self, output_dir: pathlib.Path) -> list[pathlib.Path]:
356 def find_images(self, output_dir: pathlib.Path) -> list[pathlib.Path]: 357 """Find image files in a directory.""" 358 stem_parts = 7 359 stem_digit_parts = 6 360 output_files = [] 361 for file_path in output_dir.parent.iterdir(): 362 if not file_path.is_file(): 363 continue 364 if not file_path.name.startswith(output_dir.stem): 365 continue 366 if file_path.suffix != ".png": 367 continue 368 if len(file_path.stem) < stem_parts: 369 continue 370 if file_path.stem[-stem_parts] != "-": 371 continue 372 if not all(i.isdigit() for i in file_path.stem[-stem_digit_parts:]): 373 continue 374 output_files.append(file_path) 375 376 if not output_files: 377 logger.warning("No page images found.") 378 379 return output_files
Find image files in a directory.
def
build_field_metadata( self, pdf_path: pathlib.Path, lines: Iterable[str]) -> tuple[int | None, dict[str, typing.Any]]:
381 def build_field_metadata( 382 self, 383 pdf_path: pathlib.Path, 384 lines: typing.Iterable[str], 385 ) -> tuple[int | None, dict[str, typing.Any]]: 386 """Build metadata for a field.""" 387 fields_map = { 388 field.metadata.get("leaf_focus", {}).get("name"): field 389 for field in dataclasses.fields(model.XpdfInfoResult) 390 } 391 metadata_line_index: int | None = None 392 393 data: dict[str, typing.Any] = {i.name: None for i in fields_map.values()} 394 for index, line in enumerate(lines): 395 if line.startswith("Metadata:"): 396 metadata_line_index = index 397 break 398 399 value: typing.Any = None 400 key, value = line.split(":", maxsplit=1) 401 key = key.strip() 402 403 field = fields_map.get(key) 404 if not field: 405 msg = f"Unknown pdf info key '{key}' value '{value}' in '{pdf_path}'." 406 raise utils.LeafFocusError(msg) 407 408 data_key = field.name 409 if data.get(data_key) is not None: 410 msg = f"Duplicate pdf info key '{key}' in '{pdf_path}'." 411 raise utils.LeafFocusError(msg) 412 413 typing_arg = typing.get_args(field.type) 414 types_str = [str, "str", "str | None"] 415 types_bool = [bool, "bool", "bool | None"] 416 types_int = [int, "int", "int | None"] 417 types_datetime = [datetime, "datetime", "datetime | None"] 418 419 if field.type in types_str or str in typing_arg: 420 value = value.strip() 421 elif field.type in types_datetime or datetime in typing_arg: 422 value = utils.parse_date(value.strip()) 423 elif field.type in types_bool or bool in typing_arg: 424 value = value.strip().lower() == "yes" 425 elif field.type in types_int or int in typing_arg: 426 if data_key == "file_size_bytes": 427 value = value.replace(" bytes", "") 428 value = int(value.strip().lower()) 429 else: 430 msg = f"Unknown key '{key}' type '{field.type}'" 431 raise ValueError(msg) 432 433 data[data_key] = value 434 435 return metadata_line_index, data
Build metadata for a field.