Parse error can be caused due to NUL character included in records with Sqoop

Sqoopを使ってRDBMとHadoop間でデータ転送を行なっている。Sqoopは、MapReduceジョブを実行し複数のMapタスクで並列にインポートやエクスポートを行なう。また、Sqoopはコマンドラインからsqoopコマンドを実行するだけで、データ転送に必要なテーブルのメタ情報の取得やRDBMとHadoop間でデータのやり取りをするインターフェースプログラムの生成やコンパイル、各Mapタスクへのデータスプリット計算などを内部で行ってくれるので、実行も非常に容易である。コマンドラインオプションも豊富に用意されており、多くの場合はそれらの組み合わせで十分対応できる。

Parse Error with Export

で、今回通常どおりSqoopを使ってデータ転送処理を実行していたところ、Sqoop Exportの実行時に以下のようなエラーになってしまった(以下のエラーはローカルで再現させたもの)。成功したMapタスクのデータは転送に成功しているが、失敗したMapタスクのデータは完全に転送されないためデータが不完全な状態になってしまう。parseに失敗しているデータを調べたところ、対象レコードにNUL(\00)文字が含まれていたようだった。そこで、Sqoopの内部動作を追いながら回避方法を模索してみた。

16/07/17 23:17:52 ERROR mapreduce.TextExportMapper: 
16/07/17 23:17:52 ERROR mapreduce.TextExportMapper: Exception raised during data export
16/07/17 23:17:52 ERROR mapreduce.TextExportMapper: 
16/07/17 23:17:52 ERROR mapreduce.TextExportMapper: Exception: 
java.lang.RuntimeException: Can't parse input data: '0'
	at employee.__loadFromFields(employee.java:335)
	at employee.parse(employee.java:268)
	at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:83)
	at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:39)
	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
	at org.apache.sqoop.mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:64)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
	at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
	at java.util.ArrayList$Itr.next(ArrayList.java:854)
	at employee.__loadFromFields(employee.java:330)
	... 13 more

テスト環境

今回の動作テストの環境は以下の通り。

  • sqoop-1.4.5-cdh5.3.3
  • ローカル環境

Sqoop Process Overview

まず、Sqoopの実行プロセス概要は以下のようになっている。RDBMSとHDFS間のデータ転送はMapReduceジョブで(Mapタスク)並列実行される。Mapタスクの数などはオプションで調整することができる。

sqoop-proess

Generated Sqoop ORM Class

Sqoopは、実行時にデータ転送のためのORMクラスを生成する。Sqoop Exportを実行してMySQLへデータ転送する例を挙げ、どのようなORMクラスが生成されるのかを見てみる。

まずは、以下のようなテーブルをMySQLに準備する。

