Skip to content
Snippets Groups Projects
preprocessing_pipeline.py 16.6 KiB
Newer Older
Daniel Ecer's avatar
Daniel Ecer committed
from __future__ import absolute_import

import argparse
import os
import logging
from itertools import islice

import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions

from sciencebeam_gym.utils.collection import (
  extend_dict,
  remove_keys_from_dict
)

from sciencebeam_gym.utils.file_path import (
  relative_path,
  join_if_relative_path
)

Daniel Ecer's avatar
Daniel Ecer committed
from sciencebeam_gym.beam_utils.utils import (
  TransformAndCount,
Daniel Ecer's avatar
Daniel Ecer committed
  TransformAndLog,
Daniel Ecer's avatar
Daniel Ecer committed
)

from sciencebeam_gym.beam_utils.csv import (
  WriteDictCsv,
  ReadDictCsv
)

from sciencebeam_gym.beam_utils.io import (
  read_all_from_path,
  basename,
  save_file_content
)

from sciencebeam_gym.beam_utils.main import (
  add_cloud_args,
  process_cloud_args
)

from sciencebeam_gym.structured_document.svg import (
  SvgStructuredDocument
)

from sciencebeam_gym.preprocess.annotation.target_annotation import (
Daniel Ecer's avatar
Daniel Ecer committed
  parse_xml_mapping
)

from sciencebeam_gym.preprocess.color_map import (
  parse_color_map_from_file
)

from sciencebeam_gym.preprocess.annotation.annotation_evaluation import (
Daniel Ecer's avatar
Daniel Ecer committed
  evaluate_document_by_page,
  DEFAULT_EVALUATION_COLUMNS,
  to_csv_dict_rows as to_annotation_evaluation_csv_dict_rows
)

from sciencebeam_gym.preprocess.preprocessing_utils import (
  change_ext,
  find_file_pairs_grouped_by_parent_directory_or_name,
  convert_pdf_bytes_to_lxml,
  convert_and_annotate_lxml_content,
  pdf_bytes_to_png_pages,
  svg_page_to_blockified_png_bytes,
  save_pages,
  save_svg_roots,
  filter_list_props_by_indices,
  get_page_indices_with_min_annotation_percentage,
  parse_page_range
Daniel Ecer's avatar
Daniel Ecer committed
)

from sciencebeam_gym.preprocess.preprocessing_transforms import (
  WritePropsToTFRecord
)

def get_logger():
  return logging.getLogger(__name__)

class MetricCounters(object):
  FILE_PAIR = 'file_pair_count'
  PAGE = 'page_count'
  FILTERED_PAGE = 'filtered_page_count'
  CONVERT_PDF_TO_LXML_ERROR = 'ConvertPdfToLxml_error_count'
  CONVERT_PDF_TO_PNG_ERROR = 'ConvertPdfToPng_error_count'
  CONVERT_LXML_TO_SVG_ANNOT_ERROR = 'ConvertPdfToSvgAnnot_error_count'

Daniel Ecer's avatar
Daniel Ecer committed
def configure_pipeline(p, opt):
  image_size = (
    (opt.image_width, opt.image_height)
    if opt.image_width and opt.image_height
    else None
  )
  page_range = opt.pages
  first_page = page_range[0] if page_range else 1
