• Login
Community
  • Login

tool for interconverting/querying xml/csv/json/jsonl/regex matches

Scheduled Pinned Locked Moved General Discussion
conversionxmlcsvjsonregex
7 Posts 2 Posters 2.9k Views
Loading More Posts
  • Oldest to Newest
  • Newest to Oldest
  • Most Votes
Reply
  • Reply as topic
Log in to reply
This topic has been deleted. Only users with topic management privileges can see it.
  • M
    Mark Olson
    last edited by Jun 20, 2023, 6:22 AM

    I’ve noticed enough general questions along the lines of “how to I convert between x format and y format” (e.g., CSV to XML) that I figured I’d just make one catch-all tool for doing most transformations you might want inside NPP without needing to know any programming languages (although it does help).

    This tool requires PythonScript 3.0.13 or newer .

    Simply run the script and follow the instructions. When prompted for a query, you can ignore it if you want to transform the file without filtering.

    It has not been thoroughly tested, so bug reports are appreciated.

    THE FILE IS PASTED IN TWO PARTS BETWEEN THIS POST AND THE NEXT POST (due to post length limit). YOU NEED TO INCLUDE BOTH PARTS IN THE SAME FILE FOR THIS TO WORK.

    PART 1

    '''
    Simple tool for querying CSVs, JSON, and tabular XML files with Python list comprehensions, etc.
    1. Use the PythonScript console.
    2. Call the pyq function on a string containing a query,
        using the variable "x" to represent the data file.
        You can also pass in an output type (defaults to 
        Numbers are parsed as floats; everything else is parsed as a string.
        For example, pyq("[row for row in x if row['CITY'].startswith('a')]").
        The query can include:
        * any functions that are always available, like max, min, and sorted
        * the mean function (takes average of a list of numbers)
        * functions/classes from math, re, and collections.
        * a group_by function (see below)
        * an inner_join function for performing a SQL-style inner join (see below)
    3. The result of the query will be displayed in a new window.
        For best results, move the query file to other view
        so that one view has your tabular file and the other view
        has the query results file.
    '''
    from Npp import *
    
    import csv
    import collections
    from datetime import datetime
    from io import StringIO
    import json
    import math
    import os
    import re
    from xml.etree import ElementTree as ET
    
    ### SETTINGS
    DIALOG_ON_RUN = True # open a pop-up dialog to run queries when the script is run (alternative: run from PythonScript console)
    VALID_DATA_TYPES = {'csv', 'json', 'jsonl', 'xml', 'regex'}
    FILE_EXTENSION_TYPE_MAPS = { # keys are file extensions, values are in VALID_DATA_TYPES
        'json': 'json',
        'xml': 'xml',
        'jsonl': 'jsonl',
        'csv': 'csv',
        'tsv': 'csv',
    }
    TYPES_TO_TEST = ['json', 'jsonl', 'xml'] # try these data types if file extension not helpful
    DEFAULT_OUTPUT_TYPE = 'csv' # must be in VALID_DATA_TYPES
    CSV_DEFAULT_OUTPUT_DIALECT = 'excel-tab' # tab-separated; use 'excel' for comma-separated
    AUTODETECT_BOOLEANS = True
    BOOLEAN_PAIRS = [ # case insensitive
        {'F': False, 'T': True},
        {'FALSE': False, 'TRUE': True},
        {'NO': False, 'YES': True},
        {'N': False, 'Y': True},
    ]
    BOOLEAN_VALS = {k: pair for pair in BOOLEAN_PAIRS for k in pair}
    DEFAULT_QUERY = 'x' # select all data
    DEFAULT_REGEX = '(?-s)^(.*)$' # select entire line
    ### END SETTINGS
    
    def now():
        return datetime.now().timestamp()
    
    def getFileExtension(fname):
        for ii in range(len(fname) - 1, -1, -1):
            if fname[ii] == '.':
                break
        if ii == 0:
            return ''
        return fname[ii + 1:]
    
    PYQ_DIR = os.path.join(os.path.dirname(__file__), 'pyquery')
    
    num_regex = re.compile(r'[+-]?(?:(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][+-]?\d+)?|nan|inf)')
    
    #### QUERY FUNCTIONS ####
    
    def mean(rows):
        return sum(rows)/len(rows)
    
    
    def inner_join(rows1, rows2, var1, var2=None):
        '''
        Standard SQL-style inner join. var2 is set to var1 if not specified.
        If rows1 and rows2 are lists of lists,
            returns a list of dicts
                where an 0-based index in a row is mapped to 'col{index}' for rows1
                                                          or 'col{index}_1' for rows2
        Otherwise, if rows2 has column names that are duplicates of rows in rows1,
            the duplicate names in rows2 are mangled by appending '_1'.
        EXAMPLE:
        >>> rows1 = [{'a': 1, 'b': 'foo', 'd': 5.2}, {'a': 3, 'b': 'bar', 'd': -1}]
        >>> rows2 = [{'a': 3, 'b': True, 'c': -2}, {'a': 1, 'b': False, 'c': -1}]
        >>> inner_join(rows1, rows2, 'a', 'b') # 1 and True are considered equal
    [{'a': 1, 'b': 'foo', 'd': 5.2, 'a_1': 3, 'b_1': True, 'c': -2}]
        >>> rows3 = [[1, 'a'], [2, 'b']]
        >>> rows4 = [[3, False, 'foo'], [2, True, 'bar'], [2, False, 'baz']]
        >>> inner_join(rows3, rows4, 0)
    [{'col0': 2, 'col1': 'b', 'col0_1': 2, 'col1_1': True, 'col2_1': 'bar'},
     {'col0': 2, 'col1': 'b', 'col0_1': 2, 'col1_1': False, 'col2_1': 'baz'}]
        >>> rows5 = [{'a': 1, 'b_1': 2, 'b': 1}]
        >>> rows6 = [{'a': 3, 'b': 1, 'c': -2, 'b_1': False}]
        >>> inner_join(rows5, rows6, 'a', 'b') # if k + '_1' is still duplicate, append '_1' until not duplicate
    [{'a': 1, 'b_1': 2, 'b': 1, 'a_1': 3, 'b_1_1': 1, 'c': -2, 'b_1_1_1': False}]
        '''
        if not (rows1 and rows2):
            return []
        if var2 is None:
            var2 = var1
        rows1_grouped = {}
        for row in rows1:
            val = row[var1]
            rows1_grouped.setdefault(val, []).append(row)
        rowtype = type(rows1[0])
        if rowtype == list:
            def row_combiner(row1, row2):
                out = {f'col{ii}': v for ii, v in enumerate(row1)}
                for ii, v in enumerate(row2):
                    out[f'col{ii}_1'] = v
                return out
        else:
            shared_keys = set(rows1[0]) & set(k for k in rows2[0])
            def row_combiner(row1, row2):
                out = {**row1}
                for k, v in row2.items():
                    if k in shared_keys:
                        mangled = k + '_1'
                        ctr = 2
                        while mangled in out:
                            mangled += '_1'
                        out[mangled] = v
                    else:
                        out[k] = v
                return out
        final = []
        for row2 in rows2:
            val = row2[var2]
            rows1_related = rows1_grouped.get(val)
            if rows1_related:
                for row1 in rows1_related:
                    final.append(row_combiner(row1, row2))
        return final
    
    
    def group_by(rows, group_var, func, agg_var = None):
        '''
        rows: a list of dicts with str keys
        group_var: str, the column name to group by
        func: a function that operates on a list
            (must operate on list of dicts if agg_var is None)
            OR a list of such functions
        agg_var: a column name OR a list of column names.
    EXAMPLE:
    >>> group_by(rows, 'contaminated', max, 'zone')
    {
        "TRUE": 2.0,
        "FALSE": 4.0
    }
    >>> group_by(rows, 'contaminated', [mean, min], 'nums')
    {
        "TRUE": {
            "mean": 1.5,
            "min": 1.0
        },
        "FALSE": {
            "mean": 2.0,
            "min": 1.0
        }
    }
    >>> group_by(rows, 'names', [sum, min], ['nums', 'zone'])
    {
        "Bluds": {
            "nums": {"min": NaN, "sum": NaN},
            "zone": {"min": 1.0, "sum": 3.0}
        },
        "dfsd": {
            "nums": {"min": 0.5, "sum": 2.0},
            "zone": {"min": 2.0, "sum": 8.0}
        },
        "flodt": {
            "nums": {"min": 3.4, "sum": 3.4},
            "zone": {"min": 4.0, "sum": 4.0}
        }
    }
        '''
        row_groups = {}
        for row in rows:
            val = row[group_var]
            row_groups.setdefault(str(val), []).append(row)
        if isinstance(func, list):
            new_func = lambda x: {subfunc.__name__ : subfunc(x)
                                for subfunc in func}
        else:
            new_func = func
        if isinstance(agg_var, list):
            return {group_val:
                        {sub_var: new_func([row[sub_var] for row in rows])
                                    for sub_var in agg_var}
                      for group_val, rows in row_groups.items()}
        elif isinstance(agg_var, str):
            return {group_val: new_func([row[agg_var] for row in rows])
                      for group_val, rows in row_groups.items()}
        return {group_val: new_func(rows) for group_val, rows in row_groups.items()}
    
    
    #### TABULARIZING FUNCTIONS ####
    
    
    def is_list_like(x):
        return isinstance(x, (list, set, frozenset, tuple))
    
    
    def convert_to_list_of_dicts(rows):
        if is_list_like(rows):
            if not isinstance(rows, list):
                rows = list(rows)
            if not rows:
                return rows
            firstrow = rows[0]
            if isinstance(firstrow, dict):
                return rows
            elif not is_list_like(firstrow):
                return [{'col1': row} for row in rows]
            return [dict(flatten_row(row, [])) for row in rows]
        elif isinstance(rows, dict):
            if not rows:
                return rows
            lists, dicts, other_vars = [], [], []
            for k, v in rows.items():
                if is_list_like(v):
                    lists.append((k, v))
                elif isinstance(v, dict):
                    dicts.append((k, v))
                else:
                    other_vars.append((k, v))
            if not lists:
                return [dict(flatten_row(rows, []))]
            out_rows = []
            max_len = max(len(v) for k, v in lists)
            base_row = dict(other_vars)
            base_row.update(flatten_row(dict(dicts), []))
            for ii in range(max_len):
                new_row = base_row.copy()
                for list_var, list_ in lists:
                    new_row[list_var] = None if ii >= len(list_) else list_[ii]
                out_rows.append(dict(flatten_row(new_row, [])))
            return out_rows
        # rows is a scalar
        return [{'col1': rows}]
    
    
    def flatten_row(row, current_key):
        if is_list_like(row):
            if not isinstance(row, list):
                row = list(row)
            for ii, v in enumerate(row):
                current_key.append(f'col{ii}')
                yield from flatten_row(v, current_key)
                current_key.pop()
        elif isinstance(row, dict):
            for k, v in row.items():
                current_key.append(k)
                yield from flatten_row(v, current_key)
                current_key.pop()
        elif not current_key:
            yield '_', row
        else:
            yield '.'.join(current_key), row
    
    #### PROCESSING FUNCTIONS ####
    
    def csv_to_json(text):
        lines = [x.rstrip() for x in editor.getText().splitlines()]
        sniffer = csv.Sniffer()
        dialect = sniffer.sniff(text[:4092])
        reader = csv.DictReader(lines, dialect=dialect)
        rows = list(reader)
        postprocess_json(rows, AUTODETECT_BOOLEANS)
        return rows
    
    
    def xml_to_json(text):
        root = ET.fromstring(text)
        js = [{e.tag: e.text for e in child} for child in root]
        postprocess_json(js, AUTODETECT_BOOLEANS)
        return js
    
    
    def jsonl_to_json(text):
        lines = text.splitlines()
        return [json.loads(line) for line in lines]
    
    
    def postprocess_json(rows, autodetect_booleans):
        '''
        rows must be a list of dicts mapping str to str
        Performs the following in-place changes:
        1. Converts string representations of numbers (using '.' as decimal sep)
            to the corresponding numbers
        2. If autodetect_booleans, converts columns where all values are in the same
            boolean pair to the corresponding boolean value
            (e.g., columns of all 'T' and 'F' become True and False)
        '''
        bad_keys = set()
        boolean_pairs = {}
        if autodetect_booleans:
            for row in rows:
                for k, v in row.items():
                    if k not in bad_keys and isinstance(v, str):
                        # check if value is in the same boolean pair
                        # as all previous values in this row
                        current_boopair = boolean_pairs.get(k)
                        upv = v.upper()
                        boopair = BOOLEAN_VALS.get(upv)
                        if not boopair or (current_boopair and boopair is not current_boopair):
                            bad_keys.add(k)
                            if k in boolean_pairs:
                                del boolean_pairs[k]
                        else:
                            boolean_pairs[k] = boopair
        for row in rows:
            for k, v in row.items():
                boopair = boolean_pairs.get(k)
                if boopair:
                    row[k] = boopair[v.upper()]
                elif isinstance(v, str) and num_regex.fullmatch(v):
                    row[k] = float(v)
    
    
    #### OUTPUT FUNCTIONS ####
    
    def dump_json(js_):
        return json.dumps(js_, indent=4)
        
    def dump_jsonl(js_):
        rows = convert_to_list_of_dicts(js_)
        return '\n'.join(json.dumps(row) for row in rows)
        
    def dump_csv(js_):
        rows = convert_to_list_of_dicts(js_)
        fieldnames = sorted(set(k for row in rows for k in row),
            key=lambda x: x.upper())
        strio = StringIO()
        writer = csv.DictWriter(strio, fieldnames, dialect=CSV_DEFAULT_OUTPUT_DIALECT)
        writer.writeheader()
        for row in rows:
            writer.writerow(row)
        return strio.getvalue()
        
    def dump_xml(js_):
        root = ET.Element('Table')
        rows = convert_to_list_of_dicts(js_)
        for row in rows:
            rowelement = ET.Element('Row')
            root.append(rowelement)
            for k, v in row.items():
                velement = ET.Element(k)
                velement.text = '' if v is None else str(v)
                rowelement.append(velement)
        tree = ET.ElementTree(root)
        ET.indent(tree, ' ' * 4)
        strio = StringIO()
        tree.write(strio, encoding='unicode', xml_declaration=True, short_empty_elements=False)
        return strio.getvalue()
    
    def dump_to_regex_replace(js_, regex, bels):
        rows = convert_to_list_of_dicts(js_)
        # we want to use Boost regex syntax to parse our file
        notepad.new()
        newtext = '\n'.join(bels.join(str(v) for v in row.values()) for row in rows)
        editor.setText(newtext)
        search_regex = bels.join(f"(?'{k}'.*)" for k in rows[0])
        print(f'{search_regex = }, {regex = }')
        editor.rereplace(search_regex, regex)
        text = editor.getText()
        editor.setText('')
        notepad.close()
        return text
        
    
    L 1 Reply Last reply Jun 20, 2023, 7:30 PM Reply Quote 3
    • M
      Mark Olson
      last edited by Mark Olson Jun 20, 2023, 6:24 AM Jun 20, 2023, 6:23 AM

      PART 2 (ADD TO SCRIPT AFTER PART 1 ABOVE)

      
      #### HELPER CLASSES ####
      
      class TabDef:
          def __init__(self, rows, data_type):
              self.data_type = data_type
              self.refresh(rows)
      
          def query(self, query):
              return eval(query, {
                  'collections': collections,
                  'group_by': group_by,
                  'math': math,
                  'avg': mean,
                  'mean': mean,
                  'groupby': group_by,
                  'group_by': group_by,
                  'join': inner_join,
                  'inner_join': inner_join,
                  're': re,
                  'x': self.rows,
              })
      
          def refresh(self, rows):
              self.rows = rows
              self.last_refresh = now()
              self.get_column_types()
              
          def get_column_types(self):
              if (not self.rows) or (isinstance(self.data_type, str) and self.data_type == 'json'):
                  # json data need not have columns, can be tree-like
                  self.column_types = None
                  return
              self.column_types = {}
              for row in self.rows:
                  if is_list_like(row):
                      for ii, v in enumerate(row):
                          self.column_types.setdefault(ii, set()).add(type(v))
                  elif isinstance(row, dict):
                      for k, v in row.items():
                          self.column_types.setdefault(k, set()).add(type(v))
                          
          def column_types_repr(self):
              out = ['{']
              for k, types in self.column_types.items():
                  tlist = sorted('None' if t == type(None) else t.__name__
                      for t in types)
                  out.append(f"{repr(k)}: {'|'.join(tlist)}")
                  out.append(', ')
              out[-1] = '}'
              return ''.join(out)
      
      
      class PyQuery:
          def __init__(self):
              self.tabdefs = {}
              self.mtimes = {}
              self.id_to_be_renamed = None
              self.fname_to_be_renamed = None
              self.remembered_queries = {}
              self.remembered_regexes = {}
              self.remembered_output_formats = {}
              self.delimiting_bels = {} # count BEL chars for rereplace templating
              self.current_fname = notepad.getCurrentFilename()
              self.query_file_path = os.path.join(PYQ_DIR, 'PyQuery query file.pyq')
              self.remembered_queries_fname = 'queries.json'
              self.remembered_regexes_fname = 'regexes.json'
              self.remembered_output_formats_fname = 'output_formats.json'
              
          def dump_if_not_empty(self, dict_, fname):
              if not os.path.exists(PYQ_DIR) or not dict_:
                  return
              abspath = os.path.join(PYQ_DIR, fname)
              with open(abspath, 'w') as f:
                  json.dump(dict_, f)
              
          def on_shutdown(self, notif):
              self.dump_if_not_empty(self.remembered_queries, self.remembered_queries_fname)
              self.dump_if_not_empty(self.remembered_regexes, self.remembered_regexes_fname)
              self.dump_if_not_empty(self.remembered_output_formats, self.remembered_output_formats_fname)
              
          def get_remembered_thing(self, attr, fname, default):
              '''attr is the name of a dict attribute
              ('remembered_queries' or 'remembered_regexes' or 'remembered_output_formats')
              Returns the thing associated with fname.
              If it can't find the thing associated with fname, find the thing associated
              with that thing's file extension.
              '''
              if not getattr(self, attr):            
                  abspath = os.path.join(PYQ_DIR, fname)
                  if os.path.exists(abspath):
                      with open(abspath) as f:
                          setattr(self, attr, json.load(f))
              memo = getattr(self, attr)
              remembered_for_file = memo.get(self.current_fname)
              if remembered_for_file:
                  self.memorize_thing(remembered_for_file, memo, default)
                  return remembered_for_file
              exts = memo.get('extensions')
              if exts:
                  fname_ext = getFileExtension(self.current_fname)
                  return exts.get(fname_ext, default)
              return default
              
         
          def get_query(self):
              return self.get_remembered_thing('remembered_queries', self.remembered_queries_fname, DEFAULT_QUERY)
              
          def get_regex(self):
              return self.get_remembered_thing('remembered_regexes', self.remembered_regexes_fname, DEFAULT_REGEX)
              
          def get_output_format(self):
              return self.get_remembered_thing('remembered_output_formats',  self.remembered_output_formats_fname, DEFAULT_OUTPUT_TYPE)
      
          def memorize_thing(self, thing, dict_, default):
              if thing == default:
                  return
              dict_[self.current_fname] = thing
              ext = getFileExtension(self.current_fname)
              exts = dict_.setdefault('extensions', {})
              exts[ext] = thing
              
          def memorize_query(self, query):
              self.memorize_thing(query, self.remembered_queries, DEFAULT_QUERY)
              
          def memorize_output_format(self, output_format):
              self.memorize_thing(output_format, self.remembered_output_formats, DEFAULT_OUTPUT_TYPE)
              
          def memorize_regex(self, regex):
              self.memorize_thing(regex, self.remembered_regexes, DEFAULT_REGEX)
              
          def on_bufferactivated(self, notif):
              fname = notepad.getCurrentFilename()
              if fname == self.query_file_path:
                  return
              self.current_fname = notepad.getCurrentFilename()
              last_mod_outside_npp = now()
              try: # check for modifications outside of Notepad++
                  last_mod_outside_npp = os.path.getmtime(self.current_fname)
              except FileNotFoundError:
                  return # an in-memory buffer, can't be modified outside of NPP
              last_mod_in_buffer = self.mtimes.get(self.current_fname, 0.)
              self.mtimes[self.current_fname] = max(last_mod_in_buffer, last_mod_outside_npp)
                  
          def on_filebeforerename(self, notif):
              self.id_to_be_renamed = notif['bufferID']
              self.fname_to_be_renamed = notepad.getBufferFilename(self.id_to_be_renamed)
          
          def on_filerenamecancel(self, notif):
              self.id_to_be_renamed = None
              self.fname_to_be_renamed = None
          
          def on_filerenamed(self, notif):
              if not self.id_to_be_renamed: # was cancelled
                  return
              fname = notepad.getBufferFilename(self.id_to_be_renamed)
              print(f'{self.current_fname = }, {fname = }, {self.fname_to_be_renamed = }, {self.id_to_be_renamed = }')
              if self.fname_to_be_renamed == self.current_fname:
                  self.current_fname = fname
              mtime = self.mtimes.get(self.fname_to_be_renamed)
              if mtime:
                  self.mtimes[fname] = self.mtimes[self.fname_to_be_renamed]
                  del self.mtimes[self.fname_to_be_renamed]
              else:
                  self.mtimes[fname] = now()
              tabdef = self.tabdefs.get(self.fname_to_be_renamed)
              if tabdef:
                  self.tabdefs[fname] = tabdef
                  del self.tabdefs[self.fname_to_be_renamed]
              self.id_to_be_renamed = None
              self.fname_to_be_renamed = None
      
          def on_modified(self, notif):
              if not notif['text']:
                  return
              self.mtimes[self.current_fname] = now()
      
          def get_tabdef(self):
              tabdef = self.tabdefs.get(self.current_fname)
              if tabdef:
                  mtime = self.mtimes[self.current_fname]
                  if mtime < tabdef.last_refresh:
                      return tabdef
              else:
                  tabdef = TabDef(None, None)
              notepad.open(self.current_fname)
              extension = getFileExtension(self.current_fname)
              if tabdef.data_type:
                  data_type = tabdef.data_type
              else:
                  data_type = FILE_EXTENSION_TYPE_MAPS.get(extension)
              text = editor.getText()
              if not data_type:
                  for typ in ['xml', 'json', 'jsonl']: # don't try csv, it finds illogical column separators
                      proc = self.get_processor(typ)
                      try:
                          rows = proc(text)
                          return self.bind_tabdef(tabdef, typ, rows)
                      except Exception as ex: # not the correct way to parse
                          print(f'While parsing as {typ} in try loop, got error:\n{ex}')
                          pass
                  return None # couldn't parse
              else:
                  processor = self.get_processor(data_type)
                  try:
                      rows = processor(text)
                  except Exception as ex:
                      print(f'While parsing as {data_type}, got error:\n{ex}')
                      return None
              return self.bind_tabdef(tabdef, data_type, rows)
              
          def bind_tabdef(self, tabdef, data_type, rows):
              tabdef.data_type = data_type
              self.mtimes[self.current_fname] = now()
              self.tabdefs[self.current_fname] = tabdef
              tabdef.refresh(rows)
              return tabdef
              
          def get_delimiting_bels(self):
              '''
              we use BEL chars to delimit fields in the intermediate file
              when the output mode is a regex-replace
              EXAMPLE:
              If the query result is [{'col1': 0, 'col2': 'a'}, {'col1': 1, 'col2': 'b'}]
              the intermediate file contains
              0{BEL}a
              0{BEL}b
              where {BEL} is the BEL character.
              But if BEL is in the file's text, we need to use more bels to delimit.
              E.g, if the query result is [{'col1': 0, 'col2': 'a{BEL}'}, {'col1': 1, 'col2': 'b'}]
              the intermediate file will contain
              0{BEL}{BEL}a{BEL}
              1{BEL}{BEL}b{BEL}
              '''
              delimiting_bels = self.delimiting_bels.get(self.current_fname)
              if delimiting_bels:
                  return delimiting_bels
              self._consecutive_bel_count = 0
              def on_bel_match(m):
                  bel_ct = m.end() - m.start()
                  print(f'{self._consecutive_bel_count = }, {bel_ct = }')
                  self._consecutive_bel_count = max(self._consecutive_bel_count, bel_ct)
              print(f'After finds, {self._consecutive_bel_count = }')
              editor.research('\x07+', on_bel_match)
              delimiting_bels = '\x07' * (self._consecutive_bel_count + 1)
              self.delimiting_bels[self.current_fname] = delimiting_bels
              return delimiting_bels
      
          def write_to_pyq_buffer(self, text):
              if not os.path.exists(PYQ_DIR):
                  os.mkdir(PYQ_DIR)
              if not os.path.exists(self.query_file_path):
                  with open(self.query_file_path, 'w') as f:
                      f.write('')
              notepad.open(self.query_file_path)
              editor.setText(text)
          
          def regex_matches_to_json(self, regex):
              notepad.open(self.current_fname)
              matches = []
              editor.research(regex, lambda m: matches.append(m.groups()))
              self.memorize_regex(regex)
              rows = convert_to_list_of_dicts(matches)
              postprocess_json(rows, AUTODETECT_BOOLEANS)
              return rows
          
          def get_processor(self, data_type):
              return {
                  'csv': csv_to_json,
                  'json': json.loads,
                  'jsonl': jsonl_to_json,
                  'xml': xml_to_json,
              }.get(data_type,
                  lambda text: self.regex_matches_to_json(data_type))
              
          def dump(self, output_type, data=None):
              if data is None:
                  tabdef = self.get_tabdef()
                  data = tabdef.rows
              dumper = {
                  'json': dump_json,
                  'jsonl': dump_jsonl,
                  'csv':  dump_csv,
                  'xml':  dump_xml,
              }.get(output_type,
                  lambda rows: dump_to_regex_replace(rows, output_type, self.get_delimiting_bels()))
              outstr = dumper(data)
              self.memorize_output_format(output_type)
              self.write_to_pyq_buffer(outstr)
              
          def dialog(self):
              tabdef = self.get_tabdef()
              dtype = None if not tabdef else tabdef.data_type
              if not tabdef or dtype not in VALID_DATA_TYPES:
                  default_regex = self.get_regex()
                  regex = notepad.prompt(
                      f"Couldn't parse current file as any of the types {TYPES_TO_TEST}\nEnter csv to parse as csv, or a regex if you wish to query regex matches",
                      "Couldn't parse file; enter csv or a regex",
                      default_regex
                  )
                  if not regex:
                      return
                  rows = self.regex_matches_to_json(regex)
                  tabdef = self.bind_tabdef(TabDef(None, None), regex, rows)
              msg = f'Enter Python query on {tabdef.data_type} file. x represents data.'
              coltypes = '' if not (tabdef and tabdef.column_types) \
                  else f'Col types: {tabdef.column_types_repr()}'
              default_query = self.get_query()
              query = notepad.prompt(coltypes, msg, default_query)
              if not query:
                  return
              output_type = self.get_output_format()
              data_type = notepad.prompt('Enter an output format (regex or one of json, jsonl, csv, xml)', 'Enter an output format', output_type)
              if not data_type:
                  return
              self.pyq(query, data_type)
              
          def pyq(self, query, output_type=None):
              tabdef = self.get_tabdef()
              if not tabdef:
                  self.write_to_pyq_buffer(f'{self.current_fname} could not be parsed as a file of any of the types {VALID_DATA_TYPES}')
                  return
              if not output_type:
                  output_type = DEFAULT_OUTPUT_TYPE
              try:
                  result = tabdef.query(query)
              except Exception as ex:
                  self.write_to_pyq_buffer(f'Failure while parsing query:\n{ex}')
                  return
              self.memorize_query(query)
              self.dump(output_type, result)
      
      
      if __name__ == '__main__':
          try:
              INITIALIZED
          except NameError:
              INITIALIZED = True
              PYQ = PyQuery()
              pyq = PYQ.pyq
              dump = PYQ.dump
              notepad.callback(PYQ.on_bufferactivated, [NOTIFICATION.BUFFERACTIVATED])
              notepad.callback(PYQ.on_filebeforerename, [NOTIFICATION.FILEBEFORERENAME])
              notepad.callback(PYQ.on_filerenamecancel, [NOTIFICATION.FILERENAMECANCEL])
              notepad.callback(PYQ.on_filerenamed, [NOTIFICATION.FILERENAMED])
              notepad.callback(PYQ.on_shutdown, [NOTIFICATION.BEFORESHUTDOWN])
              editor.callback(PYQ.on_modified, [SCINTILLANOTIFICATION.MODIFIED])
          if DIALOG_ON_RUN:
              PYQ.dialog()
      
      1 Reply Last reply Reply Quote 3
      • M
        Mark Olson
        last edited by Jun 20, 2023, 7:44 AM

        I should probably mention some quality-of-life features that might make you want to consider using this tool:

        1. Numbers are automatically parsed as floating-point numbers, no need to do any annoying convert-to-number-if-passes-regex-test stuff
        2. Automatically detects when file is edited, only re-parses file if required
        3. Automatic detection of booleans
        4. Regular expressions are supported in the query (Python re functions)
        5. Conversion speed seems to be competitive with any other plugin I’ve seen

        I’ll also just illustrate a regex-related use case that would normally require a bespoke script.
        Start with

        foo: tone bar: 27.1
        foo: bLue bar: 241.3
        foo: taco bar: 66.5
        foo: basst bar: 281.7
        foo: öyster bar: 144.0
        foo: oyster bar: 164.1
        foo: Spb bar: -102.1
        foo: blüe bar: 203.2
        foo: spä bar: 1.2E-3
        foo: spb bar: 84.2e4
        foo: blve bar: 183.2
        foo: blue bar: -222.2
        foo: baßk bar: 261.3
        foo: täco bar: 47.3e5
        foo: täco bar: 300.3e5
        

        In first prompt, enter (?-s)^foo: (\w+) bar: (.+)$. This captures the word after foo as col0 and the number after bar: as col1.

        Now we want to sort these lines by the number after bar (and secondarily by the word after foo) and eliminate any lines where the number after bar is less than 0.
        So in the second prompt enter sorted((r for r in x if r['col1'] > 0), key=lambda x: [x['col1'], x['col0'].upper()])

        And let’s get it back in the original format. In the final prompt, enter foo: $1 bar: $2
        We get the filtered, sorted rows:

        foo: spä bar: 0.0012
        foo: tone bar: 27.1
        foo: taco bar: 66.5
        foo: öyster bar: 144.0
        foo: oyster bar: 164.1
        foo: blve bar: 183.2
        foo: blüe bar: 203.2
        foo: bLue bar: 241.3
        foo: baßk bar: 261.3
        foo: basst bar: 281.7
        foo: spb bar: 842000.0
        foo: täco bar: 4730000.0
        foo: täco bar: 30030000.0
        
        1 Reply Last reply Reply Quote 1
        • L
          Lycan Thrope @Mark Olson
          last edited by Jun 20, 2023, 7:30 PM

          @Mark-Olson ,
          For consistentcy, this file should be named, what? :-) Always good to have that name so someone knows what someone is talking about when trying to refer to the script. :-)

          M 1 Reply Last reply Jun 20, 2023, 8:34 PM Reply Quote 1
          • M
            Mark Olson @Lycan Thrope
            last edited by Jun 20, 2023, 8:34 PM

            @Lycan-Thrope
            pyquery.py

            L 1 Reply Last reply Jun 21, 2023, 7:05 PM Reply Quote 2
            • L
              Lycan Thrope @Mark Olson
              last edited by Jun 21, 2023, 7:05 PM

              @Mark-Olson ,
              Thanks. I saw the reference to the filename at the end of the file, but wanted to be sure, for future reference. :-)

              1 Reply Last reply Reply Quote 0
              • M
                Mark Olson
                last edited by Mark Olson Jun 21, 2023, 11:18 PM Jun 21, 2023, 11:14 PM

                Permanent link to most up-to-date version

                Things fixed since my original post above:

                1. Caching problems (my original assertion that files were re-parsed only as needed was actually false)
                2. Originally if you used $0 in your output format to get all the matched text (not just capture groups), the output file would contain the BEL characters I used to delimit the capture groups in the intermediate file.
                3. Allow group_by and join to group by/join by multiple columns. See docs in file.
                4. Added byind and bykey convenience functions for easier sorting by index/key in child.
                1 Reply Last reply Reply Quote 1
                5 out of 7
                • First post
                  5/7
                  Last post
                The Community of users of the Notepad++ text editor.
                Powered by NodeBB | Contributors