create table employee (
  id int primary key,
  name text,
  group_id int,
  sex tinyint
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

続いてexportコマンドでORMクラスの生成及びexport処理を実行してみる。今回は、ローカル環境でコマンドラインでなくIDE上でJavaから直接Sqoopクラスをコールする形で実行した。

package sample;

import org.apache.sqoop.Sqoop;

public class SqoopExportTest {
    public static void main(String[] args) {
        exportToMySQL("sample", "employee");
    }

    static void exportToMySQL(String database, String table) {
        System.out.println("=> exportToMySQL");
        String[] args = new String[] {
            "export",
            "--connect", "jdbc:mysql://localhost/" + database,
            "--username", "sqoop",
            "--password", "sqoop",
            "--export-dir", "input",
            "--input-null-string", "\\N",
            "--input-null-non-string", "\\N",
            "--table", table,
            "--input-fields-terminated-by", "
package sample;
import org.apache.sqoop.Sqoop;
public class SqoopExportTest {
public static void main(String[] args) {
exportToMySQL("sample", "employee");
}
static void exportToMySQL(String database, String table) {
System.out.println("=> exportToMySQL");
String[] args = new String[] {
"export",
"--connect", "jdbc:mysql://localhost/" + database,
"--username", "sqoop",
"--password", "sqoop",
"--export-dir", "input",
"--input-null-string", "\\\\N",
"--input-null-non-string", "\\\\N",
"--table", table,
"--input-fields-terminated-by", "\001",
"--bindir", "output",
"--outdir", "output"
};
Sqoop.main(args);
}
}
1", "--bindir", "output", "--outdir", "output" }; Sqoop.main(args); } }

実行するとoutputディレクトリに以下のファイルが生成される。

$ tree output/
output/
├── employee.class
├── employee.jar
└── employee.java

0 directories, 3 files

employee.javaがORMクラスになり中身は以下のとおり。

// ORM class for table 'employee'
// WARNING: This class is AUTO-GENERATED. Modify at your own risk.
//
// Debug information:
// Generated date: Sun Jul 17 12:28:31 JST 2016
// For connector: org.apache.sqoop.manager.MySQLManager
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import com.cloudera.sqoop.lib.JdbcWritableBridge;
import com.cloudera.sqoop.lib.DelimiterSet;
import com.cloudera.sqoop.lib.FieldFormatter;
import com.cloudera.sqoop.lib.RecordParser;
import com.cloudera.sqoop.lib.BooleanParser;
import com.cloudera.sqoop.lib.BlobRef;
import com.cloudera.sqoop.lib.ClobRef;
import com.cloudera.sqoop.lib.LargeObjectLoader;
import com.cloudera.sqoop.lib.SqoopRecord;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
public class employee extends SqoopRecord  implements DBWritable, Writable {
private final int PROTOCOL_VERSION = 3;
public int getClassFormatVersion() { return PROTOCOL_VERSION; }
protected ResultSet __cur_result_set;
private Integer id;
public Integer get_id() {
return id;
}
public void set_id(Integer id) {
this.id = id;
}
public employee with_id(Integer id) {
this.id = id;
return this;
}
private String name;
public String get_name() {
return name;
}
public void set_name(String name) {
this.name = name;
}
public employee with_name(String name) {
this.name = name;
return this;
}
private Integer group_id;
public Integer get_group_id() {
return group_id;
}
public void set_group_id(Integer group_id) {
this.group_id = group_id;
}
public employee with_group_id(Integer group_id) {
this.group_id = group_id;
return this;
}
private Integer sex;
public Integer get_sex() {
return sex;
}
public void set_sex(Integer sex) {
this.sex = sex;
}
public employee with_sex(Integer sex) {
this.sex = sex;
return this;
}
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof employee)) {
return false;
}
employee that = (employee) o;
boolean equal = true;
equal = equal && (this.id == null ? that.id == null : this.id.equals(that.id));
equal = equal && (this.name == null ? that.name == null : this.name.equals(that.name));
equal = equal && (this.group_id == null ? that.group_id == null : this.group_id.equals(that.group_id));
equal = equal && (this.sex == null ? that.sex == null : this.sex.equals(that.sex));
return equal;
}
public boolean equals0(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof employee)) {
return false;
}
employee that = (employee) o;
boolean equal = true;
equal = equal && (this.id == null ? that.id == null : this.id.equals(that.id));
equal = equal && (this.name == null ? that.name == null : this.name.equals(that.name));
equal = equal && (this.group_id == null ? that.group_id == null : this.group_id.equals(that.group_id));
equal = equal && (this.sex == null ? that.sex == null : this.sex.equals(that.sex));
return equal;
}
public void readFields(ResultSet __dbResults) throws SQLException {
this.__cur_result_set = __dbResults;
this.id = JdbcWritableBridge.readInteger(1, __dbResults);
this.name = JdbcWritableBridge.readString(2, __dbResults);
this.group_id = JdbcWritableBridge.readInteger(3, __dbResults);
this.sex = JdbcWritableBridge.readInteger(4, __dbResults);
}
public void readFields0(ResultSet __dbResults) throws SQLException {
this.id = JdbcWritableBridge.readInteger(1, __dbResults);
this.name = JdbcWritableBridge.readString(2, __dbResults);
this.group_id = JdbcWritableBridge.readInteger(3, __dbResults);
this.sex = JdbcWritableBridge.readInteger(4, __dbResults);
}
public void loadLargeObjects(LargeObjectLoader __loader)
throws SQLException, IOException, InterruptedException {
}
public void loadLargeObjects0(LargeObjectLoader __loader)
throws SQLException, IOException, InterruptedException {
}
public void write(PreparedStatement __dbStmt) throws SQLException {
write(__dbStmt, 0);
}
public int write(PreparedStatement __dbStmt, int __off) throws SQLException {
JdbcWritableBridge.writeInteger(id, 1 + __off, 4, __dbStmt);
JdbcWritableBridge.writeString(name, 2 + __off, -1, __dbStmt);
JdbcWritableBridge.writeInteger(group_id, 3 + __off, 4, __dbStmt);
JdbcWritableBridge.writeInteger(sex, 4 + __off, -6, __dbStmt);
return 4;
}
public void write0(PreparedStatement __dbStmt, int __off) throws SQLException {
JdbcWritableBridge.writeInteger(id, 1 + __off, 4, __dbStmt);
JdbcWritableBridge.writeString(name, 2 + __off, -1, __dbStmt);
JdbcWritableBridge.writeInteger(group_id, 3 + __off, 4, __dbStmt);
JdbcWritableBridge.writeInteger(sex, 4 + __off, -6, __dbStmt);
}
public void readFields(DataInput __dataIn) throws IOException {
this.readFields0(__dataIn);  }
public void readFields0(DataInput __dataIn) throws IOException {
if (__dataIn.readBoolean()) { 
this.id = null;
} else {
this.id = Integer.valueOf(__dataIn.readInt());
}
if (__dataIn.readBoolean()) { 
this.name = null;
} else {
this.name = Text.readString(__dataIn);
}
if (__dataIn.readBoolean()) { 
this.group_id = null;
} else {
this.group_id = Integer.valueOf(__dataIn.readInt());
}
if (__dataIn.readBoolean()) { 
this.sex = null;
} else {
this.sex = Integer.valueOf(__dataIn.readInt());
}
}
public void write(DataOutput __dataOut) throws IOException {
if (null == this.id) { 
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
__dataOut.writeInt(this.id);
}
if (null == this.name) { 
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
Text.writeString(__dataOut, name);
}
if (null == this.group_id) { 
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
__dataOut.writeInt(this.group_id);
}
if (null == this.sex) { 
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
__dataOut.writeInt(this.sex);
}
}
public void write0(DataOutput __dataOut) throws IOException {
if (null == this.id) { 
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
__dataOut.writeInt(this.id);
}
if (null == this.name) { 
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
Text.writeString(__dataOut, name);
}
if (null == this.group_id) { 
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
__dataOut.writeInt(this.group_id);
}
if (null == this.sex) { 
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
__dataOut.writeInt(this.sex);
}
}
private static final DelimiterSet __outputDelimiters = new DelimiterSet((char) 44, (char) 10, (char) 0, (char) 0, false);
public String toString() {
return toString(__outputDelimiters, true);
}
public String toString(DelimiterSet delimiters) {
return toString(delimiters, true);
}
public String toString(boolean useRecordDelim) {
return toString(__outputDelimiters, useRecordDelim);
}
public String toString(DelimiterSet delimiters, boolean useRecordDelim) {
StringBuilder __sb = new StringBuilder();
char fieldDelim = delimiters.getFieldsTerminatedBy();
__sb.append(FieldFormatter.escapeAndEnclose(id==null?"null":"" + id, delimiters));
__sb.append(fieldDelim);
__sb.append(FieldFormatter.escapeAndEnclose(name==null?"null":name, delimiters));
__sb.append(fieldDelim);
__sb.append(FieldFormatter.escapeAndEnclose(group_id==null?"null":"" + group_id, delimiters));
__sb.append(fieldDelim);
__sb.append(FieldFormatter.escapeAndEnclose(sex==null?"null":"" + sex, delimiters));
if (useRecordDelim) {
__sb.append(delimiters.getLinesTerminatedBy());
}
return __sb.toString();
}
public void toString0(DelimiterSet delimiters, StringBuilder __sb, char fieldDelim) {
__sb.append(FieldFormatter.escapeAndEnclose(id==null?"null":"" + id, delimiters));
__sb.append(fieldDelim);
__sb.append(FieldFormatter.escapeAndEnclose(name==null?"null":name, delimiters));
__sb.append(fieldDelim);
__sb.append(FieldFormatter.escapeAndEnclose(group_id==null?"null":"" + group_id, delimiters));
__sb.append(fieldDelim);
__sb.append(FieldFormatter.escapeAndEnclose(sex==null?"null":"" + sex, delimiters));
}
private static final DelimiterSet __inputDelimiters = new DelimiterSet((char) 1, (char) 10, (char) 0, (char) 0, false);
private RecordParser __parser;
public void parse(Text __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
List<String> __fields = this.__parser.parseRecord(__record);
__loadFromFields(__fields);
}
public void parse(CharSequence __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
List<String> __fields = this.__parser.parseRecord(__record);
__loadFromFields(__fields);
}
public void parse(byte [] __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
List<String> __fields = this.__parser.parseRecord(__record);
__loadFromFields(__fields);
}
public void parse(char [] __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
List<String> __fields = this.__parser.parseRecord(__record);
__loadFromFields(__fields);
}
public void parse(ByteBuffer __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
List<String> __fields = this.__parser.parseRecord(__record);
__loadFromFields(__fields);
}
public void parse(CharBuffer __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
List<String> __fields = this.__parser.parseRecord(__record);
__loadFromFields(__fields);
}
private void __loadFromFields(List<String> fields) {
Iterator<String> __it = fields.listIterator();
String __cur_str = null;
try {
__cur_str = __it.next();
if (__cur_str.equals("\\N") || __cur_str.length() == 0) { this.id = null; } else {
this.id = Integer.valueOf(__cur_str);
}
__cur_str = __it.next();
if (__cur_str.equals("\\N")) { this.name = null; } else {
this.name = __cur_str;
}
__cur_str = __it.next();
if (__cur_str.equals("\\N") || __cur_str.length() == 0) { this.group_id = null; } else {
this.group_id = Integer.valueOf(__cur_str);
}
__cur_str = __it.next();
if (__cur_str.equals("\\N") || __cur_str.length() == 0) { this.sex = null; } else {
this.sex = Integer.valueOf(__cur_str);
}
} catch (RuntimeException e) {    throw new RuntimeException("Can't parse input data: '" + __cur_str + "'", e);    }  }
private void __loadFromFields0(Iterator<String> __it) {
String __cur_str = null;
try {
__cur_str = __it.next();
if (__cur_str.equals("\\N") || __cur_str.length() == 0) { this.id = null; } else {
this.id = Integer.valueOf(__cur_str);
}
__cur_str = __it.next();
if (__cur_str.equals("\\N")) { this.name = null; } else {
this.name = __cur_str;
}
__cur_str = __it.next();
if (__cur_str.equals("\\N") || __cur_str.length() == 0) { this.group_id = null; } else {
this.group_id = Integer.valueOf(__cur_str);
}
__cur_str = __it.next();
if (__cur_str.equals("\\N") || __cur_str.length() == 0) { this.sex = null; } else {
this.sex = Integer.valueOf(__cur_str);
}
} catch (RuntimeException e) {    throw new RuntimeException("Can't parse input data: '" + __cur_str + "'", e);    }  }
public Object clone() throws CloneNotSupportedException {
employee o = (employee) super.clone();
return o;
}
public void clone0(employee o) throws CloneNotSupportedException {
}
public Map<String, Object> getFieldMap() {
Map<String, Object> __sqoop$field_map = new TreeMap<String, Object>();
__sqoop$field_map.put("id", this.id);
__sqoop$field_map.put("name", this.name);
__sqoop$field_map.put("group_id", this.group_id);
__sqoop$field_map.put("sex", this.sex);
return __sqoop$field_map;
}
public void getFieldMap0(Map<String, Object> __sqoop$field_map) {
__sqoop$field_map.put("id", this.id);
__sqoop$field_map.put("name", this.name);
__sqoop$field_map.put("group_id", this.group_id);
__sqoop$field_map.put("sex", this.sex);
}
public void setField(String __fieldName, Object __fieldVal) {
if ("id".equals(__fieldName)) {
this.id = (Integer) __fieldVal;
}
else    if ("name".equals(__fieldName)) {
this.name = (String) __fieldVal;
}
else    if ("group_id".equals(__fieldName)) {
this.group_id = (Integer) __fieldVal;
}
else    if ("sex".equals(__fieldName)) {
this.sex = (Integer) __fieldVal;
}
else {
throw new RuntimeException("No such field: " + __fieldName);
}
}
public boolean setField0(String __fieldName, Object __fieldVal) {
if ("id".equals(__fieldName)) {
this.id = (Integer) __fieldVal;
return true;
}
else    if ("name".equals(__fieldName)) {
this.name = (String) __fieldVal;
return true;
}
else    if ("group_id".equals(__fieldName)) {
this.group_id = (Integer) __fieldVal;
return true;
}
else    if ("sex".equals(__fieldName)) {
this.sex = (Integer) __fieldVal;
return true;
}
else {
return false;    }
}
}