Daniel Ecer's avatar
Daniel Ecer committed
  xml_mapping = parse_xml_mapping(opt.xml_mapping_path)
  if opt.lxml_path:
    lxml_xml_file_pairs = (
      p |
      beam.Create([[
        join_if_relative_path(opt.base_data_path, s)
        for s in [opt.lxml_path, opt.xml_path]
      ]]) |
      "FindFilePairs" >> TransformAndLog(
        beam.FlatMap(
          lambda patterns: islice(
            find_file_pairs_grouped_by_parent_directory_or_name(patterns),
            opt.limit
          )
        ),
        log_prefix='file pairs: ',
        log_level='debug'
      ) |
Daniel Ecer's avatar
Daniel Ecer committed
      "ReadFileContent" >> beam.Map(lambda filenames: {
        'source_filename': filenames[0],
        'xml_filename': filenames[1],
        'lxml_content': read_all_from_path(filenames[0]),
        'xml_content': read_all_from_path(filenames[1])
      })
    )
  elif opt.pdf_path or opt.pdf_xml_file_list:
    if opt.pdf_xml_file_list:
      pdf_xml_url_pairs = (
        p |
        "ReadFilePairUrls" >> ReadDictCsv(opt.pdf_xml_file_list, limit=opt.limit) |
        "TranslateFilePairUrls" >> beam.Map(lambda row: (row['source_url'], row['xml_url']))
Daniel Ecer's avatar
Daniel Ecer committed
      )
    else:
      pdf_xml_url_pairs = (
        p |
        beam.Create([[
          join_if_relative_path(opt.base_data_path, s)
          for s in [opt.pdf_path, opt.xml_path]
        ]]) |
        "FindFilePairs" >> TransformAndLog(
          beam.FlatMap(
            lambda patterns: islice(
              find_file_pairs_grouped_by_parent_directory_or_name(patterns),
              opt.limit
            )
          ),
          log_prefix='file pairs: ',
          log_level='debug'
        )
      )
    pdf_xml_file_pairs = (
      pdf_xml_url_pairs |
      "ReadFileContent" >> TransformAndCount(
        beam.Map(lambda filenames: {
          'source_filename': filenames[0],
          'xml_filename': filenames[1],
          'pdf_content': read_all_from_path(filenames[0]),
          'xml_content': read_all_from_path(filenames[1])
        }),
        MetricCounters.FILE_PAIR
      )
Daniel Ecer's avatar
Daniel Ecer committed
    )

    lxml_xml_file_pairs = (
      pdf_xml_file_pairs |
      "ConvertPdfToLxml" >> MapOrLog(lambda v: remove_keys_from_dict(
        extend_dict(v, {
          'lxml_content': convert_pdf_bytes_to_lxml(
            v['pdf_content'], path=v['source_filename'],
            page_range=page_range
Daniel Ecer's avatar
Daniel Ecer committed
          )
        }),
        # we don't need the pdf_content unless we are writing tf_records
        None if opt.save_tfrecords else {'pdf_content'}
      ), log_fn=lambda e, v: (
        get_logger().warning(
          'caught exception (ignoring item): %s, pdf: %s, xml: %s',
          e, v['source_filename'], v['xml_filename'], exc_info=e
        )
      ), error_count=MetricCounters.CONVERT_PDF_TO_LXML_ERROR)
Daniel Ecer's avatar
Daniel Ecer committed
    )
  else:
    raise RuntimeError('either lxml-path or pdf-path required')

  if opt.save_png or opt.save_tfrecords:
    with_pdf_png_pages = (
      (lxml_xml_file_pairs if opt.save_tfrecords else pdf_xml_file_pairs) |
      "ConvertPdfToPng" >> MapOrLog(lambda v: remove_keys_from_dict(
        extend_dict(v, {
          'pdf_png_pages':  list(pdf_bytes_to_png_pages(
            v['pdf_content'],
            dpi=opt.png_dpi,
            image_size=image_size,
            page_range=page_range
Daniel Ecer's avatar
Daniel Ecer committed
          ))
        }),
        {'pdf_content'} # we no longer need the pdf_content
      ), error_count=MetricCounters.CONVERT_PDF_TO_PNG_ERROR)
Daniel Ecer's avatar
Daniel Ecer committed
    )

    if opt.save_png:
      _ = (
        with_pdf_png_pages |
        "SavePdfToPng" >> TransformAndLog(
          beam.Map(lambda v: save_pages(
            FileSystems.join(
              opt.output_path,
              change_ext(
                relative_path(opt.base_data_path, v['source_filename']),
                None, '.png.zip'
              )
            ),
            '.png',
            v['pdf_png_pages']
          )),
          log_fn=lambda x: get_logger().info('saved result: %s', x)
        )
      )

  if opt.save_lxml:
    _ = (
      lxml_xml_file_pairs |
      "SaveLxml" >> TransformAndLog(
        beam.Map(lambda v: save_file_content(
          FileSystems.join(
            opt.output_path,
            change_ext(
              relative_path(opt.base_data_path, v['source_filename']),
              None, '.lxml.gz'
            )
          ),
          v['lxml_content']
        )),
        log_fn=lambda x: get_logger().info('saved lxml: %s', x)
      )
    )

  annotation_results = (
    (with_pdf_png_pages if opt.save_tfrecords else lxml_xml_file_pairs) |
    "ConvertLxmlToSvgAndAnnotate" >> TransformAndCount(
      MapOrLog(lambda v: remove_keys_from_dict(
        extend_dict(v, {
          'svg_pages': list(convert_and_annotate_lxml_content(
            v['lxml_content'], v['xml_content'], xml_mapping,
            name=v['source_filename']
          ))
        }),
        # Won't need the XML anymore
        {'lxml_content', 'xml_content'}
      ), log_fn=lambda e, v: (
        get_logger().warning(
          'caught exception (ignoring item): %s, source: %s, xml: %s',
          e, v['source_filename'], v['xml_filename'], exc_info=e
        )
      ), error_count=MetricCounters.CONVERT_LXML_TO_SVG_ANNOT_ERROR),
      MetricCounters.PAGE,
      lambda v: len(v['svg_pages'])
    )
Daniel Ecer's avatar
Daniel Ecer committed
  )

  if opt.save_svg:
    _ = (
      annotation_results |
      "SaveSvgPages" >> TransformAndLog(
        beam.Map(lambda v: save_svg_roots(
          FileSystems.join(
            opt.output_path,
            change_ext(
              relative_path(opt.base_data_path, v['source_filename']),
              None, '.svg.zip'
            )
          ),
          v['svg_pages']
        )),
        log_fn=lambda x: get_logger().info('saved result: %s', x)
      )
    )

  if opt.annotation_evaluation_csv or opt.min_annotation_percentage:
    annotation_evaluation_results = (
      annotation_results |
      "EvaluateAnnotations" >> TransformAndLog(
        beam.Map(lambda v: remove_keys_from_dict(
          extend_dict(v, {
            'annotation_evaluation': evaluate_document_by_page(
              SvgStructuredDocument(v['svg_pages'])
            )
          }),
          None if opt.min_annotation_percentage else {'svg_pages'}
        )),
        log_fn=lambda x: get_logger().info(
          'annotation evaluation result: %s: %s',
          x['source_filename'], x['annotation_evaluation']
        )
      )
    )

  if opt.save_block_png or opt.save_tfrecords:
    color_map = parse_color_map_from_file(opt.color_map)
    with_block_png_pages = (
      (annotation_evaluation_results if opt.min_annotation_percentage else annotation_results) |
      "GenerateBlockPng" >> beam.Map(lambda v: remove_keys_from_dict(
        extend_dict(v, {
          'block_png_pages': [
            svg_page_to_blockified_png_bytes(svg_page, color_map, image_size=image_size)
            for svg_page in v['svg_pages']
          ]
        }),
        {'svg_pages'}
      ))
    )

    if opt.save_block_png:
      _ = (
        with_block_png_pages |
        "SaveBlockPng" >> TransformAndLog(
          beam.Map(lambda v: save_pages(
            FileSystems.join(
              opt.output_path,
              change_ext(
                relative_path(opt.base_data_path, v['source_filename']),
                None, '.block-png.zip'
              )
            ),
            '.png',
            v['block_png_pages']
          )),
          log_fn=lambda x: get_logger().info('saved result: %s', x)
        )
      )

    if opt.save_tfrecords:
      if opt.min_annotation_percentage:
        filtered_pages = (
          with_block_png_pages |
          "FilterPages" >> TransformAndCount(
            beam.Map(
              lambda v: filter_list_props_by_indices(
                v,
                get_page_indices_with_min_annotation_percentage(
                  v['annotation_evaluation'],
                  opt.min_annotation_percentage
                ),
                {'pdf_png_pages', 'block_png_pages'}
              )
            ),
            MetricCounters.FILTERED_PAGE,
            lambda v: len(v['block_png_pages'])
Daniel Ecer's avatar
Daniel Ecer committed
          )
        )
      else:
        filtered_pages = with_block_png_pages
      _ = (
        filtered_pages |
        "WriteTFRecords" >> WritePropsToTFRecord(
          FileSystems.join(opt.output_path, 'data'),
          lambda v: (
            {
              'input_uri': v['source_filename'] + '#page%d' % (first_page + i),
Daniel Ecer's avatar
Daniel Ecer committed
              'input_image': pdf_png_page,
              'annotation_uri': v['source_filename'] + '.annot' + '#page%d' % (first_page + i),
              'annotation_image': block_png_page,
              'page_no': first_page + i
Daniel Ecer's avatar
Daniel Ecer committed
            }
            for i, pdf_png_page, block_png_page in zip(
              range(len(v['pdf_png_pages'])), v['pdf_png_pages'], v['block_png_pages']
            )
Daniel Ecer's avatar
Daniel Ecer committed
          )
        )
      )

  if opt.annotation_evaluation_csv:
    annotation_evaluation_csv_name, annotation_evaluation_ext = (
      os.path.splitext(opt.annotation_evaluation_csv)
    )
    _ = (
      annotation_evaluation_results |
      "FlattenAnotationEvaluationResults" >> beam.FlatMap(
        lambda v: to_annotation_evaluation_csv_dict_rows(
          v['annotation_evaluation'],
          document=basename(v['source_filename'])
        )
      ) |
      "WriteAnnotationEvaluationToCsv" >> WriteDictCsv(
        join_if_relative_path(opt.output_path, annotation_evaluation_csv_name),
        file_name_suffix=annotation_evaluation_ext,
        columns=DEFAULT_EVALUATION_COLUMNS
      )
    )

