Source code for regparser.citations

import logging
from itertools import chain

from regparser.grammar import unified as grammar
from regparser.tree.paragraph import p_levels
from regparser.tree.struct import Node

logger = logging.getLogger(__name__)


[docs]class Label(object): # @TODO: subparts _p_markers = tuple('p{0}'.format(i) for i in range(1, 10)) app_sect_schema = ('part', 'appendix', 'appendix_section') + _p_markers app_schema = ('part', 'appendix') + _p_markers regtext_schema = ('cfr_title', 'part', 'section') + _p_markers default_schema = regtext_schema comment_schema = ('comment', 'c1', 'c2', 'c3', 'c4') SCHEMA_FIELDS = set(app_sect_schema + app_schema + regtext_schema + comment_schema) @classmethod
[docs] def from_node(cls, node): """Convert between a struct.Node and a Label; use heuristics to determine which schema to follow. Node labels aren't as expressive as Label objects""" if (node.node_type == Node.APPENDIX or (node.node_type == Node.INTERP and len(node.label) > 2 and node.label[1].isalpha())): if len(node.label) > 2 and node.label[2].isdigit(): schema = cls.app_sect_schema else: schema = cls.app_schema else: schema = cls.regtext_schema[1:] # Nodes don't track CFR title settings = {'comment': node.node_type == Node.INTERP} for idx, value in enumerate(node.label): if value == 'Interp': # Add remaining bits as comment fields for cidx in range(idx + 1, len(node.label)): comment_field = cls.comment_schema[cidx - idx] settings[comment_field] = node.label[cidx] # Stop processing the prefix fields break settings[schema[idx]] = value return cls(**settings)
@staticmethod
[docs] def determine_schema(settings): if 'appendix_section' in settings: return Label.app_sect_schema elif 'appendix' in settings: return Label.app_schema elif 'section' in settings or 'cfr_title' in settings: return Label.regtext_schema
def __init__(self, schema=None, **kwargs): self.using_default_schema = False if schema is None: schema = Label.determine_schema(kwargs) if schema is None: self.using_default_schema = True schema = Label.default_schema self.settings = kwargs self.schema = schema self.comment = any(kwargs.get(field) for field in Label.comment_schema)
[docs] def copy(self, schema=None, **kwargs): """Keep any relevant prefix when copying""" kwschema = Label.determine_schema(kwargs) set_schema = bool(schema or kwschema or not self.using_default_schema) if schema is None: if kwschema: schema = kwschema else: schema = self.schema if set_schema: new_settings = {'schema': schema} else: new_settings = {} found_start = False for field in schema + Label.comment_schema: if field in kwargs: found_start = True new_settings[field] = kwargs[field] if not found_start: new_settings[field] = self.settings.get(field) return Label(**new_settings)
[docs] def to_list(self, for_node=True): """Convert a Label into a struct.Node style label list. Node labels don't contain CFR titles""" if for_node: lst = [self.settings.get(f) for f in self.schema if f != 'cfr_title'] else: lst = [self.settings.get(f) for f in self.schema] if self.comment: lst.append(Node.INTERP_MARK) lst.append(self.settings.get('c1')) lst.append(self.settings.get('c2')) lst.append(self.settings.get('c3')) return [l for l in lst if l]
def __repr__(self): fields = ', '.join( '{0}={1}'.format(field, repr(self.settings.get(field))) for field in self.schema) return 'Label({0})'.format(fields) def __eq__(self, other): """Equality if types match and fields match""" return (isinstance(other, Label) and self.using_default_schema == other.using_default_schema and self.settings == other.settings and self.schema == other.schema and self.comment == other.comment) def __hash__(self): return hash(repr(self)) def __lt__(self, other): self_list = tuple(self.to_list(for_node=False)) other_list = tuple(other.to_list(for_node=False)) return self_list < other_list
[docs] def labels_until(self, other): """Given `self` as a starting point and `other` as an end point, yield a `Label` for paragraphs in between. For example, if `self` is something like 123.45(a)(2) and end is 123.45(a)(6), this should emit 123.45(a)(3), (4), and (5)""" self_list = self.to_list(for_node=False) other_list = other.to_list(for_node=False) field = self.schema[len(self_list) - 1] start, end = self_list[-1], other_list[-1] level = [lvl for lvl in p_levels if start in lvl and end in lvl] if (self.schema != other.schema or len(self_list) != len(other_list) or self_list[:-1] != other_list[:-1] or not level): logger.warning("Bad use of 'through': %s - %s", self, other) else: level = level[0] start_idx, end_idx = level.index(start), level.index(end) for marker in level[start_idx + 1:end_idx]: yield self.copy(**{field: marker})
[docs]class ParagraphCitation(object): def __init__(self, start, end, label, full_start=None, full_end=None, in_clause=False): if full_start is None: full_start = start if full_end is None: full_end = end self.start, self.end, self.label = start, end, label self.full_start, self.full_end = full_start, full_end self.in_clause = in_clause def __contains__(self, other): """Proper inclusion""" return (other.full_start >= self.full_start and other.full_end <= self.full_end and (other.full_end != self.full_end or other.full_start != self.full_start)) def __repr__(self): return "ParagraphCitation(start={0}, end={1}, label={2} )".format( repr(self.start), repr(self.end), repr(self.label))
[docs]def match_to_label(match, initial_label, comment=False): """Return the citation and offsets for this match""" if comment: field_map = {'comment': True} else: field_map = {} for field in Label.SCHEMA_FIELDS: value = getattr(match, field) or getattr(match, 'plaintext_' + field) if value: field_map[field] = value label = initial_label.copy(**field_map) return label
[docs]def single_citations(matches, initial_label, comment=False): """For each pyparsing match, yield the corresponding ParagraphCitation""" for match, start, end in matches: full_start = start if match.marker is not '': # Remove the marker from the beginning of the string start = match.marker.pos[1] yield ParagraphCitation( start, end, match_to_label(match, initial_label, comment), full_start=full_start)
[docs]def multiple_citations(matches, initial_label, comment=False, include_fill=False): """Similar to single_citations save that we have a compound citation, such as "paragraphs (b), (d), and (f). Yield a ParagraphCitation for each sub-citation. We refer to the first match as "head" and all following as "tail" """ for outer_match, outer_start, outer_end in matches: label = initial_label # Share context in between sub-citations for submatch in chain([outer_match.head], outer_match.tail): match = submatch.match or submatch # might be wrapped new_label = match_to_label(match, label, comment) if include_fill and submatch.through: for fill_label in label.labels_until(new_label): yield ParagraphCitation( outer_start, outer_end, fill_label, in_clause=True) yield ParagraphCitation( match.pos.start, match.pos.end, new_label, full_start=outer_start, full_end=outer_end, in_clause=True) label = new_label # update the label to keep context
[docs]def internal_citations(text, initial_label=None, require_marker=False, title=None): """List of all internal citations in the text. require_marker helps by requiring text be prepended by 'comment'/'paragraphs'/etc. title represents the CFR title (e.g. 11 for FEC, 12 for CFPB regs) and is used to correctly parse citations of the the form 11 CFR 110.1 when 11 CFR 110 is the regulation being parsed.""" if not initial_label: initial_label = Label() citations = [] def single(gram, comment): citations.extend(single_citations(gram.scanString(text), initial_label, comment)) def multiple(gram, comment): citations.extend(multiple_citations(gram.scanString(text), initial_label, comment)) single(grammar.marker_comment, True) multiple(grammar.multiple_non_comments, False) multiple(grammar.multiple_appendix_section, False) multiple(grammar.multiple_comments, True) multiple(grammar.multiple_appendices, False) multiple(grammar.multiple_period_sections, False) single(grammar.marker_appendix, False) single(grammar.appendix_with_section, False) single(grammar.marker_paragraph, False) single(grammar.mps_paragraph, False) single(grammar.m_section_paragraph, False) if not require_marker: single(grammar.section_paragraph, False) single(grammar.part_section_paragraph, False) multiple(grammar.multiple_section_paragraphs, False) # Some appendix citations are... complex for match, start, end in grammar.appendix_with_part.scanString(text): full_start = start if match.marker is not '': start = match.marker.pos[1] label_parts = filter(lambda l: l != '.', list(match)[3:]) label = dict(zip(['p1', 'p2', 'p3'], label_parts)) citations.append(ParagraphCitation( start, end, initial_label.copy( appendix=match.appendix, appendix_section=match.a1, **label), full_start=full_start)) # Internal citations can sometimes be in the form XX CFR YY.ZZ # Check if this is a reference to the CFR title and part we are parsing for cit in cfr_citations(text): cit_title = cit.label.settings.get('cfr_title') cit_part = cit.label.settings.get('part') initial_part = initial_label.settings.get('part') if cit_title == title and cit_part == initial_part: citations.append(cit) return select_encompassing_citations(citations)
[docs]def select_encompassing_citations(citations): """The same citation might be found by multiple grammars; we take the most-encompassing of any overlaps""" encompassing = [] for cit in citations: if not any(cit in other for other in citations): encompassing.append(cit) return encompassing
[docs]def remove_citation_overlaps(text, possible_markers): """Given a list of markers, remove any that overlap with citations""" return [(m, start, end) for m, start, end in possible_markers if not any((e.start <= start and e.end >= start) or (e.start <= end and e.end >= end) or (start <= e.start and end >= e.end) for e in internal_citations(text))]
[docs]def cfr_citations(text, include_fill=False): """Find all citations which include CFR title and part""" citations = [] initial_label = Label() citations.extend(single_citations(grammar.cfr.scanString(text), initial_label)) citations.extend(single_citations(grammar.cfr_p.scanString(text), initial_label)) citations.extend(multiple_citations( grammar.multiple_cfr_p.scanString(text), initial_label, include_fill=include_fill)) return select_encompassing_citations(citations)