Export処理でのエラーを探る

今回エラーとなったNUL文字を含むデータとして以下のようなサンプルデータを用意した。^A^@は制御コードを示している。

入力データ

1^Ahaikikyou^A100^A0
2^A^@^A200^A1
3^A\N^A300^A1
4^Adeife^A400^A1
5^Adffife^A800^A1

レコードのparse

ソースの中でExceptionがスローされたのは以下の箇所。ソース中のコメントにも書いているが、レコードを行単位で処理していく(入力データは標準的なテキストファイルを想定)際にレコードをparseしてフィールドのリストとして返すところで、メソッドから返されたリストの要素数がテーブルメタ情報から取得したカラム数と一致していないことでエラーとなっていたもよう。__loadFromFieldsメソッドでIteratorを使って要素にアクセスするが、リストのフィールド数がカラム数より小さいため、リストの終端を超えた要素アクセスが発生しjava.util.NoSuchElementExceptionがスローされている。

...
// 入力レコードに関する区切り文字などの指定
private static final DelimiterSet __inputDelimiters = new DelimiterSet((char) 1, (char) 10, (char) 0, (char) 0, false);
private RecordParser __parser;
public void parse(Text __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
// RecordParserにより各フィールドごとに分割されたリストが返されるが
// NUL文字が含まれる場合、parseRecordで戻されるフィールド数がテーブル情報と
// 一致していない
List<String> __fields = this.__parser.parseRecord(__record);
// 各種フィールドをemployeeインスタンスメンバにリストアする
__loadFromFields(__fields);
}
...