def add_main_args(parser):
  parser.add_argument(
    '--data-path', type=str, required=True,
    help='base data path'
  )

  source_group = parser.add_mutually_exclusive_group(required=True)
  source_group.add_argument(
    '--lxml-path', type=str, required=False,
    help='path to lxml file(s)'
  )
  source_group.add_argument(
    '--pdf-path', type=str, required=False,
    help='path to pdf file(s) (alternative to lxml)'
  )
  source_group.add_argument(
    '--pdf-xml-file-list', type=str, required=False,
    help='path to pdf-xml csv/tsv file list'
  )
  parser.add_argument(
    '--limit', type=int, required=False,
    help='limit the number of file pairs to process'
  )

  parser.add_argument(
    '--save-lxml', default=False, action='store_true',
    help='save generated lxml (if using pdf as an input)'
  )

  parser.add_argument(
    '--save-svg', default=False, action='store_true',
    help='save svg pages with annotation tags'
  )

  parser.add_argument(
    '--save-png', default=False, action='store_true',
    help='save png pages of the original pdf'
  )
  parser.add_argument(
    '--png-dpi', type=int, default=90,
    help='dpi of rendered pdf pages'
  )

  parser.add_argument(
    '--image-width', type=int, required=False,
    help='image width of resulting PNGs'
  )
  parser.add_argument(
    '--image-height', type=int, required=False,
    help='image height of resulting PNGs'
  )

  parser.add_argument(
    '--save-block-png', default=False, action='store_true',
    help='save blockified version of the svg as a png'
  )
  parser.add_argument(
    '--color-map', default='color_map.conf',
    help='color map to use (see save-block-png)'
  )

  parser.add_argument(
    '--xml-path', type=str, required=False,
    help='path to xml file(s)'
  )
  parser.add_argument(
    '--xml-mapping-path', type=str, default='annot-xml-front.conf',
    help='path to xml mapping file'
  )

  parser.add_argument(
    '--pages', type=parse_page_range, default=None,
    help='only processes the selected pages'
  )

