Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public ProgramController run(final Program program, ProgramOptions options) {
// tries to access cdap data. For example, writing to a FileSet will fail, as the yarn user will
// be running the job, but the data directory will be owned by cdap.
if (MapReduceTaskContextProvider.isLocal(hConf) || UserGroupInformation.isSecurityEnabled()) {
mapReduceRuntimeService.start();
mapReduceRuntimeService.startAsync().awaitRunning();
} else {
ProgramRunners.startAsUser(cConf.get(Constants.CFG_HDFS_USER), mapReduceRuntimeService);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void destroy() {
}

@Override
protected String getServiceName() {
protected String serviceName() {
return "MapReduceRunner-" + specification.getName();
}

Expand Down Expand Up @@ -412,6 +412,7 @@ protected void run() throws Exception {
// If we don't sleep, the final stats may not get sent before shutdown.
TimeUnit.SECONDS.sleep(2L);


// If the job is not successful, throw exception so that this service will terminate with a failure state
// Shutdown will still get executed, but the service will notify failure after that.
// However, if it's the job is requested to stop (via triggerShutdown, meaning it's a user action), don't throw
Expand Down Expand Up @@ -454,7 +455,7 @@ public void run() {
}
});
t.setDaemon(true);
t.setName(getServiceName());
t.setName(serviceName());
t.start();
}
};
Expand Down Expand Up @@ -1012,7 +1013,7 @@ private Location createPluginArchive(Location targetDir) throws IOException {
*/
private Location copyFileToLocation(File file, Location targetDir) throws IOException {
Location targetLocation = targetDir.append(file.getName()).getTempFile(".jar");
Files.copy(file, Locations.newOutputSupplier(targetLocation));
Files.asByteSource(file).copyTo(Locations.newByteSink(targetLocation));
return targetLocation;
}

Expand All @@ -1024,8 +1025,7 @@ private Location copyFileToLocation(File file, Location targetDir) throws IOExce
private Location copyProgramJar(Location targetDir) throws IOException {
Location programJarCopy = targetDir.append("program.jar");

ByteStreams.copy(Locations.newInputSupplier(programJarLocation),
Locations.newOutputSupplier(programJarCopy));
Locations.newByteSource(programJarLocation).copyTo(Locations.newByteSink(programJarCopy));
LOG.debug("Copied Program Jar to {}, source: {}", programJarCopy, programJarLocation);
return programJarCopy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.common;

import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import io.cdap.cdap.api.common.HttpErrorStatusProvider;
Expand Down Expand Up @@ -76,14 +77,14 @@ public void handle(Throwable t, HttpRequest request, HttpResponder responder) {

// If it is not some known exception type, response with 500.
LOG.error("Unexpected error: request={} {} user={}:", request.method().name(), request.getUri(),
Objects.firstNonNull(SecurityRequestContext.getUserId(), "<null>"), t);
MoreObjects.firstNonNull(SecurityRequestContext.getUserId(), "<null>"), t);
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR,
Throwables.getRootCause(t).getMessage());
}