さらに、RecordParser#parseRecordを追ってみる。parseRecordはステートマシンで実装されており、行レコードをパースしてフィールドのリストに分解する。以下は、メソッドの処理を簡略化したものになる。

...
public List<String> parseRecord(CharBuffer input) {
...
char enclosingChar = delimiters.getEnclosedBy();
char fieldDelim = delimiters.getFieldsTerminatedBy();
char recordDelim = delimiters.getLinesTerminatedBy();
char escapeChar = delimiters.getEscapedBy();
boolean enclosingRequired = delimiters.isEncloseRequired();
for (int pos = 0; pos < len; pos++) {
curChar = input.get();
switch (state) {
case FIELD_START:
// ready to start processing a new field.
if (null != sb) {
// We finished processing a previous field. Add to the list.
outputs.add(sb.toString());
}
sb = new StringBuilder();
if (enclosingChar == curChar) {
// got an opening encloser.
state = ParseState.ENCLOSED_FIELD;
} else if (escapeChar == curChar) {
state = ParseState.UNENCLOSED_ESCAPE;
} else if (fieldDelim == curChar) {
// we have a zero-length field. This is a no-op.
continue;
} else if (recordDelim == curChar) {
// we have a zero-length field, that ends processing.
pos = len;
} else {
// current char is part of the field.
state = ParseState.UNENCLOSED_FIELD;
sb.append(curChar);
if (enclosingRequired) {
throw new com.cloudera.sqoop.lib.RecordParser.ParseError(
"Opening field-encloser expected at position " + pos);
}
}
break;
case ENCLOSED_FIELD:
...
break;
case UNENCLOSED_FIELD:
...
break;
case ENCLOSED_ESCAPE:
...
break;
case ENCLOSED_EXPECT_DELIMITER:
...
break;
case UNENCLOSED_ESCAPE:
...
break;
default:
throw new com.cloudera.sqoop.lib.RecordParser.ParseError(
"Unexpected parser state: " + state);
}
}
...
}