Daniel Ecer's avatar
Daniel Ecer committed
  parser.add_argument(
    '--save-tfrecords', default=False, action='store_true',
    help='Save TFRecords with PDF PNG and Annotation PNG'
    ' (--image-width and --image-height recommended)'
  )

  parser.add_argument(
    '--min-annotation-percentage', type=float, required=False,
    help='Minimum percentage of annotations per page'
    ' (pages below that threshold will get dropped)'
  )

  parser.add_argument(
    '--annotation-evaluation-csv', type=str, required=False,
    help='Annotation evaluation CSV output file'
  )
  parser.add_argument(
    '--output-path', required=False,
    help='Output directory to write results to.'
  )

def process_main_args(parser, args):
  args.base_data_path = args.data_path.replace('/*/', '/')

  if not args.output_path:
    args.output_path = os.path.join(
      os.path.dirname(args.base_data_path),
      os.path.basename(args.base_data_path + '-results')
    )

  if not args.xml_path and not args.pdf_xml_file_list:
    parser.error('--xml-path required unless --pdf-xml-file-list is specified')

  pdf_path_or_pdf_xml_file_list = args.pdf_path or args.pdf_xml_file_list

  if args.save_lxml and not pdf_path_or_pdf_xml_file_list:
    parser.error('--save-lxml only valid with --pdf-path or --pdf-xml-file-list')

  if args.save_png and not pdf_path_or_pdf_xml_file_list:
    parser.error('--save-png only valid with --pdf-path or --pdf-xml-file-list')

  if args.save_tfrecords and not pdf_path_or_pdf_xml_file_list:
    parser.error('--save-tfrecords only valid with --pdf-path or --pdf-xml-file-list')

  if sum(1 if x else 0 for x in (args.image_width, args.image_height)) == 1:
    parser.error('--image-width and --image-height need to be specified together')

  if not (args.save_lxml or args.save_svg or args.save_png or args.save_tfrecords):
    parser.error(
      'at least one of the output options required:'
      ' --save-lxml --save-svg --save-png or --save-tfrecords'
    )

def parse_args(argv=None):
  parser = argparse.ArgumentParser()
  add_main_args(parser)
  add_cloud_args(parser)

  # parsed_args, other_args = parser.parse_known_args(argv)
  parsed_args = parser.parse_args(argv)

  process_main_args(parser, parsed_args)
  process_cloud_args(
    parsed_args, parsed_args.output_path,
Daniel Ecer's avatar
Daniel Ecer committed
    name='sciencbeam-gym-preprocessing'
Daniel Ecer's avatar
Daniel Ecer committed
  )

  get_logger().info('parsed_args: %s', parsed_args)

  return parsed_args

def run(argv=None):
  """Main entry point; defines and runs the tfidf pipeline."""
  known_args = parse_args(argv)

  # We use the save_main_session option because one or more DoFn's in this
  # workflow rely on global context (e.g., a module imported at module level).
  pipeline_options = PipelineOptions.from_dictionary(vars(known_args))
  pipeline_options.view_as(SetupOptions).save_main_session = True

  with beam.Pipeline(known_args.runner, options=pipeline_options) as p:
    configure_pipeline(p, known_args)

    # Execute the pipeline and wait until it is completed.


if __name__ == '__main__':
  logging.basicConfig(level='INFO')

  run()