private void logWithTrace(HttpRequest request, Throwable t) {
LOG.trace("Error in handling request={} {} for user={}:", request.method().name(),
request.getUri(),
Objects.firstNonNull(SecurityRequestContext.getUserId(), "<null>"), t);
MoreObjects.firstNonNull(SecurityRequestContext.getUserId(), "<null>"), t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,28 +90,28 @@ private static final class ZkClientServiceProvider implements Provider<ZKClientS
@Override
public ZKClientService get() {
String kafkaZkQuorum = cConf.get(KafkaConstants.ConfigKeys.ZOOKEEPER_QUORUM);
ZKClientService zkClientService;
ZKClientService tmpZkClientService;
final AtomicInteger startedCount = new AtomicInteger();

if (kafkaZkQuorum == null) {
// If there is no separate zookeeper quorum, use the shared ZKClientService.
zkClientService = injector.getInstance(ZKClientService.class);
tmpZkClientService = injector.getInstance(ZKClientService.class);

String kafkaNamespace = cConf.get(KafkaConstants.ConfigKeys.ZOOKEEPER_NAMESPACE_CONFIG);
if (kafkaNamespace != null) {
if (!kafkaNamespace.startsWith("/")) {
kafkaNamespace = "/" + kafkaNamespace;
}
zkClientService = ZKClientServices.delegate(
ZKClients.namespace(zkClientService, kafkaNamespace));
tmpZkClientService = ZKClientServices.delegate(
ZKClients.namespace(tmpZkClientService, kafkaNamespace));
}

// Since it is the shared ZKClientService, we don't want the KafkaClientService or BrokerService to
// start/stop it, hence having the startedCount set to 1
startedCount.set(1);
} else {
// Otherwise create a new ZKClientService
zkClientService = ZKClientServices.delegate(
tmpZkClientService = ZKClientServices.delegate(
ZKClients.reWatchOnExpire(
ZKClients.retryOnFailure(
ZKClientService.Builder.of(kafkaZkQuorum)
Expand All @@ -124,26 +124,53 @@ public ZKClientService get() {
)
);
}
// Assign to a final variable for use in the inner class
final ZKClientService zkClientService = tmpZkClientService;

// Wrap the ZKClientService using simple reference counting for start/stop
// The logic doesn't need to be sophisticated since it is a private binding and only used by the
// wrapping KafkaClientService and BrokerService, which they will make sure no duplicate calls will be
// made to the start/stop methods.
return new ForwardingZKClientService(zkClientService) {
@Override
public ListenableFuture<State> start() {
public Service startAsync() {
if (startedCount.getAndIncrement() == 0) {
return super.start();
return zkClientService.startAsync();
}
return Futures.immediateFuture(State.RUNNING);
return this;
}

@Override
public ListenableFuture<State> stop() {
public Service stopAsync() {
if (startedCount.decrementAndGet() == 0) {
return super.stop();
return zkClientService.stopAsync();
}
return Futures.immediateFuture(State.TERMINATED);
return this;
}

@Override
public void awaitRunning() {
zkClientService.awaitRunning();
}

@Override
public void awaitRunning(long timeout, TimeUnit unit) throws java.util.concurrent.TimeoutException {
zkClientService.awaitRunning(timeout, unit);
}

@Override
public void awaitTerminated() {
zkClientService.awaitTerminated();
}

@Override
public void awaitTerminated(long timeout, TimeUnit unit) throws java.util.concurrent.TimeoutException {
zkClientService.awaitTerminated(timeout, unit);
}

@Override
public Throwable failureCause() {
return zkClientService.failureCause();
}
};
}
Expand All @@ -166,12 +193,12 @@ private abstract static class AbstractServiceWithZkClient<T extends Service> ext

@Override
protected final void startUp() throws Exception {
zkClientService.startAndWait();
zkClientService.startAsync().awaitRunning();
try {
delegate.startAndWait();
delegate.startAsync().awaitRunning();
} catch (Exception e) {
try {
zkClientService.stopAndWait();
zkClientService.stopAsync().awaitTerminated();
} catch (Exception se) {
e.addSuppressed(se);
}
Expand All @@ -182,16 +209,16 @@ protected final void startUp() throws Exception {
@Override
protected final void shutDown() throws Exception {
try {
delegate.stopAndWait();
delegate.stopAsync().awaitTerminated();
} catch (Exception e) {
try {
zkClientService.stopAndWait();
zkClientService.stopAsync().awaitTerminated();
} catch (Exception se) {
e.addSuppressed(se);
}
throw e;
}
zkClientService.stopAndWait();
zkClientService.stopAsync().awaitTerminated();
}

protected T getDelegate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ public final void handleError(Throwable cause) {
try {
LOG.error("Failed to handle upload", cause);
if (output != null) {
Closeables.closeQuietly(output);
try {
output.close();
} catch (IOException e) {
LOG.warn("Failed to close output stream for file {}", file, e);
}
}
onError(cause);
// The netty-http framework will response with 500, no need to response in here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,18 @@ public void chunk(ByteBuf request, HttpResponder responder) {

@Override
public void finished(HttpResponder responder) {
Closeables.closeQuietly(outputStream);
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
LOG.warn("Failed to close output stream", e);
}
}

try (InputStream is = new CombineInputStream(buffer, outputStream == null ? null : spillPath)) {
processInput(is, responder);
} catch (Exception e) {
Throwables.propagateIfPossible(e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(String.format("Failed to process input from buffer%s",
outputStream == null ? "" : " and spill path " + spillPath), e);
} finally {
Expand All @@ -103,7 +109,13 @@ public void finished(HttpResponder responder) {

@Override
public void handleError(Throwable cause) {
Closeables.closeQuietly(outputStream);
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
LOG.warn("Failed to close output stream", e);
}
}
cleanup();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public String getSimpleName() {
String innerClassName = className.substring(lastDollarSign + 1);
// local and anonymous classes are prefixed with number (1,2,3...), anonymous classes are
// entirely numeric whereas local classes have the user supplied name as a suffix
return CharMatcher.DIGIT.trimLeadingFrom(innerClassName);
return CharMatcher.inRange('0', '9').trimLeadingFrom(innerClassName);
}
String packageName = getPackageName();
if (packageName.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,25 @@ public boolean seekToNewSource(long targetPos) throws IOException {

@Override
public void close() throws IOException {
Throwable error = null;
try {
super.close();
} catch (Throwable t) {
error = t;
throw t;
} finally {
if (sizeProvider instanceof Closeable) {
Closeables.closeQuietly((Closeable) sizeProvider);
try {
((Closeable) sizeProvider).close();
} catch (IOException e) {
if (error != null) {
error.addSuppressed(e);
} else {
// If super.close() succeeded but the provider failed,
// you should probably still know about it.
throw e;
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.cdap.cdap.common.utils.DirUtils;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
Expand Down Expand Up @@ -123,7 +124,9 @@ void clearCache(String fileName, long lastModified) {
}

String getCacheName(Location location) {
return Hashing.md5().hashString(location.toURI().getPath()).toString() + "-"
return Hashing.md5()
.hashString(location.toURI().getPath(), StandardCharsets.UTF_8)
.toString() + "-"
+ location.getName();
}

Expand Down
Loading