上記のステートマシンの状態遷移図を書くと以下のようになるだろうか。

                 Start
X
+
fieldDelim |            fieldDelim       recordDelim
+-+   |  +---------------------+      +--+
| |   |  |                     |      |  |
+v-+---v--v-+               +---+------+--v--+
+----------->FIELD_START+--------------->UNENCLOSED_FIELD+------+
|           +^-+--+--+--+               +-----^----+--^--+      |
|            | |  |  |                        |    |  |         |
|            +-+  |  |                        |    +--+         |
|    recordDelim  |  |                        |                 |escapeChar
|                 |  |                        +--------------+  |
|                 |  |                                       |  |
|    enclosingChar|  |               escapeChar              |  |
|                 |  +------------------------------------+  |  |
|          +----+ |                                       |  |  |
|          |    | |                                       |  |  |
|          |  +-+-v----------+                         +--v--+--v--------+
|          +-->ENCLOSED_FIELD<------------+            |UNENCLOSED_ESCAPE|
|             +---+--+-------+            |            +-----------------+
|                 |  |                    |
|fieldDelim       |  |                    |
|                 |  |                    |
|                 |  |  escapeChar    +---+-----------+
|                 |  +---------------->ENCLOSED_ESCAPE|
|                 |                   +---------------+
|                 |enclosingChar
|                 |
|             +---v---------------------+        +-+
+-------------+ENCLOSED_EXPECT_DELIMITER+-------->X|throw ParseError
+-----------------^--+----+        +-+
|  |
+--+
recordDelim

