Parse error can be caused due to NUL character included in records with Sqoop

Sqoopを使ってRDBMとHadoop間でデータ転送を行なっている。Sqoopは、MapReduceジョブを実行し複数のMapタスクで並列にインポートやエクスポートを行なう。また、Sqoopはコマンドラインからsqoopコマンドを実行するだけで、データ転送に必要なテーブルのメタ情報の取得やRDBMとHadoop間でデータのやり取りをするインターフェースプログラムの生成やコンパイル、各Mapタスクへのデータスプリット計算などを内部で行ってくれるので、実行も非常に容易である。コマンドラインオプションも豊富に用意されており、多くの場合はそれらの組み合わせで十分対応できる。
目次
Parse Error with Export
で、今回通常どおりSqoopを使ってデータ転送処理を実行していたところ、Sqoop Exportの実行時に以下のようなエラーになってしまった(以下のエラーはローカルで再現させたもの)。成功したMapタスクのデータは転送に成功しているが、失敗したMapタスクのデータは完全に転送されないためデータが不完全な状態になってしまう。parseに失敗しているデータを調べたところ、対象レコードにNUL(\00)
文字が含まれていたようだった。そこで、Sqoopの内部動作を追いながら回避方法を模索してみた。
16/07/17 23:17:52 ERROR mapreduce.TextExportMapper: 16/07/17 23:17:52 ERROR mapreduce.TextExportMapper: Exception raised during data export 16/07/17 23:17:52 ERROR mapreduce.TextExportMapper: 16/07/17 23:17:52 ERROR mapreduce.TextExportMapper: Exception: java.lang.RuntimeException: Can't parse input data: '0' at employee.__loadFromFields(employee.java:335) at employee.parse(employee.java:268) at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:83) at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:39) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140) at org.apache.sqoop.mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:64) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330) at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.NoSuchElementException at java.util.ArrayList$Itr.next(ArrayList.java:854) at employee.__loadFromFields(employee.java:330) ... 13 more
テスト環境
今回の動作テストの環境は以下の通り。
- sqoop-1.4.5-cdh5.3.3
- ローカル環境
Sqoop Process Overview
まず、Sqoopの実行プロセス概要は以下のようになっている。RDBMSとHDFS間のデータ転送はMapReduceジョブで(Mapタスク)並列実行される。Mapタスクの数などはオプションで調整することができる。
Generated Sqoop ORM Class
Sqoopは、実行時にデータ転送のためのORMクラスを生成する。Sqoop Exportを実行してMySQLへデータ転送する例を挙げ、どのようなORMクラスが生成されるのかを見てみる。
まずは、以下のようなテーブルをMySQLに準備する。
create table employee ( id int primary key, name text, group_id int, sex tinyint ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
続いてexportコマンドでORMクラスの生成及びexport処理を実行してみる。今回は、ローカル環境でコマンドラインでなくIDE上でJavaから直接Sqoopクラスをコールする形で実行した。
package sample; import org.apache.sqoop.Sqoop; public class SqoopExportTest { public static void main(String[] args) { exportToMySQL("sample", "employee"); } static void exportToMySQL(String database, String table) { System.out.println("=> exportToMySQL"); String[] args = new String[] { "export", "--connect", "jdbc:mysql://localhost/" + database, "--username", "sqoop", "--password", "sqoop", "--export-dir", "input", "--input-null-string", "\\N", "--input-null-non-string", "\\N", "--table", table, "--input-fields-terminated-by", "package sample; import org.apache.sqoop.Sqoop; public class SqoopExportTest { public static void main(String[] args) { exportToMySQL("sample", "employee"); } static void exportToMySQL(String database, String table) { System.out.println("=> exportToMySQL"); String[] args = new String[] { "export", "--connect", "jdbc:mysql://localhost/" + database, "--username", "sqoop", "--password", "sqoop", "--export-dir", "input", "--input-null-string", "\\\\N", "--input-null-non-string", "\\\\N", "--table", table, "--input-fields-terminated-by", "\001", "--bindir", "output", "--outdir", "output" }; Sqoop.main(args); } }1", "--bindir", "output", "--outdir", "output" }; Sqoop.main(args); } }
実行するとoutputディレクトリに以下のファイルが生成される。
$ tree output/ output/ ├── employee.class ├── employee.jar └── employee.java 0 directories, 3 files
employee.javaがORMクラスになり中身は以下のとおり。
// ORM class for table 'employee' // WARNING: This class is AUTO-GENERATED. Modify at your own risk. // // Debug information: // Generated date: Sun Jul 17 12:28:31 JST 2016 // For connector: org.apache.sqoop.manager.MySQLManager import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.lib.db.DBWritable; import com.cloudera.sqoop.lib.JdbcWritableBridge; import com.cloudera.sqoop.lib.DelimiterSet; import com.cloudera.sqoop.lib.FieldFormatter; import com.cloudera.sqoop.lib.RecordParser; import com.cloudera.sqoop.lib.BooleanParser; import com.cloudera.sqoop.lib.BlobRef; import com.cloudera.sqoop.lib.ClobRef; import com.cloudera.sqoop.lib.LargeObjectLoader; import com.cloudera.sqoop.lib.SqoopRecord; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; public class employee extends SqoopRecord implements DBWritable, Writable { private final int PROTOCOL_VERSION = 3; public int getClassFormatVersion() { return PROTOCOL_VERSION; } protected ResultSet __cur_result_set; private Integer id; public Integer get_id() { return id; } public void set_id(Integer id) { this.id = id; } public employee with_id(Integer id) { this.id = id; return this; } private String name; public String get_name() { return name; } public void set_name(String name) { this.name = name; } public employee with_name(String name) { this.name = name; return this; } private Integer group_id; public Integer get_group_id() { return group_id; } public void set_group_id(Integer group_id) { this.group_id = group_id; } public employee with_group_id(Integer group_id) { this.group_id = group_id; return this; } private Integer sex; public Integer get_sex() { return sex; } public void set_sex(Integer sex) { this.sex = sex; } public employee with_sex(Integer sex) { this.sex = sex; return this; } public boolean equals(Object o) { if (this == o) { return true; } if (!(o instanceof employee)) { return false; } employee that = (employee) o; boolean equal = true; equal = equal && (this.id == null ? that.id == null : this.id.equals(that.id)); equal = equal && (this.name == null ? that.name == null : this.name.equals(that.name)); equal = equal && (this.group_id == null ? that.group_id == null : this.group_id.equals(that.group_id)); equal = equal && (this.sex == null ? that.sex == null : this.sex.equals(that.sex)); return equal; } public boolean equals0(Object o) { if (this == o) { return true; } if (!(o instanceof employee)) { return false; } employee that = (employee) o; boolean equal = true; equal = equal && (this.id == null ? that.id == null : this.id.equals(that.id)); equal = equal && (this.name == null ? that.name == null : this.name.equals(that.name)); equal = equal && (this.group_id == null ? that.group_id == null : this.group_id.equals(that.group_id)); equal = equal && (this.sex == null ? that.sex == null : this.sex.equals(that.sex)); return equal; } public void readFields(ResultSet __dbResults) throws SQLException { this.__cur_result_set = __dbResults; this.id = JdbcWritableBridge.readInteger(1, __dbResults); this.name = JdbcWritableBridge.readString(2, __dbResults); this.group_id = JdbcWritableBridge.readInteger(3, __dbResults); this.sex = JdbcWritableBridge.readInteger(4, __dbResults); } public void readFields0(ResultSet __dbResults) throws SQLException { this.id = JdbcWritableBridge.readInteger(1, __dbResults); this.name = JdbcWritableBridge.readString(2, __dbResults); this.group_id = JdbcWritableBridge.readInteger(3, __dbResults); this.sex = JdbcWritableBridge.readInteger(4, __dbResults); } public void loadLargeObjects(LargeObjectLoader __loader) throws SQLException, IOException, InterruptedException { } public void loadLargeObjects0(LargeObjectLoader __loader) throws SQLException, IOException, InterruptedException { } public void write(PreparedStatement __dbStmt) throws SQLException { write(__dbStmt, 0); } public int write(PreparedStatement __dbStmt, int __off) throws SQLException { JdbcWritableBridge.writeInteger(id, 1 + __off, 4, __dbStmt); JdbcWritableBridge.writeString(name, 2 + __off, -1, __dbStmt); JdbcWritableBridge.writeInteger(group_id, 3 + __off, 4, __dbStmt); JdbcWritableBridge.writeInteger(sex, 4 + __off, -6, __dbStmt); return 4; } public void write0(PreparedStatement __dbStmt, int __off) throws SQLException { JdbcWritableBridge.writeInteger(id, 1 + __off, 4, __dbStmt); JdbcWritableBridge.writeString(name, 2 + __off, -1, __dbStmt); JdbcWritableBridge.writeInteger(group_id, 3 + __off, 4, __dbStmt); JdbcWritableBridge.writeInteger(sex, 4 + __off, -6, __dbStmt); } public void readFields(DataInput __dataIn) throws IOException { this.readFields0(__dataIn); } public void readFields0(DataInput __dataIn) throws IOException { if (__dataIn.readBoolean()) { this.id = null; } else { this.id = Integer.valueOf(__dataIn.readInt()); } if (__dataIn.readBoolean()) { this.name = null; } else { this.name = Text.readString(__dataIn); } if (__dataIn.readBoolean()) { this.group_id = null; } else { this.group_id = Integer.valueOf(__dataIn.readInt()); } if (__dataIn.readBoolean()) { this.sex = null; } else { this.sex = Integer.valueOf(__dataIn.readInt()); } } public void write(DataOutput __dataOut) throws IOException { if (null == this.id) { __dataOut.writeBoolean(true); } else { __dataOut.writeBoolean(false); __dataOut.writeInt(this.id); } if (null == this.name) { __dataOut.writeBoolean(true); } else { __dataOut.writeBoolean(false); Text.writeString(__dataOut, name); } if (null == this.group_id) { __dataOut.writeBoolean(true); } else { __dataOut.writeBoolean(false); __dataOut.writeInt(this.group_id); } if (null == this.sex) { __dataOut.writeBoolean(true); } else { __dataOut.writeBoolean(false); __dataOut.writeInt(this.sex); } } public void write0(DataOutput __dataOut) throws IOException { if (null == this.id) { __dataOut.writeBoolean(true); } else { __dataOut.writeBoolean(false); __dataOut.writeInt(this.id); } if (null == this.name) { __dataOut.writeBoolean(true); } else { __dataOut.writeBoolean(false); Text.writeString(__dataOut, name); } if (null == this.group_id) { __dataOut.writeBoolean(true); } else { __dataOut.writeBoolean(false); __dataOut.writeInt(this.group_id); } if (null == this.sex) { __dataOut.writeBoolean(true); } else { __dataOut.writeBoolean(false); __dataOut.writeInt(this.sex); } } private static final DelimiterSet __outputDelimiters = new DelimiterSet((char) 44, (char) 10, (char) 0, (char) 0, false); public String toString() { return toString(__outputDelimiters, true); } public String toString(DelimiterSet delimiters) { return toString(delimiters, true); } public String toString(boolean useRecordDelim) { return toString(__outputDelimiters, useRecordDelim); } public String toString(DelimiterSet delimiters, boolean useRecordDelim) { StringBuilder __sb = new StringBuilder(); char fieldDelim = delimiters.getFieldsTerminatedBy(); __sb.append(FieldFormatter.escapeAndEnclose(id==null?"null":"" + id, delimiters)); __sb.append(fieldDelim); __sb.append(FieldFormatter.escapeAndEnclose(name==null?"null":name, delimiters)); __sb.append(fieldDelim); __sb.append(FieldFormatter.escapeAndEnclose(group_id==null?"null":"" + group_id, delimiters)); __sb.append(fieldDelim); __sb.append(FieldFormatter.escapeAndEnclose(sex==null?"null":"" + sex, delimiters)); if (useRecordDelim) { __sb.append(delimiters.getLinesTerminatedBy()); } return __sb.toString(); } public void toString0(DelimiterSet delimiters, StringBuilder __sb, char fieldDelim) { __sb.append(FieldFormatter.escapeAndEnclose(id==null?"null":"" + id, delimiters)); __sb.append(fieldDelim); __sb.append(FieldFormatter.escapeAndEnclose(name==null?"null":name, delimiters)); __sb.append(fieldDelim); __sb.append(FieldFormatter.escapeAndEnclose(group_id==null?"null":"" + group_id, delimiters)); __sb.append(fieldDelim); __sb.append(FieldFormatter.escapeAndEnclose(sex==null?"null":"" + sex, delimiters)); } private static final DelimiterSet __inputDelimiters = new DelimiterSet((char) 1, (char) 10, (char) 0, (char) 0, false); private RecordParser __parser; public void parse(Text __record) throws RecordParser.ParseError { if (null == this.__parser) { this.__parser = new RecordParser(__inputDelimiters); } List<String> __fields = this.__parser.parseRecord(__record); __loadFromFields(__fields); } public void parse(CharSequence __record) throws RecordParser.ParseError { if (null == this.__parser) { this.__parser = new RecordParser(__inputDelimiters); } List<String> __fields = this.__parser.parseRecord(__record); __loadFromFields(__fields); } public void parse(byte [] __record) throws RecordParser.ParseError { if (null == this.__parser) { this.__parser = new RecordParser(__inputDelimiters); } List<String> __fields = this.__parser.parseRecord(__record); __loadFromFields(__fields); } public void parse(char [] __record) throws RecordParser.ParseError { if (null == this.__parser) { this.__parser = new RecordParser(__inputDelimiters); } List<String> __fields = this.__parser.parseRecord(__record); __loadFromFields(__fields); } public void parse(ByteBuffer __record) throws RecordParser.ParseError { if (null == this.__parser) { this.__parser = new RecordParser(__inputDelimiters); } List<String> __fields = this.__parser.parseRecord(__record); __loadFromFields(__fields); } public void parse(CharBuffer __record) throws RecordParser.ParseError { if (null == this.__parser) { this.__parser = new RecordParser(__inputDelimiters); } List<String> __fields = this.__parser.parseRecord(__record); __loadFromFields(__fields); } private void __loadFromFields(List<String> fields) { Iterator<String> __it = fields.listIterator(); String __cur_str = null; try { __cur_str = __it.next(); if (__cur_str.equals("\\N") || __cur_str.length() == 0) { this.id = null; } else { this.id = Integer.valueOf(__cur_str); } __cur_str = __it.next(); if (__cur_str.equals("\\N")) { this.name = null; } else { this.name = __cur_str; } __cur_str = __it.next(); if (__cur_str.equals("\\N") || __cur_str.length() == 0) { this.group_id = null; } else { this.group_id = Integer.valueOf(__cur_str); } __cur_str = __it.next(); if (__cur_str.equals("\\N") || __cur_str.length() == 0) { this.sex = null; } else { this.sex = Integer.valueOf(__cur_str); } } catch (RuntimeException e) { throw new RuntimeException("Can't parse input data: '" + __cur_str + "'", e); } } private void __loadFromFields0(Iterator<String> __it) { String __cur_str = null; try { __cur_str = __it.next(); if (__cur_str.equals("\\N") || __cur_str.length() == 0) { this.id = null; } else { this.id = Integer.valueOf(__cur_str); } __cur_str = __it.next(); if (__cur_str.equals("\\N")) { this.name = null; } else { this.name = __cur_str; } __cur_str = __it.next(); if (__cur_str.equals("\\N") || __cur_str.length() == 0) { this.group_id = null; } else { this.group_id = Integer.valueOf(__cur_str); } __cur_str = __it.next(); if (__cur_str.equals("\\N") || __cur_str.length() == 0) { this.sex = null; } else { this.sex = Integer.valueOf(__cur_str); } } catch (RuntimeException e) { throw new RuntimeException("Can't parse input data: '" + __cur_str + "'", e); } } public Object clone() throws CloneNotSupportedException { employee o = (employee) super.clone(); return o; } public void clone0(employee o) throws CloneNotSupportedException { } public Map<String, Object> getFieldMap() { Map<String, Object> __sqoop$field_map = new TreeMap<String, Object>(); __sqoop$field_map.put("id", this.id); __sqoop$field_map.put("name", this.name); __sqoop$field_map.put("group_id", this.group_id); __sqoop$field_map.put("sex", this.sex); return __sqoop$field_map; } public void getFieldMap0(Map<String, Object> __sqoop$field_map) { __sqoop$field_map.put("id", this.id); __sqoop$field_map.put("name", this.name); __sqoop$field_map.put("group_id", this.group_id); __sqoop$field_map.put("sex", this.sex); } public void setField(String __fieldName, Object __fieldVal) { if ("id".equals(__fieldName)) { this.id = (Integer) __fieldVal; } else if ("name".equals(__fieldName)) { this.name = (String) __fieldVal; } else if ("group_id".equals(__fieldName)) { this.group_id = (Integer) __fieldVal; } else if ("sex".equals(__fieldName)) { this.sex = (Integer) __fieldVal; } else { throw new RuntimeException("No such field: " + __fieldName); } } public boolean setField0(String __fieldName, Object __fieldVal) { if ("id".equals(__fieldName)) { this.id = (Integer) __fieldVal; return true; } else if ("name".equals(__fieldName)) { this.name = (String) __fieldVal; return true; } else if ("group_id".equals(__fieldName)) { this.group_id = (Integer) __fieldVal; return true; } else if ("sex".equals(__fieldName)) { this.sex = (Integer) __fieldVal; return true; } else { return false; } } }
Export処理でのエラーを探る
今回エラーとなったNUL文字を含むデータとして以下のようなサンプルデータを用意した。^A
や^@
は制御コードを示している。
入力データ
1^Ahaikikyou^A100^A0 2^A^@^A200^A1 3^A\N^A300^A1 4^Adeife^A400^A1 5^Adffife^A800^A1
レコードのparse
ソースの中でExceptionがスローされたのは以下の箇所。ソース中のコメントにも書いているが、レコードを行単位で処理していく(入力データは標準的なテキストファイルを想定)際にレコードをparseしてフィールドのリストとして返すところで、メソッドから返されたリストの要素数がテーブルメタ情報から取得したカラム数と一致していないことでエラーとなっていたもよう。__loadFromFieldsメソッドでIteratorを使って要素にアクセスするが、リストのフィールド数がカラム数より小さいため、リストの終端を超えた要素アクセスが発生しjava.util.NoSuchElementExceptionがスローされている。
... // 入力レコードに関する区切り文字などの指定 private static final DelimiterSet __inputDelimiters = new DelimiterSet((char) 1, (char) 10, (char) 0, (char) 0, false); private RecordParser __parser; public void parse(Text __record) throws RecordParser.ParseError { if (null == this.__parser) { this.__parser = new RecordParser(__inputDelimiters); } // RecordParserにより各フィールドごとに分割されたリストが返されるが // NUL文字が含まれる場合、parseRecordで戻されるフィールド数がテーブル情報と // 一致していない List<String> __fields = this.__parser.parseRecord(__record); // 各種フィールドをemployeeインスタンスメンバにリストアする __loadFromFields(__fields); } ...
さらに、RecordParser#parseRecordを追ってみる。parseRecordはステートマシンで実装されており、行レコードをパースしてフィールドのリストに分解する。以下は、メソッドの処理を簡略化したものになる。
... public List<String> parseRecord(CharBuffer input) { ... char enclosingChar = delimiters.getEnclosedBy(); char fieldDelim = delimiters.getFieldsTerminatedBy(); char recordDelim = delimiters.getLinesTerminatedBy(); char escapeChar = delimiters.getEscapedBy(); boolean enclosingRequired = delimiters.isEncloseRequired(); for (int pos = 0; pos < len; pos++) { curChar = input.get(); switch (state) { case FIELD_START: // ready to start processing a new field. if (null != sb) { // We finished processing a previous field. Add to the list. outputs.add(sb.toString()); } sb = new StringBuilder(); if (enclosingChar == curChar) { // got an opening encloser. state = ParseState.ENCLOSED_FIELD; } else if (escapeChar == curChar) { state = ParseState.UNENCLOSED_ESCAPE; } else if (fieldDelim == curChar) { // we have a zero-length field. This is a no-op. continue; } else if (recordDelim == curChar) { // we have a zero-length field, that ends processing. pos = len; } else { // current char is part of the field. state = ParseState.UNENCLOSED_FIELD; sb.append(curChar); if (enclosingRequired) { throw new com.cloudera.sqoop.lib.RecordParser.ParseError( "Opening field-encloser expected at position " + pos); } } break; case ENCLOSED_FIELD: ... break; case UNENCLOSED_FIELD: ... break; case ENCLOSED_ESCAPE: ... break; case ENCLOSED_EXPECT_DELIMITER: ... break; case UNENCLOSED_ESCAPE: ... break; default: throw new com.cloudera.sqoop.lib.RecordParser.ParseError( "Unexpected parser state: " + state); } } ... }
上記のステートマシンの状態遷移図を書くと以下のようになるだろうか。
Start X + fieldDelim | fieldDelim recordDelim +-+ | +---------------------+ +--+ | | | | | | | +v-+---v--v-+ +---+------+--v--+ +----------->FIELD_START+--------------->UNENCLOSED_FIELD+------+ | +^-+--+--+--+ +-----^----+--^--+ | | | | | | | | | | | +-+ | | | +--+ | | recordDelim | | | |escapeChar | | | +--------------+ | | | | | | | enclosingChar| | escapeChar | | | | +------------------------------------+ | | | +----+ | | | | | | | | | | | | | +-+-v----------+ +--v--+--v--------+ | +-->ENCLOSED_FIELD<------------+ |UNENCLOSED_ESCAPE| | +---+--+-------+ | +-----------------+ | | | | |fieldDelim | | | | | | | | | | escapeChar +---+-----------+ | | +---------------->ENCLOSED_ESCAPE| | | +---------------+ | |enclosingChar | | | +---v---------------------+ +-+ +-------------+ENCLOSED_EXPECT_DELIMITER+-------->X|throw ParseError +-----------------^--+----+ +-+ | | +--+ recordDelim
この状態遷移図をもとに、今回ParseErrorとなったレコードの遷移を見てみる。
INPUT RECORD 2^A^@^A200^A1 ------------------------------------------------- (2) ^A +----------------------+ | | +-------v---+ (1) 2 +------+---------+ |FIELD_START+----------->UNENCLOSED_FIELD| +-------+---+ +----------------+ | (3) ^@ | | | +--------------+ +-->ENCLOSED_FIELD| +-----+--^-----+ | | +--+ (4)...
NUL文字はenclosingCharのデフォルト値として使われているため、今回のようにレコードにNUL文字が含まれていた場合、本来FILED_START -> UNENCLOSED_FIELDと遷移することを期待していたが、FIELD_STARTの状態時にENCLOSED_FIELDに遷移する入力(enclosingCharトークン)が出現したため期待と異なる状態へ遷移してしまっていた。この結果、最終的にListとして返されたデータは以下のようになっていた。
Expected ----- fields[0] = 2 fields[1] = ^@ fields[2] = 200 fields[3] = 1 UnExpected ----- fields[0] = 2 fields[1] = ^A200^A1
不正文字を含むレコードのParseErrorへの対処
ORMクラス内で文字列置換
なかなか難儀だが、自動生成されたORMクラス内でparseRecordされる前に置換する方法。修正したソースを事前にコンパイルして、exportやimportコマンド実行時に--jar-file
オプションで渡してやる。
public void parse(Text __record) throws RecordParser.ParseError { if (null == this.__parser) { this.__parser = new RecordParser(__inputDelimiters); } // replace invalid characters with '?'. // 不正文字の置換 __record = new Text(__record.toString().replaceAll("\001", "?")); List<String> __fields = this.__parser.parseRecord(__record); __loadFromFields(__fields); }
enclosingChar、escapeCharの変更
もう1つは、極力sqoopの範囲で対応してあげる方法があるだろうか。デフォルトではNUL文字となっているenclosingCharやescapeCharを処理上影響のない文字に変更する。Exportの場合は、--input-escaped-by
、--input-optionally-enclosed-by
で指定できる。例えば以下のようになる。
package sample; import org.apache.sqoop.Sqoop; public class SqoopExportTest { public static void main(String[] args) { exportToMySQL("sample", "employee"); } static void exportToMySQL(String database, String table) { System.out.println("=> exportToMySQL"); String[] args = new String[] { "export", "--connect", "jdbc:mysql://localhost/" + database, "--username", "sqoop", "--password", "sqoop", "--export-dir", "input", "--input-null-string", "\\\\N", "--input-null-non-string", "\\\\N", "--table", table, "--input-fields-terminated-by", "\001", "--bindir", "output/bin", "--outdir", "output", "--input-escaped-by", "\032", "--input-optionally-enclosed-by", "\032", "--verbose" }; Sqoop.main(args); } }
実行するとmysqlには以下のようにデータが入っていることが確認できた。
mysql> select * from employee; +----+-----------+----------+------+ | id | name | group_id | sex | +----+-----------+----------+------+ | 1 | haikikyou | 100 | 0 | | 2 | | 200 | 1 | | 3 | NULL | 300 | 1 | | 4 | deife | 400 | 1 | | 5 | dffife | 800 | 1 | +----+-----------+----------+------+ 5 rows in set (0.00 sec)
例えば、RDBMSからHDFSへ--hive-drop-import-delims
オプション付きでhiveインポートして、さらに別のサービスで同テーブルデータを使いたくexportするような場合には、--hive-drop-import-delims
でドロップされた文字を--input-escaped-by
などのオプションに渡してあげれば、export時にドロップされた文字が出現しないことが保証されるだろう。
RDBMS ----------------> HDFS --------------> RDMBS import with export with --hive-drop-import-delims --input-escaped-by "\n" --input-optionally-enclosed-by "\n"
https://sqoop.apache.org/docs/1.4.5/SqoopUserGuide.html#_importing_data_into_hive
ということで、レコード内にsqoopのパースに影響のある文字列が含まれる場合は注意する。事前に不正な文字列が除去できているのが望ましいと思う。
参考リンク
- https://sqoop.apache.org/docs/1.4.5/SqoopUserGuide.html
オライリージャパン
売り上げランキング: 246,670