import java.io.*;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;


public class DBInputWritable implements Writable, DBWritable
{
        String symbol;
        String date;
        double open;
        double high;
        double low;
        double close;
        int volume;
        double adjClose;

          //private final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");

          public void readFields(DataInput in) throws IOException
          {
           symbol=in.readLine();
           date=in.readLine();
           open=in.readDouble();
           high=in.readDouble();
           low=in.readDouble();
           close=in.readDouble();
           volume=in.readInt();
           adjClose=in.readDouble();

          }

          public void readFields(ResultSet rs) throws SQLException
          {
           symbol = rs.getString(2);
           date = rs.getString(3);
           open = rs.getDouble(4);
           high = rs.getDouble(5);
           low = rs.getDouble(6);
           close = rs.getDouble(7);
           volume = rs.getInt(8);
           adjClose = rs.getDouble(9);
          }

          public void write(DataOutput out) throws IOException
          {

          }

          public void write( PreparedStatement ps) throws SQLException
          {

          }

          public String getSymbol()
          {
            return symbol;
          }


          public  String getDate()
          {
            return date;
          }

          public double getOpen()
          {
            return open;
          }

          public double getHigh()
          {
            return high;
          }

          public double getLow()
          {
            return low;
          }

          public double getClose()
          {
            return close;
          }

          public int getVolume()
          {
            return volume;
          }

          public double getAdjClose()
           {
            return adjClose;
           }

}


public class DBOutputWritable implements Writable, DBWritable
{

         String symbol;
         String date;
         double open;
         double high;
         double low;
         double close;
         int volume;
         double adjClose;
         ;




       public DBOutputWritable(String symbol,String date,String open,String high,String low,String close,String volume,String adjClose)

       {

           this.symbol=symbol;
           this.date=date;
           this.open=Double.parseDouble(open);
           this.high=Double.parseDouble(high);
           this.low=Double.parseDouble(low);
           this.close=Double.parseDouble(close);
           this.volume=Integer.parseInt(volume);
           this.adjClose=Double.parseDouble(adjClose);

       }
       public void readFields(DataInput in) throws IOException
       {

       }


       public void readFields(ResultSet rs) throws SQLException
       {

       }


       public void write(DataOutput out) throws IOException
       {
         out.writeChars(symbol);
         out.writeChars(date);
         out.writeDouble(open);
         out.writeDouble(high);
         out.writeDouble(low);
         out.writeDouble(close);
         out.writeInt(volume);
         out.writeDouble(adjClose);

       }


       public void write(PreparedStatement ps) throws SQLException
       {

          ps.setString(1,symbol);
          ps.setString(2,date);
          ps.setDouble(3,open);
          ps.setDouble(4,high);
          ps.setDouble(5,low);
          ps.setDouble(6,close);
          ps.setInt(7,volume);
          ps.setDouble(8,adjClose);
         }



   }

    public class Map extends Mapper<LongWritable,DBInputWritable,Text,Text>
    {

    public void  map(LongWritable key,  DBInputWritable value, Context ctx)
    {
              try
                    {
                  Text set;
                  set= new Text(value.getDate());
                  String line = value.getSymbol()+","+value.getDate()+","+value.getOpen()+","+value.getHigh()+","+value.getLow()+","+value.getClose()+","+value.getVolume()+","+value.getAdjClose();

                       ctx.write(set,new Text(line));

                     }

         catch(IOException e)
          {
               e.printStackTrace();
          }

         catch(InterruptedException e)
          {
              e.printStackTrace();
          }

    }
    }

public class Reduce extends Reducer<Text, Text, DBOutputWritable, NullWritable>
{



   public void reduce(Text key, Text value, Context ctx)
   {

        try
             {
               String []line= value.toString().split(",");

                    String sym=line[0];
                    String dt=line[1];
                    String opn=line[2];
                    String hgh=line[3];
                    String lw=line[4];
                    String cls=line[5];
                    String vlm=line[6];
                    String adcls=line[7];


          ctx.write(new DBOutputWritable(sym,dt,opn,hgh,lw,cls,vlm,adcls),NullWritable.get());

                }

         catch(IOException e)
          {
            e.printStackTrace();
           }

          catch(InterruptedException e)
          {
            e.printStackTrace();
          }
}
   }



  public class Main
    {

         public static void main(String [] args) throws Exception

    {

     Configuration conf = new Configuration();

     DBConfiguration.configureDB(conf,
    "com.mysql.jdbc.Driver",             //Driver Class
    "jdbc:mysql://192.168.198.128:3306/testDb",   //DB URL
    "sqoopuser",      //USERNAME
    "passphrase");  //PASSWORD

    Job job = new Job(conf);
    job.setJarByClass(Main.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(DBOutputWritable.class);
    job.setOutputValueClass(NullWritable.class);
    job.setInputFormatClass(DBInputFormat.class);
    job.setOutputFormatClass(DBOutputFormat.class);

    DBInputFormat.setInput(
    job,
    DBInputWritable.class,
    "aapldata",    //input table name
    null,
    null,
    new String[] {"stock","symbol", "date" ,"open", "high", "low", "close", "volume", "adjClose"}
    //Table Columns
    );

    DBOutputFormat.setOutput(
    job,
    "aapldatanew", //Output Table Name
    new String[] {"symbol", "date" ,"open", "high", "low", "close", "volume", "adjClose"}
    //Table Columns
    );

    System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

    }

我认为代码是完美的图片。但是我仍然遇到以下错误:
14/11/26 22:09:47 INFO mapred.JobClient:  map 100% reduce 0%
14/11/26 22:09:55 INFO mapred.JobClient:  map 100% reduce 33%
14/11/26 22:09:58 INFO mapred.JobClient: Task Id : attempt_201411262208_0001_r_000000_2, Status : FAILED
java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lidb.DBWritable
        at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.write(DBOutputFormat.java:66
    at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:586)
    at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
    at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:156)
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

需要您的宝贵见解。

最佳答案

map类中,以文本而不是DBInputWritable的形式获取输入:

public class Map extends Mapper {
  public void map(LongWritable key,Text value, Context ctx)

08-27 16:28