この状態遷移図をもとに、今回ParseErrorとなったレコードの遷移を見てみる。

INPUT RECORD
2^A^@^A200^A1
-------------------------------------------------
(2) ^A
+----------------------+
|                      |
+-------v---+ (1) 2     +------+---------+
|FIELD_START+----------->UNENCLOSED_FIELD|
+-------+---+           +----------------+
|
(3) ^@ |
|
|  +--------------+
+-->ENCLOSED_FIELD|
+-----+--^-----+
|  |
+--+
(4)...

NUL文字はenclosingCharのデフォルト値として使われているため、今回のようにレコードにNUL文字が含まれていた場合、本来FILED_START -> UNENCLOSED_FIELDと遷移することを期待していたが、FIELD_STARTの状態時にENCLOSED_FIELDに遷移する入力(enclosingCharトークン)が出現したため期待と異なる状態へ遷移してしまっていた。この結果、最終的にListとして返されたデータは以下のようになっていた。

Expected -----
fields[0] = 2
fields[1] = ^@
fields[2] = 200
fields[3] = 1
UnExpected -----
fields[0] = 2
fields[1] = ^A200^A1

不正文字を含むレコードのParseErrorへの対処

ORMクラス内で文字列置換

なかなか難儀だが、自動生成されたORMクラス内でparseRecordされる前に置換する方法。修正したソースを事前にコンパイルして、exportやimportコマンド実行時に--jar-fileオプションで渡してやる。

  public void parse(Text __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
// replace invalid characters with '?'. 
// 不正文字の置換
__record = new Text(__record.toString().replaceAll("\001", "?"));
List<String> __fields = this.__parser.parseRecord(__record);
__loadFromFields(__fields);
}

