[yunqa.de] Re: Read/Write compressed streams

  • From: Delphi Inspiration <delphi@xxxxxxxx>
  • To: yunqa@xxxxxxxxxxxxx
  • Date: Thu, 03 Jul 2008 09:25:12 +0200

Rolf Lampa wrote:

>Since I often read crunch HUGE xml files for testing "worst case" scenarios, 
>like enwiki xml dumps, I wonder if there's any VERY fast TFileStream based 
>readers/writers out there which can optionally read .bz2, .gzip, 7z or .zip 
>files directly, without unzipping them? (and, well, also write directly to 
>disk in the mentioned compressed fmts).

You can create your own xmlParserInputBuffer, a pointer of which you can pass 
to the xmlNewTextReader() function.

xmlParserInputBuffer looks like:

type
  xmlParserInputBuffer = packed record
    Context: C_void_ptr; {}
    readcallback: xmlInputReadCallback;
    closecallback: xmlInputCloseCallback;
    Encoder: xmlCharEncodingHandlerPtr; { I18N conversions to UTF-8 }
    Buffer: xmlBufferPtr; { Local buffer encoded in UTF-8 }
    raw: xmlBufferPtr; { if encoder != NULL buffer for raw input }
    Compressed: C_int; { -1=unknown, 0=not compressed, 1=compressed }
    Error: C_int; {}
    rawconsumed: C_unsigned_long; { amount consumed from raw }
  end;

Decompression takes place in the "readcallback" function, the "closecallback" 
is invoked at the end of the reading. You must supply both functions when you 
initialize the record. 

As an example, please find attached the BZip2 uncompress xmlParserInputBuffer 
which the WikiTaxi Importer uses to feed the compressed Wikipedia XML dumps to 
the XML parser. Please substitute the DIBZip2Api.pas with your favourite BZip2 
Delphi implementation.

You create the xmlParserInputBuffer by calling BZip2FileInBuf(OpenFileHandle); 
where the file handle must be obtained by FileOpen(), for example.

>[Edit]: I do perform a lot of text manipulation on the texts read by the 
>stream object (although that is done once the text have been inserted into 
>objects), but some of the manipulations perhaps could be done directly in the 
>stream's read buffer? I'm thinking about some more advanced tricks where Regex 
>would apply. Speed is crucial though. (processing time like 12 days, to come 
>down to < 3 hours, is what I'm currently onto, so... ).

Less than 3 hours is very reasonable. To give yet another example: The WikiTaxi 
Importer completes the job in less than 2 hours for the English WikiPedia on a 
recent laptop system, performing these steps: read, uncompress, parse XML, 
recompress, store to database. Total data for the English WikiPedia is 3.7 GB 
compressed and about 14 GB after decompression.

Ralf 
unit DIXmlBZip2;

{$I DICompilers.inc}

interface

uses
  DIBZip2Api, DIXml;

function BZip2FileInBuf(
  const AFileHandle: Integer;
  const ABlockSize: Integer = $80000): xmlParserInputBufferPtr;

function BZip2FileReadCallback(
  Context: Pointer;
  Buffer: PAnsiChar;
  Len: Integer): Integer;

function BZip2FileCloseCallback(
  Context: Pointer): Integer;

type
  TBZip2Context = record
    bz: bz_stream;
    Buf: PAnsiChar; // File buffer (uncompressed).
    BufSize: Integer; // File buffer size.
    FileHandle: Integer; // File handle.
    FileRead: Int64; // Total bytes read from file so far.
  end;
  PBZip2Context = ^TBZip2Context;

implementation

uses
  SysUtils;

function BZip2FileReadCallback(
  Context: Pointer;
  Buffer: PAnsiChar;
  Len: Integer): Integer;
var
  Ctx: PBZip2Context;
  e: Integer;
begin
  Ctx := Context;

  Ctx^.bz.next_out := Buffer;
  Ctx^.bz.avail_out := Len;

  repeat
    if Ctx^.bz.avail_in = 0 then // no more compressed data
      begin
        Result := FileRead(Ctx^.FileHandle, Ctx^.Buf^, Ctx^.BufSize);
        if Result = 0 then Exit; // No more input? Exit!
        Inc(Ctx^.FileRead, Result);
        Ctx^.bz.avail_in := Result;
        Ctx^.bz.next_in := Ctx^.Buf;
      end;

    e := BZ2_bzDecompress(@Ctx^.bz);
    case e of
      BZ_OK:
        begin
          if Ctx^.bz.avail_out = 0 then
            begin
              Result := Len;
              Exit;
            end;
        end;
      BZ_STREAM_END:
        begin
          Result := Cardinal(Len) - Ctx^.bz.avail_out;
          Exit;
        end;
    else
      Result := -1; // Flag error!
      Exit;
    end;
  until False;

end;

function BZip2FileCloseCallback(
  Context: Pointer): Integer;
var
  Ctx: PBZip2Context;
begin
  Ctx := Context;
  BZ2_bzDecompressEnd(@Ctx^.bz);
  FreeMem(Ctx^.Buf);
  FileClose(Ctx^.FileHandle);
  FreeMem(Ctx);
  Result := 0; // No error.
end;

function BZip2FileInBuf(const AFileHandle: Integer; const ABlockSize: Integer = 
$80000): xmlParserInputBufferPtr;
var
  Ctx: PBZip2Context;
begin
  Result := nil;
  GetMem(Ctx, SizeOf(Ctx^));
  try
    GetMem(Ctx^.Buf, ABlockSize);
    Ctx^.FileHandle := AFileHandle;
    Ctx^.FileRead := 0;
    Ctx^.BufSize := ABlockSize;

    Ctx^.bz.bzalloc := nil;
    Ctx^.bz.bzfree := nil;
    Ctx^.bz.opaque := nil;
    BZ2_bzDecompressInit(@Ctx^.bz, 0, 0);
    Ctx^.bz.avail_in := 0;

    Result := xmlAllocParserInputBuffer(XML_CHAR_ENCODING_NONE);
    Result^.readcallback := BZip2FileReadCallback;
    Result^.closecallback := BZip2FileCloseCallback;
    Result^.Context := Ctx;
  except
    FreeMem(Ctx);
  end;
end;

end.

Other related posts: