Project

General

Profile

« Previous | Next » 

Revision 5637

Added by berkley over 13 years ago

more work on streaming mmp support

View differences:

ResourceHandler.java
65 65
import edu.ucsb.nceas.metacat.MetacatHandler;
66 66
import edu.ucsb.nceas.metacat.client.InsufficientKarmaException;
67 67
import edu.ucsb.nceas.metacat.dataone.CrudService;
68
import edu.ucsb.nceas.metacat.properties.PropertyService;
68 69
import edu.ucsb.nceas.metacat.service.SessionService;
69 70
import edu.ucsb.nceas.metacat.util.RequestUtil;
70 71
import edu.ucsb.nceas.metacat.util.SessionData;
72
import edu.ucsb.nceas.utilities.PropertyNotFoundException;
73

  
71 74
import org.dataone.service.streaming.util.StreamUtil;
72 75

  
73 76
import com.gc.iotools.stream.is.InputStreamFromOutputStream;
......
1058 1061
        return h;
1059 1062
    }
1060 1063
    */
1061
    protected InputStream processMMP(final InputStream is)
1062
      throws IOException
1064
    
1065
    /**
1066
     * write the parts of an MMP to files.  The first file in the vector is
1067
     * a system metadata file, the 2nd is the object.
1068
     */
1069
    private static Vector<File> writeMMPPartsToFiles(InputStream is)
1070
        throws IOException