enclosingChar、escapeCharの変更

もう1つは、極力sqoopの範囲で対応してあげる方法があるだろうか。デフォルトではNUL文字となっているenclosingCharやescapeCharを処理上影響のない文字に変更する。Exportの場合は、--input-escaped-by--input-optionally-enclosed-byで指定できる。例えば以下のようになる。

package sample;
import org.apache.sqoop.Sqoop;
public class SqoopExportTest {
public static void main(String[] args) {
exportToMySQL("sample", "employee");
}
static void exportToMySQL(String database, String table) {
System.out.println("=> exportToMySQL");
String[] args = new String[] {
"export",
"--connect", "jdbc:mysql://localhost/" + database,
"--username", "sqoop",
"--password", "sqoop",
"--export-dir", "input",
"--input-null-string", "\\\\N",
"--input-null-non-string", "\\\\N",
"--table", table,
"--input-fields-terminated-by", "\001",
"--bindir", "output/bin",
"--outdir", "output",
"--input-escaped-by", "\032",
"--input-optionally-enclosed-by", "\032",
"--verbose"
};
Sqoop.main(args);
}
}

実行するとmysqlには以下のようにデータが入っていることが確認できた。

mysql> select * from employee;
+----+-----------+----------+------+
| id | name      | group_id | sex  |
+----+-----------+----------+------+
|  1 | haikikyou |      100 |    0 |
|  2 |           |      200 |    1 |
|  3 | NULL      |      300 |    1 |
|  4 | deife     |      400 |    1 |
|  5 | dffife    |      800 |    1 |
+----+-----------+----------+------+
5 rows in set (0.00 sec)

例えば、RDBMSからHDFSへ--hive-drop-import-delimsオプション付きでhiveインポートして、さらに別のサービスで同テーブルデータを使いたくexportするような場合には、--hive-drop-import-delimsでドロップされた文字を--input-escaped-byなどのオプションに渡してあげれば、export時にドロップされた文字が出現しないことが保証されるだろう。

RDBMS ----------------> HDFS --------------> RDMBS
import with                   export with
--hive-drop-import-delims      --input-escaped-by "\n"
--input-optionally-enclosed-by "\n"

https://sqoop.apache.org/docs/1.4.5/SqoopUserGuide.html#_importing_data_into_hive

ということで、レコード内にsqoopのパースに影響のある文字列が含まれる場合は注意する。事前に不正な文字列が除去できているのが望ましいと思う。

参考リンク

  • https://sqoop.apache.org/docs/1.4.5/SqoopUserGuide.html
Hadoop 第3版

Hadoop 第3版

posted with amazlet at 16.07.17
Tom White
オライリージャパン
売り上げランキング: 246,670

byebyehaikikyou

日記やIT系関連のネタ、WordPressに関することなど様々な事柄を書き付けた雑記です。ITエンジニア経験があるのでプログラミングに関することなどが多いです。

シェアする

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です

コメントする

Translate »