1063 1071
    {
1064
        Hashtable<String, InputStream> partsHash = new Hashtable<String, InputStream>();
1065
        final InputStreamFromOutputStream<String> objectStream = 
1066
            new InputStreamFromOutputStream<String>() 
1072
        System.out.println("producing data");
1073
        Logger logMetacat = Logger.getLogger(ResourceHandler.class);
1074
        
1075
        String[] searchStrings = {"boundary=", 
1076
                "Content-Disposition: attachment; filename=systemmetadata",
1077
                "Content-Disposition: attachment; filename=object"};
1078
        
1079
        String boundary = "";
1080
        String searchSegment = null;
1081
        byte[] b = new byte[1024];
1082
        int numread = is.read(b, 0, 1024);
1083
        String s = new String(b, 0, numread);
1084
        int ssi = 0;
1085
        boolean doneWithCurrentArray = false;
1086
        boolean searchForBoundary = false;
1087
        boolean writeMarker = true;
1088
        int currentArrayIndex = 0;
1089
        
1090
        File tmpDir;
1091
        try
1067 1092
        {
1068
            @Override
1069
            public String produce(final OutputStream dataSink) throws Exception 
1093
            tmpDir = new File(PropertyService.getProperty("application.tempDir"));
1094
        }
1095
        catch(PropertyNotFoundException pnfe)
1096
        {
1097
            logMetacat.error("ResourceHandler.writeMMPPartstoFiles: " +
1098
                    "application.tmpDir not found.  Using /tmp instead.");
1099
            tmpDir = new File("/tmp");
1100
        }
1101
        long datetimetag = new Date().getTime();
1102
        File smFile = new File(tmpDir, "sm." + datetimetag + ".tmp");
1103
        File objFile = new File(tmpDir, "obj." + datetimetag + ".tmp");
1104
        FileOutputStream smFos = new FileOutputStream(smFile);
1105
        FileOutputStream objFos = new FileOutputStream(objFile);
1106
        
1107
        boolean writeSM = true;
1108
        
1109
        while(numread != -1)
1110
        {
1111
            if(ssi == 2 && writeSM)
1070 1112
            {
1071
                System.out.println("producing data");
1072
                String[] searchStrings = {"boundary=", "Content-Disposition: attachment; filename=systemmetadata",
1073
                "Content-Disposition: attachment; filename=object"};
1074
                String boundary = "";
1075
                String searchSegment = null;
1076
                byte[] b = new byte[1024];
1077
                int numread = is.read(b, 0, 1024);
1078
                String s = new String(b, 0, numread);
1079
                int ssi = 0;
1080
                boolean doneWithCurrentArray = false;
1081
                boolean searchForBoundary = false;
1082
                int currentArrayIndex = 0;
1083
                
1084
                while(numread != -1)
1113
                writeSM = false;
1114
            }
1115
            
1116
            String searchString;
1117
            if(!searchForBoundary)
1118
            {
1119
                if(ssi == searchStrings.length)
1085 1120
                {
1086
                    String searchString;
1087
                    if(!searchForBoundary)
1121
                    break;
1122
                }
1123
                searchString = searchStrings[ssi];
1124
            }
1125
            else
1126
            {
1127
                searchString = boundary;
1128
            }
1129
            
1130
            System.out.println("searchString: " + searchString);
1131
            System.out.println("done with array: " + doneWithCurrentArray);
1132
            System.out.println("searchForBoundary: " + searchForBoundary);
1133
            if(doneWithCurrentArray)
1134
            {
1135
                System.out.println("Getting new chunk");
1136
                numread = is.read(b, 0, 1024);
1137
                s = new String(b, 0, numread);
1138
            }
1139
            
1140
            System.out.println("/////////////////current chunk (s): \n" + s + "\n////////////end current chunk");
1141
            
1142
            int[] result = StreamUtil.lookForMatch(searchString, s);
1143
            System.out.println("got result from StreamUtil: " + result[0] + ", " + result[1]);
1144
            if(result[0] != -1 && result[1] == searchString.length())
1145
            { //we have the whole search string in this group of bytes
1146
                System.out.println("1");
1147
                if(searchForBoundary)
1148
                {
1149
                    System.out.println("2");
1150
                    //find the boundary, chop it off the end and inc ssi
1151
                    String strToWrite = s.substring(0, s.indexOf(boundary, currentArrayIndex));
1152
                    System.out.println("XXXX 1 writing string: " + strToWrite + "\nXXXX");
1153
                    
1154
                    //dataSink.write(strToWrite.getBytes());
1155
                    if(writeSM)
1088 1156
                    {
1089
                        if(ssi == searchStrings.length)
1090
                        {
1091
                            break;
1092
                        }
1093
                        searchString = searchStrings[ssi];
1157
                        smFos.write(strToWrite.getBytes());
1094 1158
                    }
1095 1159
                    else
1096 1160
                    {
1097
                        searchString = boundary;
1161
                        objFos.write(strToWrite.getBytes());
1098 1162
                    }
1163
                    doneWithCurrentArray = false; 
1164
                    currentArrayIndex = s.indexOf(boundary, currentArrayIndex) + 1;
1165
                    searchForBoundary = false;
1166
                    ssi++;
1167
                }
1168
                else if(ssi == 0)
1169
                { //we're looking for the boundary marker
1170
                    int searchStringIndex = s.indexOf(searchString);
1171
                    if(s.indexOf("\"", searchStringIndex + searchString.length() + 1) == -1)
1172
                    { //the end of the boundary is in the next byte array
1173
                        boundary = s.substring(searchStringIndex + searchString.length() + 1, s.length());
1174
                        doneWithCurrentArray = true;
1175
                    }
1176
                    else if(!boundary.startsWith("--"))
1177
                    { //we can read the whole boundary from this byte array
1178
                        boundary = s.substring(searchStringIndex + searchString.length() + 1, 
1179
                            s.indexOf("\"", searchStringIndex + searchString.length() + 1));
1180
                        boundary = "--" + boundary;
1181
                        ssi++;
1182
                        doneWithCurrentArray = false;
1183
                    }
1184
                    else
1185
                    { //we're now reading the 2nd byte array to get the rest of the boundary
1186
                        searchString = "\"";
1187
                        searchStringIndex = s.indexOf(searchString);
1188
                        boundary += s.substring(0, searchStringIndex);
1189
                        boundary = "--" + boundary;
1190
                        ssi++;
1191
                        doneWithCurrentArray = false;
1192
                    }
1193
                    System.out.println("boundary: " + boundary);
1194
                }
1195
                else if(ssi == 1 || ssi == 2)
1196
                { 
1197
                    int searchStringIndex = s.indexOf(searchString);
1198
                    //find the beginning of the system metadata or data object and 
1199
                    //start writing it to the dataSink
1200
                    int smBeginIndex = s.indexOf(searchString) + searchString.length() + 2;
1201
                    searchForBoundary = true;
1202
                    int boundaryIndex = s.indexOf(boundary, smBeginIndex);
1203
                    int end = s.length();
1204
                    if(boundaryIndex != -1)
1205
                    {
1206
                        end = boundaryIndex;
1207
                        ssi++;
1208
                        searchForBoundary = false;
1209
                    }
1210
                    currentArrayIndex = end + 1;
1211
                    String strToWrite = s.substring(smBeginIndex, end);
1212
                    System.out.println("XXXX 2 writing string: " + strToWrite + "\nXXXX");
1099 1213
                    
1100
                    System.out.println("searchString: " + searchString);
1101
                    System.out.println("done with array: " + doneWithCurrentArray);
1102
                    System.out.println("searchForBoundary: " + searchForBoundary);
1103
                    if(doneWithCurrentArray)
1214
                    //dataSink.write(strToWrite.getBytes());
1215
                    if(writeSM)
1104 1216
                    {
1105
                        System.out.println("Getting new chunk");
1106
                        numread = is.read(b, 0, 1024);
1107
                        s = new String(b, 0, numread);
1217
                        smFos.write(strToWrite.getBytes());
1108 1218
                    }
1219
                    else
1220
                    {
1221
                        objFos.write(strToWrite.getBytes());
1222
                    }
1109 1223
                    
1110
                    System.out.println("/////////////////current chunk (s): \n" + s + "\n////////////end current chunk");
1224
                    if(end == s.length())
1225
                    {
1226
                        System.out.println("DONE WITH ARRAY");
1227
                        doneWithCurrentArray = true;
1228
                        currentArrayIndex = 0;
1229
                    }
1230
                    else
1231
                    {
1232
                        System.out.println("NOT DONE WITH ARRAY");
1233
                        doneWithCurrentArray = false;
1234
                    }
1235
                }
1236
            }
1237
            else if(result[1] != searchString.length() && result[0] != 0)
1238
            {  //part of the search string is on the end of this byte group
1239
                doneWithCurrentArray = true;
1240
                searchForBoundary = false;
1241
                searchSegment = s.substring(result[0], s.length());
1242
                System.out.println("searchSegment1: " + searchSegment);
1243
            }
1244
            else if(result[1] != searchString.length() && result[0] == 0)
1245
            {  //we have the end of the searchString at the beginning of this byte group
1246
                if(searchSegment != null)
1247
                {
1248
                    searchSegment += s.substring(0, result[1]);
1249
                    System.out.println("searchSegment2: " + searchSegment);
1250
                    searchForBoundary = true;
1251
                    doneWithCurrentArray = true;
1252
                    ssi++;
1253
                    String strToWrite = s.substring(result[1] + 1, s.length());
1254
                    System.out.println("XXXX 3 writing string: " + strToWrite + "\nXXXX");
1111 1255
                    
1112
                    int[] result = StreamUtil.lookForMatch(searchString, s);
1113
                    System.out.println("got result from StreamUtil: " + result[0] + ", " + result[1]);
1114
                    if(result[0] != -1 && result[1] == searchString.length())
1115
                    { //we have the whole search string in this group of bytes
1116
                        System.out.println("1");
1117
                        if(searchForBoundary)
1118
                        {
1119
                            System.out.println("2");
1120
                            //find the boundary, chop it off the end and inc ssi
1121
                            String strToWrite = s.substring(0, s.indexOf(boundary, currentArrayIndex));
1122
                            System.out.println("XXXX 1 writing string: " + strToWrite + "\nXXXX");
1123
                            dataSink.write(strToWrite.getBytes());
1124
                            doneWithCurrentArray = false; 
1125
                            currentArrayIndex = s.indexOf(boundary, currentArrayIndex) + 1;
1126
                            searchForBoundary = false;
1127
                            ssi++;
1128
                        }
1129
                        else if(ssi == 0)
1130
                        { //we're looking for the boundary marker
1131
                            int searchStringIndex = s.indexOf(searchString);
1132
                            if(s.indexOf("\"", searchStringIndex + searchString.length() + 1) == -1)
1133
                            { //the end of the boundary is in the next byte array
1134
                                boundary = s.substring(searchStringIndex + searchString.length() + 1, s.length());
1135
                                doneWithCurrentArray = true;
1136
                            }
1137
                            else if(!boundary.startsWith("--"))
1138
                            { //we can read the whole boundary from this byte array
1139
                                boundary = s.substring(searchStringIndex + searchString.length() + 1, 
1140
                                    s.indexOf("\"", searchStringIndex + searchString.length() + 1));
1141
                                boundary = "--" + boundary;
1142
                                ssi++;
1143
                                doneWithCurrentArray = false;
1144
                            }
1145
                            else
1146
                            { //we're now reading the 2nd byte array to get the rest of the boundary
1147
                                searchString = "\"";
1148
                                searchStringIndex = s.indexOf(searchString);
1149
                                boundary += s.substring(0, searchStringIndex);
1150
                                boundary = "--" + boundary;
1151
                                ssi++;
1152
                                doneWithCurrentArray = false;
1153
                            }
1154
                            System.out.println("boundary: " + boundary);
1155
                        }
1156
                        else if(ssi == 1 || ssi == 2)
1157
                        { 
1158
                            int searchStringIndex = s.indexOf(searchString);
1159
                            //find the beginning of the system metadata or data object and 
1160
                            //start writing it to the dataSink
1161
                            int smBeginIndex = s.indexOf(searchString) + searchString.length() + 2;
1162
                            searchForBoundary = true;
1163
                            int boundaryIndex = s.indexOf(boundary, smBeginIndex);
1164
                            int end = s.length();
1165
                            if(boundaryIndex != -1)
1166
                            {
1167
                                end = boundaryIndex;
1168
                                ssi++;
1169
                                searchForBoundary = false;
1170
                            }
1171
                            currentArrayIndex = end + 1;
1172
                            String strToWrite = s.substring(smBeginIndex, end);
1173
                            System.out.println("XXXX 2 writing string: " + strToWrite + "\nXXXX");
1174
                            dataSink.write(strToWrite.getBytes());
1175
                            if(end == s.length())
1176
                            {
1177
                                System.out.println("DONE WITH ARRAY");
1178
                                doneWithCurrentArray = true;
1179
                                currentArrayIndex = 0;
1180
                            }
1181
                            else
1182
                            {
1183
                                System.out.println("NOT DONE WITH ARRAY");
1184
                                doneWithCurrentArray = false;
1185
                            }
1186
                        }
1256
                    //dataSink.write(strToWrite.getBytes());
1257
                    if(writeSM)
1258
                    {
1259
                        smFos.write(strToWrite.getBytes());
1187 1260
                    }
1188
                    else if(result[1] != searchString.length() && result[0] != 0)
1189
                    {  //part of the search string is on the end of this byte group
1190
                        doneWithCurrentArray = true;
1191
                        searchForBoundary = false;
1192
                        searchSegment = s.substring(result[0], s.length());
1193
                        System.out.println("searchSegment1: " + searchSegment);
1261
                    else
1262
                    {
1263
                        objFos.write(strToWrite.getBytes());
1194 1264
                    }
1195
                    else if(result[1] != searchString.length() && result[0] == 0)
1196
                    {  //we have the end of the searchString at the beginning of this byte group
1197
                        if(searchSegment != null)
1198
                        {
1199
                            searchSegment += s.substring(0, result[1]);
1200
                            System.out.println("searchSegment2: " + searchSegment);
1201
                            searchForBoundary = true;
1202
                            doneWithCurrentArray = true;
1203
                            ssi++;
1204
                            String strToWrite = s.substring(result[1] + 1, s.length());
1205
                            System.out.println("XXXX 3 writing string: " + strToWrite + "\nXXXX");
1206
                            dataSink.write(strToWrite.getBytes());
1207
                        }
1265
                }
1266
            }
1267
            else
1268
            {
1269
                if(searchForBoundary)
1270
                {
1271
                    //we're in between the start and end of a section 
1272
                    //so just write this to the stream
1273
                    System.out.println("XXXX 4 writing string: " + s + "\nXXXX");
1274
                    //dataSink.write(s.getBytes());
1275
                    if(writeSM)
1276
                    {
1277
                        smFos.write(s.getBytes());
1208 1278
                    }
1209 1279
                    else
1210 1280
                    {
1211
                        if(searchForBoundary)
1212
                        {
1213
                            //we're in between the start and end of a section 
1214
                            //so just write this to the stream
1215
                            System.out.println("XXXX 4 writing string: " + s + "\nXXXX");
1216
                            dataSink.write(s.getBytes());
1217
                        }
1281
                        objFos.write(s.getBytes());
1218 1282
                    }
1219 1283
                }
1220
                dataSink.flush();
1221
                return "Completed";
1222 1284
            }
1285
        }
1286
        
1287
        Vector<File> v = new Vector<File>();
1288
        objFos.flush();
1289
        smFos.flush();
1290
        v.add(smFile);
1291
        v.add(objFile);
1292
        return v;
1293
    }
1294
    
1295
    /**
1296
     * return a vector where the first element is a string that represents the system
1297
     * metadata and the 2nd element is an InputStream that is the object
1298
     */
1299
    protected Vector<File> processMMP(final InputStream is)
1300
      throws IOException
1301
    {
1302
     /*   final InputStreamFromOutputStream<String> objectStream = 
1303
            new InputStreamFromOutputStream<String>() 
1304
        {
1305
            @Override
1306
            public String produce(final OutputStream dataSink) throws Exception 
1307
            {
1308
                
1309
                return produceInputStream(dataSink, is);
1310
            }
1223 1311
        };
1224 1312
        
1225
        //return partsHash;
1226
        return objectStream;
1313
        return objectStream;*/
1314
        return writeMMPPartsToFiles(is);
1227 1315
    }
1228 1316
    
1229 1317
    /**

Also available in: Unified diff